单进程版流计算实现说明

一、概念

单进程版与Storm版的流计算实现有许多相似的概念,其中最重要的包括:Topology、Spout、Bolt。Topology是一个由Spout节点和Bolt节点组成的有向无环图(或称有方向的树),一个轻应用可以有一个或多个Topology。Spout是数据的来源,它是Topology的根节点,每个Topology只有一个Spout。Bolt是真正负责处理业务逻辑的节点。

Storm将父节点传输给子节点的数据称为Tuple,单进程版的流计算与此对应的概念是BoltParameter。一组Tuple或BoltParameter组成Stream,Stream是一个抽象概念,没有具体的实现。

单进程版相当于Storm版的简化,有一些Storm版的概念这里不会有,比如:

  • 分组(grouping)
  • Worker
  • Task
  • Reliability

上面列出的概念基本都与Storm的并行处理有关,单进程版的流计算不存在并发问题,也就没有这些概念。

在编写Storm版的流计算程序时,很多非业务逻辑的功能是Storm负责维护和处理的,而单进程版的程序需要自己来处理,这引入了一些新的概念:

  • SpoutProcessor,这个类负责维护Spout节点
  • BoltProcessor,这个类负责维护Bolt节点

下文将详细解释SpoutProcessor和BoltProcessor。

二、Topology的实现

之前提到Topology树是由Spout和Bolt组成,实际上Spout和Bolt里边是业务逻辑相关的定义,真正让它们组成一棵树的是SpoutProcessor和BoltProcessor,这两个类都实现了ProcessNode接口,该接口定义如下:

public interface ProcessNode {

    /**
     * 提取本处理结点的名字
     */
    String getName();
    /**
     * 提取本结点的Id,Id用来记录节点间的关系,全局唯一
     */
    String getId();
    /**
     * 向本处理结点增加一个儿子结点
     */
    void addchild(ProcessNode child);
    /**
     * 调用本处理结点的处理逻辑对数据进行处理
     */
    void run(Object param);

}

SpoutProcesser的定义:

public class SpoutProcessor implements ProcessNode {

    private static Logger logger = LoggerFactory.getLogger(SpoutProcessor.class);
    /**
     * 本拓扑待处理的数据队列
     */
    KafkaDataQueue queue = new KafkaDataQueue();
    /**
     * 数据生成器
     */
    private StreamSpout spout;
    /**
     * 本处理器对应的下一代处理器
     */
    private LinkedList<ProcessNode> childrens;
//后面的代码省略

SpoutProcesser的第一个变量是KafkaDataQueue,这个数据队列由KafkaThread类负责写入数据(它的数据来源是Kafka队列),由ProcessThread负责读取数据并处理。关于ProcessThread等线程类后边详细介绍。
SpoutProcesser的第二个变量是StreamSpout,这就是Storm中Spout的对应实现,是拓扑树的根节点。
SpoutProcesser第三个变量:private LinkedList<ProcessNode> childrens;,这个变量记录了它的下一级节点有哪些,这是组成Topology树的关键。

public class BoltProcessor implements ProcessNode {

    /**
     * 本处理器对应的Bolt
     */
    private Bolt bolt;

    /**
     * 本处理器对应的下一代处理器
     */
    private LinkedList<ProcessNode> childrens;
//后面的代码省略

BoltProcessor也是一样,维护了Bolt的下一级节点列表。

三、线程

单进程版流计算有四个线程:

  • KafkaThread,如前所述,它负责读取Kafka队列,并把数据放到SpoutProcesser的KafkaDataQueue。每个SpoutProcesser都会收到一份完全相同的数据的拷贝,有点类似于Storm的AllGrouping分组方式。全局只有一个KafkaThread。
  • ProcessThread,它通过调用SpoutProcesser的实例来处理KafkaDataQueue中的数据,每个Topology对应一个ProcessThread。
  • OutputThread,负责把流计算的结果存储到MongoDB,全局只有一个OutputThread。
  • ShutdownHookThread,当进程退出时,如kill -15 程序退出时,把未处理的kafka数据退回kafka队列,把已经处理生成的结果存入mongo,减少数据丢失。ShutdownListener类实现了ApplicationListener接口,当监听到ContextClosedEvent事件时启动ShutdownHookThread。

四、流计算业务逻辑的实现

关于Groovy脚本是如何在Java程序中运行的,可以参考《Groovy脚本使用方法》、《baas系统脚本说明》。这篇文档主要介绍StreamSpout、ConvertBolt和StatBolt的实现(AlarmBolt与ConvertBolt类似,不再重复介绍)。

(一)StreamSpout

StreamSpout是根节点的实现,它实现了Spout接口:

public interface Spout {
    /**
     * 得到本Spout的名字
     */
    String getName();
    /**
     * 得到本Spout的Id
     */
    String getId();
    /**
     * 准备本Bolt
     */
    void prepare();
    /**
     * 提取设备档案操作对象
     */
    IArchives getArchives();

