以太坊源码(一)Merkle-Patricia Trie(MPT)的实现

Merkle-PatriciaTrie(MPT)是Ethereum中一种非常重要的数据结构,用来存储用户账户的状态及其变更、交易信息、交易的收据信息。

每一个以太坊的区块头包含三颗MPT树,分别是

  • 交易树
  • 收据树(交易执行过程中的一些数据)
  • 状态树(账号信息, 合约账户和用户账户)

下图中是两个区块头,其中state root,tx root receipt root分别存储了这三棵树的树根,第二个区块显示了当账号 175的数据变更(27 -> 45)的时候,只需要存储跟这个账号相关的部分数据,而且老的区块中的数据还是可以正常访问


state_root.png

Ethereum 使用的Merkle-PatriciaTrie(MPT)结构,源自于Trie结构,又分别继承了PatriciaTrie和MerkleTree的优点,并基于内部数据的特性,设计了全新的节点体系和插入/载入机制。

Trie树, Patricia Trie, 和Merkle树

Trie,又称为字典树或者前缀树 (prefix tree),属于查找树的一种。它与平衡二叉树的主要不同点包括:

  • 每个节点数据所携带的 key 不会存储在 Trie 的节点中,而是通过该节点在整个树形结构里位置来体现(下图中标注出完整的单词,只是为了演示Trie的原理);
  • 同一个父节点的子节点,共享该父节点的 key 作为它们各自 key 的前缀,因此根节点 key 为空;
  • 待存储的数据只存于叶子节点和部分内部节点中,非叶子节点帮助形成叶子节点 key 的前缀。下图展示了一个简单的 Trie 结构。


    trie.png

PatriciaTrie,又被称为 RadixTree 或紧凑前缀树 (compact prefix tree),是一种空间使用率经过优化的 Trie。与 Trie 不同的是,PatriciaTrie 里如果存在一个父节点只有一个子节点,那么这个父节点将与其子节点合并。这样可以缩短 Trie 中不必要的深度,大大加快搜索节点速度。

Patricia_trie.svg.png

MerkleTree,也叫哈希树 (hash tree),是密码学的一个概念,注意理论上它不一定是 Trie。在哈希树中,叶子节点的标签是它所关联数据块的哈希值,而非叶子节点的标签是它的所有子节点的标签拼接而成字符串的哈希值。

要了解 Merkle Tree 就要先从 Hash List 说起:

在点对点网络中作数据传输的时候,会同时从多个机器上下载数据,而且很多机器可以认为是不稳定或者不可信的。为了校验数据的完整性,更好的办法是把大的文件分割成小的数据块。这样的好处是,如果小块数据在传输过程中损坏了,那么只要重新下载这一小块数据就行了,不用重新下载整个文件。

怎么确定小的数据块没有损坏哪?只需要为每个数据块做 Hash。BT 下载的时候,在下载到真正数据之前,我们会先下载一个 Hash 列表。那么问题又来了,怎么确定这个 Hash 列表本事是正确的哪?答案是把每个小块数据的 Hash 值拼到一起,然后对这个长字符串在作一次 Hash 运算,这样就得到 Hash 列表的根 Hash(Top Hash or Root Hash)。下载数据的时候,首先从可信的数据源得到正确的根 Hash,就可以用它来校验 Hash 列表了,然后通过校验后的 Hash 列表校验数据块。

700px-Hash_list.svg.png

