参考:btcd

  • btcd提供了一个内存池mempool,用于存储还未被矿工打包的交易。
  • utxo和block index存储于leveldb中,而不是本文讨论的mempool
  • 被插入之前要进行一系列的正确性验证(通过mabeAcceptTransaction)。
  • 如果是orphan tx(即在main chain和mempool里找不到input的所属交易tx)会暂时插入到orphans pool(通过ProcessTransaction)。
  • 当新的block连接到主链时,会把block中的tx从mempool中移除,相应的也会转移orphans pool中依赖此block的tx到mempool中(通过MabeAcceptTransaction)
  • 当block从主链脱离时,会重新处理block里的tx(通过MabeAcceptTransaction)

一、创建mempool对象

  • 在server被创建的时候调用此方法生成一个mempool对象

  • 可以看到,pool用来存储正常的tx,orphans存储orphan tx

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    
    // New returns a new memory pool for validating and storing standalone
    // transactions until they are mined into a block.
    func New(cfg *Config) *TxPool {
        return &TxPool{
            cfg:            *cfg,
            pool:           make(map[chainhash.Hash]*TxDesc),
            orphans:        make(map[chainhash.Hash]*orphanTx),
            orphansByPrev:  make(map[wire.OutPoint]map[chainhash.Hash]*btcutil.Tx),
            nextExpireScan: time.Now().Add(orphanExpireScanInterval),
            outpoints:      make(map[wire.OutPoint]*btcutil.Tx),
        }
    }
    

二、ProcessTransaction

  • 该方法用来处理通过rpc请求(handleSendRawTransaction)发送的的rawtx和通过p2p网络(OnTx)同步的tx

  • 支持orphan tx的插入

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    
    // ProcessTransaction is the main workhorse for handling insertion of new
    // free-standing transactions into the memory pool.  It includes functionality
    // such as rejecting duplicate transactions, ensuring transactions follow all
    // rules, orphan transaction handling, and insertion into the memory pool.
    //
    // It returns a slice of transactions added to the mempool.  When the
    // error is nil, the list will include the passed transaction itself along
    // with any additional orphan transaactions that were added as a result of
    // the passed one being accepted.
    //
    // This function is safe for concurrent access.
    func (mp *TxPool) ProcessTransaction(tx *btcutil.Tx, allowOrphan, rateLimit bool, tag Tag) ([]*TxDesc, error) {
        log.Tracef("Processing transaction %v", tx.Hash())
    
        // Protect concurrent access.
        mp.mtx.Lock()
        defer mp.mtx.Unlock()
    
        // Potentially accept the transaction to the memory pool.
        missingParents, txD, err := mp.maybeAcceptTransaction(tx, true, rateLimit,
            true)
        if err != nil {
            return nil, err
        }
    
        if len(missingParents) == 0 {
            // Accept any orphan transactions that depend on this
            // transaction (they may no longer be orphans if all inputs
            // are now available) and repeat for those accepted
            // transactions until there are no more.
            newTxs := mp.processOrphans(tx)
            acceptedTxs := make([]*TxDesc, len(newTxs)+1)
    
            // Add the parent transaction first so remote nodes
            // do not add orphans.
            acceptedTxs[0] = txD
            copy(acceptedTxs[1:], newTxs)
    
            return acceptedTxs, nil
        }
    
        // The transaction is an orphan (has inputs missing).  Reject
        // it if the flag to allow orphans is not set.
        if !allowOrphan {
            // Only use the first missing parent transaction in
            // the error message.
            //
            // NOTE: RejectDuplicate is really not an accurate
            // reject code here, but it matches the reference
            // implementation and there isn't a better choice due
            // to the limited number of reject codes.  Missing
            // inputs is assumed to mean they are already spent
            // which is not really always the case.
            str := fmt.Sprintf("orphan transaction %v references "+
                "outputs of unknown or fully-spent "+
                "transaction %v", tx.Hash(), missingParents[0])
            return nil, txRuleError(wire.RejectDuplicate, str)
        }
    
        // Potentially add the orphan transaction to the orphan pool.
        err = mp.maybeAddOrphan(tx, tag)
        return nil, err
    }
    

三、MaybeAcceptTransaction

  • 当block 从主链 connect / disconnect 的时候(blockchain.NTBlockConnected/blockchain.NTBlockDisconnected),使用此函数处理block中的tx(当然不会包含coinbase)

  • 不支持orphan tx的插入

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    
    // MaybeAcceptTransaction is the main workhorse for handling insertion of new
    // free-standing transactions into a memory pool.  It includes functionality
    // such as rejecting duplicate transactions, ensuring transactions follow all
    // rules, detecting orphan transactions, and insertion into the memory pool.
    //
    // If the transaction is an orphan (missing parent transactions), the
    // transaction is NOT added to the orphan pool, but each unknown referenced
    // parent is returned.  Use ProcessTransaction instead if new orphans should
    // be added to the orphan pool.
    //
    // This function is safe for concurrent access.
    func (mp *TxPool) MaybeAcceptTransaction(tx *btcutil.Tx, isNew, rateLimit bool) ([]*chainhash.Hash, *TxDesc, error) {
        // Protect concurrent access.
        mp.mtx.Lock()
        hashes, txD, err := mp.maybeAcceptTransaction(tx, isNew, rateLimit, true)
        mp.mtx.Unlock()
    
        return hashes, txD, err
    }
    

四、mabeAcceptTransaction

  • ProcessTransaction和MabeAcceptTransaction最终都调用的该方法
  • 该方法会做一系列合法性验证然后将tx插入mempool