    /**
     * 设置设备档案操作对象
     */
    void setArchives(IArchives archives);

    /**
     * 执行Spout内部的判断逻辑,判别是否应该交由本Spout进行处理
     */
    BoltParameter execute(SpoutParameter spoutParameter);
}

prepare方法只在初始化的时候执行一次,它负责做一些准备工作。
execute方法每收到一个数据就会运行一次,处理真正的业务逻辑,数据通过参数BoltParameter传递进来,通过返回BoltParameter传递给下一级节点。
getArchives方法返回一个IArchives接口,通过这个接口提供的方法可以获取设备档案。

StreamSpout的定义(核心片段,非完整代码):

public class StreamSpout implements Spout {
    /**
     * 本Spout的配置
     */
    private SpoutConfig config;

    /**
     * 访问设备档案的对象
     */
    private Archives archives;

    @Override
    public void prepare() {

    }

    @Override
    public BoltParameter execute(SpoutParameter spoutParameter) {
//省略具体实现
    }

//省略后面的代码

第一个成员变量是SpoutConfig,这就是用户在轻应用平台上所做的配置,由单进程流计算的入口类Executor从数据库中读取填充。
第二个成员变量Archives,这个对象包含一个deviceId成员变量,当StreamSpout收到数据时,execute方法会给deviceId赋值,在后面的节点中将用来获取档案信息。
StreamSpout的prepare方法目前为空,没有任何准备工作要做。
execute方法首先判断数据流名称是否和用户配置的一致,然后构造BoltParameter对象(由流计算的上下文、内置输入对象、档案操作对象组成),返回该对象给下一级节点。

(二)ConvertBolt

ConvertBolt的定义(核心片段,非完整代码):

public class ConvertBolt extends BaseBolt {
    /**
     * 本Bolt的配置
     */
    private ConvertBoltConfig config;
    /**
     * 脚本对象
     */
    private IConvertProcess process;
    /**
     * 把数据保存到Mongo的队列
     */
    private OutputQueue outputQueue;
//省略非业务逻辑私有变量

    @Override
    public void prepare() {
//省略具体实现
    }

    @Override
    public List<BoltParameter> execute(BoltParameter parameter) {
 //省略具体实现
    }
}

ConvertBolt继承了BaseBolt,后者非常简单,不影响整体理解,细节请阅读源代码。
第一个成员变量是ConvertBoltConfig,和SpoutConfig一样,这是用户在轻应用平台上所做的配置,由单进程流计算的入口类Executor从数据库中读取填充。
第二个成员变量IConvertProcess,用来引用Groovy脚本的实例,执行Groovy脚本的时候用到。
第三个成员变量OutputQueue,OutputThread会将这个队列的数据存储到MongoDB。
prepare方法只在初始化的时候执行一次,它负责做一些准备工作,例如解析ConvertBoltConfig并加载Groovy脚本。
execute方法每收到一个数据就会运行一次,处理真正的业务逻辑,数据通过参数BoltParameter传递进来,通过返回List<BoltParameter>传递给下一级节点。

(三)StatBolt

StatBolt的定义(核心片段,非完整代码):

public class StatBolt extends BaseBolt {
    /**
     * 统计单元的配置
     */
    private StaticsBoltConfig config;

    /**
     * 统计脚本对象
     */
    private IStatProcess statProcessor;

    /**
     * 统计缓存对象
     */
    private Caches caches;

    /**
     * 统计过程中访问Redis的对象,用于保存和提取中间结果
     */
    private RedisClient redisClient;

