Kafka Log Compaction 解析

最近查看Kafka文档, 发现 Kafka 有个 Log Compaction 功能是我们之前没有留意到的, 但是有着很高的潜在实用价值.
什么是Log Compaction
Kafka 中的每一条数据都有一对 Key 和 Value, 数据存放在磁盘上, 一般不会被永久保留, 而是在到达一定的量或者时间后对最早写入的数据进行删除. Log Compaction 在默认的删除规则之外提供了另一种删除过时数据(或者说保留有价值的数据)的方式, 就是对于有相同 Key 的不同数据, 只保留最后一条, 前面的数据在合适的情况下删除.
Log Compaction 的应用场景
Log Compaction 特性, 就实时计算而言, 可以在灾难恢复方面有很好地应用场景. 比如说我们在 Storm 里做计算时, 需要长期在内存里维护一些数据, 这些数据可能是通过聚合了一天或者一周的日志得到的, 这些数据一旦由于偶然的原因(磁盘,网络等)崩溃了, 从头开始计算需要漫长的时间.一个可行的应对方法是定时将内存里的数据备份到外部存储中, 比如 Redis 或者 Mysql 等, 当崩溃发生的时候再从外部存储读回来继续计算.
使用 Log Compaction 来代替这些外部存储有以下好处.
Kafka 既是数据源又是存储工具, 可以简化技术栈, 降低维护成本.

使用 Mysql 或者 Redis 作为外部存储的话, 需要将存储的 Key 记录下来, 恢复时再用这些 Key 将数据取回, 实现起来有一定的工程复杂度. 用Log Compaction 特性的话只要把数据一股脑儿地写进 Kafka, 等灾难恢复的时候再读回内存就行了.

Kafka 针对磁盘读写都有很高的顺序性, 相对于 Mysql 没有索引查询等工作量的负担, 可以实现高性能, 相对于 Redis 而言, 它可以充分利用廉价的磁盘而对内存要求很低, 在接近的性能下能实现非常高的性价比(仅仅针对灾难恢复这个场景而言).

实现方式的简要介绍
当 topic 的 cleanup.policy (默认为delete) 设置为 compact 时, Kafka 的后台线程会定时把 topic 遍历两次, 第一次把每个 key 的哈希值最后一次出现的 offset 都存下来, 第二次检查每个 offset 对应的 key 是否在更后面的日志中出现过,如果出现了就删除对应的日志.


源码解析
Log Compaction 的大部分功能由CleanerThread完成, 核心逻辑在 Cleaner 的 clean方法

