diff --git a/service/bfg/bfg.go b/service/bfg/bfg.go index 7c196779..c2e058c3 100644 --- a/service/bfg/bfg.go +++ b/service/bfg/bfg.go @@ -112,6 +112,11 @@ type Server struct { cfg *Config + // requests + requestLimit int // Request limiter queue depth + requestLimiter chan bool // Maximum in progress websocket commands + requestTimeout time.Duration + btcHeight uint64 hemiHeight uint32 @@ -151,12 +156,17 @@ func NewServer(cfg *Config) (*Server, error) { if cfg == nil { cfg = NewDefaultConfig() } + defaultRequestTimeout := 9 * time.Second // XXX + requestLimit := 1000 // XXX s := &Server{ - cfg: cfg, - popTXFinality: make(map[uint64][]*popTX), - btcHeight: cfg.BTCStartHeight, - server: http.NewServeMux(), - publicServer: http.NewServeMux(), + cfg: cfg, + requestLimiter: make(chan bool, requestLimit), + requestLimit: requestLimit, + requestTimeout: defaultRequestTimeout, + popTXFinality: make(map[uint64][]*popTX), + btcHeight: cfg.BTCStartHeight, + server: http.NewServeMux(), + publicServer: http.NewServeMux(), cmdsProcessed: prometheus.NewCounter(prometheus.CounterOpts{ Subsystem: promSubsystem, Name: "rpc_calls_total", @@ -164,6 +174,9 @@ func NewServer(cfg *Config) (*Server, error) { }), sessions: make(map[string]*bfgWs), } + for i := 0; i < requestLimit; i++ { + s.requestLimiter <- true + } var err error s.btcClient, err = electrumx.NewClient(cfg.EXBTCAddress) @@ -176,27 +189,46 @@ func NewServer(cfg *Config) (*Server, error) { return s, nil } -func (s *Server) writeResponse(ctx context.Context, conn protocol.APIConn, response any, id string) error { - if err := bfgapi.Write(ctx, conn, id, response); err != nil { - log.Errorf("error occurred writing bfgapi: %s", err) - return err +// handleRequest is called as a go routine to handle a long lived command. +func (s *Server) handleRequest(parrentCtx context.Context, bws *bfgWs, wsid string, requestType string, handler func(ctx context.Context) (any, error)) { + log.Tracef("handleRequest: %v", bws.addr) + defer log.Tracef("handleRequest exit: %v", bws.addr) + + ctx, cancel := context.WithTimeout(parrentCtx, s.requestTimeout) + defer cancel() + + select { + case <-s.requestLimiter: + default: + log.Infof("Request limiter hit %v: %v", bws.addr, requestType) + <-s.requestLimiter } + defer func() { s.requestLimiter <- true }() - return nil + log.Tracef("Handling request %v: %v", bws.addr, requestType) + + response, err := handler(ctx) + if err != nil { + log.Errorf("Failed to handle %v request %v: %v", + requestType, bws.addr, err) + } + if response == nil { + return + } + + log.Debugf("Responding to %v request with %v", requestType, spew.Sdump(response)) + + if err := bfgapi.Write(ctx, bws.conn, wsid, response); err != nil { + log.Errorf("Failed to handle %v request: protocol write failed: %v", + requestType, err) + } } -func (s *Server) handleBitcoinBalance(ctx context.Context, bws *bfgWs, payload any, id string) (any, error) { +func (s *Server) handleBitcoinBalance(ctx context.Context, bbr *bfgapi.BitcoinBalanceRequest) (any, error) { log.Tracef("handleBitcoinBalance") defer log.Tracef("handleBitcoinBalance exit") - // Increade command count - defer s.cmdsProcessed.Inc() - br, ok := payload.(*bfgapi.BitcoinBalanceRequest) - if !ok { - return nil, fmt.Errorf("not BitcoinBalanceRequest: %T", br) - } - - balance, err := s.btcClient.Balance(ctx, br.ScriptHash) + balance, err := s.btcClient.Balance(ctx, bbr.ScriptHash) if err != nil { e := protocol.NewInternalErrorf("bitcoin balance: %v", err) return &bfgapi.BitcoinBalanceResponse{ @@ -210,16 +242,9 @@ func (s *Server) handleBitcoinBalance(ctx context.Context, bws *bfgWs, payload a }, nil } -func (s *Server) handleBitcoinBroadcast(ctx context.Context, bws *bfgWs, payload any, id string) (any, error) { +func (s *Server) handleBitcoinBroadcast(ctx context.Context, bbr *bfgapi.BitcoinBroadcastRequest) (any, error) { log.Tracef("handleBitcoinBroadcast") defer log.Tracef("handleBitcoinBroadcast exit") - // Increade command count - defer s.cmdsProcessed.Inc() - - bbr, ok := payload.(*bfgapi.BitcoinBroadcastRequest) - if !ok { - return nil, fmt.Errorf("not a BitcoinBroadcastRequest: %T", bbr) - } rr := bytes.NewReader(bbr.Transaction) mb := wire.MsgTx{} @@ -250,7 +275,7 @@ func (s *Server) handleBitcoinBroadcast(ctx context.Context, bws *bfgWs, payload }, nil } - txHash, err := s.btcClient.Broadcast(context.TODO(), bbr.Transaction) + txHash, err := s.btcClient.Broadcast(ctx, bbr.Transaction) if err != nil { e := protocol.NewInternalErrorf("broadcast tx: %s", err) return &bfgapi.BitcoinBroadcastResponse{ @@ -286,16 +311,9 @@ func (s *Server) handleBitcoinBroadcast(ctx context.Context, bws *bfgWs, payload return &bfgapi.BitcoinBroadcastResponse{TXID: txHash}, nil } -func (s *Server) handleBitcoinInfo(ctx context.Context, bws *bfgWs, payload any, id string) (any, error) { +func (s *Server) handleBitcoinInfo(ctx context.Context, bir *bfgapi.BitcoinInfoRequest) (any, error) { log.Tracef("handleBitcoinInfo") defer log.Tracef("handleBitcoinInfo exit") - // Increade command count - defer s.cmdsProcessed.Inc() - - _, ok := payload.(*bfgapi.BitcoinInfoRequest) - if !ok { - return nil, fmt.Errorf("not a BitcoinInfoRequest %T", payload) - } height, err := s.btcClient.Height(ctx) if err != nil { @@ -310,20 +328,11 @@ func (s *Server) handleBitcoinInfo(ctx context.Context, bws *bfgWs, payload any, }, nil } -func (s *Server) handleBitcoinUTXOs(ctx context.Context, bws *bfgWs, payload any, id string) (any, error) { +func (s *Server) handleBitcoinUTXOs(ctx context.Context, bur *bfgapi.BitcoinUTXOsRequest) (any, error) { log.Tracef("handleBitcoinUTXOs") defer log.Tracef("handleBitcoinUTXOs exit") - // Increade command count - defer s.cmdsProcessed.Inc() - bur, ok := payload.(*bfgapi.BitcoinUTXOsRequest) - if !ok { - err := fmt.Errorf("not a BitcoinUTXOsRequest %T", payload) - log.Errorf(err.Error()) - return nil, err - } - - utxos, err := s.btcClient.UTXOs(context.TODO(), bur.ScriptHash) + utxos, err := s.btcClient.UTXOs(ctx, bur.ScriptHash) if err != nil { e := protocol.NewInternalErrorf("bitcoin utxos: %v", err) return &bfgapi.BitcoinUTXOsResponse{ @@ -343,17 +352,11 @@ func (s *Server) handleBitcoinUTXOs(ctx context.Context, bws *bfgWs, payload any return buResp, nil } -func (s *Server) handleAccessPublicKeyCreateRequest(ctx context.Context, bws *bfgWs, payload any, id string) (any, error) { +func (s *Server) handleAccessPublicKeyCreateRequest(ctx context.Context, acpkc *bfgapi.AccessPublicKeyCreateRequest) (any, error) { log.Tracef("handleAccessPublicKeyCreateRequest") defer log.Tracef("handleAccessPublicKeyCreateRequest exit") - accessPublicKeyCreateRequest, ok := payload.(*bfgapi.AccessPublicKeyCreateRequest) - if !ok { - err := fmt.Errorf("incorrect type: %T", payload) - return nil, err - } - - publicKey, err := hex.DecodeString(accessPublicKeyCreateRequest.PublicKey) + publicKey, err := hex.DecodeString(acpkc.PublicKey) if err != nil { return &bfgapi.AccessPublicKeyCreateResponse{ Error: protocol.WireErrorf("public key decode: %v", err), @@ -384,7 +387,7 @@ func (s *Server) handleAccessPublicKeyCreateRequest(ctx context.Context, bws *bf return &bfgapi.AccessPublicKeyCreateResponse{}, nil } -func (s *Server) handleAccessPublicKeyDelete(ctx context.Context, bws *bfgWs, payload any, id string) (any, error) { +func (s *Server) handleAccessPublicKeyDelete(ctx context.Context, payload any) (any, error) { log.Tracef("handleAccessPublicKeyDelete") defer log.Tracef("handleAccessPublicKeyDelete exit") @@ -631,9 +634,6 @@ func (s *Server) handleWebsocketPrivateRead(ctx context.Context, bws *bfgWs) { log.Tracef("handleWebsocketPrivateRead: %v", bws.addr) defer log.Tracef("handleWebsocketPrivateRead exit: %v", bws.addr) - // Command completed - defer s.cmdsProcessed.Inc() - for { cmd, id, payload, err := bfgapi.Read(ctx, bws.conn) if err != nil { @@ -651,42 +651,66 @@ func (s *Server) handleWebsocketPrivateRead(ctx context.Context, bws *bfgWs) { log.Tracef("handleWebsocketRead read %v: %v %v %v", bws.addr, cmd, id, spew.Sdump(payload)) - var response any - switch cmd { case bfgapi.CmdPingRequest: - response, err = s.handlePing(ctx, bws, payload, id) + err = s.handlePingRequest(ctx, bws, payload, id) case bfgapi.CmdPopTxForL2BlockRequest: - response, err = s.handlePopTxForL2Block(ctx, bws, payload, id) + handler := func(c context.Context) (any, error) { + msg := payload.(*bfgapi.PopTxsForL2BlockRequest) + return s.handlePopTxsForL2Block(c, msg) + } + + go s.handleRequest(ctx, bws, id, "handle pop for l2 block request", handler) case bfgapi.CmdNewL2KeystonesRequest: - response, err = s.handleNewL2Keystones(ctx, bws, payload, id) + handler := func(c context.Context) (any, error) { + msg := payload.(*bfgapi.NewL2KeystonesRequest) + return s.handleNewL2Keystones(c, msg) + } + + go s.handleRequest(ctx, bws, id, "handle new l2 keystones request", handler) case bfgapi.CmdBTCFinalityByRecentKeystonesRequest: - response, err = s.handleBtcFinalityByRecentKeystonesRequest(ctx, bws, payload, id) + handler := func(c context.Context) (any, error) { + msg := payload.(*bfgapi.BTCFinalityByRecentKeystonesRequest) + return s.handleBtcFinalityByRecentKeystonesRequest(c, msg) + } + + go s.handleRequest(ctx, bws, id, "handle finality recent keystones request", handler) case bfgapi.CmdBTCFinalityByKeystonesRequest: - response, err = s.handleBtcFinalityByKeystonesRequest(ctx, bws, payload, id) + handler := func(c context.Context) (any, error) { + msg := payload.(*bfgapi.BTCFinalityByKeystonesRequest) + return s.handleBtcFinalityByKeystonesRequest(c, msg) + } + + go s.handleRequest(ctx, bws, id, "handle finality keystones request", handler) case bfgapi.CmdAccessPublicKeyCreateRequest: - response, err = s.handleAccessPublicKeyCreateRequest(ctx, bws, payload, id) + handler := func(c context.Context) (any, error) { + msg := payload.(*bfgapi.AccessPublicKeyCreateRequest) + return s.handleAccessPublicKeyCreateRequest(c, msg) + } + + go s.handleRequest(ctx, bws, id, "handle access key create request", handler) case bfgapi.CmdAccessPublicKeyDeleteRequest: - response, err = s.handleAccessPublicKeyDelete(ctx, bws, payload, id) + handler := func(c context.Context) (any, error) { + msg := payload.(*bfgapi.AccessPublicKeyDeleteRequest) + return s.handleAccessPublicKeyDelete(c, msg) + } + + go s.handleRequest(ctx, bws, id, "handle access key delete request", handler) default: err = fmt.Errorf("unknown command") } - // if there was an error, close the websocket, only do this if we - // can't continue + // If set, it is a terminal error. if err != nil { - log.Errorf("handleWebsocketPrivateRead error %v %v: %v", bws.addr, cmd, err) - // XXX this should be handled in the caller + log.Errorf("handleWebsocketRead %v %v %v: %v", + bws.addr, cmd, id, err) + // XXX this needs to be handled by the caller bws.conn.CloseStatus(websocket.StatusProtocolError, err.Error()) return - } else { - if err := s.writeResponse(ctx, bws.conn, response, id); err != nil { - bws.conn.CloseStatus(websocket.StatusProtocolError, err.Error()) - return - } } + s.cmdsProcessed.Inc() } } @@ -696,9 +720,6 @@ func (s *Server) handleWebsocketPublicRead(ctx context.Context, bws *bfgWs) { log.Tracef("handleWebsocketPublicRead: %v", bws.addr) defer log.Tracef("handleWebsocketPublicRead exit: %v", bws.addr) - // Command completed - defer s.cmdsProcessed.Inc() - for { cmd, id, payload, err := bfgapi.Read(ctx, bws.conn) if err != nil { @@ -712,38 +733,61 @@ func (s *Server) handleWebsocketPublicRead(ctx context.Context, bws *bfgWs) { return } - var response any - switch cmd { case bfgapi.CmdPingRequest: - response, err = s.handlePing(ctx, bws, payload, id) + // quick call + err = s.handlePingRequest(ctx, bws, payload, id) case bfgapi.CmdL2KeystonesRequest: - response, err = s.handleL2KeystonesRequest(ctx, bws, payload, id) + handler := func(c context.Context) (any, error) { + msg := payload.(*bfgapi.L2KeystonesRequest) + return s.handleL2KeystonesRequest(c, msg) + } + + go s.handleRequest(ctx, bws, id, "handle l2 keystones request", handler) case bfgapi.CmdBitcoinBalanceRequest: - response, err = s.handleBitcoinBalance(ctx, bws, payload, id) + handler := func(c context.Context) (any, error) { + msg := payload.(*bfgapi.BitcoinBalanceRequest) + return s.handleBitcoinBalance(c, msg) + } + + go s.handleRequest(ctx, bws, id, "handle bitcoin balance request", handler) case bfgapi.CmdBitcoinBroadcastRequest: - response, err = s.handleBitcoinBroadcast(ctx, bws, payload, id) + handler := func(c context.Context) (any, error) { + msg := payload.(*bfgapi.BitcoinBroadcastRequest) + return s.handleBitcoinBroadcast(c, msg) + } + + go s.handleRequest(ctx, bws, id, "handle bitcoin broadcast request", handler) case bfgapi.CmdBitcoinInfoRequest: - response, err = s.handleBitcoinInfo(ctx, bws, payload, id) + handler := func(c context.Context) (any, error) { + msg := payload.(*bfgapi.BitcoinInfoRequest) + return s.handleBitcoinInfo(c, msg) + } + + go s.handleRequest(ctx, bws, id, "handle bitcoin broadcast request", handler) case bfgapi.CmdBitcoinUTXOsRequest: - response, err = s.handleBitcoinUTXOs(ctx, bws, payload, id) + + handler := func(c context.Context) (any, error) { + msg := payload.(*bfgapi.BitcoinUTXOsRequest) + return s.handleBitcoinUTXOs(c, msg) + } + + go s.handleRequest(ctx, bws, id, "handle bitcoin utxos request", handler) default: err = fmt.Errorf("unknown command") } + // If set, it is a terminal error. if err != nil { - log.Errorf("handleWebsocketPublicRead %v %v: %v", bws.addr, cmd, err) + log.Errorf("handleWebsocketRead %v %v %v: %v", + bws.addr, cmd, id, err) + // XXX this needs to be handled by the caller bws.conn.CloseStatus(websocket.StatusProtocolError, err.Error()) return - } else { - if err := s.writeResponse(ctx, bws.conn, response, id); err != nil { - // XXX this should be handled in the caller - bws.conn.CloseStatus(websocket.StatusProtocolError, err.Error()) - return - } } + s.cmdsProcessed.Inc() } } @@ -934,33 +978,33 @@ func (s *Server) handleWebsocketPublic(w http.ResponseWriter, r *http.Request) { bws.sessionId, r.RemoteAddr, bws.publicKey) } -func (s *Server) handlePing(ctx context.Context, bws *bfgWs, payload any, id string) (any, error) { - log.Tracef("handlePing: %v", bws.addr) - defer log.Tracef("handlePing exit: %v", bws.addr) +func (s *Server) handlePingRequest(ctx context.Context, bws *bfgWs, payload any, id string) error { + log.Tracef("handlePingRequest: %v", bws.addr) + defer log.Tracef("handlePingRequest exit: %v", bws.addr) p, ok := payload.(*bfgapi.PingRequest) if !ok { - return nil, fmt.Errorf("handlePing invalid payload type: %T", payload) + return fmt.Errorf("handlePingRequest invalid payload type: %T", payload) } response := &bfgapi.PingResponse{ OriginTimestamp: p.Timestamp, Timestamp: time.Now().Unix(), } - return response, nil -} - -func (s *Server) handlePopTxForL2Block(ctx context.Context, bws *bfgWs, payload any, id string) (any, error) { - log.Tracef("handlePopTxForL2Block: %v", bws.addr) - defer log.Tracef("handlePopTxForL2Block exit: %v", bws.addr) + log.Tracef("responding with %v", spew.Sdump(response)) - p, ok := payload.(*bfgapi.PopTxsForL2BlockRequest) - if !ok { - return nil, fmt.Errorf("handlePopTxForL2Block invalid payload type: %T", - payload) + if err := bfgapi.Write(ctx, bws.conn, id, response); err != nil { + return fmt.Errorf("handlePingRequest write: %v %v", + bws.addr, err) } + return nil +} + +func (s *Server) handlePopTxsForL2Block(ctx context.Context, ptl2 *bfgapi.PopTxsForL2BlockRequest) (any, error) { + log.Tracef("handlePopTxsForL2Block") + defer log.Tracef("handlePopTxsForL2Block exit") - hash := hemi.HashSerializedL2KeystoneAbrev(p.L2Block) + hash := hemi.HashSerializedL2KeystoneAbrev(ptl2.L2Block) var h [32]byte copy(h[:], hash) popTxs, err := s.db.PopBasisByL2KeystoneAbrevHash(ctx, h, true) @@ -989,17 +1033,8 @@ func (s *Server) handlePopTxForL2Block(ctx context.Context, bws *bfgWs, payload return response, nil } -func (s *Server) handleBtcFinalityByRecentKeystonesRequest(ctx context.Context, bws *bfgWs, payload any, id string) (any, error) { - p, ok := payload.(*bfgapi.BTCFinalityByRecentKeystonesRequest) - if ok == false { - // XXX this isn't right - return nil, fmt.Errorf( - "handleBtcFinalityByRecentKeystonesRequest invalid payload type %T", - payload, - ) - } - - finalities, err := s.db.L2BTCFinalityMostRecent(ctx, p.NumRecentKeystones) +func (s *Server) handleBtcFinalityByRecentKeystonesRequest(ctx context.Context, bfrk *bfgapi.BTCFinalityByRecentKeystonesRequest) (any, error) { + finalities, err := s.db.L2BTCFinalityMostRecent(ctx, bfrk.NumRecentKeystones) if err != nil { e := protocol.NewInternalErrorf("error getting finality: %v", err) return &bfgapi.BTCFinalityByRecentKeystonesResponse{ @@ -1029,18 +1064,9 @@ func (s *Server) handleBtcFinalityByRecentKeystonesRequest(ctx context.Context, }, nil } -func (s *Server) handleBtcFinalityByKeystonesRequest(ctx context.Context, bws *bfgWs, payload any, id string) (any, error) { - p, ok := payload.(*bfgapi.BTCFinalityByKeystonesRequest) - if !ok { - // XXX this isn't right - return nil, fmt.Errorf( - "handleBtcFinalityByKeystonesRequest invalid payload type %T", - payload, - ) - } - - l2KeystoneAbrevHashes := make([]database.ByteArray, 0, len(p.L2Keystones)) - for _, l := range p.L2Keystones { +func (s *Server) handleBtcFinalityByKeystonesRequest(ctx context.Context, bfkr *bfgapi.BTCFinalityByKeystonesRequest) (any, error) { + l2KeystoneAbrevHashes := make([]database.ByteArray, 0, len(bfkr.L2Keystones)) + for _, l := range bfkr.L2Keystones { a := hemi.L2KeystoneAbbreviate(l) l2KeystoneAbrevHashes = append(l2KeystoneAbrevHashes, a.Hash()) } @@ -1077,17 +1103,11 @@ func (s *Server) handleBtcFinalityByKeystonesRequest(ctx context.Context, bws *b }, nil } -func (s *Server) handleL2KeystonesRequest(ctx context.Context, bws *bfgWs, payload any, id string) (any, error) { - p, ok := payload.(*bfgapi.L2KeystonesRequest) - if !ok { - // XXX this isn't right - return nil, fmt.Errorf( - "handleL2KeystonesRequest invalid payload type %T", - payload, - ) - } +func (s *Server) handleL2KeystonesRequest(ctx context.Context, l2kr *bfgapi.L2KeystonesRequest) (any, error) { + log.Tracef("handleL2KeystonesRequest") + defer log.Tracef("handleL2KeystonesRequest exit") - results, err := s.db.L2KeystonesMostRecentN(ctx, uint32(p.NumL2Keystones)) + results, err := s.db.L2KeystonesMostRecentN(ctx, uint32(l2kr.NumL2Keystones)) if err != nil { e := protocol.NewInternalErrorf("error getting l2 keystones: %s", err) return &bfgapi.L2KeystonesResponse{ @@ -1189,8 +1209,8 @@ func hemiL2KeystonesToDb(l2ks []hemi.L2Keystone) []bfgd.L2Keystone { return dbks } -func (s *Server) handleNewL2Keystones(ctx context.Context, bws *bfgWs, payload any, id string) (any, error) { - ks := hemiL2KeystonesToDb(payload.(*bfgapi.NewL2KeystonesRequest).L2Keystones) +func (s *Server) handleNewL2Keystones(ctx context.Context, nlkr *bfgapi.NewL2KeystonesRequest) (any, error) { + ks := hemiL2KeystonesToDb(nlkr.L2Keystones) err := s.db.L2KeystonesInsert(ctx, ks) response := bfgapi.NewL2KeystonesResponse{} if err != nil { @@ -1429,7 +1449,7 @@ func (s *Server) Run(pctx context.Context) error { return fmt.Errorf("failed to create server: %w", err) } cs := []prometheus.Collector{ - s.cmdsProcessed, + s.cmdsProcessed, // XXX should we make two counters? priv/pub prometheus.NewGaugeFunc(prometheus.GaugeOpts{ Subsystem: promSubsystem, Name: "running", diff --git a/service/bss/bss.go b/service/bss/bss.go index 38bc182c..3384d964 100644 --- a/service/bss/bss.go +++ b/service/bss/bss.go @@ -372,6 +372,7 @@ func (s *Server) handleWebsocketRead(ctx context.Context, bws *bssWs) { if err != nil { log.Errorf("handleWebsocketRead %v %v %v: %v", bws.addr, cmd, id, err) + // XXX this needs to be handled by the caller bws.conn.CloseStatus(websocket.StatusProtocolError, err.Error()) return