    /**
     * 把数据存储到Mongo中的队列
     */
    private OutputQueue outputQueue;

    @Override
    public void prepare() {
//省略具体实现
    }

    @Override
    public List<BoltParameter> execute(BoltParameter parameter) {
 //省略具体实现
    }
}

StatBolt和ConvertBolt结构基本一致,相同的部分不再重复说明。
成员变量Caches是内置的统计计算所需的缓存类,在prepare方法中初始化,它实现了ICaches接口,包括单个设备统计用到的group函数和全局统计用到的group(Object group)。虽然名为Caches,但它更多的是作为计算所需的内置对象,实际的缓存功能是由它内部的RedisClient类实现。
execute方法判断是否到达输出时间(上一次输出时间可通过Caches获取),如果到达则执行输出脚本,如果没有到达则执行计算脚本。

五、缓存的实现

采用了两级缓存机制:Redis和Ehcache,前者是远程缓存,后者是本地缓存。事实上正是由于远程缓存性能不够好才引入了本地缓存,但另一方面,如果只使用本地缓存,程序意外终止时会丢失数据,所以两者结合使用。
CacheUtils类提供了Ehcache的访问,RedisClient类除了提供了Redis的访问,还包含CacheUtils的调用并提供其他类所需的接口方法。

缓存的细节信息见下表:

缓存类型 最大数量级 key值构成 Ehcache缓存名字 备注
设备档案 设备数量 arch-、archiveId、deviceId archiveCache
单个统计中间结果 设备数量*统计节点数量 cache-、statId、deviceId midStatCache 单个统计时缓存数量远大于全局统计
单个统计的上一次输出时间 设备数量*统计节点数量 stat_last_time-、statId、deviceId lastStatOutputCache 单个统计时缓存数量远大于全局统计
全局统计中间结果 档案字段数量*统计节点数量 cache-、statId、group midStatCache group是档案字段的值
全局统计的上一次输出时间 档案字段数量*统计节点数量 stat_last_time-、statId、group lastStatOutputCache group是档案字段的值
上一次告警输出的时间(数据来源为非统计节点) 设备数量*告警节点数量 alarm_last-、alarmId、deviceId lastAlarmCache
上一次告警输出的时间(数据来源为单个统计节点) 设备数量*告警节点数量 alarm_last-、alarmId、deviceId lastAlarmCache key值看起来和上一行相同,但alarmId实际会不一样。
上一次告警输出的时间(数据来源为全局统计节点) 档案字段数量*告警节点数量 alarm_last-、alarmId、group lastAlarmCache group是档案字段的值
全局统计的维度值列表 档案字段数量*统计节点数量 statId statDimensionsCache value是Set<String>,Set里边放的group;单线程版本的流计算是存储在Redis

六、程序的入口:Executor类

Executor类的init方法是单进程版流计算程序的入口,init方法主要做了两件事:创建Topology树和启动各个线程,当init方法执行完毕之后,单进程流计算程序就开始读取Kafka队列中的数据并处理,这个过程将会一直运行下去,只能用户手动停止。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,390评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,821评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,632评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,170评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,033评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,098评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,511评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,204评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,479评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,572评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,341评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,213评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,576评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,893评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,171评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,486评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,676评论 2 335

推荐阅读更多精彩内容

  • Date: Nov 17-24, 2017 1. 目的 积累Storm为主的流式大数据处理平台对实时数据处理的相关...
    一只很努力爬树的猫阅读 2,145评论 0 4
  • 这是一个JStorm使用教程,不包含环境搭建教程,直接在公司现有集群上跑任务,关于JStorm集群环境搭建,后续研...
    Coselding阅读 6,259评论 1 9
  • 原文链接Storm Tutorial 本人原创翻译,转载请注明出处 这个教程内容包含如何创建topologies及...
    quiterr阅读 1,593评论 0 6
  • 目录 场景假设 调优步骤和方法 Storm 的部分特性 Storm 并行度 Storm 消息机制 Storm UI...
    mtide阅读 16,976评论 30 60
  • 忽然就想煲点鸡汤了,听着外面悉悉索索的雨声却完全没有一点睡意,什么是生活?生活又是什么?我又在过怎样的生活?这些我...
    贺新凉baby阅读 157评论 0 0