CEP简介
CEP全称为 Complex Event Processing 复杂事件处理,其可以通过在流式数据中发现符合某种特征的模式进而触发对应的后续动作,其既支持基于单条事件的简单无状态的模式匹配(例如基于事件中的某个字段进行筛选过滤),也可以支持基于关联/聚合/时间窗口等跨事件的复杂有状态模式匹配(例如计算滑动时间窗口移动均值)。受益于其直接作用于流式数据,无需查询持久化数据库,对底层数据库不会产生任何压力,以及其强大的模式发现能力,在监控系统中,如果把CEP与流处理引擎结合,在IT运维管理中,可以大大增强告警的实时性以及适用范围。
CEP对IT运维的价值
传统的ITOM主要对底层的软硬件基础架构对单一指标基于静态的阈值进行监控告警,这里有两个关键词:基础架构 以及 单一, 这其实正好对应了传统IT监控的两大痛点。单一导致的结果就是误报,服务器的CPU利用率上升有时是因为交易量的上升带来的正常现象,只要在合理区间内就无需告警,但是CPU利用率的孤立上升就可能是因为代码缺陷造成的,有经验的工程师一眼就能看出是否是故障,是因为工程师在一瞬间就综合分析了各个相关指标。针对基础架构则让运维人员的生活非常苦逼,有功劳都是其他部门的,出了故障都是运维的兄弟在顶包,所以近些年来基本上所有的企业都在做APM,通过网络抓包或者日志埋点等方式可以提取交易成功率/交易量/成功率等反映业务性能的指标,做了不少漂亮工程,不过不管是交易量还是成功率都还是从系统的角度去看问题,真正能带来多少业务价值其实也很难说,大屏上那些五颜六色的图表可能更多时候也是在领导检查或者参观时才体现其价值。从数据来看,其实IT运维过程中的数据是最完整的,既有包含了服务器,网络设备等基础设施的底层运行信息,也有包含中间件和数据库的中间层系统信息,还有包含了全部业务过程的上层应用日志信息,在这个数据时代,IT运维正在向IT运营转变,理应能够发挥更大的业务价值,举例对于一个金融企业,如果能从应用日志中提取到同一个账户在1分钟内在距离50公里的两地柜员机取款的类欺诈信息来支持风控,IT运维所承担的就不只是保障作用,而是直接参与到了业务决策过程当中。虽然CEP不是为IT运维管理而生,但是在一定程度上CEP确实可以解决上述两个问题,CEP最强大的就是其模式匹配引擎,其不仅可以作用于不同类型的事件,更可以按照时间窗口,发生顺序和次数以及其他状态聚合结果进行模式匹配,可以和各种业务规则进行对应。另外CEP是直接作用于流式数据,而非通过定期查询数据库的方式,因此最实时,且对数据库没有任何压力。目前的Flink流引擎已经自带了CEP模块,Flink官方给出的CEP例子正好就是针对数据中心监控的场景,案例中需要对Rack的温度进行监控,对于同一个Rack,当10秒内连续两次温度超过温度阈值时预警,当20秒内产生连续两个预警,且第二次预警温度高于第一次时进行告警,该模式中有不同状态的切换,有时间窗口的滑动等,如果没有CEP,需要自行构建对应的状态机进行处理,但是基于CEP强大的模式挖掘能力之上的实现非常简单,由此可见CEP的威力。不过Flink中的CEP模块是与其流失API紧密结合的,如果我们的前置流引擎不是Flink,则无法直接使用。我们在构建智能化监控系统时,从性能方面去考虑,在流引擎与CEP之间添加了基于AKKA的多层路由模块,可以按照业务系统,服务器,实例以及指标级别进行消息的路由,CEP引擎内嵌在每个Actor内部,可以在不同的路由级别对不同范围的消息进行模式匹配。下图是对应的架构图。 监控系统的架构以及对应实现不在我们这篇文章范围之内,本文接下来介绍下如何使用CEP来完成一个复杂监控告警从生成到关闭的过程。
CEP作为监控告警的规则引擎
本文中使用了一个开源的CEP组件esper(https://github.com/espertechinc/esper),目前该组件已经被weblogic等中间件厂商集成到自己的日志异常检测功能中,其本身是一个JAR库,无需其他运行时框架,所以使用起来非常方便。其支持一种DSL语言EPL,与SQL类似,用户通过输入不同的EPL来定义不同的匹配模式。本文不包含基本的EPL语法介绍,有兴趣的同学可以移步这里(http://blog.csdn.net/luonanqin/article/details/21300263) 以及esper的官网(http://www.espertech.com/esper/)。本文主要从一个实际的监控需求案例出发,来介绍esper的相关功能。
监控告警需求
监控原始事件定义如下:
//name代表监控指标的名称,例如CPU.CPUUtil
//timestamp指某一个具体的时间点
//value代表在timestamp时间点的指标值
//tags里面存储了一些其他描述信息,例如服务器名,ip地址,所属业务系统等等
case class GenericMetric(timestamp:Long, name:String, value:Any, tags:Map[String, Any])
//timestamp代表Warning产生的时间点
//name代表指标名称
//value代表产生Warning前最后一个事件的值
case class Warning(timestamp:Long, name:String, value:Any, tags:Map[String, Any])
//timestamp代表关闭事件产生的时间点
//name代表指标名称
case class Close(timestamp:Long, name:String)
//timestamp代表Critical产生的时间点
//name代表指标名称
//value代表产生Critical前最后一个事件的值
case class Critical(timestamp:Long, name:String, value:Any, tags:Map[String, Any])
告警规则1
当value连续10次 >= 0.5,< 0.8时产生Warning级别的告警,当value连续10次 >= 0.8时产生Critical级别的告警。
当产生Warning级别的告警后,当出现不在 >= 0.5 < 0.8区间的事件数据时,产生Close事件把Warning告警关闭。
当产生Critical级别的告警后,当出现不在 < 0.8区间的事件数据时,产生Close事件把Critical告警关闭。
Warning create epl:
insert into Warn select (select value from GenericMetric.std:lastevent()) as value, (select name from GenericMetric.std:lastevent()) as name, (select tags from GenericMetric.std:lastevent()) as tags,current_timestamp as timestamp from pattern[ every [10] (GenericMetric(value >= 0.5 and value < 0.8) and not GenericMetric(value < 0.5 or value >= 0.8))]
Critical create epl:
insert into Critical select (select value from GenericMetric.std:lastevent()) as value, (select name from GenericMetric.std:lastevent()) as name, (select tags from GenericMetric.std:lastevent()) as tags, current_timestamp as timestamp from pattern[ every [10] (GenericMetric(value >= 0.8) and not GenericMetric(value < 0.8))]
Warn close epl:
insert into Close select (select name from GenericMetric.std:lastevent()) as name,current_timestamp as timestamp from pattern [every [1] (Warn -> (GenericMetric(value < 0.5 or value >= 0.8)))]
Critical close epl:
insert into Close select (select name from GenericMetric.std:lastevent()) as name,current_timestamp as timestamp from pattern [every [1] (Critical -> (GenericMetric(value < 0.8)))]
告警规则2
当value连续10次 >= 0.5,< 0.8时产生Warning级别的告警,当value连续10次 >= 0.8时产生Critical级别的告警。
当产生Warning级别的告警后,当连续出现10次< 0.5的事件数据时,产生Close事件把Warning告警关闭。
当产生Critical级别的告警后,当连续出现10次 < 0.8区间的事件数据时,产生Close事件把Critical告警关闭。
告警的生成规则和规则1是相同的,这里就不再重复给出,需要关注的是关闭的规则与之前不同:
Warn close epl:
insert into Close select (select name from GenericMetric.std:lastevent()) as name,current_timestamp as timestamp from pattern [every [1] (Warn -> every [10](GenericMetric(value < 0.5)))]
Critical close epl:
insert into Close select (select name from GenericMetric.std:lastevent()) as name,current_timestamp as timestamp from pattern [every [1] (Critical -> every [10](GenericMetric(value < 0.8)))]
告警规则3
当value连续1分钟 >= 0.5,< 0.8时产生Warning级别的告警,当value连续1分钟 >= 0.8时产生Critical级别的告警。
当产生Warning级别的告警后,当连续出现1分钟< 0.5的事件数据时,产生Close事件把Warning告警关闭。
当产生Critical级别的告警后,当连续出现1分钟 < 0.8区间的事件数据时,产生Close事件把Critical告警关闭。
Warn create epl:
insert into Warn select (select value from GenericMetric.std:lastevent()) as value, (select name from GenericMetric.std:lastevent()) as name, (select tags from GenericMetric.std:lastevent()) as tags,current_timestamp as timestamp from pattern[ every (GenericMetric(value >= 0.5 and value < 0.8) -> ( timer:interval(60 sec) and not GenericMetric(value < 0.5 or value >= 0.8)))]
Critical create epl:
insert into Critical select (select value from GenericMetric.std:lastevent()) as value, (select name from GenericMetric.std:lastevent()) as name, (select tags from GenericMetric.std:lastevent()) as tags,current_timestamp as timestamp from pattern[ every (GenericMetric(value >= 0.8) -> ( timer:interval(60 sec) and not GenericMetric(value < 0.8)))]
Warn close epl:
insert into Close select (select name from GenericMetric.std:lastevent()) as name,current_timestamp as timestamp from pattern [every (Warn -> ( timer:interval(60 sec) and GenericMetric(value < 0.5) and not GenericMetric(value >= 0.5)))]
Critical close epl:
insert into Close select (select name from GenericMetric.std:lastevent()) as name,current_timestamp as timestamp from pattern [every (Critical -> ( timer:interval(60 sec) and GenericMetric(value < 0.8) and not GenericMetric(value >= 0.8)))]
如果 没有CEP,要实现上述告警规则需要构建状态机,以及对状态跳转过程中的历史数据的记录,实现起来比较复杂,但是基于esper,对应每个规则只是一句简单的EPL,而且多个规则可以同时起作用,代码可读性以及扩展性都比构建状态机要好很多。
小结
本篇是从基础架构监控的层面来介绍了CEP,当然CEP更强大的功能是对结构化后的日志数据进行模式匹配,与复杂的业务规则进行对应,发挥更大的业务价值,后续的CEP系列文章里面会更多从日志数据挖掘的方面去做相关介绍。此外,基于AKKA构建分布式的监控告警系统也是一个很有意思的主题,充分利用了actor的轻量级优势,通过创建百万甚至千万级别的actor与各指标进行对应,充分利用了服务器的计算资源,在相对普通的服务器集群上可以支撑海量的指标实时有状态监控,后续也会单独进行介绍。