介绍
HDFS支持由Data Node管理的写入到堆栈内存的功能。Data Node会异步的将数据从内存持久化至磁盘,从而在性能敏感的IO Path中移去昂贵的磁盘IO和校验,因此我们称之为Lazy Persist。HDFS尽可能的保证在Lazy Persist策略下的持久性。在副本还未持久化至磁盘,节点重启了,则有可能会发生罕见的数据遗失。我们可以选择Lazy Persist Writes的策略来减少延迟,但可能会损失一定的持久性。
上文描述的原理在图一的表示其实是4,6的步骤.写数据的RAM,然后异步的写到Disk.前面几个步骤是如何设置StorageType的操作,这个在下文种会具体提到.所以上图所示的大体步骤可以归纳为如下:
- 对目标文件目录设置StoragePolicy为LAZY_PERSIST的内存存储策略.
- 客户端进程向NameNode发起创建/写文件的请求.
- 请求到具体的DataNode,DataNode会把这些数据块写入RAM内存中,同时启动异步线程服务将内存数据持久化到磁盘上.
内存的异步持久化存储,就是明显不同于其他介质存储数据的地方.这应该也是LAZY_PERSIST的名称的源由吧,数据不是马上落盘,而是”lazy persisit”懒惰的方式,延时的处理.
源码
在之前的篇幅中已经提到过,数据存储的同时会有另外一批数据会被异步的持久化,所以这里一定会涉及到多个服务对象的合作.这些服务对象的指挥者是FsDatasetImpl.他是一个掌管DataNode所有磁盘读写数据的管家.
在FsDatasetImpl中,与内存存储相关的服务对象有如下的3个.
LazyWriter:lazyWriter是一个线程服务,此线程会不断的循环着从数据块列表中取出数据块,加入到异步持久化线程池RamDiskAsyncLazyPersistService中去执行.
RamDiskAsyncLazyPersistService:此对象就是异步持久化线程服务,里面针对每一个磁盘块设置一个对应的线程池,然后需要持久化到给定的磁盘块的数据块会被提交到对应的线程池中去.每个线程池的最大线程数为1.
RamDiskReplicaLruTracker:副本块跟踪类,此类种维护了所有已持久化,未持久化的副本以及总副本数据信息.所以当一个副本被最终存储到内存种后,相应的会有副本所属队列信息的变更.其次当节点内存不足的时候,部分距离最近最久没有被访问的副本块会在此类中被移除.
RamDiskReplicaLruTracker
在以上3者中,RamDiskReplicaLruTracker的角色起到了一个中间人的角色.因为他内部维护了多个关系的数据块信息.主要的就是以下3类.
public class RamDiskReplicaLruTracker extends RamDiskReplicaTracker {
...
/**
* <block pool ID <block ID, RamDiskRelicaLru>
* Map of blockpool ID to <map of blockID to ReplicaInfo>.
*/
Map<String, Map<Long, RamDiskReplicaLru>> replicaMaps;
/**
* 将会被写入到磁盘的副本队列
* Queue of replicas that need to be written to disk.
* Stale entries are GC'd by dequeueNextReplicaToPersist.
*/
Queue<RamDiskReplicaLru> replicasNotPersisted;
/**
* 已经被持久化的副本,按照上次使用的时间排序
* Map of persisted replicas ordered by their last use times.
*/
TreeMultimap<Long, RamDiskReplicaLru> replicasPersisted;
...
}
三种数据块的关系如图三所示,replicaMaps是<block pool ID <block ID, RamDiskRelicaLru>的Map(关于Block pool,在上篇HDFS Federation中有介绍),Ram副本都会在replicaMaps中存在。replicasNotPersisted保存的是将会写入到磁盘的队列。replicasPersisted保存的是已经被持久化的副本,并且按照上次使用的时间排序。
列出几个RamDiskReplicaLruTracker类中关键的方法
- addReplica 新添副本
@Override
synchronized void addReplica(final String bpid, final long blockId,
final FsVolumeImpl transientVolume) {
Map<Long, RamDiskReplicaLru> map = replicaMaps.get(bpid);
if (map == null) {
map = new HashMap<Long, RamDiskReplicaLru>();
replicaMaps.put(bpid, map);
}
RamDiskReplicaLru ramDiskReplicaLru = new RamDiskReplicaLru(bpid, blockId, transientVolume);
map.put(blockId, ramDiskReplicaLru);
//将ramDiskReplicaLru放入到replicasNotPersisted之中
replicasNotPersisted.add(ramDiskReplicaLru);
}
- dequeueNextReplicaToPersist 从replicasNotPersisted队列中拿出下一个将要被持久化的副本
@Override
//从replicasNotPersisted中拿到下个将会被持久化的block
synchronized RamDiskReplicaLru dequeueNextReplicaToPersist() {
while (replicasNotPersisted.size() != 0) {
RamDiskReplicaLru ramDiskReplicaLru = replicasNotPersisted.remove();
Map<Long, RamDiskReplicaLru> replicaMap =
replicaMaps.get(ramDiskReplicaLru.getBlockPoolId());
if (replicaMap != null && replicaMap.get(ramDiskReplicaLru.getBlockId()) != null) {
return ramDiskReplicaLru;
}
// The replica no longer exists, look for the next one.
}
return null;
}
- recordStartLazyPersist 记录开始Lazy Persist持久化操作
@Override
synchronized void recordStartLazyPersist(
final String bpid, final long blockId, FsVolumeImpl checkpointVolume) {
Map<Long, RamDiskReplicaLru> map = replicaMaps.get(bpid);
RamDiskReplicaLru ramDiskReplicaLru = map.get(blockId);
//设置被持久化的Volume
ramDiskReplicaLru.setLazyPersistVolume(checkpointVolume);
}
- recordEndLazyPersist 记录结束Lazy Persist持久化操作
@Override
synchronized void recordEndLazyPersist(
final String bpid, final long blockId, final File[] savedFiles) {
Map<Long, RamDiskReplicaLru> map = replicaMaps.get(bpid);
RamDiskReplicaLru ramDiskReplicaLru = map.get(blockId);
if (ramDiskReplicaLru == null) {
throw new IllegalStateException("Unknown replica bpid=" +
bpid + "; blockId=" + blockId);
}
ramDiskReplicaLru.recordSavedBlockFiles(savedFiles);
if (replicasNotPersisted.peek() == ramDiskReplicaLru) {
// Common case.
replicasNotPersisted.remove();
} else {
// Caller error? Fallback to O(n) removal.
replicasNotPersisted.remove(ramDiskReplicaLru);
}
ramDiskReplicaLru.lastUsedTime = Time.monotonicNow();
replicasPersisted.put(ramDiskReplicaLru.lastUsedTime, ramDiskReplicaLru);
ramDiskReplicaLru.isPersisted = true;
}
- reenqueueReplicaNotPersisted 持久化失败,则将该副本重新加入replicasNotPersisted队列
@Override
synchronized void reenqueueReplicaNotPersisted(final RamDiskReplica ramDiskReplicaLru) {
replicasNotPersisted.add((RamDiskReplicaLru) ramDiskReplicaLru);
}
而这些方法大概的执行顺序如图四所示
当节点重启或者有新的文件被设置了LAZY_PERSIST策略后,就会有新的副本块被存储到内存中,同时会加入到replicaNotPersisted队列中.然后经过中间的dequeueNextReplicaToPersist取出下一个将被持久化的副本块,进行写磁盘的操作.recordStartLazyPersist,recordEndLazyPersist这2个方法会在持久化的过程中被调用,标志着持久化状态的变更.
还有三个方法虽然与持久化无关,但是也比较重要
discardReplica:当此副本已经被检测出不需要的时候,包括已被删除,或已损坏的情况,可以从内存中移除,撤销.
touch:恰好与Linux种的touch同名,此方法意味访问了一次某特定的副本块,并会更新此副本块的lastUesdTime. lastUesdTime会在后面提到的LRU算法中起到关键的作用.
//touch会更新最近访问的时间
synchronized void touch(final String bpid,
final long blockId) {
Map<Long, RamDiskReplicaLru> map = replicaMaps.get(bpid);
RamDiskReplicaLru ramDiskReplicaLru = map.get(blockId);
...
// Reinsert the replica with its new timestamp.
// 更新最近访问时间戳,并重新插入数据
if (replicasPersisted.remove(ramDiskReplicaLru.lastUsedTime, ramDiskReplicaLru)) {
ramDiskReplicaLru.lastUsedTime = Time.monotonicNow();
replicasPersisted.put(ramDiskReplicaLru.lastUsedTime, ramDiskReplicaLru);
}
}
//第二步获取候选移除块
synchronized RamDiskReplicaLru getNextCandidateForEviction() {
// 获取replicasPersisted迭代器进行遍历
final Iterator<RamDiskReplicaLru> it = replicasPersisted.values().iterator();
while (it.hasNext()) {
// 因为replicasPersisted已经根据时间排好序了,所以取出当前的块进行移除即可
final RamDiskReplicaLru ramDiskReplicaLru = it.next();
it.remove();
Map<Long, RamDiskReplicaLru> replicaMap =
replicaMaps.get(ramDiskReplicaLru.getBlockPoolId());
if (replicaMap != null && replicaMap.get(ramDiskReplicaLru.getBlockId()) != null) {
return ramDiskReplicaLru;
}
// The replica no longer exists, look for the next one.
}
return null;
}
- getNextCandidateForEviction:此方法在DataNode内存空间不足,需要内存额外预留出空间给新的副本块存放时被调用.此方法会根据所设置的eviction scheme模式,选择需要被移除的块,默认的是LRU策略的.
/**
* Attempt to evict one or more transient block replicas until we
* have at least bytesNeeded bytes free.
*/
//根据已持久化的块的访问时间来进行筛选移除,而不是直接是内存中的块
public void evictBlocks(long bytesNeeded) throws IOException {
int iterations = 0;
final long cacheCapacity = cacheManager.getCacheCapacity();
// 当检测到内存空间不满足外界需要的大小时
while (iterations++ < MAX_BLOCK_EVICTIONS_PER_ITERATION &&
(cacheCapacity - cacheManager.getCacheUsed()) < bytesNeeded) {
// 获取待移除副本信息
RamDiskReplica replicaState = ramDiskReplicaTracker.getNextCandidateForEviction();
if (replicaState == null) {
break;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Evicting block " + replicaState);
}
...
// 移除内存中的相关块并释放空间
// Delete the block+meta files from RAM disk and release locked
// memory.
removeOldReplica(replicaInfo, newReplicaInfo, blockFile, metaFile,
blockFileUsed, metaFileUsed, bpid);
}
}
}
LazyWriter
LazyWriter是一个线程服务,他是一个发动机,循环不断的从队列中取出待持久化的数据块,提交到异步持久化服务中去.直接来看主要的run方法.
public void run() {
int numSuccessiveFailures = 0;
while (fsRunning && shouldRun) {
try {
// 取出新的副本块并提交到异步服务中,返回是否提交成功布尔值
numSuccessiveFailures = saveNextReplica() ? 0 : (numSuccessiveFailures + 1);
// Sleep if we have no more work to do or if it looks like we are not
// making any forward progress. This is to ensure that if all persist
// operations are failing we don't keep retrying them in a tight loop.
if (numSuccessiveFailures >= ramDiskReplicaTracker.numReplicasNotPersisted()) {
Thread.sleep(checkpointerInterval * 1000);
numSuccessiveFailures = 0;
}
} catch (InterruptedException e) {
LOG.info("LazyWriter was interrupted, exiting");
break;
} catch (Exception e) {
LOG.warn("Ignoring exception in LazyWriter:", e);
}
}
}
进入saveNextReplica方法的处理
private boolean saveNextReplica() {
RamDiskReplica block = null;
FsVolumeReference targetReference;
FsVolumeImpl targetVolume;
ReplicaInfo replicaInfo;
boolean succeeded = false;
try {
// 从队列种取出新的待持久化的块
block = ramDiskReplicaTracker.dequeueNextReplicaToPersist();
if (block != null) {
synchronized (FsDatasetImpl.this) {
...
// 提交到异步服务中去
asyncLazyPersistService.submitLazyPersistTask(
block.getBlockPoolId(), block.getBlockId(),
replicaInfo.getGenerationStamp(), block.getCreationTime(),
replicaInfo.getMetaFile(), replicaInfo.getBlockFile(),
targetReference);
}
}
}
succeeded = true;
} catch(IOException ioe) {
LOG.warn("Exception saving replica " + block, ioe);
} finally {
if (!succeeded && block != null) {
LOG.warn("Failed to save replica " + block + ". re-enqueueing it.");
onFailLazyPersist(block.getBlockPoolId(), block.getBlockId());
}
}
return succeeded;
}
所以LazyWriter线程服务的流程图可以归纳为如图五所示:
RamDiskAsyncLazyPersistService
RamDiskAsyncLazyPersistService主要围绕着Volume磁盘和Executor线程池这2部分的内容.秉持着下面一个原则
一个磁盘服务对应一个线程池,并且一个线程池的最大线程数也只有1个.
线程池列表定义如下
class RamDiskAsyncLazyPersistService {
...
private Map<File, ThreadPoolExecutor> executors
= new HashMap<File, ThreadPoolExecutor>();
...
当服务启动的时候,就会有新的磁盘目录加入.
synchronized void addVolume(File volume) {
if (executors == null) {
throw new RuntimeException("AsyncLazyPersistService is already shutdown");
}
ThreadPoolExecutor executor = executors.get(volume);
// 如果当前已存在此磁盘目录对应的线程池,则跑异常
if (executor != null) {
throw new RuntimeException("Volume " + volume + " is already existed.");
}
// 否则进行添加
addExecutorForVolume(volume);
}
进入addExecutorForVolume方法
private void addExecutorForVolume(final File volume) {
...
// 新建线程池,最大线程执行数为
ThreadPoolExecutor executor = new ThreadPoolExecutor(
CORE_THREADS_PER_VOLUME, MAXIMUM_THREADS_PER_VOLUME,
THREADS_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(), threadFactory);
// This can reduce the number of running threads
executor.allowCoreThreadTimeOut(true);
// 加入到executors中,以为volume作为key
executors.put(volume, executor);
}
还有一个需要注意的是提交执行方法submitLazyPersistTask.
void submitLazyPersistTask(String bpId, long blockId,
long genStamp, long creationTime,
File metaFile, File blockFile,
FsVolumeReference target) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("LazyWriter schedule async task to persist RamDisk block pool id: "
+ bpId + " block id: " + blockId);
}
// 获取需要持久化到目标磁盘实例
FsVolumeImpl volume = (FsVolumeImpl)target.getVolume();
File lazyPersistDir = volume.getLazyPersistDir(bpId);
if (!lazyPersistDir.exists() && !lazyPersistDir.mkdirs()) {
FsDatasetImpl.LOG.warn("LazyWriter failed to create " + lazyPersistDir);
throw new IOException("LazyWriter fail to find or create lazy persist dir: "
+ lazyPersistDir.toString());
}
// 新建此服务Task
ReplicaLazyPersistTask lazyPersistTask = new ReplicaLazyPersistTask(
bpId, blockId, genStamp, creationTime, blockFile, metaFile,
target, lazyPersistDir);
// 提交到对应volume的线程池中执行
execute(volume.getCurrentDir(), lazyPersistTask);
}
如果在上述执行的过程中发生失败,会调用失败处理的方法,并会重新将此副本块插入到replicateNotPersisted队列等待下一次的持久化.
public void onFailLazyPersist(String bpId, long blockId) {
RamDiskReplica block = null;
block = ramDiskReplicaTracker.getReplica(bpId, blockId);
if (block != null) {
LOG.warn("Failed to save replica " + block + ". re-enqueueing it.");
// 重新插入队列操作
ramDiskReplicaTracker.reenqueueReplicaNotPersisted(block);
}
}
RamDiskAsyncLazyPersistService总的结构图图6所示。
配置
安装RAM磁盘
使用Unix mount命令安装RAM磁盘分区。 例如。 在/ mnt / dn-tmpfs /下安装32 GB tmpfs分区
sudo mount -t tmpfs -o size = 32g tmpfs / mnt / dn-tmpfs /
建议您在/ etc / fstab中创建一个条目,以便在节点重新启动时自动重新创建RAM磁盘。 另一个选项是使用/ dev / shm下的子目录,它是默认情况下在大多数Linux发行版上可用的tmpfs安装。 确保安装的大小大于或等于您的dfs.datanode.max.locked.memory设置,否则在/ etc / fstab中重写它。 不建议对每个数据节点使用多个tmpfs分区进行Lazy Persist写入。
使用RAM_DISK存储类型标记tmpfs卷
通过hdfs-site.xml中的dfs.datanode.data.dir配置设置将tmpfs目录标记为RAM_DISK存储类型。 例如。 在具有三个硬盘卷/ grid / 0,/ grid / 1和/ grid / 2和tmpfs mount / mnt / dn-tmpfs的数据节点上,必须按如下所示设置dfs.datanode.data.dir:
<property>
<name>dfs.datanode.data.dir</name>
<value>/grid/0,/grid/1,/grid/2,[RAM_DISK]/mnt/dn-tmpfs</value>
</property>
这一步是至关重要的。 如果没有RAM_DISK标记,HDFS会将tmpfs卷视为非易失性存储,并且数据不会保存到永久存储。 您将在节点重新启动时丢失数据。
启用存储策略
使用上面提到的LAZY_PERSIST,而不是使用默认的StoragePolicy.DEFAULT,默认策略的存储介质是DISK类型的.设置存储策略的方法目前有2种:
- 第一种,通过命令行的方式,调用如下命令
hdfs storagepolicies -setStoragePolicy -path <path> -policy LAZY_PERSIST
- 第二种,通过调用对应的程序方法,比如调用暴露到外部的create文件方法,但是得带上参数CreateFlag.LAZY_PERSIST.例子如下:
FSDataOutputStream fos =
fs.create(
path,
FsPermission.getFileDefault(),
EnumSet.of(CreateFlag.CREATE, CreateFlag.LAZY_PERSIST),
bufferLength,
replicationFactor,
blockSize,
null);
总结
Lazy Persist策略可以损失一些持久化的保证,以减少延迟,达到懒持久的目的。