0%

btcd 源码分析系列:7 - rpc

参考:btcd

  • btcd实现了基于http的和websocket的json-rpc,支持http Basic authentication。
  • websocket通道 支持所有的请求类型,http通道不支持通知类(notify)的请求
  • http/websocket是传输协议,json是序列化协议。

一、start

  • 启动一个server,监听路径”/“和”/ws”,分别对应http/websocket。

  • 通过chedkAuth()函数认证,基于Basic authentication

  • 通过limitConnections()对单个地址的并发连接数量做限制

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    rpcServeMux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
    w.Header().Set("Connection", "close")
    w.Header().Set("Content-Type", "application/json")
    r.Close = true

    // Limit the number of connections to max allowed.
    if s.limitConnections(w, r.RemoteAddr) {
    return
    }

    // Keep track of the number of connected clients.
    s.incrementClients()
    defer s.decrementClients()
    _, isAdmin, err := s.checkAuth(r, true)
    if err != nil {
    jsonAuthFail(w)
    return
    }

    // Read and respond to the request.
    s.jsonRPCRead(w, r, isAdmin)
    })

    // Websocket endpoint.
    rpcServeMux.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
    authenticated, isAdmin, err := s.checkAuth(r, false)
    if err != nil {
    jsonAuthFail(w)
    return
    }

    // Attempt to upgrade the connection to a websocket connection
    // using the default size for read/write buffers.
    ws, err := websocket.Upgrade(w, r, nil, 0, 0)
    if err != nil {
    if _, ok := err.(websocket.HandshakeError); !ok {
    rpcsLog.Errorf("Unexpected websocket error: %v",
    err)
    }
    http.Error(w, "400 Bad Request.", http.StatusBadRequest)
    return
    }
    s.WebsocketHandler(ws, r.RemoteAddr, authenticated, isAdmin)
    })

二、解析参数

  • 通过对body进行unmarshal,可以直接拿到此结构

    1
    2
    3
    4
    5
    6
    type Request struct {
    Jsonrpc string `json:"jsonrpc"`
    Method string `json:"method"`
    Params []json.RawMessage `json:"params"`
    ID interface{} `json:"id"`
    }
  • 进一步通过反射拿到此结构体,cmd对应method cmd(实际上就是参数)的结构体反射值

    1
    2
    3
    4
    5
    6
    type parsedRPCCmd struct {
    id interface{}
    method string
    cmd interface{}
    err *hcjson.RPCError
    }
  • 一个具体的method,对应的cmd结构体,也就是method的参数

    1
    2
    3
    4
    5
    6
    7
    // CreateRawTransactionCmd defines the createrawtransaction JSON-RPC command.
    type CreateRawTransactionCmd struct {
    Inputs []TransactionInput
    Amounts map[string]float64 `jsonrpcusage:"{\"address\":amount,...}"` // In HC
    LockTime *int64
    PayLoad *string
    }

三、执行

  • 通过method找到相应的handler,传入之前拿到的参数,并执行

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    func (s *rpcServer) standardCmdResult(cmd *parsedRPCCmd, closeChan <-chan struct{}) (interface{}, error) {
    handler, ok := rpcHandlers[cmd.method]
    if ok {
    goto handled
    }
    _, ok = rpcAskWallet[cmd.method]
    if ok {
    handler = handleAskWallet
    goto handled
    }
    _, ok = rpcUnimplemented[cmd.method]
    if ok {
    handler = handleUnimplemented
    goto handled
    }
    return nil, hcjson.ErrRPCMethodNotFound
    handled:

    return handler(s, cmd.cmd, closeChan)
    }
  • 具体的执行方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21

    // handleCreateRawTransaction handles createrawtransaction commands.
    func handleCreateRawTransaction(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) {
    c := cmd.(*hcjson.CreateRawTransactionCmd)

    // Validate the locktime, if given.
    if c.LockTime != nil &&
    (*c.LockTime < 0 ||
    *c.LockTime > int64(wire.MaxTxInSequenceNum)) {
    return nil, rpcInvalidError("Locktime out of range")
    }

    // Add all transaction inputs to a new transaction after performing
    // some validity checks.
    mtx := wire.NewMsgTx()
    for _, input := range c.Inputs {
    txHash, err := chainhash.NewHashFromStr(input.Txid)
    if err != nil {
    return nil, rpcDecodeHexError(input.Txid)
    }
    }

四、返回

  • 最后将结果序列化并返回,responseID=requestID

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    // Marshal the response.
    msg, err := createMarshalledReply(responseID, result, jsonErr)
    if err != nil {
    rpcsLog.Errorf("Failed to marshal reply: %v", err)
    return
    }

    // Write the response.
    err = s.writeHTTPResponseHeaders(r, w.Header(), http.StatusOK, buf)
    if err != nil {
    rpcsLog.Error(err)
    return
    }
    if _, err := buf.Write(msg); err != nil {
    rpcsLog.Errorf("Failed to write marshalled reply: %v", err)
    }