/** * Clean the given log * * @param cleanable The log to be cleaned * * @return The first offset not cleaned and the statistics for this round of cleaning */ private[log] def clean(cleanable: LogToClean): (Long, CleanerStats) = { val stats = new CleanerStats() info("Beginning cleaning of log %s.".format(cleanable.log.name)) val log = cleanable.log // build the offset map info("Building offset map for %s...".format(cleanable.log.name)) val upperBoundOffset = cleanable.firstUncleanableOffset buildOffsetMap(log, cleanable.firstDirtyOffset, upperBoundOffset, offsetMap, stats) // <----- 这里第一次遍历所有offset将key索引 val endOffset = offsetMap.latestOffset + 1 stats.indexDone() // figure out the timestamp below which it is safe to remove delete tombstones // this position is defined to be a configurable time beneath the last modified time of the last clean segment val deleteHorizonMs = log.logSegments(0, cleanable.firstDirtyOffset).lastOption match { case None => 0L case Some(seg) => seg.lastModified - log.config.deleteRetentionMs } // determine the timestamp up to which the log will be cleaned // this is the lower of the last active segment and the compaction lag val cleanableHorizonMs = log.logSegments(0, cleanable.firstUncleanableOffset).lastOption.map(_.lastModified).getOrElse(0L) // group the segments and clean the groups info("Cleaning log %s (cleaning prior to %s, discarding tombstones prior to %s)...".format(log.name, new Date(cleanableHorizonMs), new Date(deleteHorizonMs))) for (group <- groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize, log.config.maxIndexSize)) cleanSegments(log, group, offsetMap, deleteHorizonMs, stats) // <-- 这里第二次遍历所有offset,删除冗余的日志,并且将多个小的segment合并为一个 // record buffer utilization stats.bufferUtilization = offsetMap.utilization stats.allDone() (endOffset, stats) }

log compaction 通过两次遍历所有数据来实现, 两次遍历之间交流的媒介就是一个OffsetMap, 下面是OffsetMap的签名
trait OffsetMap { def slots: Int def put(key: ByteBuffer, offset: Long) def get(key: ByteBuffer): Long def clear() def size: Int def utilization: Double = size.toDouble / slots def latestOffset: Long}

这基本就是个普通的mutable map, 在 Kafka 项目中,它的实现只有一个, 叫做SkimpyOffsetMap
put方法
put 方法会为每个 key 生成一份摘要,默认使用 md5 方法生成一个 16byte 的摘要, 根据这个摘要在 bytes
中哈希的到一个下标, 如果这个下标已经被别的摘要占据, 则线性查找到下个空余的下标为止, 然后在对应位置插入该 key 对应的 offset
/** * Associate this offset to the given key. * @param key The key * @param offset The offset */override def put(key: ByteBuffer, offset: Long) { require(entries < slots, "Attempt to add a new entry to a full offset map.") lookups += 1 hashInto(key, hash1) // probe until we find the first empty slot var attempt = 0 var pos = positionOf(hash1, attempt) while(!isEmpty(pos)) { bytes.position(pos) bytes.get(hash2) if(Arrays.equals(hash1, hash2)) { // we found an existing entry, overwrite it and return (size does not change) bytes.putLong(offset) lastOffset = offset return } attempt += 1 pos = positionOf(hash1, attempt) } // found an empty slot, update it--size grows by 1 bytes.position(pos) bytes.put(hash1) bytes.putLong(offset) lastOffset = offset entries += 1}

get方法
get 方法使用和 put 同样的摘要算法获得 key 的摘要, 通过摘要获得 offset 的存储位置
/** * Get the offset associated with this key. * @param key The key * @return The offset associated with this key or -1 if the key is not found */ override def get(key: ByteBuffer): Long = { lookups += 1 hashInto(key, hash1) // search for the hash of this key by repeated probing until we find the hash we are looking for or we find an empty slot var attempt = 0 var pos = 0 //we need to guard against attempt integer overflow if the map is full //limit attempt to number of slots once positionOf(..) enters linear search mode val maxAttempts = slots + hashSize - 4 do { if(attempt >= maxAttempts) return -1L pos = positionOf(hash1, attempt) bytes.position(pos) if(isEmpty(pos)) return -1L bytes.get(hash2) attempt += 1 } while(!Arrays.equals(hash1, hash2)) bytes.getLong() }

可能的空间问题 性能问题 冲突问题
空间问题
默认情况下, Kafka 用 16 个 byte 存放key的摘要, 用 8 个 byte 存放摘要对应的 offset, 1GB 的空间可以保存 1024* 1024*1024 / 24 = 44,739,242.666...
个 key 对应的数据.
性能问题
这个 log compaction 的原理挺简单, 就是定期把所有日志读两遍,写一遍, cpu 的速度超过磁盘完全不是问题, 只要日志的量对应的读两遍写一遍的时间在可接受的范围内, 它的性能就是可以接受的.
冲突问题
现在的 OffsetMap 唯一的实现名字叫做 SkimpyOffsetMap, 相信你们已经从这个名字里看出端倪, 最初的作者本身也认为这样的实现不够严谨. 这个算法在两个 key 的 md5 值相同的情况下就判断 key 是相同的, 如果遇到了 key 不同而 md5 值相同的情况, 那两个 key 中其中一个的消息就丢失了. 虽然 md5 值相同的概率很低, 但如果真的碰上了, 那就是100%, 概率值再低也没用, 而且从网上的反映看似乎冲突还不少见.
我个人目前想到的处理方案是, 大部分的 key 总长度并不算长, 可以把这类 key 所有可能的情况都md5一遍看一下是否有冲突, 如果没有的话就放心用.

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,670评论 5 460
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,928评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,926评论 0 320
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,238评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,112评论 4 356
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,138评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,545评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,232评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,496评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,596评论 2 310
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,369评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,226评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,600评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,906评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,185评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,516评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,721评论 2 335

推荐阅读更多精彩内容