五、ProcessOrhpans

  • 当有新的block connect 到主链时,对block中的每一个tx都检测是否存在基于它的orphan tx,并将存在的tx移到mempool中

    // ProcessOrphans determines if there are any orphans which depend on the passed
    // transaction hash (it is possible that they are no longer orphans) and
    // potentially accepts them to the memory pool.  It repeats the process for the
    // newly accepted transactions (to detect further orphans which may no longer be
    // orphans) until there are no more.
    //
    // It returns a slice of transactions added to the mempool.  A nil slice means
    // no transactions were moved from the orphan pool to the mempool.
    //
    // This function is safe for concurrent access.
    func (mp *TxPool) ProcessOrphans(acceptedTx *btcutil.Tx) []*TxDesc {
        mp.mtx.Lock()
        acceptedTxns := mp.processOrphans(acceptedTx)
        mp.mtx.Unlock()
    
        return acceptedTxns
    }
    

六、Fetchtransaction

  • 从mem pool中获取对应hash的tx,不包含orphans pool

  • 在rpc请求的一些方法(searchrawtransactions、getrawtransaction、gettxout、)和p2p网络发送txmsg使用

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    
    // FetchTransaction returns the requested transaction from the transaction pool.
    // This only fetches from the main transaction pool and does not include
    // orphans.
    //
    // This function is safe for concurrent access.
    func (mp *TxPool) FetchTransaction(txHash *chainhash.Hash) (*btcutil.Tx, error) {
        // Protect concurrent access.
        mp.mtx.RLock()
        txDesc, exists := mp.pool[*txHash]
        mp.mtx.RUnlock()
    
        if exists {
            return txDesc.Tx, nil
        }
    
        return nil, fmt.Errorf("transaction is not in the pool")
    }
    

七、MiningDescs

  • rpc getBlockTemplate 使用此方法生成挖矿需要的数据

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    
    // MiningDescs returns a slice of mining descriptors for all the transactions
    // in the pool.
    //
    // This is part of the mining.TxSource interface implementation and is safe for
    // concurrent access as required by the interface contract.
    func (mp *TxPool) MiningDescs() []*mining.TxDesc {
        mp.mtx.RLock()
        descs := make([]*mining.TxDesc, len(mp.pool))
        i := 0
        for _, desc := range mp.pool {
            descs[i] = &desc.TxDesc
            i++
        }
        mp.mtx.RUnlock()
    
        return descs
    }
    

八、RemoveTransaction

  • 从mempool中移除交易

  • 当有新的block connect到主链时,移除block中的transaction

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    
    // RemoveTransaction removes the passed transaction from the mempool. When the
    // removeRedeemers flag is set, any transactions that redeem outputs from the
    // removed transaction will also be removed recursively from the mempool, as
    // they would otherwise become orphans.
    //
    // This function is safe for concurrent access.
    func (mp *TxPool) RemoveTransaction(tx *btcutil.Tx, removeRedeemers bool) {
        // Protect concurrent access.
        mp.mtx.Lock()
        mp.removeTransaction(tx, removeRedeemers)
        mp.mtx.Unlock()
    }
    
    

九、checkPoolDoubleSpend

  • 双花检测

  • 通过对比 待插入txin的PreviousOutPoint和内存池里边的已经存在的outpoints

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    
    // checkPoolDoubleSpend checks whether or not the passed transaction is
    // attempting to spend coins already spent by other transactions in the pool.
    // Note it does not check for double spends against transactions already in the
    // main chain.
    //
    // This function MUST be called with the mempool lock held (for reads).
    func (mp *TxPool) checkPoolDoubleSpend(tx *hcutil.Tx, txType stake.TxType) error {
        for i, txIn := range tx.MsgTx().TxIn {
            // We don't care about double spends of stake bases.
            if  i == 0 && (txType == stake.TxTypeSSGen || txType == stake.TxTypeSSRtx) {
                continue
            }
    
            if txR, exists := mp.outpoints[txIn.PreviousOutPoint]; exists {
                str := fmt.Sprintf("transaction %v in the pool "+
                    "already spends the same coins", txR.Hash())
                return txRuleError(wire.RejectDuplicate, str)
            }
        }
    
        return nil
    }
    
  • outpoint 包含hash和索引

    1
    2
    3
    4
    5
    6
    7
    8
    
    // OutPoint defines a HC data type that is used to track previous
    // transaction outputs.
    type OutPoint struct {
        Hash  chainhash.Hash
        Index uint32
        Tree  int8
    }
    
    

十、PruneExpiredTx

  • 根据高度移除过期交易

  • 当从其他peer同步到一个块的时候和重组的时候发生

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    
    // PruneExpiredTx prunes expired transactions from the mempool that may no longer
    // be able to be included into a block.
    func (mp *TxPool) PruneExpiredTx(height int64) {
        // Protect concurrent access.
        mp.mtx.Lock()
        mp.pruneExpiredTx(height)
        mp.mtx.Unlock()
    }
    
    func (mp *TxPool) pruneExpiredTx(height int64) {
        for _, tx := range mp.pool {
            if tx.Tx.MsgTx().Expiry != 0 {
                if height >= int64(tx.Tx.MsgTx().Expiry) {
                    log.Debugf("Pruning expired transaction %v "+
                        "from the mempool", tx.Tx.Hash())
                    mp.removeTransaction(tx.Tx, true)
                }
            }
        }
    }