Prometheus checkpoint源码阅读

由于Prometheus storage很多地方都是借鉴RocksDB的设计思想,下面引用RocksDBcheckpoint的介绍。

Checkpoint is a feature in RocksDB which provides the ability to take a snapshot of a running RocksDB database in a separate directory. Checkpoints can be used as a point in time snapshot, which can be opened Read-only to query rows as of the point in time or as a Writeable snapshot by opening it Read-Write. Checkpoints can be used for both full and incremental backups.

简单来说,Checkpoint是某一个时间点上的snapshot

概念介绍

在介绍checkpoint之前我们先来分析一下Prometheus里面的一些术语及其scrape的数据格式。

# HELP node_cpu_seconds_total Seconds the cpus spent in each mode.
# TYPE node_cpu_seconds_total counter
node_cpu_seconds_total{cpu="0",mode="idle"} 52544.27
node_cpu_seconds_total{cpu="0",mode="nice"} 0
node_cpu_seconds_total{cpu="0",mode="system"} 6582
node_cpu_seconds_total{cpu="0",mode="user"} 7838.02
node_cpu_seconds_total{cpu="1",mode="idle"} 61939.86
node_cpu_seconds_total{cpu="1",mode="nice"} 0
node_cpu_seconds_total{cpu="1",mode="system"} 1991.59
node_cpu_seconds_total{cpu="1",mode="user"} 3031.76

从采样数据可以看出prometheus的数据模型主要包含Metric nameslables,以及samples

每个time series都由metric name和可选的labels唯一标识。

<metric name>{<label name>=<label value>, ...}

Every time series is uniquely identified by its metric name and optional key-value pairs called labels.

Labels enable Prometheus's dimensional data model: any given combination of labels for the same metric name identifies a particular dimensional instantiation of that metric.

Samples包括一个浮点值以及毫秒精度的时间戳。

Samples form the actual time series data. Each sample consists of:

  • a float64 value
  • a millisecond-precision timestamp

Prometheus metric types主要包含四种,CounterGaugeHistogram以及Summary


源码分析

(ENV) 🍺 /Users/xsky/go/src/github.com/microyahoo/prometheus ☞ tree data -h
data
├── [ 192]  01DPE8T5XPQ9ZYHSNJYBJBKGR6
│   ├── [  96]  chunks
│   │   └── [7.1K]  000001
│   ├── [ 22K]  index
│   ├── [ 272]  meta.json
│   └── [   9]  tombstones
├── [   0]  lock
├── [ 20K]  queries.active
└── [ 256]  wal
    ├── [   0]  00000050
    ├── [   0]  00000051
    ├── [   0]  00000052
    ├── [   0]  00000053
    ├── [ 10K]  00000054
    └── [  96]  checkpoint.000049
        └── [ 32K]  00000000

4 directories, 12 files

由于下面的源码分析会用到上述的目录结构,从上述可以看到wal目录下有一个checkpoint.N的目录,目录下包含相应的checkpoint文件。

