[LevelDB/源码]Put操作的流程源码分析

LevelDB作为一个Key-Value的NoSQL数据库,其最基本的操作就是Put,即插入一对<key, value>记录, 本文将以源码走读的方式解析数据记录插入数据库的基本流程

  1. 如下代码所示是Put的函数实现,其和delete操作共用putRec函数。
func (db *DB) Put(key, value []byte, wo *opt.WriteOptions) error {
    return db.putRec(keyTypeVal, key, value, wo)
}

2.putRec函数是实际的Put插入实现函数单看putRec函数会发现该函数非常难理解。在这里作者使用writeMergeC、writeMergedC、writeAckC和writeLockC共同控制多线程的数据插入和合并操作。这四个channel的定义如下,注意其中writeLockC的channel可以缓存一个对象。

writeMergeC:  make(chan writeMerge),
writeMergedC: make(chan bool),
writeLockC:   make(chan struct{}, 1),
writeAckC:    make(chan error),

其中writeMergeC的channel可以携带插入的数据,其工作原理如下图所示, 假设同时有c1,c2和c3的线程尝试插入数据,这时他们会在writeMergeC和writeLockC上竞争,由于其定义的不同。第一个进入的线程必然会获取writeLockC,也即获取写锁我们假设是图中的C1。此时C1线程会将<key,value>数据线写入batch然后执行writeLock函数。而c2和c3则继续竞争,由于此时写锁被writeLockC独占。c2和c3只能竞争writeMergeC的写入权限。

3.在writeLocked函数内部,会对写入记录执行merge操作,也就是会从writeMergeC中读取当前堵塞的写入并在空间够用的情况下merge成一次写操作。我们假设c2和c3依次获取写入writeMergeC的操作,则其数据最终会形成图中Step2的状态。此时c2 和c3堵塞在如下代码处。

       if <-db.writeMergedC {
        return <-db.writeAckC
      }
阶段1和阶段2.png
  1. putRec函数解析
func (db *DB) putRec(kt keyType, key, value []byte, wo *opt.WriteOptions) error {
    if err := db.ok(); err != nil {
        return err
    }
    //merge 和sync 以数据库的初始化配置为主
    merge := !wo.GetNoWriteMerge() && !db.s.o.GetNoWriteMerge()
    sync := wo.GetSync() && !db.s.o.GetNoSync()

    // Acquire write lock
    if merge {
        select {
        case db.writeMergeC <- writeMerge{sync: sync, keyType: kt, key: key, value: value}:
            //如果能向writeMergeC 写入新插入的key value 数据
            //则等待新的key value与老的数据进行merge操作
            if <-db.writeMergedC { //等待writeMerge的结果,如果merge失败则继续获取写锁

                //merge成功等待merge之后的数据写入结果
                // Write is merged.
                return <-db.writeAckC
            }
            // Write is not merged, the write lock is handed to us. Continue.
        case db.writeLockC <- struct{}{}: //尝试获取写锁
            // Write lock acquired.
        case err := <-db.compPerErrC:
            // Compaction error.
            return err
        case <-db.closeC:
            // Closed
            return ErrClosed
        }
    } else {
        //没有merge的情况直接尝试获取写入锁
        select {
        case db.writeLockC <- struct{}{}:
            // Write lock acquired.
        case err := <-db.compPerErrC:
            // Compaction error.
            return err
        case <-db.closeC:
            // Closed
            return ErrClosed
        }
    }

    batch := db.batchPool.Get().(*Batch)
    batch.Reset()
    batch.appendRec(kt, key, value)
    return db.writeLocked(batch, batch, merge, sync)
}

4.writeLocked 函数解析

writeLocked函数是c1的真正执行写入的函数,其写入的主要流程包括:

  • 获取内存数据库memDB,如果空间不足则扩容;
  • 如果任由merge空间和数据,执行merge逻辑;
  • 写日志信息;
  • 数据写入内存;
  • 释放相关所,以及通知给等待线程执行结果(如图中c2和c3)
func (db *DB) writeLocked(batch, ourBatch *Batch, merge, sync bool) error {
    // Try to flush memdb. This method would also trying to throttle writes
    // if it is too fast and compaction cannot catch-up.

    //1.尝试flush db的数据 如果有需要
    // 返回DB的mdb以及mdb的剩余空间,如果mdbFree不够则会对mdb进行扩容操作
    mdb, mdbFree, err := db.flush(batch.internalLen)

    if err != nil {
        db.unlockWrite(false, 0, err)
        return err
    }
    defer mdb.decref() //释放当前引用数量

    var (
        overflow bool
        merged   int
        batches  = []*Batch{batch}
    )

    if merge { // 需要merge的情况
        // Merge limit.
        var mergeLimit int
        //控制merge的数量不是特别大
        if batch.internalLen > 128<<10 {
            mergeLimit = (1 << 20) - batch.internalLen
        } else {
            mergeLimit = 128 << 10
        }
        mergeCap := mdbFree - batch.internalLen
        if mergeLimit > mergeCap {
            mergeLimit = mergeCap
        }
        //控制最大能够merge的量

    merge:
        for mergeLimit > 0 {
            select {
            case incoming := <-db.writeMergeC:
                if incoming.batch != nil { //writeMergeC 中存储的是batch的情况
                    // Merge batch.
                    if incoming.batch.internalLen > mergeLimit {
                        overflow = true
                        break merge
                    }
                    batches = append(batches, incoming.batch)
                    mergeLimit -= incoming.batch.internalLen
                } else {
                    // Merge put.
                    internalLen := len(incoming.key) + len(incoming.value) + 8
                    if internalLen > mergeLimit {
                        overflow = true
                        break merge
                    }
                    if ourBatch == nil {
                        ourBatch = db.batchPool.Get().(*Batch)
                        ourBatch.Reset()
                        batches = append(batches, ourBatch)
                    }
                    // We can use same batch since concurrent write doesn't
                    // guarantee write order.
                    ourBatch.appendRec(incoming.keyType, incoming.key, incoming.value)
                    mergeLimit -= internalLen
                }
                sync = sync || incoming.sync // 同步的情况需要通知写入的等待线程写入完毕
                merged++
                db.writeMergedC <- true

            default:
                break merge
            }
        }
    }

    // Seq number.
    seq := db.seq + 1 //seq是实际batch的数量编号, 此时db的实际seq并未更新

    // Write journal.
    // 2. batch 信息写入日志
    if err := db.writeJournal(batches, seq, sync); err != nil {
        db.unlockWrite(overflow, merged, err)
        return err
    }

    // Put batches.
    // 3. batch 数据写入内存数据库 mendb
    for _, batch := range batches {
        if err := batch.putMem(seq, mdb.DB); err != nil {
            panic(err)
        }
        seq += uint64(batch.Len())
    }

    // Incr seq number.
    db.addSeq(uint64(batchesLen(batches))) //更新db的seq

    // Rotate memdb if it's reach the threshold.
    if batch.internalLen >= mdbFree { //防止下次不够?
        db.rotateMem(0, false)
    }

    db.unlockWrite(overflow, merged, nil)
    return nil
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,670评论 5 460
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,928评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,926评论 0 320
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,238评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,112评论 4 356
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,138评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,545评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,232评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,496评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,596评论 2 310
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,369评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,226评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,600评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,906评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,185评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,516评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,721评论 2 335

推荐阅读更多精彩内容