参考:btcd

  • addrmanager 主要提供了peer地址的管理功能,包括地址的增删查改
  • 通过存储json序列化后的数据到本地文件实现持久化
  • btc启动的时候,会读取该json文件,将保存的节点信息读取到内存中
  • btc运行的过程中,每过一段时间(十分钟)就持久化一次,以备下次启动时使用
  • btc退出时,会再次持久化一次

一、创建addrmanager对象

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// New returns a new bitcoin address manager.
// Use Start to begin processing asynchronous address updates.
func New(dataDir string, lookupFunc func(string) ([]net.IP, error)) *AddrManager {
	am := AddrManager{
		peersFile:      filepath.Join(dataDir, "peers.json"),
		lookupFunc:     lookupFunc,
		rand:           rand.New(rand.NewSource(time.Now().UnixNano())),
		quit:           make(chan struct{}),
		localAddresses: make(map[string]*localAddress),
		version:        serialisationVersion,
	}
	am.reset()
	return &am
}

二、启动

  • 通过started字段保证一个addrManager只能start一次

  • loadPeers()就是读取硬盘上保存的peer信息到内存中

  • addressHander 启动一个协程处理周期性的持久化和

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    
    // Start begins the core address handler which manages a pool of known
    // addresses, timeouts, and interval based writes.
    func (a *AddrManager) Start() {
        // Already started?
        if atomic.AddInt32(&a.started, 1) != 1 {
            return
        }
    
        log.Trace("Starting address manager")
    
        // Load peers we already know about from file.
        a.loadPeers()
    
        // Start the address ticker to save addresses periodically.
        a.wg.Add(1)
        go a.addressHandler()
    }
    

三、停止

  • stop的时候通过close channel 通知并等待子协程(处理持久化任务)的退出

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    
    // Stop gracefully shuts down the address manager by stopping the main handler.
    func (a *AddrManager) Stop() error {
        if atomic.AddInt32(&a.shutdown, 1) != 1 {
            log.Warnf("Address manager is already in the process of " +
                "shutting down")
            return nil
        }
    
        log.Infof("Address manager shutting down")
        close(a.quit)
        a.wg.Wait()
        return nil
    }
    
    // addressHandler is the main handler for the address manager.  It must be run
    // as a goroutine.
    func (a *AddrManager) addressHandler() {
        dumpAddressTicker := time.NewTicker(dumpAddressInterval)
        defer dumpAddressTicker.Stop()
    out:
        for {
            select {
            case <-dumpAddressTicker.C:
                a.savePeers()
    
            case <-a.quit:
                break out
            }
        }
        a.savePeers()
        a.wg.Done()
        log.Trace("Address handler done")
    }
    

四、获取地址

  • 用了自己的随机算法选取部分地址返回

  • 这个随机算法简单的说就是,先根据百分比确定返回的数量max,然后选中第i个,并与i-n之间的随机一个交换,i从0依次递增到max-1,这样一共选出来max个

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    
    // AddressCache returns the current address cache.  It must be treated as
    // read-only (but since it is a copy now, this is not as dangerous).
    func (a *AddrManager) AddressCache() []*wire.NetAddress {
        allAddr := a.getAddresses()
    
        numAddresses := len(allAddr) * getAddrPercent / 100
        if numAddresses > getAddrMax {
            numAddresses = getAddrMax
        }
    
        // Fisher-Yates shuffle the array. We only need to do the first
        // `numAddresses' since we are throwing the rest.
        for i := 0; i < numAddresses; i++ {
            // pick a number between current index and the end
            j := rand.Intn(len(allAddr)-i) + i
            allAddr[i], allAddr[j] = allAddr[j], allAddr[i]
        }
    
        // slice off the limit we are willing to share.
        return allAddr[0:numAddresses]
    }