// Checkpoint creates a compacted checkpoint of segments in range [first, last] in the given WAL.
// It includes the most recent checkpoint if it exists.
// All series not satisfying keep and samples below mint are dropped.
//
// The checkpoint is stored in a directory named checkpoint.N in the same
// segmented format as the original WAL itself.
// This makes it easy to read it through the WAL package and concatenate
// it with the original WAL.
func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64) (*CheckpointStats, error) {
    stats := &CheckpointStats{}
    var sgmReader io.ReadCloser

    {

        var sgmRange []wal.SegmentRange
        // 查找wal目录下查找最近的checkpoint,
        // 由于有可能有多个checkpoint,因此找的是最近的。
        // 返回checkpoint的目录,以及checkpoint目录的后缀索引,
        // 例如上面的目录结构中idx=49
        dir, idx, err := LastCheckpoint(w.Dir())
        if err != nil && err != ErrNotFound {
            return nil, errors.Wrap(err, "find last checkpoint")
        }
        last := idx + 1
        // 这个地方需要判断一下是因为有可能没有打过checkpoint
        if err == nil {
            if from > last {
                return nil, fmt.Errorf("unexpected gap to last checkpoint. expected:%v, requested:%v", last, from)
            }
            // Ignore WAL files below the checkpoint. They shouldn't exist to begin with.
            // from从最后一次checkpoint的index+1开始
            // 也就是说前面的已经打过checkpoint了
            from = last

            sgmRange = append(sgmRange, wal.SegmentRange{Dir: dir, Last: math.MaxInt32})
        }
        
        // 起始段范围
        sgmRange = append(sgmRange, wal.SegmentRange{Dir: w.Dir(), First: from, Last: to})
        // 将其包装成SegmentRangeReader
        sgmReader, err = wal.NewSegmentsRangeReader(sgmRange...)
        if err != nil {
            return nil, errors.Wrap(err, "create segment reader")
        }
        defer sgmReader.Close()
    }

    cpdir := filepath.Join(w.Dir(), fmt.Sprintf(checkpointPrefix+"%06d", to))
    cpdirtmp := cpdir + ".tmp"

    // 创建checkpoint.XXXXXX.tmp的临时目录
    if err := os.MkdirAll(cpdirtmp, 0777); err != nil {
        return nil, errors.Wrap(err, "create checkpoint dir")
    }
    // 在checkpoint.XXXXXX.tmp的临时目录中创建WAL段文件,
    // 如果目录为空,则初始WAL index为0,即00000000
    cp, err := wal.New(nil, nil, cpdirtmp, w.CompressionEnabled())
    if err != nil {
        return nil, errors.Wrap(err, "open checkpoint")
    }

    // Ensures that an early return caused by an error doesn't leave any tmp files.
    defer func() {
        cp.Close()
        os.RemoveAll(cpdirtmp)
    }()

    r := wal.NewReader(sgmReader)
    
    var (
        series  []RefSeries
        samples []RefSample
        tstones []Stone
        dec     RecordDecoder
        enc     RecordEncoder
        buf     []byte
        recs    [][]byte
    )
    // 依次读取每个record
    for r.Next() {
        series, samples, tstones = series[:0], samples[:0], tstones[:0]

        // We don't reset the buffer since we batch up multiple records
        // before writing them to the checkpoint.
        // Remember where the record for this iteration starts.
        start := len(buf)
        rec := r.Record()

        // 判断record的类型
        switch dec.Type(rec) {
        case RecordSeries:
            // 先decode所有的series记录,下面有详细解释
            series, err = dec.Series(rec, series)
            if err != nil {
                return nil, errors.Wrap(err, "decode series")
            }
            // Drop irrelevant series in place.
            repl := series[:0]
            // 根据keep判断哪些需要保留
            for _, s := range series {
                if keep(s.Ref) {
                    repl = append(repl, s)
                }
            }
            // 将series encode到buf中
            if len(repl) > 0 {
                buf = enc.Series(repl, buf)
            }
            // 统计总共的,以及丢弃的series
            stats.TotalSeries += len(series)
            stats.DroppedSeries += len(series) - len(repl)

        case RecordSamples:
            // 从rec中decode所有的samples,下面有详细解释
            samples, err = dec.Samples(rec, samples)
            if err != nil {
                return nil, errors.Wrap(err, "decode samples")
            }
            // Drop irrelevant samples in place.
            repl := samples[:0]
            for _, s := range samples {
                // 将Samples中T小于mint的过滤掉
                if s.T >= mint {
                    repl = append(repl, s)
                }
            }
            if len(repl) > 0 {
                // 将samples encode到buf中
                buf = enc.Samples(repl, buf)
            }
            stats.TotalSamples += len(samples)
            stats.DroppedSamples += len(samples) - len(repl)

        case RecordTombstones:
            tstones, err = dec.Tombstones(rec, tstones)
            if err != nil {
                return nil, errors.Wrap(err, "decode deletes")
            }
            // Drop irrelevant tombstones in place.
            repl := tstones[:0]
            for _, s := range tstones {
                for _, iv := range s.intervals {
                    // TODO why?
                    if iv.Maxt >= mint {
                        repl = append(repl, s)
                        break
                    }
                }
            }
            if len(repl) > 0 {
                buf = enc.Tombstones(repl, buf)
            }
            stats.TotalTombstones += len(tstones)
            stats.DroppedTombstones += len(tstones) - len(repl)

        default:
            return nil, errors.New("invalid record type")
        }
        if len(buf[start:]) == 0 {
            continue // All contents discarded.
        }
        recs = append(recs, buf[start:])

        // Flush records in 1 MB increments.
        // 每当buf累积到1MB时flush一次
        if len(buf) > 1*1024*1024 {
            if err := cp.Log(recs...); err != nil {
                return nil, errors.Wrap(err, "flush records")
            }
            buf, recs = buf[:0], recs[:0]
        }
    }
    // If we hit any corruption during checkpointing, repairing is not an option.
    // The head won't know which series records are lost.
    if r.Err() != nil {
        return nil, errors.Wrap(r.Err(), "read segments")
    }

    // Flush remaining records.
    // flush剩余的records
    if err := cp.Log(recs...); err != nil {
        return nil, errors.Wrap(err, "flush records")
    }
    if err := cp.Close(); err != nil {
        return nil, errors.Wrap(err, "close checkpoint")
    }
    // 将checkpoint.XXXXXX.tmp重命名为checkpoint.XXXXXX
    if err := fileutil.Replace(cpdirtmp, cpdir); err != nil {
        return nil, errors.Wrap(err, "rename checkpoint directory")
    }

    return stats, nil
}

