数据流
读取文件数据的剖析
为了知道客户端与HDFS,NameNode,DataNode交互过程中数据的流向,请看图3-2,这张图显示了读取文件过程中主要的事件顺序。客户端通过调用FileSystem对象的open()方法打开一个希望从中读取数据的文件,对于HDFS来说,FileSystem是一个DistributedFileSystem的实例对象(图3-2 步骤1)。DistributedFileSystem远程调用名称节点(NameNode)得到文件开头几个块的位置。对于每一个块,名称节点返回包含这个块复本的所有数据节点(DataNode)的地址。进一步,这些数据节点会根据集群的网络拓扑结构按照距离客户端的远近进行排序。如果客户端本身是一个数据节点(例如一个MapReduce任务),而这个数据节点包含要读取的块的复本,则客户端会直接从本地读取。
DistributedFileSystem返回一个FSDataInputStream对象给客户端,用于从文件中读取数据。FSDataInputStream是一个输入流,支持文件寻位(seek)。FSDataInputStream里包装了一个DFSInputStream类,这个类支持数据节点和名称节点的I/O操作。
客户端调用read()方法从流中读取数据。DFSInputStream存储了文件中开头几个块所在的数据节点的地址。首先连接第一个块所在的最近的数据节点,数据从数据节点被读取到客户端,然后不断地从这个流中读取(步骤4)直接这个块数据被读完,然后DFSInputStream将会关闭到这个数据节点的连接,寻找下一个块所在的最近的数据节点(步骤5)。这一系列操作对客户端来说是透明的,它不用管。从客户端的角度来看,它仅仅是在读取一个连续的数据流。
块按顺序依次被读取。当客户端从数据流中读数的时候,DFSInputStream依次建立和关闭和数据节点的连接。如果需要,DistributedFileSystem将再次调用名称节点得到下一批块所有数据节点的位置。当客户端完成了所有数据的读取,它会调用FSDataInputStream的close()方法关闭流(步骤6)。
在读取的过程中,如果DFSInputStream在与数据节点交互的过程中出现了错误,它将会尝试当前块所在的最近的下一个数据节点。它也会记住那些交互失败的数据节点以便读取其它块时不再在这些失败的数据节点中读取。DFSInputStream也会校验从数据节点传过来的数据,如果块中数据损坏了,它将尝试从另一个包含这个块复本的数据节点中读取。它也会向名称节点报告这个损坏的块。
这样设计一个重要的方面是客户端直接与数据节点交互,并通过名称节点的引导,找到每一个块所在的最好的数据节点。这样设计可以让HDFS响应大量同时并发请求的客户端。因为数据分布在集群中所有的数据节点中。而且,名称节点仅仅需要响应获取块所有位置的请求(这个位置信息存储在内存中,所以非常高效)而不需要响应获取文件数据的请求。如果名称节点还响应读取文件数据的请求,那么随着客户端数据增多,很快会出现瓶颈。
Hadoop网络拓扑结构
本地网络的两个节点对彼此"关闭"是什么意思呢?在大批量数据处理环境中,限制速度的因素是节点之前传输的速率,带宽几乎对速度没有一点贡献,所以可以用节点间的带宽做为衡量节点间距离的尺码。但在实践中并不直接去测试两个节点间的带宽,因为这很困难。Hadoop采取了一个简单的途径,网络以树的形式表示,两个节点的距离等于各自距离他们共同上层节点的距离之和。树中的层级并不是预先设定好的,通常层级中有数据中心,机架(Rack)和正在运行进程的节点。下面场景中带宽依次递减:图3-3更加形象显示了上面示例:
- 相同节点上的处理
- 同一机架不同节点上的处理
- 同一数据中心不同机架中节点上的处理
- 不同数据中心中节点上的处理
例如:节点n1,在机架r1上,机架在数据中心d1上。用/d1/r1/n1,以这为列,来看看下面四个场景中节点间距离:- distance(/d1/r1/n1,/d1/r1/n1)=0(相同节点上的处理)
- distance(/d1/r1/n1,/d1/r1/n2)=2(相同机架上不同节点)
- distance(/d1/r1/n1,/d1/r2/n3)=4(相同数据中心不同节点)
- distance(/d1/r1/n1,/d2/r3/n4)=6(不同数据中心节点)
最后,你要知道hadoop并不知道你的网络拓扑图,需要你进行配置。然而,默认的情况下,hadoop会假设所有节点在同一数据中心中一机架上。对于小型集群,确实是这种情况,这样的话,就不需要进行额外的配置。
写入数据到文件的剖析
下一步,我们将看看数据怎么写入到HDFS中的。虽然这是很细节的东西,但它有助于理解HDFS模型如何保证数据一致。
我们考虑这一种情况,在HDFS中创建一个新文件,写入数据,然后关闭文件。如图3-4所示:客户端通过调用DistributedFileSystem类的create()方法创建文件(图3-4步骤1)。DistributedFileSystem远程调用名称节点在文件系统的名称空间中创建一个新文件,没有块与这个新文件关联(步骤2)。名称节点做各种各样的检查确保文件之前没有被创建过,而且客户端有权限创建这个文件。如果检查通过,名称节点将会记录这个新文件,否则将创建失败,抛给客户端一个IOException异常。如果成功创建,则DistributedFileSystem返回一个FSDataOutputStream对象给客户端,以便客户端写入数据。正如读数据那样,FSDataOutputStream封闭了DFSOutputStream类,用此类来与数据节点与名称节点交互。
当客户端写数据的时候(步骤3),DFSOutputStream首先将数据拆分成多个包,写入"数据队列"中。然后,DataStreamer过来消费这个数据队列,它会向名称节点请求一些合适的新块用于存储复本数据。名称节点会返回包含这些新块的数据节点列表。这些数据节点形成了一个通道,这里,我们假设复制级别是3,所以在这个通道中有三个节点。DataStreamer首先向这个通道中第一个数据节点写入之前被拆分的包数据。第一个数据节点写完后,会前进到第二个数据节点,第二个数据节点存储包数据后继续前进到第三个也是最后一个数据节点(步骤4)。
DFSOutStream也会在内部维护一个"包队列"。只有当某一个包被所有节点存储后,这个包才会从包队列中删除(步骤5)。
如果在数据写入过程中,任何一个数据节点写入失败了,那么将么执行如下操作(这些操作对客户端来说是透明的)。首先,通道关闭,包队列中的所有包都将会放到数据队列前面。这样,失败数据节点的下游数据节点不会错过任何一个包。在好的数据节点上的当前块被给予一个新的身份标识,将它传送给名称节点,以便以后当失败的数据节点恢复后,它上面已经保存的部分块数据将会被删除。失败的数据节点从通道中移除,再基于剩下两个好的数据节点建立一个新通道。数据块中剩余的数据写到管道中剩下好的数据节点中。名称节点知道这个块还需要复制,所以它会把它复制到另外一个节点中.余下的块照常处理。
虽然不太可能,但在写入数据的时候仍有可能几个数据节点同时失败,只要dfs.namenode.replication.min复本数(默认是1)有值,就会写入成功。块将会在集群中异步复制直到达到设定的复本复制数(dfs.replication默认是3)。
当客户端写入数据完成后,将会调用close()方法关闭流(步骤6)。这个方法将会清除数据节点通道中剩下的包,并等待所有包数据写入完成,然后通知名称节点,整个文件已经写入完成(步骤7)。名称节点知道这个文件由哪些块组成(因为DataStreamer是向名称节点请求得到块的位置的),所以它仅需要等待块完成了最小复制就可以成功返回了。
复本存储
名称节点是怎么知道选择哪些数据节点存储复本呢?这是在综合权衡了可靠性,写入数据带宽和读取数据带宽之后得到的结果。例如:如果将所有复本放在一个节点上将会造成最小的写入带宽(因为复制通道运行在一个节点上),而且,这不是真正的冗余,因为如果这个节点损坏了,块数据就会丢失。但是读数据的带宽会很高。另一种极端的情况,将复本放在不同的数据中心,这样或许能最大化冗余度,但是却很消耗带宽。即使在相同的数据中心中,也会有很多种不同的存储策略。
一旦复本的存储位置确定了,就会建立一个通道,结合考虑hadoop的网络拓扑结构之后进行数据的写入。对于复本个数为3的情况,通道也许如图3-5所示:
Hadoop默认的策略是将第一个复本存放在客户机所在的节点中(对于运行在集群外的客户端来说,将会随机选择一个节点,系统尽量不会选择已经存储很满或工作太忙的节点)。第二个复本存储时将会选择与第一个节点不在同一个硬盘阵列的另外一个机架,随机选择一个节点存储。第三个复本将会放在与第二个节点相同的机架中,但是存储在随机选择的另外一个节点中。其它的复本将会存储在集群中随机选择的节点中,系统尽量避免将太量复本放到相同的机架中。
总之,这个策略在可靠性(块被存储在两个机架中),写入带宽(写数据时仅需要通过一个网络交换机),读取性能(可以选择两个机架中任意一个读取),块的分布性(客户端仅在本地机架中写入一个块)这些因素之间做了比较好的权衡。
一致性模型
文件系统的一致性模型描述了读取文件中的数据或向文件写入数据的可见性。HDFS为了性能牺牲了一些POSIX标准的要求,导致一些操作可能与你期望的不一样。
在创建一个文件后,正如所期望的那样,在文件系统名称空间中看见了这个文件。
Path p=new Path("p");
fs.create(p);
assertThat(fs.exists(p),is(true));
然而,任何写入到这个文件的数据不一定可见,即使输出流被flush刷新了。这个文件的长度仍为0。
Path p=new Path("p");
OutputStream out=fs.create(p);
out.write("content".getBytes("UTF-8"));
out.flush();
assertThat(fs.getFileStatus(p).getLen(),is(0L));
一旦超过一个hadoop块的数据写入了,第一个块将对读取器可见。对于后续的块也是如此。当前正在被写入数据的块总是对新来的读取器不可见。
HDFS通过FSDataOutputStream的hflush()方法可以强迫缓存中的数据flush进数据节点。在hflush()方法成功返回后,HDFS确保已经写入文件的数据都存进了写数据管道中的数据节点中,并且对新来的读取器可见。
Path p=new Path("p");
FSDataOutputStream out=fs.create(p);
out.write("content".getBytes("UTF-8"));
out.hflush();
assertThat(fs.getFileStatus(p).getLen(),is((long)"contents".length()));
注意hflush()不能确保数据节点已经将数据写入磁盘中,仅仅确保数据存储在数据节点的内存中(所以如果数据中心断电了,数据将会丢失)。如果需要确保数据能写入磁盘,请使用hsync()。
hsync()方法内部的操作与POSIX标准中fsync()标准命令相似,都会提交缓存中的数据到磁盘。例如,使用标准的JAVA API将数据写入本地文件,在flush数据流和同步数据到磁盘后,就可以确保能看见已经写入文件的内容。
FileOutputStream out=new FileOutputStream(localFile);
out.write("contents".getBytes("UTF-8"));
out.flush();//flush操作系统
out.getFD().sync();//同步进磁盘
assertThat(localFile.length(),is((long)"contents".length()));
关闭HDFS的文件流时内部也会执行hflush()方法。
Path p=new Path("p");
OutputStream out=fs.create(p);
out.write("contents".getBytes("UTF-8"));
out.close();
assertThat(fs.getFileStatus().getLen(),is((long)"contents".length()));
应用设计的重要性
一致性模型已经蕴涵了设计应用的方法。如果不调用hflush()和hsync(),当客户端或系统故障时,你将会丢失大量数据。对很多应用来说,这是不可接受的。所以你应该在合适的时机调用hflush(),例如在写入相当一部分数据记录或字节之后。虽然hflush()这个方法在设计时考虑到不对HDFS造成太大负担,但是它确实对性能有一些影响(hsync()有更多影响)。所以在数据健壮性与传输率之间要有一个权衡。一个可接受的平衡点是当以不同频率调用hflush(),并在考量应用性能前提下,那些依赖应用,合适的数据都能被读取到时。
使用distcp并发复制
到目前为止我们看到的HDFS获取数据的形式都是单线程的。例如,通过指定文件通配符的方法,我们可以同时操作大量文件。但要想有效地并发处理这些文件 ,你必须自己编程。Hadoop提供了一个有用的程序,叫做distcp,用于并发地将数据复制到hadoop或从Hadoop复制数据。
distcp其中的一个用途是替代hadoop fs -cp命令。例如,你可以复制一个文件到另一个文件中通过使用
% hadoop distcp file1 file2
你也可以复制目录:
% hadoop distcp dir1 dir2
如果目录dir2不存在,hadoop将会创建它。并且目录1中的内容将复制到目录dir2中。你可以指定多个源路径,所有这些源路径下的文件都将会复制到目的目录中。
如果目录dir2已经存在了,dir1目录将复制到它下一级,创建目录结构dir2/dir1。如果这不是你所想要的,你可以通过使用-overwrite选项,将数据以覆盖的形式复制到dir2目录下。你也可以只更新那些已经改变的文件,使用-update选项。我们通过一个示例说明。如果我们在目录dir1中修改了一个文件,我们将会使用如下命令将dir1目录的修改同步进dir2中。
% hadoop distcp -update dir1 dir2
distcp使用MapReduce作业方式实现,在集群中并发运行多个map来进行复制工作,没有reducer。每一个file使用一个map复制。Distcp粗略地将所有文件等分成几份,以便给每一个map分配近似相等的数据量。默认情况下,最多使用20个map。但是这个值可以通过在distcp中指定-m参数改变。
使用distcp一个非常常用的用途是在两个HDFS集群间传输数据。例如,下面命令在第二个集群中创建了第一个集群/foo目录下文件的备份。
% hadoop distcp -update -delete -p hdfs://namenode1/foo hdfs://namenod2/foo
-delete参数使用distcp删除目的目录下有而源目录没有的文件或目录。-p参数意思是文件的状态属性像权限,块大小和复本个数都保留。你可以不带任何参数运行distcp命令来查看参数的详细使用说明。
如果这两个集群运行不同版本的HDFS,那么你可以使用webhdfs协议在两个集群间复制。
% hadoop distcp webhdfs://namenode1:50070/foo webhdfs://namenode2:50070/foo
另一种变通的方法可以使用HTTPFS代理做为distcp的源或目的地(它也使用了webhdfs协议,可以设置防火墙和控制带宽,参看"HTTP章节")。
保持HDFS集群平衡
当将数据复制到HDFS中时,考虑集群的平衡性很重要。当文件块在集群中均匀连续存储时,HDFS能够表现地最好。所以你使用distcp时也要确保不打破这个规则。例如,如果你如果指定-m 1,将会有一个map进行复制工作,先不考虑这样做效率很低,没有充分有效地利用集群资源,这样做就意味着每一个块的第一个复本将位于运行map任务的节点上(直到磁盘满了)。第二个和第三个复本将会在集群其它节点上。但是这样就达不到平衡,如果使集群中map任务数比节点数多,就可以避免这个问题。所以,当运行distcp命令时,最好使用默认的每一个节点20个map任务。
然而,不可能一直保持集群平衡。也许你想要限制map任务的个数,以便节点上资源能够被其它作业使用。这种情况下,你可以使用平衡工具(可参看"平衡器章节")使集群中的块分布地更加均衡。
本文是笔者翻译自《OReilly.Hadoop.The.Definitive.Guide.4th.Edition》第一部分第3章,后续将继续翻译其它章节。虽尽力翻译,但奈何水平有限,错误再所难免,如果有问题,请不吝指出!希望本文对你有所帮助。