以太坊源码分析(3)txpool交易池源码分析

交易池:txpool源码分析

交易池的源码位于:core/tx_pool.go文件

txpool交易池由两部分构成分别是pending和queued组成。主要适用于存放当前提交等待被区块确认提交的交易,本地交易和网络远程交易都有

1、pending:等待执行的交易会被放在pending队列中

2、queued:提交但是不能够执行的交易,放在queue中等待执行

通过阅读tx_pool_test.go这个txpool的测试文件源码可以发现txpool主要功能如下:

1、检查交易的信息数据是否合法,包括Gas,余额是否不足,Nonce大小等等

2、检查时间状态,将nonce过高的交易放在queue队列,将可以执行的交易放在 pending队列

3、在资源有限的情况下(例如当前池满了,或者网络拥堵),会优先执行GasPrice高的交易

4、如果当前交易的额度大于当前账户的额度,交易会被删除

5、对于相同的account对应的相同nonce的交易只会保存GasPrice高的哪个交易

6、本地交易会使用journal的功能将信息存在本地磁盘

7、如果account没有余额了,那么对应queue队列和pending队列中的交易会被删除

txpool的数据结构

type TxPool struct {
    //配置信息
   config       TxPoolConfig
    //链配置
   chainconfig  *params.ChainConfig
    //当前的链
   chain        blockChain
    //最低的gas价格
   gasPrice     *big.Int
    //通过txFedd订阅TxPool的消息
   txFeed       event.Feed
    //提供了同时取消多个订阅的功能
   scope        event.SubscriptionScope
    //当有了新的区块的产生会收到消息,订阅区块头消息
   chainHeadCh  chan ChainHeadEvent
    //区块头消息订阅器
   chainHeadSub event.Subscription
    //对事物进行签名处理
   signer       types.Signer
    //读写互斥锁
   mu           sync.RWMutex
    //当前区块链头部的状态
   currentState  *state.StateDB
    //挂起状态跟踪虚拟nonces
   pendingState  *state.ManagedState 
    // 目前交易的费用上限
   currentMaxGas uint64              
    //一套豁免驱逐规则的本地交易
   locals  *accountSet 
    //本地事务日志备份到磁盘
   journal *txJournal 
    //等待队列
   pending map[common.Address]*txList  
    //排队但不可处理的事务
   queue   map[common.Address]*txList   
    //每个已知帐户的最后一次心跳
   beats   map[common.Address]time.Time 
    //所有允许查询的事务
   all     *txLookup   
    //所有按价格排序的交易
   priced  *txPricedList                
    //关闭同步
   wg sync.WaitGroup 
    //家园版本??
   homestead bool
}

TxPool的配置信息包括一下

type TxPoolConfig struct {
    // 默认情况下应视为本地的地址
   Locals    []common.Address 
    // 是否应该禁用本地事务处理
   NoLocals  bool  
    // 本地事务日志,以便在节点重新启动时存活
   Journal   string  
    // 重新生成本地事务日志的时间间隔
   Rejournal time.Duration 
    // 最低gas价格,以接受入池
   PriceLimit uint64
    // 更换//现有交易的最低价格增幅(一次)  
   PriceBump  uint64 
    //每个帐户保证的可执行事务槽数
   AccountSlots uint64 
    //所有帐户的可执行事务槽的最大数量
   GlobalSlots  uint64 
    //每个帐户允许的非可执行事务槽的最大数量
   AccountQueue uint64 
    //所有帐户的非可执行事务槽的最大数量
   GlobalQueue  uint64
    //非可执行事务的最大排队时间
   Lifetime time.Duration 
}

NewTxPool() 构建

func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain) *TxPool {
   //对输入进行消毒,确保不会设定易受影响的天然气价格
   // sanitize检查所提供的用户配置,并更改任何不合理或不可行的配置
    config = (&config).sanitize()
    //创建带有初始设置的交易池
   // Create the transaction pool with its initial settings
   pool := &TxPool{
      config:      config,
      chainconfig: chainconfig,
      chain:       chain,
      signer:      types.NewEIP155Signer(chainconfig.ChainID),
      pending:     make(map[common.Address]*txList),
      queue:       make(map[common.Address]*txList),
      beats:       make(map[common.Address]time.Time),
      all:         newTxLookup(),
      chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize),
      gasPrice:    new(big.Int).SetUint64(config.PriceLimit),
   }
   pool.locals = newAccountSet(pool.signer)
   for _, addr := range config.Locals {
      log.Info("Setting new local account", "address", addr)
      pool.locals.add(addr)
   }
   pool.priced = newTxPricedList(pool.all)
   pool.reset(nil, chain.CurrentBlock().Header())
    ////如果启用了本地事务和日志记录,则从磁盘加载
   // If local transactions and journaling is enabled, load from disk
   if !config.NoLocals && config.Journal != "" {
      pool.journal = newTxJournal(config.Journal)

      if err := pool.journal.load(pool.AddLocals); err != nil {
         log.Warn("Failed to load transaction journal", "err", err)
      }
      if err := pool.journal.rotate(pool.local()); err != nil {
         log.Warn("Failed to rotate transaction journal", "err", err)
      }
   }
   // 从区块链订阅事件
   pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh)

   // 启动事件循环并返回
   pool.wg.Add(1)
   go pool.loop()

   return pool
}

