企业级大数据技术体系概述
大数据架构的6层
大数据从数据源开始,经过分析、挖掘到最终获得价值一般需要经过6个主要环节:
包括数据收集、数据存储、资源管理与服务协调、计算引擎、数据分析和数据可视化
Hadoop与Spark开源大数据技术栈:
大数据架构:Lambda Architecture
Hadoop MapReduce这样的批处理系统,可靠性高,而实时性差;Storm这样的流式处理系统来说,则情况正好相反。
通过结合这两类计算技术,LA可以在延迟、吞吐量和容错之间找到平衡点。
LA主要思想是将数据处理流程分解成三层:批处理层、流式处理层和服务层。
一个经典的LA应用案例是推荐系统:推荐系统的设计目的是根据用户的兴趣特点和购买行为,向用户推荐感兴趣的信息和商品。
推荐系统最核心的模块是推荐算法,推荐算法通常会根据用户的兴趣特点和历史行为数据构建推荐模型,以预测用户可能感兴趣的信息和商品,进而推荐给用户。
典型的推荐系统架构
数据首先流入Kafka,之后按照不同时间粒度导入批处理和流式处理两个系统中。
批处理层拥有所有历史数据(通常保存到HDFS/HBase中),通常用以实现推荐模型,它以当前数据(比如最近一小时数据)和历史数据为输入,通过特征工程、模型构建(通常是迭代算法,使用MapReduce/Spark实现)及模型评估等计算环节后,最终获得最优的模型并将产生的推荐结果存储(比如Redis)起来,整个过程延迟较大(分钟甚至小时级别);
为了解决推荐系统中的冷启动问题(新用户推荐),往往会引入流式处理层:它会实时收集用户的行为,并基于这些行为数据通过简单的推荐算法(通常使用Storm/SparkStreaming实现)快速产生推荐结果并存储起来。为了便于其他系统获取推荐结果,推荐系统往往通过服务层对外提供访问接口,比如网站后台在渲染某个访问页面时,可能从广告系统、推荐系统以及内容存储系统中获取对应的结果,并返回给客户端。
备注:用户冷启动,指“产品初期,从目标用户转化为种子用户的过程”
数据收集
关系型数据的收集
Sqoop(SQL to Hadoop)
为了能够利用大数据技术处理和存储这些关系型数据,首先需将数据导入到像HDFS、HBase这样的大数据存储系统中,以便使用MapReduce、Spark这样的分布式计算技术进行高效分析和处理。另一方面,为了便于与前端的数据可视化系统对接,我们通常需要将Hadoop大数据系统分析产生的结果(比如报表,通常数据量不会太大)导回到关系型数据库中。为了解决上述问题,高效地实现关系型数据库与Hadoop之间的数据导入导出,Hadoop生态系统提供了工具Sqoop(SQL toHadoop)
Sqoop采用MapReduce可进行全量关系型数据的收集。
数据增量收集CDC
除了收集数据库全量数据外,我们还希望只获取增量数据,即MySQL某个表从某个时刻开始修改/插入/删除的数据。捕获数据源中数据的更新,进而获取增量数据的过程,被称为CDC(“Change Data Capture”)。
CDC几种应用场景
- 异地机房同步。实现数据异地机房容灾。
- 数据库实时备份。类似于master/slave架构,实时对数据库进行备份。
- 业务Cache刷新。更新数据库成功的同时,刷新cache中的值。
- 数据全库迁移。创建任务队列表,逐步完成全库所有表的迁移。
Alibaba Canal 组件
Canal的主要定位是基于数据库增量日志解析,提供增量数据订阅和消费,目前主要支持了MySQL关系型数据库。Canal的主要原理是,模拟数据库的主备复制协议,接收主数据库产生的binary log(简称“binlog”),进而捕获更新数据。
步骤1:Canal实现MySQL主备复制协议,向MySQL Server发送dump协议。
步骤2:MySQL收到dump请求,开始推送binlog给Canal。
步骤3:Canal解析binlog对象,并发送给各个消费者。
Databus
相比于阿里巴巴的Canal系统,LinkedIn的Databus更加强大,包括支持更多数据源(Oracle和MySQL等)、扩展性更优的架构(比如高扩展的架构允许保存更长时间的更新数据)等
多机房数据同步系统Otter
Otter基于Canal开源产品,获取数据库增量日志数据,本身采用典型的管理系统架构:Manager(Web管理)+Node(工作节点)
- Manager负责发布同步任务配置,接收同步任务反馈的状态信息等。
- 工作节点负责执行同步任务,并将同步状态反馈给Manager。
为了解决分布式状态调度,允许多Node节点之间协同工作,Otter采用了开源分布式协调组件ZooKeeper。
为了让系统具有良好的扩展性和灵活性,Otter将整个同步流程抽象为Select(与数据源对接的阶段,为解决数据来源的差异性而引入)、Extract、Transform、Load(简称S、E、T、L)四个阶段(类似于数据仓库的ETL模型,即数据提取、数据转换和数据载入三个阶段)
Otter跨机房数据同步
数据涉及网络传输,S、E、T、L几个阶段会分散在2个或者更多Node节点上,多个Node之间通过ZooKeeper进行协同工作(一般是Select和Extract在一个机房的Node, Transform/Load落在另一个机房的Node)。
非关系型数据的收集
在现实世界中,非关系型数据量远大于关系型数据。非关系型数据种类繁多,包括网页、视频、图片、用户行为日志、机器日志等,其中日志类数据直接反映了(日志)生产者的现状和行为特征,通常会用在行为分析系统、推荐系统、广告系统中。日志数据具有流式、数据量大等特点,通常分散在各种设备上,由不同服务和组件产生,为了高效地收集这些流式日志,需要采用具有良好扩展性、伸缩性和容错性的分布式系统。
日志收集面临以下问题:
❑ 数据源种类繁多:各种服务均会产生日志,这些日志格式不同,产生日志的方式也不同(有的写到本地日志文件中,有的通过HTTP发到远端等)。
❑ 数据源是物理分布的:各种服务运行在不同机器上,有的甚至是跨机房的。设计日志收集系统时需考虑这种天然的分布式特征。
❑ 流式的,不间断产生:日志是实时产生的,需要实时或近实时收集到,以便于后端的分析和挖掘。
❑ 对可靠性有一定要求:日志收集过程中,希望能做到不丢数据(比如银行用户转账日志),或只丢失可控的少量数据(比如用户搜索日志)。
Flume
Cloudera公司开源的Flume系统便是解决以上这些流式数据收集问题的,它是一个通用的流式数据收集系统,可以将不同数据源产生的流式数据近实时地发送到后端中心化的存储系统中,具有分布式、良好的可靠性以及可用性等优点。
Flume NG基本架构
Flume的数据流是通过一系列称为Agent的组件构成的,如图3-2所示,一个Agent可从客户端或前一个Agent接收数据,经过过滤(可选)、路由等操作后,传递给下一个或多个Agent(完全分布式),直到抵达指定的目标系统。
Agent内部主要由三个组件构成,分别是Source, Channel和Sink:
- Source:
Flume将数据流水线中传递的数据称为“Event”
Flume数据流中接收Event的组件,通常从Client程序或上一个Agent接收数据 - Channel:
Channel是一个缓存区,它暂存Source写入的Event,直到被Sink发送出去。 - Sink:
Sink负责从Channel中读取数据,并发送给下一个Agent(的Source)。
Flume NG高级组件
除了Source、Channel和Sink外,Flume Agent还允许用户设置其他组件更灵活地控制数据流,包括Interceptor, Channel Selector和Sink Processor等
- Interceptor:
Interceptor组件允许用户修改(Timestamp Interceptor/Host Interceptor/UUID Interceptor)或丢弃(Regex Filtering Interceptor/Regex Extractor Interceptor)传输过程中的Event。 - Channel Selector:
Channel Selector允许Flume Source选择一个或多个目标Channel,并将当前Event写入这些Channel。 - Sink Processor:
Flume允许将多个Sink组装在一起形成一个逻辑实体(称为“Sink Group”),而SinkProcessor则在Sink Group基础上提供负载均衡以及容错的功能(当一个Sink挂掉了,可由另一个Sink接替)。
分布式消息队列Kafka
在实际应用中,不同服务器(数据生产者)产生的日志,比如指标监控数据、用户搜索日志、用点击日志等,需要同时传送到多个系统中以便进行相应的逻辑处理和挖掘,比如指标监控数据可能被同时写入Hadoop和Storm集群(数据消费者)进行离线和实时分析。为了降低数据生产者和消费者之间的耦合性、平衡两者处理能力的不对等,消息队列出现了。消息队列是位于生产者和消费者之间的“中间件”,它解除了生产者和消费者的直接依赖关系,使得软件架构更容易扩展和伸缩;它能够缓冲生产者产生的数据,防止消费者无法及时处理生产者产生的数据。
Kafka设计架构
Kafka架构由Producer、Broker和Consumer三类组件构成,其中Producer将数据写入Broker, Consumer则从Broker上读取数据进行处理,而Broker构成了连接Producer和Consumer的“缓冲区”。Broker和Consumer通过ZooKeeper做协调和服务发现[插图]。多个Broker构成一个可靠的分布式消息存储系统,避免数据丢失。Broker中的消息被划分成若干个topic,同属一个topic的所有数据按照某种策略被分成多个partition,以实现负载分摊和数据并行处理。
Kafka各组件详解
Kafka Producer
Kafka Producer是由用户使用Kafka提供的SDK开发的,Producer将数据转化成“消息”,并通过网络发送给Broker。在Kafka中,每条数据被称为“消息”,每条消息表示为一个三元组:
<topic,Key,Message>
❑ topic:表示该条消息所属的topic。topic是划分消息的逻辑概念,一个topic可以分布到多个不同的broker上。
❑ key:表示该条消息的主键。Kafka会根据主键将同一个topic下的消息划分成不同的分区(partition),默认是基于哈希取模的算法,用户也可以根据自己需要设计分区算法。
❑ message:表示该条消息的值。该数值的类型为字节数组,可以是普通字符串、JSON对象,或者经JSON, Avro, Thrift或Protobuf等序列化框架序列化后的对象。
Kafka Broker
在Kafka中,Broker一般有多个,它们组成一个分布式高容错的集群。Broker的主要职责是接受Producer和Consumer的请求,并把消息持久化到本地磁盘。如图所示,Broker以topic为单位将消息分成不同的分区(partition),每个分区可以有多个副本,通过数据冗余的方式实现容错。
当partition存在多个副本时,其中有一个是leader,对外提供读写请求,其他均是follower,不对外提供读写服务,只是同步leader中的数据,并在leader出现问题时,通过选举算法将其中的某一个提升为leader。
Kafka Broker能够保证同一topic下同一partition内部的消息是有序的,但无法保证partition之间的消息全局有序,这意味着一个Consumer读取某个topic下(多个分区中,如下图所示)的消息时,可能得到跟写入顺序不一致的消息序列。但在实际应用中,合理利用分区内部有序这一特征即可完成时序相关的需求。
Kafka Broker以追加的方式将消息写到磁盘文件中,且每个分区中的消息被赋予了唯一整数标识,称之为“offset”(偏移量),如图上所示,Broker仅提供基于offset的读取方式,不会维护各个Consumer当前已消费消息的offset值,而是由Consumer各自维护当前读取的进度。 Consumer读取数据时告诉Broker请求消息的起始offset值,Broker将之后的消息流式发送过去。Broker中保存的数据是有有效期的,比如7天,一旦超过了有效期,对应的数据将被移除以释放磁盘空间。只要数据在有效期内,Consumer可以重复读取而不受限制。
Kafka Consumer
Kafka Consumer主动从Kafka Broker拉取消息进行处理。每个Kafka Consumer自己维护最后一个已读取消息的offset,并在下次请求从这个offset开始的消息,这一点不同于ZeroMQ、RabbitMQ等其他消息队列,这种基于pull的机制大大降低了Broker的压力,使得Kafka Broker的吞吐率很高。
Kafka允许多个Consumer构成一个Consumer Group,共同读取同一topic中的数据,提高数据读取效率。Kafka可自动为同一Group中的Consumer分摊负载,从而实现消息的并发读取,并在某个Consumer发生故障时,自动将它处理的partition转移给同Group中其他Consumer处理。
ZooKeeper
在一个Kafka集群中,ZooKeeper担任分布式服务协调的作用,Broker和Consumer直接依赖于ZooKeeper才能正常工作:
❑ Broker与ZooKeeper:所有Broker会向ZooKeeper注册,将自己的位置、健康状态、维护的topic、partition等信息写入ZooKeeper,以便于其他Consumer可以发现和获取这些数据,当一个Consumer宕掉后,其他Consumer会通过ZooKeeper发现这一故障,并自动分摊该Consumer的负载,进而触发相应的容错机制。
❑ Consumer与ZooKeeper: Consumer Group通过ZooKeeper保证内部各个Consumer的负载均衡,并在某个Consumer或Broker出现故障时,重新分摊负载;Consumer(仅限于high-level API,如果是low-level API,用户需自己保存和恢复offset)会将最近所获取消息的offset写入ZooKeeper,以便出现故障重启后,能够接着故障前的断点继续读取数据。