从代码中可以看出从SegmentRangeReader中依次读取所有的records, 每读取一条record会先判断其类型,record主要有三种类型Series, Samples, Tombstones, 如下所示:

// RecordSeries is used to match WAL records of type Series.
RecordSeries RecordType = 1
// RecordSamples is used to match WAL records of type Samples.
RecordSamples RecordType = 2
// RecordTombstones is used to match WAL records of type Tombstones.
RecordTombstones RecordType = 3

按照代码描述record中第一个字节为type。

  • 如果类型为Series,则需要从record中decode所有的series记录。
// Series appends series in rec to the given slice.
// 从rec中decode所有的series
func (d *RecordDecoder) Series(rec []byte, series []RefSeries) ([]RefSeries, error) {
    dec := encoding.Decbuf{B: rec}

    if RecordType(dec.Byte()) != RecordSeries {
        return nil, errors.New("invalid record type")
    }
    for len(dec.B) > 0 && dec.Err() == nil {
        ref := dec.Be64()

        lset := make(labels.Labels, dec.Uvarint())

        for i := range lset {
            lset[i].Name = dec.UvarintStr()
            lset[i].Value = dec.UvarintStr()
        }
        sort.Sort(lset)

        series = append(series, RefSeries{
            Ref:    ref,
            Labels: lset,
        })
    }
    if dec.Err() != nil {
        return nil, dec.Err()
    }
    if len(dec.B) > 0 {
        return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.B))
    }
    return series, nil
}
  • 分析如下:
    • 第一个字节为type
    • 接下来的8个字节为ref
    • 接着读取可变长度的字节slice,表示Lables的个数。
    • 接着是Name和value组成的label键值对。
      • 其中解析Name和Value时都是先读取Name和Value的长度,接着读取指定长度的[]byte,将其转化为字符串。
      +-----------------+-----------------+
      |       Name      |      Value      |
      +-----------+-----+-----------+-----+
      | Uvarint64 | len | Uvarint64 | len |
      +-----------------+-----------+-----+
      
    • 接着将labels按照name进行排序。
    • 生成一条完整的Series记录。