add() 方法

验证交易并将其插入到future queue. 如果这个交易是替换了当前存在的某个交易,那么会返回之前的那个交易,这样外部就不用调用promote方法. 如果某个新增加的交易被标记为local, 那么它的发送账户会进入白名单,这个账户的关联的交易将不会因为价格的限制或者其他的一些限制被删除

func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
    //如果这个交易已经知道,就丢弃他
   hash := tx.Hash()
   if pool.all.Get(hash) != nil {
      log.Trace("Discarding already known transaction", "hash", hash)
      return false, fmt.Errorf("known transaction: %x", hash)
   }
   // 如果交易不能通过基本数据验证,就丢弃它
   if err := pool.validateTx(tx, local); err != nil {
      log.Trace("Discarding invalid transaction", "hash", hash, "err", err)
      invalidTxCounter.Inc(1)
      return false, err
   }
   // 如果交易池已经满了,就丢弃交易费用低的交易
   if uint64(pool.all.Count()) >= pool.config.GlobalSlots+pool.config.GlobalQueue {
      // 如果新的交易费用交易过低就不接受
      if !local && pool.priced.Underpriced(tx, pool.locals) {
         log.Trace("Discarding underpriced transaction", "hash", hash, "price", tx.GasPrice())
         underpricedTxCounter.Inc(1)
         return false, ErrUnderpriced
      }
      // 如果新的交易比旧的交易号好,就添加新的交易
      drop := pool.priced.Discard(pool.all.Count()-int(pool.config.GlobalSlots+pool.config.GlobalQueue-1), pool.locals)
      for _, tx := range drop {
         log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "price", tx.GasPrice())
         underpricedTxCounter.Inc(1)
         pool.removeTx(tx.Hash(), false)
      }
   }
   // 如果交易正在替换已经挂起的交易,请直接执行
   from, _ := types.Sender(pool.signer, tx) // already validated
   if list := pool.pending[from]; list != nil && list.Overlaps(tx) {
      // 一旦已经挂起,检查是否满足要求的价格上涨
      inserted, old := list.Add(tx, pool.config.PriceBump)
      if !inserted {
         pendingDiscardCounter.Inc(1)
         return false, ErrReplaceUnderpriced
      }
      // 新交易更好,替换掉旧交易
      if old != nil {
         pool.all.Remove(old.Hash())
         pool.priced.Removed()
         pendingReplaceCounter.Inc(1)
      }
      pool.all.Add(tx)
      pool.priced.Put(tx)
      pool.journalTx(from, tx)

      log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To())

      // 我们直接注入了一个新的交易,通知子系统
      go pool.txFeed.Send(NewTxsEvent{types.Transactions{tx}})

      return old != nil, nil
   }
   // New transaction isn't replacing a pending one, push into queue
   replace, err := pool.enqueueTx(hash, tx)
   if err != nil {
      return false, err
   }
   // 标记本地地址并记录本地交易
   if local {
      if !pool.locals.contains(from) {
         log.Info("Setting new local account", "address", from)
         pool.locals.add(from)
      }
   }
   pool.journalTx(from, tx)

   log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To())
   return replace, nil
}

validateTx() 方法

使用一致性规则来检查一个交易是否有效,并采用本地节点的一些启发式的限制

