关于名字的来历
Pinot团队的Team Lead Kishore解释说:
1,团队的人都喜欢黑皮诺。
2,黑皮诺是红葡萄酒中最挑剔和难照料的品种,但是你可以用它产生最为复杂的葡萄酒,就像数据一样,收集和分析都很困难,但是当你使得数据能够正确的处理和工作,就能够产生巨大的用途。
Pinot的Github地址
https://github.com/linkedin/pinot
Pinot特性
1. 一个面向列式存储的数据库,支持RunLength Encoding和Fixed Bit Length Encoding的压缩
2. 可插入的索引技术 – SortedIndex, Bitmap Index, Inverted Index
3. 可以根据Query和Segment元数据进行查询和执行计划的优化
4. 从kafka的准实时数据灌入和从hadoop的批量数据灌入
5. 类似于SQL的查询语言,支持在事实表上的selection, aggregation, filtering, group by, order by, distinct
6. 支持多值字段
7. 水平扩展和容错
需要注意的是Pinot适合的是在数据不变的append only的大规模数据集上进行低延时的分析。
Pinot的几个重要的概念:
Table– 在Pinot中,table是一组有关系的数据的逻辑抽象,它包含行和列。Table的schema定义列名和它的的元数据描述,这个和数据库中的table概念是一样的。
Segment– 一个逻辑表(Table)可以分为多个物理单元,叫做Segment,实际上是数据的真实存储。
Pinot包含如下的组件:
PinotController– 它管理集群中的节点,职责包括:
。对Table和Segment的创建、更新和删除操作的处理
。计算Table和Segment在Pinot Server上的分配
PinotServer– 保存一个或者多个物理的Segment,职责包括:
。当被分配一个预先创建的segment,下载并且装载这个Segment,当被分配一个Kafka Topic,从kafka的partion的一个子集中消费数据
。执行查询请求并将结果返回给PinotBroker
Pinot Broker– 接收客户端的查询请求,并且将他们路由到多个服务上(根据路由策略), 合并接收的查询结果并返回给客户端。
Pinot的架构:
Pinot比较high level的架构如下:
看到这个架构,不由得不去和另外一个准实时分布式OLAP系统Druid进行对比,下边是Druid的架构:
两个架构非常的相似,Pinot Controller和Druid的Coordinator Node角色类似。Pinot Broker则和Druid中的Broker Node功能差不多。而在Pinot和Druid中,都有RealTime Nodes和Historical Nodes,分别处理从消息总线(Kafka)来的准实时数据和存储历史的Segment(二者都用Segment这个概念,并且都是按照时间来划分)。另外二者都支持流式和批量的数据装入。回到Pinot,历史数据处理数据流图如下:
在HDFS上的文件通过MR任务将数据变成有索引的Segment,然后推送到Pinot的集群的历史节点,去提供Query的能力。有索引的Segment可以配置保留日期,在过期后可以被自动的删除。对于实时数据的装入,数据流图如下:
实时数据处理是直接消费Kafka的数据,然后在内存中生成Segment索引,然后定期将数据flush到磁盘中。实时节点的数据的保留日期会相对比较短,比如保留3天的数据,实时节点的数据在时效前会存储到历史节点中。
查询路由
用户的所有的查询会发送到Pinot Broker,用户并不需要关心查询是被发往实时还是历史节点。PinotBroker会根据Query的情况自动的将请求切分然后按照需要发送给实时节点和历史节点,并且能够将结果进行自动的合并。比如:select count(*) from table where time >T会被分为两个Query,第一个Query是select count(*) from table where time >T and time from table where time> T1。
第一个Query会被送到历史节点执行,第二个Query则会被送到实时节点执行。
Pinot更细粒度的组件架构图如下:
可以看到,整个系统利用Apache Helix作为集群的管理,还利用了Zookeeper存储集群的状态,同时保存Helix和Pinot的配置。另外,Pinot利用NFS来将在HDFS上用MR产生的Segment推到PinotServer上。
对于历史节点,数据流程如下图:
从上图可以看到,原始的数据存储在HDFS上,然后被分成数据分片,分片的大小一般是256M或者512M。Pinot索引创建任务会对每个分片生成一个mapper任务去生成新的索引化的Segment。Segment在Hadoop上建立之后,需要发送到线上Pinot服务上去。这个过程是通过Pinot提供的推送任务来完成的。推送任务不是hadoop map任务,而是一个由Azkaban调度的java任务。它从HDFS读取数据,通过httppost送到Pinot Controller服务机器上。PinotController会将Segment存储在mount在Pinot Controller机器上的NFS中。然后Pinot Controller会将Segment分配给一个Pinot Server,而分配的相关信息则是由Helix来维护和管理。
Helix会监控Pinot Server的存活状态,当一个server启动时,Helix会通知PinotServer分配给该Server的segment。Pinot Server通过Helix获取Segment相关的元数据以及下载uri等信息,然后从Controller Server下载Segment并且装入到本地的磁盘中。解压缩的Segment中包含元数据以及每列的正排和倒排索引。根据装载模式(内存,mmap),索引或者被装载到内存中,或者被mmap到服务器上。Segment装载过程是由Helix触发的离线在线切换,当切换完成,Helix会通知Broker节点改Segment可以在该服务器被使用,Broker会在查询时将查询路由到该服务器。
大部分的Pinot数据是按照时间进行分区的,保留时间可以按照用例来配置(天、月、年粒度)。Pinot控制服务有一个后台清理线程去根据元数据删除过期的Segment。删除Segment会清理掉Controller服务中NFS上的数据,以及Helix服务上的元数据信息,Helix会通知Pinot Server对Segment进行在线离线切换,将Segment变为离线状态,然后变为删除状态,从本地磁盘删掉数据。
对于实时节点,数据流图如下:
实时节点是从Kafka消费数据,当在Pinot上创建一个resource时,Pinot会分配一组实例从Kafka topic消费数据。Pinot采用Kafka High Level API消费数据从而实现对Kafka数据消费的分布性。当Pinot消费了预先配置好数量的事件后,会在内存中将数据转变为离线Segment。当Segment创建成功后,Pinot commit offset到Kafka,如果失败,Pinot会从上次的checkpoint重新生成Segment。实时节点生成的Segment的格式和历史节点生成Segment格式相同,从而方便segment从实时节点到历史节点的重新分发。实时节点的Segment有效期的粒度只有天,而且会比较短。具体原因是从Kafka消费实时数据很难做到在离线节点相同的优化,另外这些Segment可以转到离线节点进行进一步的优化。还有就是Kafka很难保证消息只是消费一次,这样可能会造成数据精确性的问题,这些问题也可以在离线节点进行修复。
Pinot集群管理
Pinot是通过通用集群管理软件Helix进行集群管理的。在Helix中,节点被分为三种类型:
1.参与者– 实际提供分布式资源和能力的节点
2.观察者– 观察每个参与者状态的节点,并且将请求路由给参与者。
3.控制者– 观察和控制参与者节点,负责维护整个集群的状态正常和稳定。
在Pinot当中,Pinot Server是Helix中的参与者角色,Pinot Broker是观察者角色,Pinot Controller则是控制者角色。
Pinot索引Segment
Pinot 索引Segment是将原始的数据通过列式进行表达。原始数据一般都是行式表达的数据,将行式数据转化为列式可以减少存储开销并且可以提供对列的更快速的访问。
查询处理过程
Pinot查询处理过程如上图
Pinot支持类似于SQL的PQL。PQL是SQL的子集,目前PQL不支持join,嵌套子查询。Pinot使用Antlr进行PQL的Parser,将查询语句翻译成一个语法树。语法树会被Logical Planner进一步翻译成逻辑计划树。逻辑计划树在物理计划阶段会进行优化,将执行计划按照segment进行处理,形成每个Segment的一个物理执行计划。最后,物理执行计划会根据segment所属的server,交给pinotserver进行执行。
Pinot编译和安装
Pinot是纯粹用Java开发的,可以直接从git上获取源代码进行安装和部署,假设你在linux上,需要你先安装mvn,笔者采用的是最新的mvn 3.3.3,具体下载安装过程如下:
git clone https://github.com/linkedin/pinot.git
cd pinot
mvn install package -Dskip.Tests
需要注意的是,pinot支持的是JDK1.7以上的版本,JDK 1.6在mvn编译过程中就会发生错误。
如果mvn成功,则可以
cd pinot-distribution /target/pinot-0.016-pkg
然后执行:
bin/quick-start-offline.sh
这样就可以体验pinot了,pinot会启动一个web服务,默认监听9000端口。访问http://{yourhost}:9000/query/index.html,可以进入一个交互式查询的web页面:
在这里,可以执行SQL,执行的结果会以JSON的格式返回。
总结
LinkedIn Pinot的出现实际上也代表了当前大数据分析的潮流,越来越多的实时分析的要求促进了大数据技术的进一步发展。Hadoop MR解决了大数据分析从不能到能的问题,但是如果想要更好、更快的进行分析,原来Hadoop MR粗犷的数据分析模式则不能够适应要求。各个公司于是都开始探索如何能够解决实时的OLAP的需求,Druid和Pinot都是为了这个而产生出来的。而这两个的技术架构很类似,甚至我们自己在解决问题时用的方法也是类似的。核心点是将数据分为实时和历史两个部分,实时部分从消息队列消费数据,数据消费时就对数据进行分区和索引,存储转化为列式存储从而方便OLAP类型的查询。由于数据的分区采用的是基于时间的分区,在查询执行时,可以方便的路由查询到实时和历史节点,然后将结果进行合并返回。两个产品都不支持Join,对于OLAP来讲,这还是很大的限制。如果想实现join的效果,则只能构造巨大的事实表来解决。
回到当前的现状,如果设计一个类似的系统,我想架构会是类似的,不过技术的选择则更为丰富。生成Segment采用Spark将会变得更为快捷和方便。而TalkingData采用Bitmap索引来进行Segment创建,在多维度交叉上无疑更为快速和方便。