Merkle Tree 可以看做 Hash List 的泛化(Hash List 可以看作一种特殊的 Merkle Tree,即树高为 2 的多叉 Merkle Tree。

在最底层,和哈希列表一样,我们把数据分成小的数据块,有相应地哈希和它对应。但是往上走,并不是直接去运算根哈希,而是把相邻的两个哈希合并成一个字符串,然后运算这个字符串的哈希,这样每两个哈希就结婚生子,得到了一个” 子哈希 “。于是往上推,最终必然形成一棵倒挂的树,到了树根的这个位置,这一代就剩下一个根哈希了,我们把它叫做 Merkle Root。

 
MerkleTree.png

在 p2p 网络下载网络之前,先从可信的源获得文件的 Merkle Tree 树根。一旦获得了树根,就可以从其他从不可信的源获取 Merkle tree。通过可信的树根来检查接受到的 MerkleTree。如果 Merkle Tree 是损坏的或者虚假的,就从其他源获得另一个 Merkle Tree,直到获得一个与可信树根匹配的 MerkleTree。

Merkle Tree 和 HashList 的主要区别是,可以直接下载并立即验证 Merkle Tree 的一个分支。因为可以将文件切分成小的数据块,这样如果有一块数据损坏,仅仅重新下载这个数据块就行了。如果文件非常大,那么 Merkle tree 和 Hash list 都很大,但是 Merkle tree 可以一次下载一个分支,然后立即验证这个分支,如果分支验证通过,就可以下载数据了。而 Hash list 只有下载整个 hash list 才能验证。

Merkle-Patricia Trie(MPT) 的实现

MPT 是 Ethereum 自定义的 Trie 型数据结构。在代码中,trie.Trie 结构体用来管理一个 MPT 结构,其中每个节点都是行为接口 Node 的实现类。下图是 Trie 结构体和 node 接口族的 UML 关系图:

MPT.png
  • 在 Trie 结构体中,成员 root 始终作为整个 MPT 的根节点;

  • db是后端的KV存储,trie的结构最终都是需要通过KV的形式存储到数据库里面去,然后启动的时候是需要从数据库里面加载的

  • originalRoot 的作用是在创建 Trie 对象时承接入参 hashNode,通过这个hash值可以在数据库里面恢复出整颗的trie树

  • cachegen 是 cache 次数的计数器,每次 Trie 的变动提交后cachegen 自增 1。

    • cachegen会被附加在node节点上面(node.nodeFlag.gen),默认等于Trie的cachegen。如果Trie每次Commit的时候node有更新,那么Trie的cachegen会重新赋值给node的cachegen。否则node的cachegen将小于Trie的cachegen。
    • 如果当前Trie的cachegen - cachelimit大于node的cachegen,说明trie提交了cachelimit次之后,node一直没有更新。那么node会从cache里面卸载,以便节约内存。 其实这就是缓存更新的LRU算法, 如果一个缓存在多久没有被使用,那么就从缓存里面移除,以节约内存空间。
  • Trie 结构体提供包括对节点的插入、删除、更新,所有节点改动的提交,以及返回整个 MPT 的哈希值。

node 接口族担当整个 MPT 中的各种节点,node 接口分四种实现: fullNode,shortNode,valueNode,hashNode,其中只有 fullNode 和 shortNode 可以带有子节点。

fullNode 是一个可以携带多个子节点的节点。

  • 它有一个容量为 17 的 node 数组成员变量 Children
  • 数组中前 16 个空位分别对应 16 进制 (hex) 下的 0-9a-f,这样对于每个子节点,根据其 key 值 16 进制形式下的第一位的值,就可挂载到 Children 数组的某个位置,fullNode 本身不再需要额外 key 变量;
  • Children 数组的第 17 位,留给该 fullNode 的数据部分。fullNode 明显继承了原生 trie 的特点,而每个父节点最多拥有 16 个分支也包含了基于总体效率的考量。

shortNode 是一个仅有一个子节点的节点。

  • 它的成员变量 Val 指向一个子节点,而成员 Key 是一个字节数组[]byte。
  • 显然 shortNode 的设计体现了 PatriciaTrie 的特点,通过合并只有一个子节点的父节点和其子节点来缩短 trie 的深度

valueNode 承载了MPT结构中 真正数据部分的节点。

  • 它其实是字节数组 []byte 的一个别名,不带子节点。
  • 在使用中,valueNode 就是所携带数据部分的 RLP 哈希值,长度 32byte,数据的 RLP 编码值作为 valueNode 的匹配项存储在数据库里。

这三种类型构成了一个完整的PatriciaTrie结构。何一个[k,v]类型数据被插入一个MPT时,会以k字符串为路径沿着root向下延伸,在此次插入结束时首先成为一个shortNode,k会以自顶点root起到到该节点止的key path形式存在。但之后随着其他节点的不断插入和删除,根据MPT结构的要求,原有节点可能会变化成其他node实现类型,同时MPT中也会不断裂变或者合并出新的节点。比如:

  • 假设一个shortNode S已经有一个子节点A,现在要新插入一个子节点B,那么会有两种可能,如果节点A为fullNode,那么新节点B沿着A的路径继续向下,这样S的子节点会被更新;如果节点A为valueNode,那么S的Key分裂成两段,前一段的公共部分分配给S作为新的Key,同时裂变出一个新的fullNode作为S的子节点,以同时容纳B,以及需要更新的A。
  • 如果一个fullNode原本只有两个子节点,现在要删除其中一个子节点,那么这个fullNode就会退化为shortNode,同时保留的子节点如果是shortNode,还可以跟它再合并。
  • 如果一个shortNode的子节点是valueNode同时又被删除了,那么这个shortNode就会退化成一个valueNode

注意:黄皮书中把节点类型概括为了分支节点、扩展节点和叶子节点。fullNode对应了黄皮书里面的分支节点,shortNode对应了黄皮书里面的扩展节点和叶子节点(通过shortNode.Val的类型来对应到底是叶子节点还是分支节点,如果是valueNode,就是叶子节点,否则是分支节点)

特殊的那个 - hashNode

hashNode 跟 valueNode 一样,也是字符数组 []byte 的一个别名,同样存放 32byte 的哈希值,也没有子节点。不同的是,hashNode 是 fullNode 或者 shortNode 对象的 RLP 哈希值,所以它跟 valueNode 在使用上有着莫大的不同。

在 MPT 中,hashNode 几乎不会单独存在 (有时遍历遇到一个 hashNode 往往因为原本的 node 被折叠了),而是以 nodeFlag 结构体的成员(nodeFlag.hash) 的形式,被 fullNode 和 shortNode 间接持有。一旦 fullNode 或 shortNode 的成员变量 (包括子结构) 发生任何变化,它们的 hashNode 就一定需要更新。所以在 trie.Trie 结构体的 insert(),delete()等函数实现中,可以看到除了新创建的 fullNode、shortNode,那些子结构有所改变的 fullNode、shortNode 的 nodeFlag 成员也会被重设,hashNode 会被清空。在下次 trie.Hash()调用时,整个 MPT 自底向上的遍历过程中,所有清空的 hashNode 会被重新赋值。这样 trie.Hash()结束后,我们可以得到一个根节点 root 的 hashNode,它就是此时此刻这个 MPT 结构的哈希值。上文中提到的,Block 的成员变量 Root、TxHash、ReceiptHash 的生成,正是源出于此。

明显的,hashNode 体现了 MerkleTree 的特点:每个父节点的哈希值来源于所有子节点哈希值的组合,一个顶点的哈希值能够代表一整个树形结构。hashNode 加上之前的 fullNode,shortNode,valueNode,构成了一个完整的 Merkle-PatriciaTrie 结构

MPT 中对 key 的编码

MPT中涉及到了三种编码,分别为keybytes编码、Hex编码和Compact编码

keybytes 编码这种编码格式就是原生的key字节数组,大部分的Trie的API都是使用这边编码格式

Hex 编码: 当 [k,v] 数据插入 MPT 时,它们的 k(key)都必须经过编码。这时对 key 的编码,要保证原本是 []byte 类型的 key 能够以 16 进制形式按位进入 fullNode.Children[],因为 Children[] 数组最多只能容纳 16 个子节点。相应的,Ethereum 代码中在这里定义了一种编码方式叫 Hex,将 1byte 的字符大小限制在 4bit(16 进制)以内。

先来看 Hex 编码的实现,Hex 编码的基本逻辑如下图:

Hex.png

很简单,就是将 keybytes 中的 1byte 信息,将高 4bit 和低 4bit 分别放到两个 byte 里,最后在尾部加 1byte 标记当前属于 Hex 格式。这样新产生的 key 虽然形式还是 []byte,但是每个 byte 大小已经被限制在 4bit 以内,代码中把这种新数据的每一位称为 nibble。这样经过编码之后,带有[]nibble 格式的 key 的数据就可以顺利的进入 fullNode.Children[] 数组了。

节点被加载到内存里面的时候key使用的是这种编码,因为它的方便访问。

Hex 编码虽然解决了 key 是 keybytes 形式的数据插入 MPT 的问题,但代价也很大,就是数据冗余。典型的如 shortNode,目前 Hex 格式下的 Key,长度会变成是原来 keybytes 格式下的两倍。这一点对于节点的哈希计算,比如计算 hashNode,影响很大。所以 Ethereum 又定义了另一种编码格式叫 Compact,用来对 Hex 格式进行优化。

Compact 编码:又叫 hex prefix 编码,它的主要意图是将 Hex 格式的字符串恢复到 keybytes 的格式,同时要加入当前 Compact 格式的标记位,还要考虑在奇偶不同长度 Hex 格式字符串下,避免引入多余的 byte。

compact.png

如上图所示,

  • Compact 编码首先将 Hex 尾部标记 byte 去掉,然后将原本每 2 nibble 的数据合并到 1byte;
  • 增添 1byte 在输出数据头部以放置 Compact 格式标记位00100000;
  • 如果输入 Hex 格式字符串有效长度为奇数,还可以将 Hex 字符串的第一个 nibble 放置在标记位 byte 里的低 4bit,并增加奇数位标志 0011xxxx。

节点放入数据库时候的key用到的就是Compact编码,可以节约磁盘空间。

trie/encoding.go源码

encoding.go主要处理trie树中的三种编码格式的相互转换的工作

// Compact编码首先将Hex尾部标记byte去掉,然后将原本每2 nibble的数据合并到1byte;
// 增添1byte在输出数据头部以放置Compact格式标记位;
// 如果输入Hex格式字符串有效长度为奇数,还可以将Hex字符串的第一个nibble放置在标记位byte里的低4bit。

func hexToCompact(hex []byte) []byte {
    // 如果最后一位是16,terminator为1,否则为0
    terminator := byte(0)
    //包含terminator这个的肯定是叶子节点
    if hasTerm(hex) {
        terminator = 1
        //去除Hex标志位
        hex = hex[:len(hex)-1]
    }
    //定义Compact字节数组
    buf := make([]byte, len(hex)/2+1)
    // Compact格式标记位,如果最后一位是16,才会有Compact格式标记位
    // 因为要恢复nibble时,有Compact标志的,要在最后添加16
    buf[0] = terminator << 5 // the flag byte 00000000或者00100000
    //如果为奇数,添加奇数位标志,并把第一个nibble字节放入buf[0]的低四位
    if len(hex)&1 == 1 {
        buf[0] |= 1 << 4 // odd flag 奇数标志 00110000
        buf[0] |= hex[0] // first nibble is contained in the first byte 0011xxxx
        hex = hex[1:]
    }
    //将两个nibble字节合并成一个字节
    decodeNibbles(hex, buf[1:])
    return buf
}

/**
将compact编码转化为Hex编码
 */
func compactToHex(compact []byte) []byte {
    base := keybytesToHex(compact)
    base = base[:len(base)-1]
    // apply terminator flag
    // base[0]包括四种情况
    // 00000000 扩展节点偶数位
    // 00000001 扩展节点奇数位
    // 00000010 叶子节点偶数位
    // 00000011 叶子节点奇数位
    
    if base[0] >= 2 {
        //如果是叶子节点,末尾添加Hex标志位16
        base = append(base, 16)
    }
    // apply odd flag
    //如果是偶数位,chop等于2,否则等于1
    chop := 2 - base[0]&1
    //去除compact标志位。偶数位去除2个字节,奇数位去除1个字节(因为奇数位的低四位放的是nibble数据)
    base = base[chop:]
    return base
}

func keybytesToHex(str []byte) []byte {
    l := len(str)*2 + 1
    //将一个keybyte转化成两个字节
    var nibbles = make([]byte, l)
    for i, b := range str {
        nibbles[i*2] = b / 16
        nibbles[i*2+1] = b % 16
    }
    //末尾加入Hex标志位16
    nibbles[l-1] = 16
    return nibbles
}

// hexToKeybytes turns hex nibbles into key bytes.
// This can only be used for keys of even length.
func hexToKeybytes(hex []byte) []byte {
    if hasTerm(hex) {
        hex = hex[:len(hex)-1]
    }
    if len(hex)&1 != 0 {
        panic("can't convert hex key of odd length")
    }
    key := make([]byte, (len(hex)+1)/2)
    decodeNibbles(hex, key)
    return key
}

func decodeNibbles(nibbles []byte, bytes []byte) {
    for bi, ni := 0, 0; ni < len(nibbles); bi, ni = bi+1, ni+2 {
        bytes[bi] = nibbles[ni]<<4 | nibbles[ni+1]
    }
}

// prefixLen returns the length of the common prefix of a and b.
// prefixLen返回a和b的公共前缀的长度。
func prefixLen(a, b []byte) int {
    var i, length = 0, len(a)
    if len(b) < length {
        length = len(b)
    }
    for ; i < length; i++ {
        if a[i] != b[i] {
            break
        }
    }
    return i
}

// hasTerm returns whether a hex key has the terminator flag.
func hasTerm(s []byte) bool {
    return len(s) > 0 && s[len(s)-1] == 16
}

MPT源码解读

Trie树的插入,查找

Trie树的初始化调用New函数,函数接受一个hash值和一个Database参数,如果hash值不是空值的化,就说明是从数据库加载一个已经存在的Trie树, 就调用trie.resolveHash方法来加载整颗Trie树,这个方法后续会介绍。 如果root是空,那么就新建一颗Trie树返回。

func New(root common.Hash, db *Database) (*Trie, error) {
    if db == nil {
        panic("trie.New called without a database")
    }
    trie := &Trie{
        db:           db,
        originalRoot: root,
    }
    if (root != common.Hash{}) && root != emptyRoot {
        rootnode, err := trie.resolveHash(root[:], nil)
        if err != nil {
            return nil, err
        }
        trie.root = rootnode
    }
    return trie, nil
}

Trie树的插入,这是一个递归调用的方法,从根节点开始,一直往下找,直到找到可以插入的点,进行插入操作。

参数node是当前插入的节点, prefix是当前已经处理完的key(节点共有的前缀), key是还没有处理玩的部分key, 完整的key = prefix + key。 value是需要插入的值。

返回值bool是操作是否改变了Trie树(dirty),node是插入完成后的子树的根节点

  • 如果节点类型是nil(一颗全新的Trie树的节点就是nil的),这个时候整颗树是空的,直接返回shortNode{key, value, t.newFlag()}, 这个时候整颗树的根就含有了一个shortNode节点。
  • 如果当前的根节点类型是shortNode(也就是叶子节点),首先计算公共前缀,如果公共前缀就等于key,那么说明这两个key是一样的,如果value也一样的(dirty == false),那么返回错误。 如果没有错误就更新shortNode的值然后返回。如果公共前缀不完全匹配,那么就需要把公共前缀提取出来形成一个独立的节点(扩展节点),扩展节点后面连接一个branch节点,branch节点后面看情况连接两个short节点。首先构建一个branch节点(branch := &fullNode{flags: t.newFlag()}),然后再branch节点的Children位置调用t.insert插入剩下的两个short节点
  • 如果当前的节点是fullNode(也就是branch节点),那么直接往对应的孩子节点调用insert方法,然后把对应的孩子节点指向新生成的节点。
  • 如果当前节点是hashNode, hashNode的意思是当前节点还没有加载到内存里面来,还是存放在数据库里面,那么首先调用 t.resolveHash(n, prefix)来加载到内存,然后对加载出来的节点调用insert方法来进行插入。

插入代码

func (t *Trie) insert(n node, prefix, key []byte, value node) (bool, node, error) {
    if len(key) == 0 {
        if v, ok := n.(valueNode); ok {
            return !bytes.Equal(v, value.(valueNode)), value, nil
        }
        return true, value, nil
    }
    switch n := n.(type) {
    case *shortNode:
        matchlen := prefixLen(key, n.Key)
        // If the whole key matches, keep this short node as is
        // and only update the value.
        // 如果整个key匹配,请保持此short node不变并仅更新value。
        if matchlen == len(n.Key) {
            dirty, nn, err := t.insert(n.Val, append(prefix, key[:matchlen]...), key[matchlen:], value)
            if !dirty || err != nil {
                return false, n, err
            }
            return true, &shortNode{n.Key, nn, t.newFlag()}, nil
        }
        // Otherwise branch out at the index where they differ.
        branch := &fullNode{flags: t.newFlag()}
        var err error
        _, branch.Children[n.Key[matchlen]], err = t.insert(nil, append(prefix, n.Key[:matchlen+1]...), n.Key[matchlen+1:], n.Val)
        if err != nil {
            return false, nil, err
        }
        _, branch.Children[key[matchlen]], err = t.insert(nil, append(prefix, key[:matchlen+1]...), key[matchlen+1:], value)
        if err != nil {
            return false, nil, err
        }
        // Replace this shortNode with the branch if it occurs at index 0.
        if matchlen == 0 {
            return true, branch, nil
        }
        // Otherwise, replace it with a short node leading up to the branch.
        return true, &shortNode{key[:matchlen], branch, t.newFlag()}, nil

    case *fullNode:
        dirty, nn, err := t.insert(n.Children[key[0]], append(prefix, key[0]), key[1:], value)
        if !dirty || err != nil {
            return false, n, err
        }
        n = n.copy()
        n.flags = t.newFlag()
        n.Children[key[0]] = nn
        return true, n, nil

    case nil:
        return true, &shortNode{key, value, t.newFlag()}, nil

    case hashNode:
        // We've hit a part of the trie that isn't loaded yet. Load
        // the node and insert into it. This leaves all child nodes on
        // the path to the value in the trie.
        rn, err := t.resolveHash(n, prefix)
        if err != nil {
            return false, nil, err
        }
        dirty, nn, err := t.insert(rn, prefix, key, value)
        if !dirty || err != nil {
            return false, rn, err
        }
        return true, nn, nil

    default:
        panic(fmt.Sprintf("%T: invalid node: %v", n, n))
    }
}

Trie树的Get方法,基本上就是遍历Trie树,来获取Key对应的value信息。

func (t *Trie) tryGet(origNode node, key []byte, pos int) (value []byte, newnode node, didResolve bool, err error) {
    switch n := (origNode).(type) {
    case nil:
        return nil, nil, false, nil
    case valueNode:
        return n, n, false, nil
    case *shortNode:
        if len(key)-pos < len(n.Key) || !bytes.Equal(n.Key, key[pos:pos+len(n.Key)]) {
            // key not found in trie
            return nil, n, false, nil
        }
        value, newnode, didResolve, err = t.tryGet(n.Val, key, pos+len(n.Key))
        if err == nil && didResolve {
            n = n.copy()
            n.Val = newnode
            n.flags.gen = t.cachegen
        }
        return value, n, didResolve, err
    case *fullNode:
        value, newnode, didResolve, err = t.tryGet(n.Children[key[pos]], key, pos+1)
        if err == nil && didResolve {
            n = n.copy()
            n.flags.gen = t.cachegen
            n.Children[key[pos]] = newnode
        }
        return value, n, didResolve, err
    case hashNode:
        child, err := t.resolveHash(n, key[:pos])
        if err != nil {
            return nil, n, true, err
        }
        value, newnode, _, err := t.tryGet(child, key, pos)
        return value, newnode, true, err
    default:
        panic(fmt.Sprintf("%T: invalid node: %v", origNode, origNode))
    }
}

Trie树的序列化和反序列化

序列化主要是指把内存表示的数据存放到数据库里面, 反序列化是指把数据库里面的Trie数据加载成内存表示的数据。
序列化的目的主要是方便存储,减少存储大小等。 反序列化的目的是把存储的数据加载到内存,方便Trie树的插入,查询,修改等需求。

Trie的序列化和反序列化主要应用了前面介绍的Compat 编码和 RLP编码格式。

Trie树的使用方法在trie_test.go里面有比较详细的参考。 这里我列出一个简单的使用流程。首先创建一个空的Trie树,然后插入一些数据,最后调用trie.Commit()方法进行序列化并得到一个hash值(root)

func TestInsert(t *testing.T) {
    trie := newEmpty()
    updateString(trie, "doe", "reindeer")
    updateString(trie, "dog", "puppy")
    updateString(trie, "do", "cat")
    root, err := trie.Commit()
}

下面我们来分析下Commit()的主要流程。 经过一系列的调用,最终调用了hasher.go的hash方法。

// Commit writes all nodes to the trie's memory database, tracking the internal
// and external (for account tries) references.
func (t *Trie) Commit(onleaf LeafCallback) (root common.Hash, err error) {
    if t.db == nil {
        panic("commit called on trie with nil database")
    }
    hash, cached, err := t.hashRoot(t.db, onleaf)
    if err != nil {
        return common.Hash{}, err
    }
    t.root = cached
    t.cachegen++
    return common.BytesToHash(hash.(hashNode)), nil
}

// 折叠node的入口是hasher.hash(),在执行中,hash()和hashChildren()相互调用以遍历整个MPT结构,
// store()对节点作RLP哈希计算。折叠node的基本逻辑是:
// 如果node没有子节点,那么直接返回;
// 如果这个node带有子节点,那么首先将子节点折叠成hashNode。当这个node的子节点全都变成哈希值hashNode之后,
// 再对这个node作RLP+哈希计算,得到它的哈希值,亦即hashNode。
// 注意到hash()和hashChildren()返回两个node类型对象,第一个@hash是入参n经过折叠的hashNode哈希值,
// 第二个@cached是没有经过折叠的n,并且n的hashNode还被赋值了。
func (t *Trie) hashRoot(db *Database, onleaf LeafCallback) (node, node, error) {
    if t.root == nil {
        return hashNode(emptyRoot.Bytes()), nil, nil
    }
    h := newHasher(t.cachegen, t.cachelimit, onleaf)
    defer returnHasherToPool(h)
    return h.hash(t.root, db, true)
}

下面我们简单介绍下hash方法,hash方法主要做了两个操作。 一个是保留原有的树形结构,并用cache变量中, 另一个是计算原有树形结构的hash并把hash值存放到cache变量中保存下来。

计算原有hash值的主要流程是首先调用h.hashChildren(n,db)把所有的子节点的hash值求出来,把原有的子节点替换成子节点的hash值。 这是一个递归调用的过程,会从树叶依次往上计算直到树根。然后调用store方法计算当前节点的hash值,并把当前节点的hash值放入cache节点,设置dirty参数为false(新创建的节点的dirty值是为true的),然后返回。

返回值说明, cache变量包含了原有的node节点,并且包含了node节点的hash值。 hash变量返回了当前节点的hash值(这个值其实是根据node和node的所有子节点计算出来的)。

有一个小细节: 根节点调用hash函数的时候, force参数是为true的,其他的子节点调用的时候force参数是为false的。 force参数的用途是当节点的RLP字节长度小于32也对节点的RLP进行hash计算,这样保证无论如何也会对根节点进行Hash计算。

// hash collapses a node down into a hash node, also returning a copy of the
// original node initialized with the computed hash to replace the original one.
// hash将节点向下折叠为hash node,同时返回用计算出的散列初始化的原始节点的副本以替换原始节点。

// 如果 trie.cachegen - node.cachegen > cachelimit,就可以把节点从内存里面卸载掉。
// 也就是说节点经过几次Commit,都没有修改,那么就把节点从内存里面卸载,以便节约内存给其他节点使用。

// 卸载过程在我们的 hasher.hash方法中
// 这个方法是在commit的时候调用。如果方法的canUnload方法调用返回真,那么就卸载节点
// 观察他的返回值,只返回了hash节点,而没有返回node节点,这样节点就没有引用,不久就会被gc清除掉。
// 节点被卸载掉之后,会用一个hashNode节点来表示这个节点以及其子节点。
// 如果后续需要使用,可以通过方法把这个节点加载到内存里面来。
func (h *hasher) hash(n node, db *Database, force bool) (node, node, error) {
    // If we're not storing the node, just hashing, use available cached data
    if hash, dirty := n.cache(); hash != nil {
        if db == nil {
            return hash, n, nil
        }
        if n.canUnload(h.cachegen, h.cachelimit) {
            // Unload the node from cache. All of its subnodes will have a lower or equal
            // cache generation number.
            // 从缓存中卸载节点。它的所有子节点将具有较低或相等的缓存世代号码。
            cacheUnloadCounter.Inc(1)
            return hash, hash, nil
        }
        if !dirty {
            return hash, n, nil
        }
    }
    // Trie not processed yet or needs storage, walk the children
    collapsed, cached, err := h.hashChildren(n, db)
    if err != nil {
        return hashNode{}, n, err
    }
    hashed, err := h.store(collapsed, db, force)
    if err != nil {
        return hashNode{}, n, err
    }
    // Cache the hash of the node for later reuse and remove
    // the dirty flag in commit mode. It's fine to assign these values directly
    // without copying the node first because hashChildren copies it.
    cachedHash, _ := hashed.(hashNode)
    switch cn := cached.(type) {
    case *shortNode:
        cn.flags.hash = cachedHash
        if db != nil {
            cn.flags.dirty = false
        }
    case *fullNode:
        cn.flags.hash = cachedHash
        if db != nil {
            cn.flags.dirty = false
        }
    }
    return hashed, cached, nil
}

hashChildren方法,这个方法把所有的子节点替换成他们的hash,可以看到cache变量接管了原来的Trie树的完整结构,collapsed变量把子节点替换成子节点的hash值。

  • 如果当前节点是shortNode, 首先把collapsed.Key从Hex Encoding 替换成 Compact Encoding, 然后递归调用hash方法计算子节点的hash和cache,这样就把子节点替换成了子节点的hash值,
  • 如果当前节点是fullNode, 那么遍历每个子节点,把子节点替换成子节点的Hash值,
  • 否则的化这个节点没有children。直接返回。

代码

// hashChildren replaces the children of a node with their hashes if the encoded
// size of the child is larger than a hash, returning the collapsed node as well
// as a replacement for the original node with the child hashes cached in.
func (h *hasher) hashChildren(original node, db *Database) (node, node, error) {
    var err error

    switch n := original.(type) {
    case *shortNode:
        // Hash the short node's child, caching the newly hashed subtree
        collapsed, cached := n.copy(), n.copy()
        collapsed.Key = hexToCompact(n.Key)
        cached.Key = common.CopyBytes(n.Key)

        if _, ok := n.Val.(valueNode); !ok {
            collapsed.Val, cached.Val, err = h.hash(n.Val, db, false)
            if err != nil {
                return original, original, err
            }
        }
        if collapsed.Val == nil {
            collapsed.Val = valueNode(nil) // Ensure that nil children are encoded as empty strings.
        }
        return collapsed, cached, nil

    case *fullNode:
        // Hash the full node's children, caching the newly hashed subtrees
        collapsed, cached := n.copy(), n.copy()

        for i := 0; i < 16; i++ {
            if n.Children[i] != nil {
                collapsed.Children[i], cached.Children[i], err = h.hash(n.Children[i], db, false)
                if err != nil {
                    return original, original, err
                }
            } else {
                collapsed.Children[i] = valueNode(nil) // Ensure that nil children are encoded as empty strings.
            }
        }
        cached.Children[16] = n.Children[16]
        if collapsed.Children[16] == nil {
            collapsed.Children[16] = valueNode(nil)
        }
        return collapsed, cached, nil

    default:
        // Value and hash nodes don't have children so they're left as were
        return n, original, nil
    }
}

store方法,如果一个node的所有子节点都替换成了子节点的hash值,那么直接调用rlp.Encode方法对这个节点进行编码,如果编码后的值小于32,并且这个节点不是根节点,那么就把他们直接存储在他们的父节点里面,否者调用h.sha.Write方法进行hash计算, 然后把hash值和编码后的数据存储到数据库里面,然后返回hash值。

可以看到每个值大于32的节点的值和hash都存储到了数据库里面,

// store方法,如果一个node的所有子节点都替换成了子节点的hash值,
// 那么直接调用rlp.Encode方法对这个节点进行编码
// 如果编码后的值小于32,并且这个节点不是根节点,那么就把他们直接存储在他们的父节点里面
// 否者调用h.sha.Write方法进行hash计算,
// 然后把hash值和编码后的数据存储到数据库里面,然后返回hash值。
// 可以看到每个值大于32的节点的值和hash都存储到了数据库里面,
func (h *hasher) store(n node, db *Database, force bool) (node, error) {
    // Don't store hashes or empty nodes.
    if _, isHash := n.(hashNode); n == nil || isHash {
        return n, nil
    }
    // Generate the RLP encoding of the node
    h.tmp.Reset()
    if err := rlp.Encode(h.tmp, n); err != nil {
        panic("encode error: " + err.Error())
    }
    if h.tmp.Len() < 32 && !force {
        // 小于32字节的节点存储在其父节点内
        return n, nil // Nodes smaller than 32 bytes are stored inside their parent
    }
    // Larger nodes are replaced by their hash and stored in the database.
    // RLP做sha hash
    hash, _ := n.cache()
    if hash == nil {
        h.sha.Reset()
        h.sha.Write(h.tmp.Bytes())
        hash = hashNode(h.sha.Sum(nil))
    }
    if db != nil {
        // We are pooling the trie nodes into an intermediate memory cache
        db.lock.Lock()

        //这里来了一次字节数组与hash的转换
        hash := common.BytesToHash(hash)
        // 果然数据库里面插入的key是node的RLP之后的hash值(其实就是hashNode),
        // value为node的RLP值的字节数组
        db.insert(hash, h.tmp.Bytes())

        // Track all direct parent->child node references
        switch n := n.(type) {
        case *shortNode:
            if child, ok := n.Val.(hashNode); ok {
                db.reference(common.BytesToHash(child), hash)
            }
        case *fullNode:
            for i := 0; i < 16; i++ {
                if child, ok := n.Children[i].(hashNode); ok {
                    db.reference(common.BytesToHash(child), hash)
                }
            }
        }
        db.lock.Unlock()

        // Track external references from account->storage trie
        if h.onleaf != nil {
            switch n := n.(type) {
            case *shortNode:
                if child, ok := n.Val.(valueNode); ok {
                    h.onleaf(child, hash)
                }
            case *fullNode:
                for i := 0; i < 16; i++ {
                    if child, ok := n.Children[i].(valueNode); ok {
                        h.onleaf(child, hash)
                    }
                }
            }
        }
    }
    return hash, nil
}

Trie的反序列化过程。还记得之前创建Trie树的流程么。 如果参数root的hash值不为空,那么就会调用rootnode, err := trie.resolveHash(root[:], nil) 方法来得到rootnode节点。 首先从数据库里面通过hash值获取节点的RLP编码后的内容。 然后调用decodeNode来解析内容。

func (t *Trie) resolveHash(n hashNode, prefix []byte) (node, error) {
    cacheMissCounter.Inc(1)

    hash := common.BytesToHash(n)
    //通过hash解析出node的RLP值
    enc, err := t.db.Node(hash)
    if err != nil || enc == nil {
        return nil, &MissingNodeError{NodeHash: hash, Path: prefix}
    }
    return mustDecodeNode(n, enc, t.cachegen), nil
}

decodeNode方法,这个方法根据rlp的list的长度来判断这个编码到底属于什么节点,如果是2个字段那么就是shortNode节点,如果是17个字段,那么就是fullNode,然后分别调用各自的解析函数。

// decodeNode parses the RLP encoding of a trie node.
func decodeNode(hash, buf []byte, cachegen uint16) (node, error) {
    if len(buf) == 0 {
        return nil, io.ErrUnexpectedEOF
    }
    elems, _, err := rlp.SplitList(buf)
    if err != nil {
        return nil, fmt.Errorf("decode error: %v", err)
    }
    switch c, _ := rlp.CountValues(elems); c {
    case 2:
        n, err := decodeShort(hash, buf, elems, cachegen)
        return n, wrapError(err, "short")
    case 17:
        n, err := decodeFull(hash, buf, elems, cachegen)
        return n, wrapError(err, "full")
    default:
        return nil, fmt.Errorf("invalid number of list elements: %v", c)
    }
}

decodeShort方法,通过key是否有终结符号来判断到底是叶子节点还是扩展节点。如果有终结符那么就是叶子结点,通过SplitString方法解析出来val然后生成一个shortNode。 不过没有终结符,那么说明是扩展节点, 通过decodeRef来解析剩下的节点,然后生成一个shortNode。

func decodeShort(hash, buf, elems []byte, cachegen uint16) (node, error) {
    // kbuf代表compact key
    // rest代表shortnode的value
    kbuf, rest, err := rlp.SplitString(elems)
    if err != nil {
        return nil, err
    }
    flag := nodeFlag{hash: hash, gen: cachegen}
    key := compactToHex(kbuf)
    if hasTerm(key) {
        // value node
        val, _, err := rlp.SplitString(rest)
        if err != nil {
            return nil, fmt.Errorf("invalid value node: %v", err)
        }
        return &shortNode{key, append(valueNode{}, val...), flag}, nil
    }
    r, _, err := decodeRef(rest, cachegen)
    if err != nil {
        return nil, wrapError(err, "val")
    }
    return &shortNode{key, r, flag}, nil
}

decodeRef方法根据数据类型进行解析,如果类型是list,那么有可能是内容<32的值,那么调用decodeNode进行解析。 如果是空节点,那么返回空,如果是hash值,那么构造一个hashNode进行返回,注意的是这里没有继续进行解析,如果需要继续解析这个hashNode,那么需要继续调用resolveHash方法。 到这里decodeShort方法就调用完成了。

func decodeRef(buf []byte, cachegen uint16) (node, []byte, error) {
    kind, val, rest, err := rlp.Split(buf)
    if err != nil {
        return nil, buf, err
    }
    switch {
    case kind == rlp.List:
        // 'embedded' node reference. The encoding must be smaller
        // than a hash in order to be valid.
        // len(buf) - len(rest)得到的结果应该是类型加内容的长度
        if size := len(buf) - len(rest); size > hashLen {
            err := fmt.Errorf("oversized embedded node (size is %d bytes, want size < %d)", size, hashLen)
            return nil, buf, err
        }
        n, err := decodeNode(nil, buf, cachegen)
        return n, rest, err
    case kind == rlp.String && len(val) == 0:
        // empty node
        return nil, rest, nil
    case kind == rlp.String && len(val) == 32:
        return append(hashNode{}, val...), rest, nil
    default:
        return nil, nil, fmt.Errorf("invalid RLP string size %d (want 0 or 32)", len(val))
    }
}

decodeFull方法。根decodeShort方法的流程差不多。

func decodeFull(hash, buf, elems []byte, cachegen uint16) (*fullNode, error) {
    n := &fullNode{flags: nodeFlag{hash: hash, gen: cachegen}}
    for i := 0; i < 16; i++ {
        cld, rest, err := decodeRef(elems, cachegen)
        if err != nil {
            return n, wrapError(err, fmt.Sprintf("[%d]", i))
        }
        n.Children[i], elems = cld, rest
    }
    val, _, err := rlp.SplitString(elems)
    if err != nil {
        return n, err
    }
    if len(val) > 0 {
        n.Children[16] = append(valueNode{}, val...)
    }
    return n, nil
}

Trie树的cache管理

Trie树的cache管理。 还记得Trie树的结构里面有两个参数, 一个是cachegen,一个是cachelimit。这两个参数就是cache控制的参数。 Trie树每一次调用Commit方法,会导致当前的cachegen增加1。

func (t *Trie) Commit(onleaf LeafCallback) (root common.Hash, err error) {
    if t.db == nil {
        panic("commit called on trie with nil database")
    }
    hash, cached, err := t.hashRoot(t.db, onleaf)
    if err != nil {
        return common.Hash{}, err
    }
    t.root = cached
    t.cachegen++
    return common.BytesToHash(hash.(hashNode)), nil
}

然后在Trie树插入的时候,会把当前的cachegen存放到节点中。

func (t *Trie) insert(n node, prefix, key []byte, value node) (bool, node, error) {
            ....
            return true, &shortNode{n.Key, nn, t.newFlag()}, nil
        }

// newFlag returns the cache flag value for a newly created node.
func (t *Trie) newFlag() nodeFlag {
    return nodeFlag{dirty: true, gen: t.cachegen}
}

如果 trie.cachegen - node.cachegen > cachelimit,就可以把节点从内存里面卸载掉。 也就是说节点经过几次Commit,都没有修改,那么就把节点从内存里面卸载,以便节约内存给其他节点使用。

卸载过程在我们的 hasher.hash方法中, 这个方法是在commit的时候调用。如果方法的canUnload方法调用返回真,那么就卸载节点,观察他的返回值,只返回了hash节点,而没有返回node节点,这样节点就没有引用,不久就会被gc清除掉。 节点被卸载掉之后,会用一个hashNode节点来表示这个节点以及其子节点。 如果后续需要使用,可以通过方法把这个节点加载到内存里面来。

func (h *hasher) hash(n node, db *Database, force bool) (node, node, error) {
    // If we're not storing the node, just hashing, use available cached data
    if hash, dirty := n.cache(); hash != nil {
        if db == nil {
            return hash, n, nil
        }
        if n.canUnload(h.cachegen, h.cachelimit) {
            // Unload the node from cache. All of its subnodes will have a lower or equal
            // cache generation number.
            // 从缓存中卸载节点。它的所有子节点将具有较低或相等的缓存世代号码。
            cacheUnloadCounter.Inc(1)
            return hash, hash, nil
        }
        if !dirty {
            return hash, n, nil
        }
    }
    ....
    return hashed, cached, nil
}

canUnload方法是一个接口,不同的node调用不同的方法。

// canUnload tells whether a node can be unloaded.
func (n *nodeFlag) canUnload(cachegen, cachelimit uint16) bool {
    return !n.dirty && cachegen-n.gen >= cachelimit
}

func (n *fullNode) canUnload(gen, limit uint16) bool  { return n.flags.canUnload(gen, limit) }
func (n *shortNode) canUnload(gen, limit uint16) bool { return n.flags.canUnload(gen, limit) }
func (n hashNode) canUnload(uint16, uint16) bool      { return false }
func (n valueNode) canUnload(uint16, uint16) bool     { return false }

func (n *fullNode) cache() (hashNode, bool)  { return n.flags.hash, n.flags.dirty }
func (n *shortNode) cache() (hashNode, bool) { return n.flags.hash, n.flags.dirty }
func (n hashNode) cache() (hashNode, bool)   { return nil, true }
func (n valueNode) cache() (hashNode, bool)  { return nil, true }

security_trie.go 加密的Trie

为了避免刻意使用很长的key导致访问时间的增加, security_trie包装了一下trie树, 所有的key都转换成keccak256算法计算的hash值。同时在数据库里面存储hash值对应的原始的key。

// SecureTrie wraps a trie with key hashing. In a secure trie, all
// access operations hash the key using keccak256. This prevents
// calling code from creating long chains of nodes that
// increase the access time.
//
// Contrary to a regular trie, a SecureTrie can only be created with
// New and must have an attached database. The database also stores
// the preimage of each key.
//
// SecureTrie is not safe for concurrent use.
// 之所以叫SecureTrie,是因为树中所有的key都是经过keccak256加密的
type SecureTrie struct {
    trie             Trie //原始的Trie树
    hashKeyBuf       [common.HashLength]byte //计算hash值的buf
    secKeyCache      map[string][]byte //记录hash值和对应的key的映射
    secKeyCacheOwner *SecureTrie // Pointer to self, replace the key cache on mismatch
}

// NewSecure creates a trie with an existing root node from a backing database
// and optional intermediate in-memory node pool.
//
// If root is the zero hash or the sha3 hash of an empty string, the
// trie is initially empty. Otherwise, New will panic if db is nil
// and returns MissingNodeError if the root node cannot be found.
//
// Accessing the trie loads nodes from the database or node pool on demand.
// Loaded nodes are kept around until their 'cache generation' expires.
// A new cache generation is created by each call to Commit.
// cachelimit sets the number of past cache generations to keep.
func NewSecure(root common.Hash, db *Database, cachelimit uint16) (*SecureTrie, error) {
    if db == nil {
        panic("trie.NewSecure called without a database")
    }
    trie, err := New(root, db)
    if err != nil {
        return nil, err
    }
    trie.SetCacheLimit(cachelimit)
    return &SecureTrie{trie: *trie}, nil
}

// Get returns the value for key stored in the trie.
// The value bytes must not be modified by the caller.
func (t *SecureTrie) Get(key []byte) []byte {
    res, err := t.TryGet(key)
    if err != nil {
        log.Error(fmt.Sprintf("Unhandled trie error: %v", err))
    }
    return res
}

// TryGet returns the value for key stored in the trie.
// The value bytes must not be modified by the caller.
// If a node was not found in the database, a MissingNodeError is returned.
func (t *SecureTrie) TryGet(key []byte) ([]byte, error) {
    return t.trie.TryGet(t.hashKey(key))
}

// Update associates key with value in the trie. Subsequent calls to
// Get will return value. If value has length zero, any existing value
// is deleted from the trie and calls to Get will return nil.
//
// The value bytes must not be modified by the caller while they are
// stored in the trie.
func (t *SecureTrie) Update(key, value []byte) {
    if err := t.TryUpdate(key, value); err != nil {
        log.Error(fmt.Sprintf("Unhandled trie error: %v", err))
    }
}

// TryUpdate associates key with value in the trie. Subsequent calls to
// Get will return value. If value has length zero, any existing value
// is deleted from the trie and calls to Get will return nil.
//
// The value bytes must not be modified by the caller while they are
// stored in the trie.
//
// If a node was not found in the database, a MissingNodeError is returned.
func (t *SecureTrie) TryUpdate(key, value []byte) error {
    // hk代表HashKey
    hk := t.hashKey(key)
    err := t.trie.TryUpdate(hk, value)
    if err != nil {
        return err
    }
    t.getSecKeyCache()[string(hk)] = common.CopyBytes(key)
    return nil
}

// Delete removes any existing value for key from the trie.
func (t *SecureTrie) Delete(key []byte) {
    if err := t.TryDelete(key); err != nil {
        log.Error(fmt.Sprintf("Unhandled trie error: %v", err))
    }
}

// TryDelete removes any existing value for key from the trie.
// If a node was not found in the database, a MissingNodeError is returned.
func (t *SecureTrie) TryDelete(key []byte) error {
    hk := t.hashKey(key)
    delete(t.getSecKeyCache(), string(hk))
    return t.trie.TryDelete(hk)
}

// GetKey returns the sha3 preimage of a hashed key that was
// previously used to store a value.
func (t *SecureTrie) GetKey(shaKey []byte) []byte {
    if key, ok := t.getSecKeyCache()[string(shaKey)]; ok {
        return key
    }
    key, _ := t.trie.db.preimage(common.BytesToHash(shaKey))
    return key
}

// Commit writes all nodes and the secure hash pre-images to the trie's database.
// Nodes are stored with their sha3 hash as the key.
//
// Committing flushes nodes from memory. Subsequent Get calls will load nodes
// from the database.
func (t *SecureTrie) Commit(onleaf LeafCallback) (root common.Hash, err error) {
    // Write all the pre-images to the actual disk database
    if len(t.getSecKeyCache()) > 0 {
        t.trie.db.lock.Lock()
        for hk, key := range t.secKeyCache {
            t.trie.db.insertPreimage(common.BytesToHash([]byte(hk)), key)
        }
        t.trie.db.lock.Unlock()

        t.secKeyCache = make(map[string][]byte)
    }
    // Commit the trie to its intermediate node database
    return t.trie.Commit(onleaf)
}

func (t *SecureTrie) Hash() common.Hash {
    return t.trie.Hash()
}

func (t *SecureTrie) Root() []byte {
    return t.trie.Root()
}

func (t *SecureTrie) Copy() *SecureTrie {
    cpy := *t
    return &cpy
}

// NodeIterator returns an iterator that returns nodes of the underlying trie. Iteration
// starts at the key after the given start key.
func (t *SecureTrie) NodeIterator(start []byte) NodeIterator {
    return t.trie.NodeIterator(start)
}

// hashKey returns the hash of key as an ephemeral buffer.
// The caller must not hold onto the return value because it will become
// invalid on the next call to hashKey or secKey.
func (t *SecureTrie) hashKey(key []byte) []byte {
    h := newHasher(0, 0, nil)
    h.sha.Reset()
    h.sha.Write(key)
    buf := h.sha.Sum(t.hashKeyBuf[:0])
    returnHasherToPool(h)
    return buf
}

// getSecKeyCache returns the current secure key cache, creating a new one if
// ownership changed (i.e. the current secure trie is a copy of another owning
// the actual cache).
func (t *SecureTrie) getSecKeyCache() map[string][]byte {
    if t != t.secKeyCacheOwner {
        t.secKeyCacheOwner = t
        t.secKeyCache = make(map[string][]byte)
    }
    return t.secKeyCache
}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,547评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,399评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,428评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,599评论 1 274
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,612评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,577评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,941评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,603评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,852评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,605评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,693评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,375评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,955评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,936评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,172评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,970评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,414评论 2 342

推荐阅读更多精彩内容