func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
   // 拒绝超过32KB的事务,以防止DOSS攻击
   if tx.Size() > 32*1024 {
      return ErrOversizedData
   }
   // 交易不能是负的。这可能永远不会发生使用RLP解码
   // 但如果使用RPC创建交易,则可能发生交易。
   if tx.Value().Sign() < 0 {
      return ErrNegativeValue
   }
   // 确保交易使用的Gas不超过当前块限制的GasLimit
   if pool.currentMaxGas < tx.Gas() {
      return ErrGasLimit
   }
   // 确保交易签名正确
   from, err := types.Sender(pool.signer, tx)
   if err != nil {
      return ErrInvalidSender
   }
   // 在我们自己的最低接受Gas价格下放弃非本地交易
   local = local || pool.locals.contains(from) 
    //即使交易从网络到达,帐户也可能是本地的
   if !local && pool.gasPrice.Cmp(tx.GasPrice()) > 0 {
      return ErrUnderpriced
   }
   // 确保交易符合即时的nonce 也就是当前的交易的nonce必须要等与当前账户的   //nonce
   if pool.currentState.GetNonce(from) > tx.Nonce() {
      return ErrNonceTooLow
   }
   // 发起交易的一方应该有足够的资金来支付费用,判断余额是否足够
   // cost == V + GP * GL
   if pool.currentState.GetBalance(from).Cmp(tx.Cost()) < 0 {
      return ErrInsufficientFunds
   }
   intrGas, err := IntrinsicGas(tx.Data(), tx.To() == nil, pool.homestead)
   if err != nil {
      return err
   }
   if tx.Gas() < intrGas {
      return ErrIntrinsicGas
   }
   return nil
}

loop() 方法

是txPool的一个goroutine.也是主要的事件循环.等待和响应外部区块链事件以及各种报告和交易驱逐事件

func (pool *TxPool) loop() {
    //等待组计数器减一
   defer pool.wg.Done()

   // 启动统计报表和交易退出提示符
   var prevPending, prevQueued, prevStales int
    
   report := time.NewTicker(statsReportInterval)
   defer report.Stop()

   evict := time.NewTicker(evictionInterval)
   defer evict.Stop()

   journal := time.NewTicker(pool.config.Rejournal)
   defer journal.Stop()

   // 跟踪交易重组的前一个头标头
   head := pool.chain.CurrentBlock()

   // 不断等待和应对各种事件
   for {
      select {
      // 处理链头事件
      case ev := <-pool.chainHeadCh:
         if ev.Block != nil {
            pool.mu.Lock()
            if pool.chainconfig.IsHomestead(ev.Block.Number()) {
               pool.homestead = true
            }
            pool.reset(head.Header(), ev.Block.Header())
            head = ev.Block

            pool.mu.Unlock()
         }
      //由于系统停止而取消订阅
      case <-pool.chainHeadSub.Err():
         return

      // 处理统计报表刻度
      case <-report.C:
         pool.mu.RLock()
         pending, queued := pool.stats()
         stales := pool.priced.stales
         pool.mu.RUnlock()

         if pending != prevPending || queued != prevQueued || stales != prevStales {
            log.Debug("Transaction pool status report", "executable", pending, "queued", queued, "stales", stales)
            prevPending, prevQueued, prevStales = pending, queued, stales
         }

      // 处理非活动帐户事务退出
      case <-evict.C:
         pool.mu.Lock()
         for addr := range pool.queue {
            // 从退出机制中跳过本地事务
            if pool.locals.contains(addr) {
               continue
            }
            // 任何时间足够长的非本地交易信息都应该被清除
            if time.Since(pool.beats[addr]) > pool.config.Lifetime {
               for _, tx := range pool.queue[addr].Flatten() {
                  pool.removeTx(tx.Hash(), true)
               }
            }
         }
         pool.mu.Unlock()

      // 处理本地交易日志的轮换
      case <-journal.C:
         if pool.journal != nil {
            pool.mu.Lock()
            if err := pool.journal.rotate(pool.local()); err != nil {
               log.Warn("Failed to rotate local tx journal", "err", err)
            }
            pool.mu.Unlock()
         }
      }
   }
}

promoteTx() 方法

把某个交易加入到pending 队列. 这个方法假设已经获取到了锁

func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.Transaction) bool {
   // 尝试将交易插入到等待挂起队列中
   if pool.pending[addr] == nil {
      pool.pending[addr] = newTxList(true)
   }
   list := pool.pending[addr]

   inserted, old := list.Add(tx, pool.config.PriceBump)
   if !inserted {
      // 如果旧的交易更好就丢弃这个交易
      pool.all.Remove(hash)
      pool.priced.Removed()

      pendingDiscardCounter.Inc(1)
      return false
   }
   // 如果新的交易更好就丢弃以前的任何事务并标记此事务
   if old != nil {
      pool.all.Remove(old.Hash())
      pool.priced.Removed()

      pendingReplaceCounter.Inc(1)
   }
   // Failsafe to work around direct pending inserts (tests)
   if pool.all.Get(hash) == nil {
      pool.all.Add(tx)
      pool.priced.Put(tx)
   }
   // Set the potentially new pending nonce and notify any subsystems of the new tx
   pool.beats[addr] = time.Now()
   pool.pendingState.SetNonce(addr, tx.Nonce()+1)

   return true
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,684评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,143评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,214评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,788评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,796评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,665评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,027评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,679评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,346评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,664评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,766评论 1 331
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,412评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,015评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,974评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,073评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,501评论 2 343