write以put为例
客户端流程解析
(1)用户提交put请求后,HBase客户端会将put请求添加到本地buffer中,符合一定条件就会通过AsyncProcess异步批量提交。HBase默认设置autoflush=true,表示put请求直接会提交给服务器进行处理;用户可以设置autoflush=false,这样的话put请求会首先放到本地buffer,等到本地buffer大小超过一定阈值(默认为2M,可以通过配置文件配置)之后才会提交。很显然,后者采用group commit机制提交请求,可以极大地提升写入性能,但是因为没有保护机制,如果客户端崩溃的话会导致提交的请求丢失。
// 先将put加入到buffer中
@Override
public void put(final Put put) throws InterruptedIOException,
RetriesExhaustedWithDetailsException {
getBufferedMutator().mutate(put);
if (autoFlush) {
flushCommits();
}
}
private void doMutate(Mutation m) throws InterruptedIOException,
RetriesExhaustedWithDetailsException {
if (closed) {
throw new IllegalStateException("Cannot put when the BufferedMutator is closed.");
}
if (!(m instanceof Put) && !(m instanceof Delete)) {
throw new IllegalArgumentException("Pass a Delete or a Put");
}
// This behavior is highly non-intuitive... it does not protect us against
// 94-incompatible behavior, which is a timing issue because hasError, the below code
// and setter of hasError are not synchronized. Perhaps it should be removed.
if (ap.hasError()) {
writeAsyncBuffer.add(m);
backgroundFlushCommits(true);
}
if (m instanceof Put) {
validatePut((Put) m);
}
currentWriteBufferSize += m.heapSize();
writeAsyncBuffer.add(m);
// size没有超过阈值异步flush,否则同步flush
while (currentWriteBufferSize > writeBufferSize) {
backgroundFlushCommits(false);
}
}
(2)在提交之前,HBase会在元数据表.meta.中根据rowkey找到它们归属的region server,这个定位的过程是通过HConnection的locateRegion方法获得的。如果是批量请求的话还会把这些rowkey按照HRegionLocation分组,每个分组可以对应一次RPC请求。
(3)HBase会为每个HRegionLocation构造一个远程RPC请求MultiServerCallable<Row>,然后通过rpcCallerFactory.<MultiResponse> newCaller()执行调用,忽略掉失败重新提交和错误处理,客户端的提交操作到此结束
服务器端流程解析
先简单介绍一下服务端HRegion中的各种lock:
(1)scannerReadPointsLock:increment操作的时候需要拿到所有scanner中最小的readPoint以保证原子性,这个lock用来控制控制 new RegionScannerImpl和 getSmallestReadPoint的并发
(2)lock:确保正常关闭
(3)updatesLock:update公共资源如mvcc/memstore/wal加锁
(4)RowLockContext:行锁put操作时需要对一行加锁
在HRegion中有许多replay的判断,replay是在rs failover和log split时重新将wal中数据持久化到rs时的动作,由于很多操作都是在写wal前已经完成了,所以这里不需要重复处理
主要操作在HRegion.doMiniBatchMutation这个函数中,以源码步骤为例:
(1)首先获取行锁
// 这里加的是读锁,因为mvcc保证了consistence所以这里没必要加写锁,数据一致性由mvcc保证即可,在increment/append/batch中由于需要保证原子性需要加写锁
rowLock = getRowLock(mutation.getRow(), true);
(2)更新cell时间戳,如果cell中设置了时间则不需要更新,否则更新为当前时间
public static boolean updateLatestStamp(Cell cell, byte[] ts, int tsOffset) throws IOException {
if (cell.getTimestamp() == HConstants.LATEST_TIMESTAMP) {
setTimestamp(cell, ts, tsOffset);
return true;
}
return false;
}
(3)将变更wal写入buffer中
// 加了一把读锁
walEdit = new WALEdit(cellCount, isInReplay);
lock(this.updatesLock.readLock(), numReadyToWrite);
.......
// 写入到buffer中
addFamilyMapToWALEdit(familyMaps[i], walEdit);
(4)将最后一个改到写入到wal buffer中,但是没有持久化
// 生产的一个写入wal的key包含sequenceID和mvcc WriteEntry
walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc, this.getReplicationScope());
// 重要txid是一个transactionID用于后续wal 持久化,hdfs写入模型单独开另一章节再介绍
txid =
this.wal.append(this.getRegionInfo(), walKey, walEdit, true);
(5)将数据写入到memstore中,由于wal没有sync写入过程没有complete,由mvcc保证数据不可见
applyFamilyMapToMemstore(familyMaps[i], memstoreSize);
(6)释放行锁和更新锁
if (locked) {
this.updatesLock.readLock().unlock();
locked = false;
}
releaseRowLocks(acquiredRowLocks);
(7)sync wal
(8)更新mvcc版本号
mvcc.completeMemstoreInsert(writeEntry);
(9)后续一些coprocessor工作
put完之后,会根据条件讲一个flush request放入到taskqueue中,有一个单独线程执行request,最终调用HRegion.flush这个函数
flush:
如果内存超过一定阈值或者需要compact时,我们将内存中数据flush到磁盘上面,在flush的时候也用到了mvcc,这里是因为memstore中的数据有可能没有写到wal上面,我们插入一个事务保证flush时所有前面的事务已经完成了,保证我们flush到磁盘上面的数据都已经完全写入到wal上面
// flush 分两个阶段准备阶段和commit阶段
protected FlushResult internalFlushcache(final WAL wal, final long myseqid,
final Collection<Store> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker)
throws IOException {
PrepareFlushResult result
= internalPrepareFlushCache(wal, myseqid, storesToFlush, status, writeFlushWalMarker);
if (result.result == null) {
return internalFlushCacheAndCommit(wal, status, result, storesToFlush);
} else {
return result.result; // early exit due to failure from prepare stage
}
}
//首先我们看看memestore的结构,memestore包含两个segement用于写入的mutableSegement和用于flush的immutableSegement,每次flush是先将mutable变为immutableSegement将这个segement用于flush,新生成一个mutableSegement用于新来的写入
public abstract class AbstractMemStore implements MemStore {
// active segment absorbs write operations
protected volatile MutableSegment active;
// Snapshot of memstore. Made for flusher.
protected volatile ImmutableSegment snapshot;
......................
}
// 每一个cf对应一个Hstore,每一个Hstore对应一个StoreFlushContext,prepare时调用StoreFlushContext.prepare()
@Override
public void prepare() {
// passing the current sequence number of the wal - to allow bookkeeping in the memstore
this.snapshot = memstore.snapshot();
this.cacheFlushCount = snapshot.getCellsCount();
this.cacheFlushSize = snapshot.getDataSize();
committedFiles = new ArrayList<Path>(1);
}
//memstore做snapshot时具体源码如下直接将现有数据变为immutableSegement然后返回:
@Override
public MemStoreSnapshot snapshot() {
// If snapshot currently has entries, then flusher failed or didn't call
// cleanup. Log a warning.
if (!this.snapshot.isEmpty()) {
LOG.warn("Snapshot called again without clearing previous. " +
"Doing nothing. Another ongoing flush or did we fail last attempt?");
} else {
this.snapshotId = EnvironmentEdgeManager.currentTime();
if (!this.active.isEmpty()) {
ImmutableSegment immutableSegment = SegmentFactory.instance().
createImmutableSegment(this.active);
this.snapshot = immutableSegment;
resetActive();
}
}
return new MemStoreSnapshot(this.snapshotId, this.snapshot);
}
// prepare阶段对updatesLock.writeLock()进行操作所以此时是禁止写入的
(1)prepare阶段:遍历当前Region中的所有Memstore,将Memstore中当前数据集kvset做一个快照snapshot,然后再新建一个新的kvset。后期的所有写入操作都会写入新的kvset中,而整个flush阶段读操作会首先分别遍历kvset和snapshot,如果查找不到再会到HFile中查找。prepare阶段需要加一把updateLock对写请求阻塞,结束之后会释放该锁。因为此阶段没有任何费时操作,因此持锁时间很短。
// 循环调用HStore flush
for (StoreFlushContext flush : storeFlushCtxs.values()) {
flush.flushCache(status);
}
// 具体实现类有两个DefaultStoreFlusher和StripeStoreFlusher,以DefaultStoreFlusher为例
@Override
public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
MonitoredTask status, ThroughputController throughputController) throws IOException {
ArrayList<Path> result = new ArrayList<Path>();
int cellsCount = snapshot.getCellsCount();
if (cellsCount == 0) return result; // don't flush if there are no entries
// Use a store scanner to find which rows to flush.
// 这里使用的是当前所有scanner中最小的readPoint,这样保证flush到磁盘上的文件一定可读的,flush思路就是从snapshot中scan出kv写到临时 hfile中
long smallestReadPoint = store.getSmallestReadPoint();
InternalScanner scanner = createScanner(snapshot.getScanner(), smallestReadPoint);
if (scanner == null) {
return result; // NULL scanner returned from coprocessor hooks means skip normal processing
}
StoreFile.Writer writer;
try {
// TODO: We can fail in the below block before we complete adding this flush to
// list of store files. Add cleanup of anything put on filesystem if we fail.
synchronized (flushLock) {
status.setStatus("Flushing " + store + ": creating writer");
// Write the map out to the disk
writer = store.createWriterInTmp(cellsCount, store.getFamily().getCompression(),
/* isCompaction = */ false,
/* includeMVCCReadpoint = */ true,
/* includesTags = */ snapshot.isTagsPresent(),
/* shouldDropBehind = */ false,
snapshot.getTimeRangeTracker());
IOException e = null;
try {
performFlush(scanner, writer, smallestReadPoint, throughputController);
} catch (IOException ioe) {
e = ioe;
// throw the exception out
throw ioe;
} finally {
if (e != null) {
writer.close();
} else {
finalizeWriter(writer, cacheFlushId, status);
}
}
}
} finally {
scanner.close();
}
LOG.info("Flushed, sequenceid=" + cacheFlushId +", memsize="
+ StringUtils.humanReadableInt(snapshot.getDataSize()) +
", hasBloomFilter=" + writer.hasGeneralBloom() +
", into tmp file " + writer.getPath());
result.add(writer.getPath());
return result;
}
(2)flush阶段:遍历所有Memstore,将prepare阶段生成的snapshot持久化为临时文件,临时文件会统一放到目录.tmp下。这个过程因为涉及到磁盘IO操作,因此相对比较耗时。
/*
* Change storeFiles adding into place the Reader produced by this new flush.
* @param sfs Store files
* @param snapshotId
* @throws IOException
* @return Whether compaction is required.
*/
private boolean updateStorefiles(final List<StoreFile> sfs, final long snapshotId)
throws IOException {
this.lock.writeLock().lock();
try {
this.storeEngine.getStoreFileManager().insertNewFiles(sfs);
if (snapshotId > 0) {
this.memstore.clearSnapshot(snapshotId);
}
} finally {
// We need the lock, as long as we are updating the storeFiles
// or changing the memstore. Let us release it before calling
// notifyChangeReadersObservers. See HBASE-4485 for a possible
// deadlock scenario that could have happened if continue to hold
// the lock.
this.lock.writeLock().unlock();
}
// Tell listeners of the change in readers.
notifyChangedReadersObservers();
if (LOG.isTraceEnabled()) {
long totalSize = 0;
for (StoreFile sf : sfs) {
totalSize += sf.getReader().length();
}
String traceMessage = "FLUSH time,count,size,store size,store files ["
+ EnvironmentEdgeManager.currentTime() + "," + sfs.size() + "," + totalSize
+ "," + storeSize + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]";
LOG.trace(traceMessage);
}
// move 完文件后判断一下需不需compact
return needsCompaction();
}
(3)commit阶段:遍历所有的Memstore,将flush阶段生成的临时文件移到指定的ColumnFamily目录下,针对HFile生成对应的storefile和Reader,把storefile添加到HStore的storefiles列表中,最后再清空prepare阶段生成的snapshot。
read以scan为例
protected RegionScanner instantiateRegionScanner(Scan scan,
List<KeyValueScanner> additionalScanners) throws IOException {
// scan逆序的
if (scan.isReversed()) {
if (scan.getFilter() != null) {
scan.getFilter().setReversed(true);
}
// 支持逆序scan,以下以正序scan为基础
return new ReversedRegionScannerImpl(scan, additionalScanners, this);
}
return new RegionScannerImpl(scan, additionalScanners, this);
}
// 构造RegionScanScannerImpl中,一个region有多个store,针对每一个store我们有一个对应的KeyValueScanner
try {
for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) {
Store store = stores.get(entry.getKey());
KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt);
instantiatedScanners.add(scanner);
// 1.没有filter 2.??? 3.filter对该cf永远返回true
if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
|| this.filter.isFamilyEssential(entry.getKey())) {
scanners.add(scanner);
} else {
joinedScanners.add(scanner);
}
}
......
// 返回下一行
@Override
public boolean nextRaw(List<Cell> outResults, ScannerContext scannerContext)
throws IOException {
if (storeHeap == null) {
// scanner is closed
throw new UnknownScannerException("Scanner was closed");
}
boolean moreValues = false;
if (outResults.isEmpty()) {
// Usually outResults is empty. This is true when next is called
// to handle scan or get operation.
moreValues = nextInternal(outResults, scannerContext);
} else {
List<Cell> tmpList = new ArrayList<Cell>();
moreValues = nextInternal(tmpList, scannerContext);
outResults.addAll(tmpList);
}
// If the size limit was reached it means a partial Result is being
// returned. Returning a
// partial Result means that we should not reset the filters; filters
// should only be reset in
// between rows
if (!scannerContext.midRowResultFormed())
resetFilters();
if (isFilterDoneInternal()) {
moreValues = false;
}
return moreValues;
}