+-----------+----------+-------------+------+--------+------+-------+-----+-----+----------+-------------+------+--------+------+-------+-----+-----+
| type <1b> | ref <8b> | len(lables) | name | value  | name | value | ... | ... | ref <8b> | len(lables) | name | value  | name | value | ... | ... |
+-----------+----------+-------------+------+--------+------+-------+-----+-----+----------+-------------+------+--------+------+-------+-----+-----+
    byte       uint64     Uvarint
  • 如果类型为Sampels
// Samples appends samples in rec to the given slice.
func (d *RecordDecoder) Samples(rec []byte, samples []RefSample) ([]RefSample, error) {
    dec := encoding.Decbuf{B: rec}

    if RecordType(dec.Byte()) != RecordSamples {
        return nil, errors.New("invalid record type")
    }
    if dec.Len() == 0 {
        return samples, nil
    }
    var (
        baseRef  = dec.Be64()
        baseTime = dec.Be64int64()
    )
    for len(dec.B) > 0 && dec.Err() == nil {
        dref := dec.Varint64()
        dtime := dec.Varint64()
        val := dec.Be64()

        samples = append(samples, RefSample{
            Ref: uint64(int64(baseRef) + dref),
            T:   baseTime + dtime,
            V:   math.Float64frombits(val),
        })
    }

    if dec.Err() != nil {
        return nil, errors.Wrapf(dec.Err(), "decode error after %d samples", len(samples))
    }
    if len(dec.B) > 0 {
        return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.B))
    }
    return samples, nil
}

具体格式如下:

+-----------+--------------+---------------+------+--------+------+-------+--------+-----+-----+-----+-----+
| type <1b> | baseRef <8b> | baseTime <8b> | dref | dtime  |  val | dref  |  dtime | val | ... | ... | ... | 
+-----------+--------------+---------------+------+--------+------+-------+--------+-----+-----+-----+-----+
    byte        uint64          int64      Varint64 Varint64 uint64

其中RefSample:

RefSample{
    Ref: uint64(int64(baseRef) + dref),
    T:   baseTime + dtime,
    V:   math.Float64frombits(val),
}
  • 如果类型为Tombstones
// Tombstones appends tombstones in rec to the given slice.
func (d *RecordDecoder) Tombstones(rec []byte, tstones []Stone) ([]Stone, error) {
    dec := encoding.Decbuf{B: rec}

    if RecordType(dec.Byte()) != RecordTombstones {
        return nil, errors.New("invalid record type")
    }
    for dec.Len() > 0 && dec.Err() == nil {
        // TODO 这个地方还得确认一下
        // 这里每个stone只有一个Interval?
        tstones = append(tstones, Stone{
            ref: dec.Be64(),
            intervals: Intervals{
                {Mint: dec.Varint64(), Maxt: dec.Varint64()},
            },
        })
    }
    if dec.Err() != nil {
        return nil, dec.Err()
    }
    if len(dec.B) > 0 {
        return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.B))
    }
    return tstones, nil
}

具体格式如下:

+-----------+----------+------+--------+-----------+------+---------+-----+-----+-----+
| type <1b> | ref <8b> | minT |  maxT  |  ref <8b> | minT |  maxT   | ... | ... | ... |
+-----------+----------+------+--------+-----------+------+---------+-----+-----+-----+
    byte      uint64   Varint64 Varint64 

其中Stone:

Stone{
    ref: dec.Be64(),
    intervals: Intervals{
        {
            Mint: dec.Varint64(), 
            Maxt: dec.Varint64()
        },
    }
}

问题点

  • SegmentRangeReader中读取一条record后会先判断其type,第一个字节表示其类型吗?目前还没看到。
  • RecordDecoder.Tombstones中每个Stone只对应一个Interval吗?

References

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,602评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,442评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,878评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,306评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,330评论 5 373
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,071评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,382评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,006评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,512评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,965评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,094评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,732评论 4 323
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,283评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,286评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,512评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,536评论 2 354
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,828评论 2 345

推荐阅读更多精彩内容