需求来了
今年上半年做了一个需求,需求目的是获取实时的广告曝光在各个时刻各个类目的占比,发现曝光不足的类目,针对对应类目商家进行引导广告投放。团队当时用的存储介质只有mysql,es,mongodb和公司自研的搜索引擎,都不太适合放大量的时序数据,最后比较了一堆开源框架,从使用和维护成本出发考虑决定还是用了influxdb,(使用感受,啥都好,唯一缺点:分布式要钱,我们也么得钱, 穷是原罪o(╥﹏╥)o),数据不是T0级的数据,不需要长时间的存储,所以单机也就搞定了。
投入上线以后效果还是挺好的,实时性很强,写入也没有延迟什么的,除了前期偶尔出现了一两次influxdb进程突然挂掉以外(应该是我写入姿势太暴力了),后面基本不需要维护了。最后数据的界面大致如下:
原理了解一发
时序数据库算是比较小众的数据库了,除非是做监控什么的,感觉在业务开发上很少用到,influxdb的优点都可以百度到,不赘述了。之前大部分接触的mysql、es的原理也无非B+树,Trie树 ,读了一些简介以后发现influxdb实现的原理是lsm-tree,就研究了一下这种数据结构。
lsm-tree
全称 Log Structured Merge Tree,理解下字面意思,日志结构的合并树,很明显了,日志结构嘛,就是有顺序的,一条条往后面怼的,这是个粗略的理解。顺序的读比B+树这种随机读肯定是要快很多了。关于lsm的原理,网络上有很多文章,摘选一个可以作为基础阅读了解一下 lsm概述
看着这个图也可以发现几个特点了,树的数据在内存和磁盘中都有,读源码可以发现,influxdb文件级别的树总共有4层。
针对这一块的原理,可以直接开始看influxdb的文件结构。influxdb有两个核心的文件目录,data和wal,wal不赘述了,预写日志,很多数据库都有这个,data下面存放的是实际的数据,截了一个实际的表里的数据:
可以发现tsm的一些文件,其实就是lsm,前面是一个序列号,后面的是实际的lsm-tree的层级。根据更新时间就可以发现层级较大的文件合并的时间间隔较长,04是八小时一次,02是两小时一次,刚刚好四个小时的02 level的可以聚合成一个04 level的文件;文件大小也是随层级增大而增大的,但是为什么01和02的差距那么小?可能和压缩的比例有关?
看到这里感觉lsm和一个数据结构很相似 -- skiplist,跳跃表也是一种提升查询效率的数据结构,数据也是按照分层级按照一定顺序存储的。只不过lsm里面是一个树形结构不是完全的链表形式。
因为主要被分为内存和磁盘两块代码,先从内存级入手捋一捋。
C0 tree -- cache.go
先来看看C0级别的源码: tsdb/tsm1/cache.go
cache设计的初衷是为了让写在wal的数据也能够被查询到,因为可能还没有被写入文件。
// Cache maintains an in-memory store of Values for a set of keys.
type Cache struct {
mu sync.RWMutex
store *ring
maxSize uint64
// snapshots are the cache objects that are currently being written to tsm files
// they're kept in memory while flushing so they can be queried along with the cache.
// they are read only and should never be modified
snapshot *Cache
snapshotting bool
//监控数据忽略
tracker *cacheTracker
lastSnapshot time.Time
lastWriteTime time.Time
}
其中有一个snapshot,注释写的很清楚,内存中的数据可以被查询但是不会被修改,因为准备被写入tsm文件了,这个只是为了提升查询的速度,减少磁盘的读取,如果能在内存中读到就不会去磁盘进行读取了。store是实际存储数据的结构ring,里面包含了多个分区,每个分区会根据你的key -- 存储的索引hash出来的值,来进行存储。所以实际cache的写入就是按照索引的值分区存入内存。cacheTracker及后续的两个时间都是监控所用,暂时忽略。
type ring struct {
// Number of keys within the ring. This is used to provide a hint for
// allocating the return values in keys(). It will not be perfectly accurate
// since it doesn't consider adding duplicate keys, or trying to remove non-
// existent keys.
keysHint int64
// 16个分区
partitions [numPartitions]*partition
}
// partition provides safe access to a map of series keys to entries.
type partition struct {
mu sync.RWMutex
store map[string]*entry
}
// entry is a set of values and some metadata.
//string是索引的key 跟踪entry
type entry struct {
mu sync.RWMutex
values Values // All stored values.
// 数据类型
vtype byte
}
//values的枚举
type (
Value = value.Value
IntegerValue = value.IntegerValue
UnsignedValue = value.UnsignedValue
FloatValue = value.FloatValue
BooleanValue = value.BooleanValue
StringValue = value.StringValue
)
// Value represents a TSM-encoded value. 这行注释很明显了,tsm数据的结构,
//里面包含时间戳,实际值
type Value interface {
// UnixNano returns the timestamp of the value in nanoseconds since unix epoch.
UnixNano() int64
// Value returns the underlying value.
Value() interface{}
// Size returns the number of bytes necessary to represent the value and its timestamp.
Size() int
// String returns the string representation of the value and its timestamp.
String() string
// internalOnly is unexported to ensure implementations of Value
// can only originate in this package.
internalOnly()
}
上面的源代码中values是一个数组,数组的顺序是按时间顺序直接往后加的,连搜索特定时间点的数据的search(v int64)这个方法是一个根据时间戳的二分查找,那么问题来了,用过influxdb的api你会发现数据时间戳你是可以指定的,如果小的时间戳的数据被后写入了,如何保证最后写入文件的数据是按时间戳排序呢?(不按时间排序你还叫啥时序数据库2333),从Values数组的代码里就有了答案,在merge过程中会删除重复的时间戳并且判断有时间戳乱序后重新排序,这样的话即使写入内存的时候时间戳是乱序的,最终写入文件的时候依旧是按时间戳排序的。核心代码如下:
//去重的函数
func (a Values) Deduplicate() Values {
if len(a) <= 1 {
return a
}
// See if we're already sorted and deduped
var needSort bool
//先遍历看是否乱序或有重复值,是则进行排序
for i := 1; i < len(a); i++ {
if a[i-1].UnixNano() >= a[i].UnixNano() {
needSort = true
break
}
}
if !needSort {
return a
}
//排序
sort.Stable(a)
//去重
var i int
for j := 1; j < len(a); j++ {
v := a[j]
if v.UnixNano() != a[i].UnixNano() {
i++
}
a[i] = v
}
return a[:i+1]
}
下面这个图是从另外一个博客中取的,画得比较好不重复画了:
其他的一些核心方法注意一下的就是数据的批量写入,WriteMulti的操作不是原子的,超出内存部分会抛异常但是可以写入的部分会成功,本质就是Write操作写了个for循环……
再来看快照方法,snapshot的内容最终会被写入tsm文件:
// Snapshot takes a snapshot of the current cache, adds it to the slice of caches that
// are being flushed, and resets the c[图片上传中...(image.png-c14626-1565837031888-0)]
urrent cache with new values.
func (c *Cache) Snapshot() (*Cache, error) {
c.mu.Lock()
defer c.mu.Unlock()
if c.snapshotting {
return nil, ErrSnapshotInProgress
}
c.snapshotting = true
c.tracker.IncSnapshotsActive() // increment the number of times we tried to do this
// If no snapshot exists, create a new one, otherwise update the existing snapshot
if c.snapshot == nil {
c.snapshot = &Cache{
store: newRing(),
tracker: newCacheTracker(c.tracker.metrics, c.tracker.labels),
}
}
// Did a prior snapshot exist that failed? If so, return the existing
// snapshot to retry.
if c.snapshot.Size() > 0 {
return c.snapshot, nil
}
//这是个语法糖 这一段就是在snapshot为空时把当前的cache的ring存到了snapshot中,把c.store置为空,
//这里操作是不按顺序执行的 ,是类似java的交换变量,会使用一个中间临时变量的
c.snapshot.store, c.store = c.store, c.snapshot.store
snapshotSize := c.Size()
// Reset the cache's store.
c.store.reset()
//省略监控代码
return c.snapshot, nil
}
这一段的逻辑其实就是lsm中把memtable变成immutable memtable的一段实现。
到此l0核心部分的代码分析完毕。
总结一下
总体来看内存里的逻辑并不复杂,主要核心是要理清楚ring的层级结构。下一篇讲一下C1 tree的磁盘文件的合并策略。点我前往下半场