0%

btcd 源码分析系列:6 - mempool

参考:btcd

  • btcd提供了一个内存池mempool,用于存储还未被矿工打包的交易。
  • utxo和block index存储于leveldb中,而不是本文讨论的mempool
  • 被插入之前要进行一系列的正确性验证(通过mabeAcceptTransaction)。
  • 如果是orphan tx(即在main chain和mempool里找不到input的所属交易tx)会暂时插入到orphans pool(通过ProcessTransaction)。
  • 当新的block连接到主链时,会把block中的tx从mempool中移除,相应的也会转移orphans pool中依赖此block的tx到mempool中(通过MabeAcceptTransaction)
  • 当block从主链脱离时,会重新处理block里的tx(通过MabeAcceptTransaction)

一、创建mempool对象

  • 在server被创建的时候调用此方法生成一个mempool对象

  • 可以看到,pool用来存储正常的tx,orphans存储orphan tx

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    // New returns a new memory pool for validating and storing standalone
    // transactions until they are mined into a block.
    func New(cfg *Config) *TxPool {
    return &TxPool{
    cfg: *cfg,
    pool: make(map[chainhash.Hash]*TxDesc),
    orphans: make(map[chainhash.Hash]*orphanTx),
    orphansByPrev: make(map[wire.OutPoint]map[chainhash.Hash]*btcutil.Tx),
    nextExpireScan: time.Now().Add(orphanExpireScanInterval),
    outpoints: make(map[wire.OutPoint]*btcutil.Tx),
    }
    }

二、ProcessTransaction

  • 该方法用来处理通过rpc请求(handleSendRawTransaction)发送的的rawtx和通过p2p网络(OnTx)同步的tx

  • 支持orphan tx的插入

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    // ProcessTransaction is the main workhorse for handling insertion of new
    // free-standing transactions into the memory pool. It includes functionality
    // such as rejecting duplicate transactions, ensuring transactions follow all
    // rules, orphan transaction handling, and insertion into the memory pool.
    //
    // It returns a slice of transactions added to the mempool. When the
    // error is nil, the list will include the passed transaction itself along
    // with any additional orphan transaactions that were added as a result of
    // the passed one being accepted.
    //
    // This function is safe for concurrent access.
    func (mp *TxPool) ProcessTransaction(tx *btcutil.Tx, allowOrphan, rateLimit bool, tag Tag) ([]*TxDesc, error) {
    log.Tracef("Processing transaction %v", tx.Hash())

    // Protect concurrent access.
    mp.mtx.Lock()
    defer mp.mtx.Unlock()

    // Potentially accept the transaction to the memory pool.
    missingParents, txD, err := mp.maybeAcceptTransaction(tx, true, rateLimit,
    true)
    if err != nil {
    return nil, err
    }

    if len(missingParents) == 0 {
    // Accept any orphan transactions that depend on this
    // transaction (they may no longer be orphans if all inputs
    // are now available) and repeat for those accepted
    // transactions until there are no more.
    newTxs := mp.processOrphans(tx)
    acceptedTxs := make([]*TxDesc, len(newTxs)+1)

    // Add the parent transaction first so remote nodes
    // do not add orphans.
    acceptedTxs[0] = txD
    copy(acceptedTxs[1:], newTxs)

    return acceptedTxs, nil
    }

    // The transaction is an orphan (has inputs missing). Reject
    // it if the flag to allow orphans is not set.
    if !allowOrphan {
    // Only use the first missing parent transaction in
    // the error message.
    //
    // NOTE: RejectDuplicate is really not an accurate
    // reject code here, but it matches the reference
    // implementation and there isn't a better choice due
    // to the limited number of reject codes. Missing
    // inputs is assumed to mean they are already spent
    // which is not really always the case.
    str := fmt.Sprintf("orphan transaction %v references "+
    "outputs of unknown or fully-spent "+
    "transaction %v", tx.Hash(), missingParents[0])
    return nil, txRuleError(wire.RejectDuplicate, str)
    }

    // Potentially add the orphan transaction to the orphan pool.
    err = mp.maybeAddOrphan(tx, tag)
    return nil, err
    }

三、MaybeAcceptTransaction

  • 当block 从主链 connect / disconnect 的时候(blockchain.NTBlockConnected/blockchain.NTBlockDisconnected),使用此函数处理block中的tx(当然不会包含coinbase)

  • 不支持orphan tx的插入

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    // MaybeAcceptTransaction is the main workhorse for handling insertion of new
    // free-standing transactions into a memory pool. It includes functionality
    // such as rejecting duplicate transactions, ensuring transactions follow all
    // rules, detecting orphan transactions, and insertion into the memory pool.
    //
    // If the transaction is an orphan (missing parent transactions), the
    // transaction is NOT added to the orphan pool, but each unknown referenced
    // parent is returned. Use ProcessTransaction instead if new orphans should
    // be added to the orphan pool.
    //
    // This function is safe for concurrent access.
    func (mp *TxPool) MaybeAcceptTransaction(tx *btcutil.Tx, isNew, rateLimit bool) ([]*chainhash.Hash, *TxDesc, error) {
    // Protect concurrent access.
    mp.mtx.Lock()
    hashes, txD, err := mp.maybeAcceptTransaction(tx, isNew, rateLimit, true)
    mp.mtx.Unlock()

    return hashes, txD, err
    }

