数据分类
整体分为 3 类:
内存数据
-
磁盘数据: 磁盘数据又分为
1. 快照 2. 事务日志
在zk启动过程中,3类数据之间的关系为:
内存数据
ZK 的数据模型:树
树中单个节点包含的内容:
节点数据
节点 ACL 信息
节点的路径
在Zookeeper中,数据存储分为两部分:内存数据存储和磁盘数据存储,我们主要分析服务器启动时内存数据库的初始化过程和主从服务器数据同步的过程,先介绍一下涉及的基本类
DataTree
Zookeeper的数据模型是一棵树,DataTree是内存数据存储的核心,代表了内存中一份完整的数据(最新),包括所有的节点路径,节点数据和ACL信息,对应watches等。类的主要属性为:
//节点路径为key,节点数据内容DataNode为value.实时存储了所有的zk节点,使用ConcurrentHashMap保证并发性
private final ConcurrentHashMap<String, DataNode> nodes =new ConcurrentHashMap<String, DataNode>();
//节点数据对应的watch
private final WatchManager dataWatches = new WatchManager();
//节点路径对应的watch
private final WatchManager childWatches = new WatchManager();
//key为sessionId,value为该会话对应的临时节点路径,方便实时访问和清理
private final Map<Long, HashSet<String>> ephemerals = new ConcurrentHashMap<Long, HashSet<String>>();
//This set contains the paths of all container nodes
private final Set<String> containers =Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
//This set contains the paths of all ttl nodes
private final Set<String> ttls =Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
//内存数据库的最大zxid
public volatile long lastProcessedZxid = 0;
DataNode
数据存储的最小单元,包含节点的数据内容,节点状态,子节点列表,以及对子节点的操作接口等,主要属性为:
//节点内容
byte data[];
Long acl;
//节点状态,包括一些节点的元数据,如ephemeralOwner,czxid等
public StatPersisted stat;
//子节点相对父节点路径集合,不包括父节点路径
private Set<String> children = null;
抛出 2 个问题:
1. DataTree 中 nodes 是 Map,表示所有的 ZK 节点,那其内部 key 是什么?
Re:ZNode 的唯一标识 path 作为 key
2. ephemerals 是Map,用于存储临时节点,那其内部 key 是什么?value 又是什么?
Re:临时节点是跟 Session 绑定的,sessionId 作为 key
ZKDatabase
Zookeeper的内存数据库,负责管理Zookeeper的所有会话,DataTree存储和事务日志。它会定时向磁盘dump快照数据(snapCount主要控制),服务器启动时,会通过磁盘上的事务日志和快照数据文件恢复成完整的内存数据库。主要属性为:
protected DataTree dataTree;
//key为sessionId,value为会话过期时间
protected ConcurrentHashMap<Long, Integer> sessionsWithTimeouts;
//用于和磁盘交互事务日志文件和快照文件的类
protected FileTxnSnapLog snapLog;
//主从数据同步时使用
protected long minCommittedLog, maxCommittedLog;
public static final int commitLogCount = 500;
protected static int commitLogBuffer = 700;
protected LinkedList<Proposal> committedLog = new LinkedList<Proposal>();
zookeeper内存数据库的两种持久化方式:
FileTxnLog:事务日志文件,以日志追加的方式维护。
FileSnap:内存快照文件,保存内存数据库某一时刻的状态,所以数据不一定是最新的。
TxnLog:是一个读取日志的接口,提供了读取事务log的接口方法。
SnapShot:是一个操作日志快照的接口,提供了对快照文件操作的方法。
FileTxnLog:实现TxnLog接口,提供了读取事务日志的方法实现。
FileSnap:实现Snapshot接口,负责存储、序列化、反序列化、访问快照。
FileTxnSnapLog,封装了TxnLog和SnapShot。
Util,工具类,提供持久化所需的API。
我们可以看到,zookeeper总共提供了两种持久化文件,分别是内存快照SnapShot和事务日志TxnLog(这种日志有点类似MySQL数据库中的binlog,zookeeper会把所有涉及到修改内存数据结构的操作日志记录到该log中,也就是说,zookeeper会把每一个事务操作诸如添加、删除都会记录到这个日志文件中,当zookeeper出现异常时,可以借助该事务日志进行数据恢复)。
日志文件它主要是负责实时记录对服务端的每一次的事务操作日志(这里讲的事务和数据库中事务不一样,它是指涉及到对服务器端的内存数据库的增删改这种会变更内存数据库的操作行为,不记录查询)
通过这种日志文件,当zookeeper因为故障而发生重启时,我们就可以根据内存快照文件和事务日志使得内存数据库恢复最新的数据库状态。在zookeeper中,FileTxnLog就是负责日志文件持久化的逻辑对象,它是TxnLog的一个实现类。它会通过在分配内存时会预分配固定大的内存大小;同时保证每次写的时候都是直接追加顺序写入,从而保证日志文件的性能。
首先我们来看下TxnLog它给我们哪些方法:
void rollLog() throws IOException;// 滚动日志,从当前日志滚到下一个日志,不是回滚
boolean append(TxnHeader hdr, Record r) throws IOException;//追加一个请求至事务性日志
TxnIterator read(long zxid) throws IOException;// 可迭代读取事务性日志
long getLastLoggedZxid() throws IOException;//事务性操作的最新zxid
boolean truncate(long zxid) throws IOException;// 清空zxid以后的日志
long getDbId() throws IOException;// 获取数据库的id
void commit() throws IOException;// 提交事务并进行确认
void close() throws IOException;// 关闭事务性日志
FileTxnLog是TxnLog的一个实现类,所以它也就负责了实现该接口上的方法,我们来看下它是怎么实现的,对于一个日志文件,特别要关注的是它的读写操作的性能。
FileTxnLog
实现了TxnLog接口,提供了API可以获取日志和写入日志,
首先先看一下事务日志文件的格式
LogFile:
//一个日志文件由以下三个部分组成
* FileHeader TxnList ZeroPad
//1.文件头
* FileHeader: {
* magic 4bytes (ZKLG)
* version 4bytes
* dbid 8bytes
* }
//事务内容
* TxnList:
* Txn || Txn TxnList
* Txn:
//一条事务日志的组成部分
* checksum Txnlen TxnHeader Record 0x42
* checksum: 8bytes Adler32 is currently used
* calculated across payload -- Txnlen, TxnHeader, Record and 0x42
*
* Txnlen:
* len 4bytes
*
* TxnHeader: {
* sessionid 8bytes
* cxid 4bytes
* zxid 8bytes
* time 8bytes
* type 4bytes
* }
*
* Record:
* See Jute definition file for details on the various record types
*
* ZeroPad:
* 0 padded to EOF (filled during preallocation stage)
public class FileTxnLog implements TxnLog {
private static final Logger LOG;
//预分配64m大小
static long preAllocSize = 65536 * 1024;
//直接内存
private static final ByteBuffer fill = ByteBuffer.allocateDirect(1);
//魔数,用于校验日志文件的正确性,默认为1514884167
public final static int TXNLOG_MAGIC =ByteBuffer.wrap("ZKLG".getBytes()).getInt();
public final static int VERSION = 2;
//日志文件名好的前缀
public static final String LOG_FILE_PREFIX = "log";
/** Maximum time we allow for elapsed fsync before WARNing */
private final static long fsyncWarningThresholdMS;
static {
LOG = LoggerFactory.getLogger(FileTxnLog.class);
//获得系统参数,判断系统参数配置了预分配内存大小
String size = System.getProperty("zookeeper.preAllocSize");
if (size != null) {
try {
preAllocSize = Long.parseLong(size) * 1024;
} catch (NumberFormatException e) {
LOG.warn(size + " is not a valid value for preAllocSize");
}
}
/** Local variable to read fsync.warningthresholdms into */
Long fsyncWarningThreshold;
if ((fsyncWarningThreshold = Long.getLong("zookeeper.fsync.warningthresholdms")) == null)
fsyncWarningThreshold = Long.getLong("fsync.warningthresholdms", 1000);
fsyncWarningThresholdMS = fsyncWarningThreshold;
}
// 最大(也就是最新)的zxid
long lastZxidSeen;
volatile BufferedOutputStream logStream = null;
volatile OutputArchive oa;
volatile FileOutputStream fos = null;
//log目录文件
File logDir;
//是否强制同步,默认是yes
private final boolean forceSync = !System.getProperty("zookeeper.forceSync", "yes").equals("no");
long dbId;
private LinkedList<FileOutputStream> streamsToFlush =
new LinkedList<FileOutputStream>();
// 当前配置的大小
long currentSize;
// 写日志文件
File logFileWrite = null;
private volatile long syncElapsedMS = -1L;
}
FileTxnLog方法
append: 主要是负责日志追加,在对日志文件的写操作时,zookeeper主要是通过日志追加的方法
public synchronized boolean append(TxnHeader hdr, Record txn)
throws IOException
{
//校验头部不能为空,hdr主要包含了czxid、clientId、zxid等相关信息
if (hdr == null) {
return false;
}
//如果待写入的事务的事务id小于本地保存的最新的事务id,给提醒
if (hdr.getZxid() <= lastZxidSeen) {
LOG.warn("Current zxid " + hdr.getZxid()
+ " is <= " + lastZxidSeen + " for "
+ hdr.getType());
} else {
lastZxidSeen = hdr.getZxid();
}
//在第一次新建一个FileTxnLog时候,logStream还是空的,这个时候需要为它创建一个新的日志文件,并把logStream指向这个日志文件
if (logStream==null) {
if(LOG.isInfoEnabled()){
LOG.info("Creating new log file: " + Util.makeLogName(hdr.getZxid()));
}
//根据待写入的事务id创建一个新的日志文件,我们可以看到文件名包含这个文件存放的事务的最小事务id
logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid()));
fos = new FileOutputStream(logFileWrite);
logStream=new BufferedOutputStream(fos);
oa = BinaryOutputArchive.getArchive(logStream);
//根据魔数、版本号和数据库id生成日志文件头,dbId默认是0
FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
fhdr.serialize(oa, "fileheader");
// 确保在用0填充之前,先把魔数信息等写入到文件中,进行依次flush
logStream.flush();
currentSize = fos.getChannel().position();//获取当前文件流中的大小
streamsToFlush.add(fos);
}
//重新计算文件大小,保证文件的大小是预分配大小的整数倍
//可以让文件尽可能的占用连续的磁盘扇区,减少后续写入和读取文件时的磁盘寻道开销;
//迅速占用磁盘空间,防止使用过程中所需空间不足
currentSize = padFile(fos.getChannel());
//序列化TxnHeader Record记录到byte[]
byte[] buf = Util.marshallTxnEntry(hdr, txn);
if (buf == null || buf.length == 0) {
throw new IOException("Faulty serialization for header " +
"and txn");
}
//创建校验和的算法,默认是Adler32
//Adler-32可用于计算数据流的校验和 校验和,几乎与 CRC-32 一样可靠,但是能够更快地计算出来
Checksum crc = makeChecksumAlgorithm();
//使用指定的字节数组更新校验和
crc.update(buf, 0, buf.length);
//将更新的校验和值写入到日志文件中
oa.writeLong(crc.getValue(), "txnEntryCRC");
//将TxnHeader Record数据写入到输出流
Util.writeTxnBytes(oa, buf);
return true;
}
//1.先计算buf数据长度写入
//2.写入buf数组数据
// 3.记录尾部以’B’字符结尾,写入0x42
public static void writeTxnBytes(OutputArchive oa, byte[] bytes)
throws IOException {
oa.writeBuffer(bytes, "txnEntry");
oa.writeByte((byte) 0x42, "EOR"); // 'B'
}
我们再看看下它是怎么计算和填充为文件分配的大小
private long padFile(FileChannel fileChannel) throws IOException {
//计算新的文件大小,并通过填充0先占用未使用的byte空间,
//这样可以让文件尽可能的占用连续的磁盘扇区,减少后续写入和读取文件时的磁盘寻道开销
// currentSize默认是0
long newFileSize = calculateFileSizeWithPadding(fileChannel.position(), currentSize, preAllocSize);
if (currentSize != newFileSize) {//将整个日志文件中未使用的部分填充0
fileChannel.write((ByteBuffer) fill.position(0), newFileSize - fill.remaining());
currentSize = newFileSize;
}
return currentSize;
}
/***
*
* @param position 通过管道写入的字节长度
* @param fileSize 当前设置的文件大小
* @param preAllocSize 与分配大小
* @return
*/
public static long calculateFileSizeWithPadding(long position, long fileSize, long preAllocSize) {
// I如果剩余空间不足4k且预分配空间大于0
if (preAllocSize > 0 && position + 4096 >= fileSize) {
//如果已写入的长度超过了文件大小,文件大小扩为 写入的字节长度+预分配的大小
if (position > fileSize){//刚创建的时候肯定走这个,这样就可以保证fileSize始终是preAllocSize的整数倍
fileSize = position + preAllocSize;
//这边会重新调整到预分配块长度的整数倍(是否是为了方便管理统计等?)
fileSize -= fileSize % preAllocSize;
} else {
fileSize += preAllocSize;
}
}
return fileSize;
}
现在我们对FileTxnLog文件的写应该有一定的了解。也知道,在文件新建的时候会预分配文件内存大小,并用0来填充,从而保证文件的磁盘占用是连续的,同时通过日志追加的方式,我们可以保证对日志文件的写的顺序性,从而保证了写性能;我们也可以到,每次将事务写入到日志文件中时,都会先根据写入的事务计算并写入一个校验和,然后再把事务流写入到日志文件中,这样可以充分保证事务日志的安全性和完整性。
read:看完写文件操作,我们当然想知道读文件的操作。因为读写是一一对应的。文件的读取,zookeepeer给我们提供了两种重载的方法:
/***
* zxid:指定迭代读取日志文件中的第一个事务ID
* 默认fastForward=true
*/
TxnIterator read(long zxid)
/***
* zxid:指定迭代读取日志文件中的第一个事务ID
* fastForward:
* true:则返回的迭代器只包含大于等于zxid的事务
* fasle:则返回的迭代器除了包含大于等于zxid的事务,还包含了跟ZXID同一个日志文件的ZXID
*/
TxnIterator read(long zxid, boolean fastForward)
public TxnIterator read(long zxid, boolean fastForward) throws IOException {
return new FileTxnIterator(logDir, zxid, fastForward);
}
/***
* 读取事务日志, 这个方法在服务当机恢复的时候,用来遍历事务日志来恢复数据
* 根据目标事务zxid,从日志文件中读取大于该事务id的事务,并返回这些事务构成的迭代器TxnIterator
* 注意底层在遍历每一个日志文件的时候,会对文件进行魔数校验等,避免文件被损坏
* @param zxid 迭代器的开始位置
* @return
* @throws IOException
*/
public FileTxnIterator(File logDir, long zxid, boolean fastForward)
throws IOException {
this.logDir = logDir;//日志文件存放目录
this.zxid = zxid;//目标事务ID
init();
//在init()方法里,我们拿到目标文件的第一个事务ID,这个时候如果fastForward 是true的话,就要继续往下遍历,找出目标zxid的事务,才进行停止。
//这里要注意hdr是上一次遍历的事务头
if (fastForward && hdr != null) {
while (hdr.getZxid() < zxid) {
if (!next())
break;
}
}
}
void init() throws IOException {
storedFiles = new ArrayList<File>();
//排序目录下的日志文件,我们知道文件名称是根据事务id来创建的,所以,文件的排序也等价于事务的排序
List<File> files = Util.sortDataDir(FileTxnLog.getLogFiles(logDir.listFiles(), 0), "log", false);
for (File f: files) {
//找出起始事务ID大于ZXID的日志文件
if (Util.getZxidFromName(f.getName(), "log") >= zxid) {
storedFiles.add(f);
}
//当第一次遍历到起始ID小于ZXID的日志文件后,要记得把该文件也作为查找目标文件,因为它里面可能包含大于ZXID的事务。
//同时停止遍历,因为后面继续遍历下去也没意思,都是小于ZXID的日志文件。
else if (Util.getZxidFromName(f.getName(), "log") < zxid) {
storedFiles.add(f);
break;
}
}
//找出已排好序且可能存在大于ZXID的日志文件后,打开第一个日志文件输入流准备读取
goToNextLog();
//注意这个时候调用next()只是获取第一个日志文件中的第一个事务ID,该事务ID并不一定是ZXID。
next();
}
//开始读取下一个事务
public boolean next() throws IOException {
if (ia == null) {
return false;
}
try {
//读取校验和的值
long crcValue = ia.readLong("crcvalue");
//读取事务
byte[] bytes = Util.readTxnBytes(ia);
//因为我们是采用预分配内存方式,会定义一个EOF作为空的事务。所以,当我们读取到一个空的,也就表明日志文件已读到末尾
if (bytes == null || bytes.length==0) {
throw new EOFException("Failed to read " + logFile);
}
//分析校验和的值是否正确,防止消息被破坏。这就是为什么我们在append的时候要加入校验和
Checksum crc = makeChecksumAlgorithm();
crc.update(bytes, 0, bytes.length);
if (crcValue != crc.getValue())
throw new IOException(CRC_ERROR);
//反序列事务
hdr = new TxnHeader();
record = SerializeUtils.deserializeTxn(bytes, hdr);
} catch (EOFException e) {
LOG.debug("EOF exception " + e);
inputStream.close();
inputStream = null;
ia = null;
hdr = null;
//日志文件已读到末尾了,所以要跳到下一个文件开始读取
if (!goToNextLog()) {
return false;
}
// if we went to the next log file, we should call next() again
return next();
} catch (IOException e) {
inputStream.close();
throw e;
}
return true;
}
FileTxnLog除了提供对事务日志的读写之外,还提供了其它的一些额外方法,下面我们继续看些这些方法
getLogFiles:获取可能包含比目标事务ID大的日志文件的数组(服务器启动并恢复内存数据库的时候会调用这个方法进行内存数据库恢复)
/***
*
* @param logDirList 日志文件列表
* @param snapshotZxid 通过内存快照恢复的最大的事务id,剩余的比snapshotZxid就要从日志文件里恢复
* @return 找出比包含有<=snapshotZxid的事务id的日志文件列表,当snapshotZxid=0时,获取所有的文件
*/
public static File[] getLogFiles(File[] logDirList,long snapshotZxid) {
//对日志文件进行排序,按照事务ID从高到低
List<File> files = Util.sortDataDir(logDirList, LOG_FILE_PREFIX, true);
long logZxid = 0;
// Find the log file that starts before or at the same time as the
// zxid of the snapshot
for (File f : files) {
long fzxid = Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX);
//如果文件名的事务id:fzxid>快照的最新的zxid
if (fzxid > snapshotZxid) {
continue;
}
//如果fzxid <= snapshotZxid && fzxid > logZxid
if (fzxid > logZxid) {
logZxid = fzxid;
}
}
List<File> v=new ArrayList<File>(5);
for (File f : files) {
long fzxid = Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX);
if (fzxid < logZxid) {//找出文件id大于logZxid的文件名
continue;
}
v.add(f);
}
return v.toArray(new File[0]);
}
getLastLoggedZxid获取最新的事务ID
//该方法只在节点服务器启动的时候被调用
public long getLastLoggedZxid() {//从日志文件中获取最大的Zxid
//找出所有的日志文件并排序(其实可以排序后拿第一个就好了啊?)
File[] files = getLogFiles(logDir.listFiles(), 0);
//排序日志文件,并从日志文件名称上获取包含最大zxid的日志文件的文件名中的日志id
long maxLog=files.length>0?
Util.getZxidFromName(files[files.length-1].getName(),LOG_FILE_PREFIX):-1;
// 在最新的日志文件里迭代查找最新的事务ID
long zxid = maxLog;
TxnIterator itr = null;
try {
FileTxnLog txn = new FileTxnLog(logDir);
//根据文件名的事务id遍历迭代该日志文件,获取整个内存数据库的最大事务id,
itr = txn.read(maxLog);
while (true) {
if(!itr.next())
break;
TxnHeader hdr = itr.getHeader();
zxid = hdr.getZxid();
}
} catch (IOException e) {
LOG.warn("Unexpected exception", e);
} finally {
close(itr);
}
return zxid;
}
commit方法,提交日志并刷至磁盘,force方法会把所有未写磁盘的数据都强制写入磁盘。 这是因为在操作系统中出于性能考虑回把数据放入缓冲区,所以不能保证数据在调用write写入文件通道后就及时写到磁盘上了,除非手动调用force方法。 force方法需要一个布尔参数,代表是否把meta data也一并强制写入。
/***
* 提交事务,确保日志刷新到磁盘中
* @throws IOException
*/
public synchronized void commit() throws IOException {
if (logStream != null) {//刷新
logStream.flush();
}
for (FileOutputStream log : streamsToFlush) {
log.flush();
if (forceSync) {//是否强制刷盘
long startSyncNS = System.nanoTime();
FileChannel channel = log.getChannel();
channel.force(false);
syncElapsedMS = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startSyncNS);
if (syncElapsedMS > fsyncWarningThresholdMS) {
LOG.warn("fsync-ing the write ahead log in "
+ Thread.currentThread().getName()
+ " took " + syncElapsedMS
+ "ms which will adversely effect operation latency. "
+ "File size is " + channel.size() + " bytes. "
+ "See the ZooKeeper troubleshooting guide");
}
}
}
while (streamsToFlush.size() > 1) {
streamsToFlush.removeFirst().close();
}
}
truncate:截断删除比zxid大的事务
runcate清空大于给定的zxid事务日志,集群版learner向leader同步的时候,leader告诉learner需要回滚同步调用Learner#syncWithLeader
public boolean truncate(long zxid) throws IOException {
FileTxnIterator itr = null;
try {
//找出大于zxid的事务迭代器
itr = new FileTxnIterator(this.logDir, zxid);
PositionInputStream input = itr.inputStream;
if(input == null) {
throw new IOException("No log files found to truncate! This could " +
"happen if you still have snapshots from an old setup or " +
"log files were deleted accidentally or dataLogDir was changed in zoo.cfg.");
}
long pos = input.getPosition();
RandomAccessFile raf=new RandomAccessFile(itr.logFile,"rw");
raf.setLength(pos);//把当前log后面的部分(比zxid更大的)截断
raf.close();
while(itr.goToNextLog()) {
if (!itr.logFile.delete()) {//把后面的log文件都删除
LOG.warn("Unable to truncate {}", itr.logFile);
}
}
} finally {
close(itr);
}
return true;
}
FileTxnIterator
在这里我们发现在根据zxid进行read的时候会返回一个FileTxnIterator,所以有必要介绍这个FileTxnIterator
public static class FileTxnIterator implements TxnLog.TxnIterator {
File logDir;//日志文件存放目录
//开始读取的起始zxid
long zxid;//迭代器的开始zxid,也就是这个迭代器主要是用来存放比我们要查找的zxid大的事务
TxnHeader hdr;//事务头
Record record;
File logFile;//当前流指向的文件
InputArchive ia;
static final String CRC_ERROR="CRC check failed";
PositionInputStream inputStream=null;
//存放包含比我们需要查找的zxid大的事务id的日志文件列表
private ArrayList<File> storedFiles;
}
构造函数
public FileTxnIterator(File logDir, long zxid, boolean fastForward)
throws IOException {
this.logDir = logDir;
this.zxid = zxid;
//过滤出所有需要读的日志文件,并利用goToNextLog()方法打开第一个日志日志文件的输入流
init();
if (fastForward && hdr != null) {
while (hdr.getZxid() < zxid) {
if (!next())
break;
}
}
}
init方法中过滤出所有需要读的日志文件
void init() throws IOException {
//storedFiles按照事务id从大到小排序
storedFiles = new ArrayList<File>();
//排序日志文件
List<File> files = Util.sortDataDir(FileTxnLog.getLogFiles(logDir.listFiles(), 0), LOG_FILE_PREFIX, false);
for (File f: files) {//迭代日志文件并找出可能存在事务id大于zxid的日志文件
if (Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX) >= zxid) {
storedFiles.add(f);
}
// 当执行到这步,说明后面的日志都比给定的zxid小,就没必要继续遍历,直接break
else if (Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX) < zxid) {
storedFiles.add(f);
break;
}
}
//获打开第一个日志日志文件的输入流,也就是zxid最小的
goToNextLog();
//next方法用来从日志文件中读取一条记录,校验并反序列化出来,
//读取成功返回true,如果读到了文件末尾则调goToNextLog()读下一个文件,以此递归直到最后
next();
}
goToNextLog
//打开第一个日志文件输入流
private boolean goToNextLog() throws IOException {
if (storedFiles.size() > 0) {
this.logFile = storedFiles.remove(storedFiles.size()-1);
ia = createInputArchive(this.logFile);
return true;
}
return false;
}
next
next方法用来从日志文件中读取一条记录,校验并反序列化出来,读取成功返回true,如果读到了文件末尾调goToNextLog()读下一个文件,以此递归直到最后
//读取下一个事务,并检查事务的万完整性,包括事务头信息
public boolean next() throws IOException {
if (ia == null) {
return false;
}
try {
long crcValue = ia.readLong("crcvalue");
byte[] bytes = Util.readTxnBytes(ia);
// Since we preallocate, we define EOF to be an
if (bytes == null || bytes.length==0) {
throw new EOFException("Failed to read " + logFile);
}
// 检查文件是否被破坏
Checksum crc = makeChecksumAlgorithm();
crc.update(bytes, 0, bytes.length);
if (crcValue != crc.getValue())
throw new IOException(CRC_ERROR);
hdr = new TxnHeader();
//反序列事务信息
record = SerializeUtils.deserializeTxn(bytes, hdr);
} catch (EOFException e) {
LOG.debug("EOF exception " + e);
inputStream.close();
inputStream = null;
ia = null;
hdr = null;
// 执行到这边意味着文件已经读到末尾了,就要把留指向下一个文件
if (!goToNextLog()) {
return false;
}
// if we went to the next log file, we should call next() again
return next();
} catch (IOException e) {
inputStream.close();
throw e;
}
return true;
}
事务日志小结:
事务日志频繁 flush 到磁盘,消耗大量磁盘 IO
磁盘空间预分配:事务日志剩余空间 < 4KB 时,将文件大小增加 64 MB
磁盘预分配的目标:减少磁盘 seek 次数
建议:事务日志,采用独立磁盘单独存放
事务序列化:本质是生成一个字节数组
包含:事务头、事务体的序列化
事务体:会话创建事务、节点创建事务、节点删除事务、节点数据更新事务
数据相关过程
ZK 服务器启动时,首先会进行数据初始化,将磁盘中数据,加载到内存中,恢复现场。
数据同步
ZK 集群服务器启动之后,会进行 2 个动作:
- 选举 Leader:分配角色
- Learner 向 Leader 服务器注册:数据同步
数据同步,本质:将没有在 Learner 上执行的事务,同步给 Learner。
集群启动后,什么时候能够对外提供服务?需要等所有 Learner 都完成数据同步吗?
过半策略:只需要半数 Learner 完成数据同步,Learder 向所有已经完成数据同步的 Learner 发送 UPTODATE 命令,表示集群具备了对外服务能力
FREQ
问题1、我们知道zookeeper每次生成的事务日志都带有当前文件的第一条事务的zxid,这有什么好处呢?
(1)它可以帮助我们快速的定位某一个事务操作所在的日志文件。
(2)我们知道,事务的zxid中高32位包含了epoch,这个是leader所属的周期,因此这样我们可以通过日志文件名就清楚的知道,当前运行时的zookeeper所属的leader周期。
问题2、在前面,我们知道,每次append写入事务的时,我们都会检测事务文件日志当前剩余的空间是否大于4kb,如果小于4kb,则会在现有的文件基础上加上64MB,然后使用0来填充?那么为什么要使用这种预分配的形式呢?
我们都知道,对于客户端每次的事务提交,都要将事务写入到事务日志中,所以事务日志写入的性能决定了zookeeper对客户端的请求的响应。也就是说,事务每次的请求可以看作是一次对底层磁盘的IO操作。严格的讲,文件的不断追加写入操作会触发底层磁盘IO为文件不断的开辟新的磁盘块,即磁盘seek。因此为了减少seek的频率,从而提高zookeeper的IO响应的时间,创建事务日志的时候都会进行文件的预分配--在文件处建之时,就会向操作系统预分配一块很大的磁盘块,默认是64mb,而一旦分配的磁盘块剩余的空间<4kb,则会再次分配,这样就可以避免随着每次事务的写入过程中导致日志文件的不断增长而需要不断的触发seek。事务预分配的大小,可以通过系统参数zookeeper.preAllocsize来设置。
问题3、事务日志文件是如何检查一个事务日志文件的完整性呢?
事务日志文件为了保证和检查其文件的完整性和数据的准确性。zookeeper在每次事务操作写入前,都会根据系列化的字节数组来计算checksum,这样当我们重新载入事务的时候,就可以检查这个事务文件的完整性了。zookeeper采用Adler32算法来计算checksum。
问题4、事务是什么时候刷盘的?
我们刚才讲过,事务每次刷盘都是一次IO操作,所以为了减少刷盘的次数,从而提高响应性能,zookeeper会将每次事务的请求写入都是先写到一个缓冲流中,而并非真正的刷盘到磁盘上去,那么在什么时候输盘到磁盘中呢?zookeeper服务器在启动的时候会单独启动一个requestProcessor线程来处理这个请求队列queuedRequests,如果队列里面有待处理的事务请求,则该线程将会取出队列事务并写入到事务日志文件中,这个时候的写入是先写入到一个缓冲流中,当requestProcessor统计写入缓冲流的事务超过1000或者队列已经没有事务了,则会开始将缓冲流中的数据刷到磁盘块中。至于刷盘的方式是可选择的,通过配置控制它是异步还是同步刷到磁盘中。
问题5、事务日志的截断方法什么请下会触发?
由于在zookeeper运行中,可能由于一些异常情况会导致learner的lastzxid比leader的还大,无论这种情况是怎么发生的,这都是一种不正常的现象。为了遵循一个集群中,只要存在leader,那么所有机器都必须与该leader的数据进行同步,所以leader会向learner触发truc方法,要求这个leaner对日志进行截断。