四、mabeAcceptTransaction

  • ProcessTransaction和MabeAcceptTransaction最终都调用的该方法
  • 该方法会做一系列合法性验证然后将tx插入mempool

五、ProcessOrhpans

  • 当有新的block connect 到主链时,对block中的每一个tx都检测是否存在基于它的orphan tx,并将存在的tx移到mempool中

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    // ProcessOrphans determines if there are any orphans which depend on the passed
    // transaction hash (it is possible that they are no longer orphans) and
    // potentially accepts them to the memory pool. It repeats the process for the
    // newly accepted transactions (to detect further orphans which may no longer be
    // orphans) until there are no more.
    //
    // It returns a slice of transactions added to the mempool. A nil slice means
    // no transactions were moved from the orphan pool to the mempool.
    //
    // This function is safe for concurrent access.
    func (mp *TxPool) ProcessOrphans(acceptedTx *btcutil.Tx) []*TxDesc {
    mp.mtx.Lock()
    acceptedTxns := mp.processOrphans(acceptedTx)
    mp.mtx.Unlock()

    return acceptedTxns
    }

六、Fetchtransaction

  • 从mem pool中获取对应hash的tx,不包含orphans pool

  • 在rpc请求的一些方法(searchrawtransactions、getrawtransaction、gettxout、)和p2p网络发送txmsg使用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    // FetchTransaction returns the requested transaction from the transaction pool.
    // This only fetches from the main transaction pool and does not include
    // orphans.
    //
    // This function is safe for concurrent access.
    func (mp *TxPool) FetchTransaction(txHash *chainhash.Hash) (*btcutil.Tx, error) {
    // Protect concurrent access.
    mp.mtx.RLock()
    txDesc, exists := mp.pool[*txHash]
    mp.mtx.RUnlock()

    if exists {
    return txDesc.Tx, nil
    }

    return nil, fmt.Errorf("transaction is not in the pool")
    }

七、MiningDescs

  • rpc getBlockTemplate 使用此方法生成挖矿需要的数据

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    // MiningDescs returns a slice of mining descriptors for all the transactions
    // in the pool.
    //
    // This is part of the mining.TxSource interface implementation and is safe for
    // concurrent access as required by the interface contract.
    func (mp *TxPool) MiningDescs() []*mining.TxDesc {
    mp.mtx.RLock()
    descs := make([]*mining.TxDesc, len(mp.pool))
    i := 0
    for _, desc := range mp.pool {
    descs[i] = &desc.TxDesc
    i++
    }
    mp.mtx.RUnlock()

    return descs
    }

八、RemoveTransaction

  • 从mempool中移除交易

  • 当有新的block connect到主链时,移除block中的transaction

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    // RemoveTransaction removes the passed transaction from the mempool. When the
    // removeRedeemers flag is set, any transactions that redeem outputs from the
    // removed transaction will also be removed recursively from the mempool, as
    // they would otherwise become orphans.
    //
    // This function is safe for concurrent access.
    func (mp *TxPool) RemoveTransaction(tx *btcutil.Tx, removeRedeemers bool) {
    // Protect concurrent access.
    mp.mtx.Lock()
    mp.removeTransaction(tx, removeRedeemers)
    mp.mtx.Unlock()
    }

九、checkPoolDoubleSpend

  • 双花检测

  • 通过对比 待插入txin的PreviousOutPoint和内存池里边的已经存在的outpoints

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    // checkPoolDoubleSpend checks whether or not the passed transaction is
    // attempting to spend coins already spent by other transactions in the pool.
    // Note it does not check for double spends against transactions already in the
    // main chain.
    //
    // This function MUST be called with the mempool lock held (for reads).
    func (mp *TxPool) checkPoolDoubleSpend(tx *hcutil.Tx, txType stake.TxType) error {
    for i, txIn := range tx.MsgTx().TxIn {
    // We don't care about double spends of stake bases.
    if i == 0 && (txType == stake.TxTypeSSGen || txType == stake.TxTypeSSRtx) {
    continue
    }

    if txR, exists := mp.outpoints[txIn.PreviousOutPoint]; exists {
    str := fmt.Sprintf("transaction %v in the pool "+
    "already spends the same coins", txR.Hash())
    return txRuleError(wire.RejectDuplicate, str)
    }
    }

    return nil
    }
  • outpoint 包含hash和索引

    1
    2
    3
    4
    5
    6
    7
    // OutPoint defines a HC data type that is used to track previous
    // transaction outputs.
    type OutPoint struct {
    Hash chainhash.Hash
    Index uint32
    Tree int8
    }

十、PruneExpiredTx

  • 根据高度移除过期交易

  • 当从其他peer同步到一个块的时候和重组的时候发生

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    // PruneExpiredTx prunes expired transactions from the mempool that may no longer
    // be able to be included into a block.
    func (mp *TxPool) PruneExpiredTx(height int64) {
    // Protect concurrent access.
    mp.mtx.Lock()
    mp.pruneExpiredTx(height)
    mp.mtx.Unlock()
    }

    func (mp *TxPool) pruneExpiredTx(height int64) {
    for _, tx := range mp.pool {
    if tx.Tx.MsgTx().Expiry != 0 {
    if height >= int64(tx.Tx.MsgTx().Expiry) {
    log.Debugf("Pruning expired transaction %v "+
    "from the mempool", tx.Tx.Hash())
    mp.removeTransaction(tx.Tx, true)
    }
    }
    }
    }