diff --git a/api/protocol/protocol.go b/api/protocol/protocol.go index 09a9ddd1..95d79a31 100644 --- a/api/protocol/protocol.go +++ b/api/protocol/protocol.go @@ -41,8 +41,7 @@ func (he HandshakeError) Error() string { } func (he HandshakeError) Is(target error) bool { - _, ok := target.(HandshakeError) - return ok + return errors.Is(he, target) } var PublicKeyAuthError = websocket.CloseError{ @@ -230,17 +229,18 @@ type Message struct { Payload json.RawMessage `json:"payload"` } -// Error is a protocol Error type that can be used for additional error -// context. It embeds an 8 byte number that can be used to trace calls on both the -// client and server side. +// Error is a protocol error type that is used to provide additional error +// information between a server and client. +// +// A unique "trace" string may be embedded, which can be used to trace errors +// between a server and client. type Error struct { Timestamp int64 `json:"timestamp"` - Trace string `json:"trace"` - Message string `json:"error"` + Trace string `json:"trace,omitempty"` + Message string `json:"message"` } -// Errorf is a client induced protocol error (e.g. "invalid height"). This is a -// pretty printable error on the client and server and is not fatal. +// Errorf returns a protocol Error type with an embedded trace. func Errorf(msg string, args ...interface{}) *Error { trace, _ := random(8) return &Error{ @@ -250,10 +250,83 @@ func Errorf(msg string, args ...interface{}) *Error { } } +// String pretty prints a protocol error. func (e Error) String() string { + if len(e.Trace) == 0 { + return e.Message + } return fmt.Sprintf("%v [%v:%v]", e.Message, e.Trace, e.Timestamp) } +// RequestError wraps an error to create a protocol request error. +// +// Request errors are usually something caused by a client, e.g. validation or +// input errors, and therefore should not be logged server-side and do not +// contain an embedded trace. +func RequestError(err error) *Error { + return &Error{ + Timestamp: time.Now().Unix(), + Message: err.Error(), + } +} + +// RequestErrorf creates a new protocol request error. +// +// Request errors are usually something caused by a client, e.g. validation or +// input errors, and therefore should not be logged server-side and do not +// contain an embedded trace. +func RequestErrorf(msg string, args ...any) *Error { + return &Error{ + Timestamp: time.Now().Unix(), + Message: fmt.Sprintf(msg, args...), + } +} + +// InternalError represents an internal application error. +// +// Internal errors are errors that occurred within the application and are not +// caused by a client (e.g. validation or input errors). The actual error +// message should not be sent to clients, as it is internal to the application, +// and may be server-operator specific. +type InternalError struct { + protocol *Error + internal error +} + +// ProtocolError returns the protocol error representation. +// This error is intended to be sent to clients. +func (ie InternalError) ProtocolError() *Error { + return ie.protocol +} + +// Error satisfies the error interface. +func (ie InternalError) Error() string { + if ie.internal != nil { + return fmt.Sprintf("%v [%v:%v]", ie.internal.Error(), + ie.protocol.Timestamp, ie.protocol.Trace) + } + return ie.protocol.String() +} + +// Unwrap returns the error wrapped by this internal error. +func (ie InternalError) Unwrap() error { + return ie.internal +} + +// NewInternalError returns an InternalError wrapping the given error. +func NewInternalError(err error) *InternalError { + return NewInternalErrorf("internal error: %w", err) +} + +// NewInternalErrorf returns an InternalError constructed from the passed +// message and arguments. +func NewInternalErrorf(msg string, args ...interface{}) *InternalError { + return &InternalError{ + protocol: Errorf("internal error"), + internal: fmt.Errorf(msg, args...), + } +} + // Ping type PingRequest struct { Timestamp int64 `json:"timestamp"` // Local timestamp @@ -416,7 +489,7 @@ func (ac *Conn) IsOnline() bool { return ac.wsc != nil } -// Close close a websocket connection with normal status. +// Close closes a websocket connection with normal status. func (ac *Conn) Close() error { return ac.CloseStatus(websocket.StatusNormalClosure, "") } diff --git a/e2e/e2e_ext_test.go b/e2e/e2e_ext_test.go index ce694d0b..276a4a26 100644 --- a/e2e/e2e_ext_test.go +++ b/e2e/e2e_ext_test.go @@ -980,7 +980,7 @@ func TestBFGPublicErrorCases(t *testing.T) { }, { name: "bitcoin broadcast deserialize error", - expectedError: "failed to deserialized tx: unexpected EOF", + expectedError: "failed to deserialize tx: unexpected EOF", requests: []bfgapi.BitcoinBroadcastRequest{ { Transaction: []byte("invalid..."), @@ -1000,7 +1000,7 @@ func TestBFGPublicErrorCases(t *testing.T) { }, { name: "bitcoin broadcast database error", - expectedError: "pop_basis already exists", + expectedError: "pop basis already exists", requests: []bfgapi.BitcoinBroadcastRequest{ { Transaction: btx, @@ -1158,7 +1158,7 @@ func TestBFGPrivateErrorCases(t *testing.T) { }, { name: "public key is invalid", - expectedError: "encoding/hex: invalid byte: U+006C 'l'", + expectedError: "public key decode: encoding/hex: invalid byte: U+006C 'l'", requests: []bfgapi.AccessPublicKeyCreateRequest{ { PublicKey: "blahblahblah", @@ -3315,7 +3315,7 @@ func TestBFGAuthPingThenRemoval(t *testing.T) { var v interface{} err = wsjson.Read(ctx, c, &v) - if err != nil && !strings.Contains(err.Error(), "status = StatusCode(4100)") { + if err != nil && !strings.Contains(err.Error(), "status = StatusProtocolError and reason = \"killed\"") { t.Fatal(err) } diff --git a/service/bfg/bfg.go b/service/bfg/bfg.go index 305a145e..daec4161 100644 --- a/service/bfg/bfg.go +++ b/service/bfg/bfg.go @@ -62,40 +62,6 @@ func init() { loggo.ConfigureLoggers(logLevel) } -// InternalError is an error type to differentiates between caller and callee -// errors. An internal error is used whne something internal to the application -// fails. -type InternalError struct { - internal *protocol.Error - actual error -} - -// Err return the protocol.Error that can be sent over the wire. -func (ie InternalError) Err() *protocol.Error { - return ie.internal -} - -// String return the actual underlying error. -func (ie InternalError) String() string { - i := ie.internal - return fmt.Sprintf("%v [%v:%v]", ie.actual.Error(), i.Trace, i.Timestamp) -} - -// Error satifies the error interface. -func (ie InternalError) Error() string { - if ie.internal == nil { - return "internal error" - } - return ie.internal.String() -} - -func NewInternalErrorf(msg string, args ...interface{}) *InternalError { - return &InternalError{ - internal: protocol.Errorf("internal error"), - actual: fmt.Errorf(msg, args...), - } -} - func NewDefaultConfig() *Config { return &Config{ EXBTCAddress: "localhost:18001", @@ -146,6 +112,11 @@ type Server struct { cfg *Config + // requests + requestLimit int // Request limiter queue depth + requestLimiter chan bool // Maximum in progress websocket commands + requestTimeout time.Duration + btcHeight uint64 hemiHeight uint32 @@ -185,12 +156,17 @@ func NewServer(cfg *Config) (*Server, error) { if cfg == nil { cfg = NewDefaultConfig() } + defaultRequestTimeout := 9 * time.Second // XXX + requestLimit := 1000 // XXX s := &Server{ - cfg: cfg, - popTXFinality: make(map[uint64][]*popTX), - btcHeight: cfg.BTCStartHeight, - server: http.NewServeMux(), - publicServer: http.NewServeMux(), + cfg: cfg, + requestLimiter: make(chan bool, requestLimit), + requestLimit: requestLimit, + requestTimeout: defaultRequestTimeout, + popTXFinality: make(map[uint64][]*popTX), + btcHeight: cfg.BTCStartHeight, + server: http.NewServeMux(), + publicServer: http.NewServeMux(), cmdsProcessed: prometheus.NewCounter(prometheus.CounterOpts{ Subsystem: promSubsystem, Name: "rpc_calls_total", @@ -198,6 +174,9 @@ func NewServer(cfg *Config) (*Server, error) { }), sessions: make(map[string]*bfgWs), } + for i := 0; i < requestLimit; i++ { + s.requestLimiter <- true + } var err error s.btcClient, err = electrumx.NewClient(cfg.EXBTCAddress) @@ -210,88 +189,103 @@ func NewServer(cfg *Config) (*Server, error) { return s, nil } -func (s *Server) writeResponse(ctx context.Context, conn protocol.APIConn, response any, id string) error { - if err := bfgapi.Write(ctx, conn, id, response); err != nil { - log.Errorf("error occurred writing bfgapi: %s", err) - return err +// handleRequest is called as a go routine to handle a long lived command. +func (s *Server) handleRequest(parrentCtx context.Context, bws *bfgWs, wsid string, requestType string, handler func(ctx context.Context) (any, error)) { + log.Tracef("handleRequest: %v", bws.addr) + defer log.Tracef("handleRequest exit: %v", bws.addr) + + ctx, cancel := context.WithTimeout(parrentCtx, s.requestTimeout) + defer cancel() + + select { + case <-s.requestLimiter: + default: + log.Infof("Request limiter hit %v: %v", bws.addr, requestType) + <-s.requestLimiter } + defer func() { s.requestLimiter <- true }() - return nil -} + log.Tracef("Handling request %v: %v", bws.addr, requestType) -func (s *Server) handleBitcoinBalance(ctx context.Context, bws *bfgWs, payload any, id string) (any, error) { - log.Tracef("handleBitcoinBalance") - defer log.Tracef("handleBitcoinBalance exit") - // Increade command count - defer s.cmdsProcessed.Inc() + response, err := handler(ctx) + if err != nil { + log.Errorf("Failed to handle %v request %v: %v", + requestType, bws.addr, err) + } + if response == nil { + return + } - br, ok := payload.(*bfgapi.BitcoinBalanceRequest) - if !ok { - return nil, fmt.Errorf("not BitcoinBalanceRequest: %T", br) + log.Debugf("Responding to %v request with %v", requestType, spew.Sdump(response)) + + if err := bfgapi.Write(ctx, bws.conn, wsid, response); err != nil { + log.Errorf("Failed to handle %v request: protocol write failed: %v", + requestType, err) } +} - bResp := &bfgapi.BitcoinBalanceResponse{} +func (s *Server) handleBitcoinBalance(ctx context.Context, bbr *bfgapi.BitcoinBalanceRequest) (any, error) { + log.Tracef("handleBitcoinBalance") + defer log.Tracef("handleBitcoinBalance exit") - balance, err := s.btcClient.Balance(ctx, br.ScriptHash) + balance, err := s.btcClient.Balance(ctx, bbr.ScriptHash) if err != nil { - ie := NewInternalErrorf("error getting bitcoin balance: %s", err) - log.Errorf(ie.actual.Error()) - bResp.Error = ie.internal - return bResp, nil + e := protocol.NewInternalErrorf("bitcoin balance: %w", err) + return &bfgapi.BitcoinBalanceResponse{ + Error: e.ProtocolError(), + }, e } - bResp.Confirmed = balance.Confirmed - bResp.Unconfirmed = balance.Unconfirmed - return bResp, nil + return &bfgapi.BitcoinBalanceResponse{ + Confirmed: balance.Confirmed, + Unconfirmed: balance.Unconfirmed, + }, nil } -func (s *Server) handleBitcoinBroadcast(ctx context.Context, bws *bfgWs, payload any, id string) (any, error) { +func (s *Server) handleBitcoinBroadcast(ctx context.Context, bbr *bfgapi.BitcoinBroadcastRequest) (any, error) { log.Tracef("handleBitcoinBroadcast") defer log.Tracef("handleBitcoinBroadcast exit") - // Increade command count - defer s.cmdsProcessed.Inc() - - bbr, ok := payload.(*bfgapi.BitcoinBroadcastRequest) - if !ok { - return nil, fmt.Errorf("not a BitcoinBroadcastRequest: %T", bbr) - } - - bbResp := &bfgapi.BitcoinBroadcastResponse{} rr := bytes.NewReader(bbr.Transaction) mb := wire.MsgTx{} if err := mb.Deserialize(rr); err != nil { - bbResp.Error = protocol.Errorf("failed to deserialized tx: %s", err) - return bbResp, nil + return &bfgapi.BitcoinBroadcastResponse{Error: protocol.RequestErrorf( + "failed to deserialize tx: %s", err, + )}, nil } - var tl2 *pop.TransactionL2 - var err error + var ( + tl2 *pop.TransactionL2 + err error + ) for _, v := range mb.TxOut { tl2, err = pop.ParseTransactionL2FromOpReturn(v.PkScript) - if err != nil { - log.Errorf(err.Error()) // handle real error below + if err == nil { + break // Found the pop transaction. } } + if tl2 == nil { - bbResp.Error = protocol.Errorf("could not find l2 keystone abbrev in btc tx") - return bbResp, nil + return &bfgapi.BitcoinBroadcastResponse{ + Error: protocol.RequestErrorf("could not find l2 keystone abbrev in btc tx"), + }, nil } publicKeyUncompressed, err := pop.ParsePublicKeyFromSignatureScript(mb.TxIn[0].SignatureScript) if err != nil { - bbResp.Error = protocol.Errorf("could not parse signature script: %s", err) - return bbResp, nil + return &bfgapi.BitcoinBroadcastResponse{ + Error: protocol.RequestErrorf("could not parse signature script: %v", err), + }, nil } - txHash, err := s.btcClient.Broadcast(context.TODO(), bbr.Transaction) + txHash, err := s.btcClient.Broadcast(ctx, bbr.Transaction) if err != nil { - ie := NewInternalErrorf("error broadcasting to bitcoin: %s", err) - log.Errorf(ie.actual.Error()) - bbResp.Error = ie.internal - return bbResp, nil + // This may not alwyas be an internal error. + e := protocol.NewInternalErrorf("broadcast tx: %w", err) + return &bfgapi.BitcoinBroadcastResponse{ + Error: e.ProtocolError(), + }, e } - bbResp.TXID = txHash if err := s.db.PopBasisInsertPopMFields(ctx, &bfgd.PopBasis{ BtcTxId: txHash, @@ -300,72 +294,57 @@ func (s *Server) handleBitcoinBroadcast(ctx context.Context, bws *bfgWs, payload L2KeystoneAbrevHash: tl2.L2Keystone.Hash(), }); err != nil { if errors.Is(err, database.ErrDuplicate) { - bbResp.Error = protocol.Errorf("pop_basis already exists") - return bbResp, nil + return &bfgapi.BitcoinBroadcastResponse{ + Error: protocol.RequestErrorf("pop basis already exists"), + }, nil } if errors.Is(err, database.ErrValidation) { - log.Errorf("invalid pop basis: %s", err) - bbResp.Error = protocol.Errorf("invalid pop_basis") - return bbResp, nil + e := protocol.NewInternalErrorf("invalid pop basis: %w", err) + return &bfgapi.BitcoinBroadcastResponse{ + Error: e.ProtocolError(), + }, e } - ie := NewInternalErrorf("error inserting pop basis: %s", err) - bbResp.Error = ie.internal - log.Errorf(ie.actual.Error()) - return bbResp, nil + e := protocol.NewInternalErrorf("insert pop basis: %w", err) + return &bfgapi.BitcoinBroadcastResponse{ + Error: e.ProtocolError(), + }, e } - return bbResp, nil + return &bfgapi.BitcoinBroadcastResponse{TXID: txHash}, nil } -func (s *Server) handleBitcoinInfo(ctx context.Context, bws *bfgWs, payload any, id string) (any, error) { +func (s *Server) handleBitcoinInfo(ctx context.Context, bir *bfgapi.BitcoinInfoRequest) (any, error) { log.Tracef("handleBitcoinInfo") defer log.Tracef("handleBitcoinInfo exit") - // Increade command count - defer s.cmdsProcessed.Inc() - - _, ok := payload.(*bfgapi.BitcoinInfoRequest) - if !ok { - return nil, fmt.Errorf("not a BitcoinInfoRequest %T", payload) - } - - biResp := &bfgapi.BitcoinInfoResponse{} height, err := s.btcClient.Height(ctx) if err != nil { - ie := NewInternalErrorf("error getting bitcoin height: %s", err) - log.Errorf(ie.actual.Error()) - biResp.Error = ie.internal - return biResp, nil + e := protocol.NewInternalErrorf("bitcoin height: %w", err) + return &bfgapi.BitcoinInfoResponse{ + Error: e.ProtocolError(), + }, e } - biResp.Height = height - return biResp, nil + return &bfgapi.BitcoinInfoResponse{ + Height: height, + }, nil } -func (s *Server) handleBitcoinUTXOs(ctx context.Context, bws *bfgWs, payload any, id string) (any, error) { +func (s *Server) handleBitcoinUTXOs(ctx context.Context, bur *bfgapi.BitcoinUTXOsRequest) (any, error) { log.Tracef("handleBitcoinUTXOs") defer log.Tracef("handleBitcoinUTXOs exit") - // Increade command count - defer s.cmdsProcessed.Inc() - - bur, ok := payload.(*bfgapi.BitcoinUTXOsRequest) - if !ok { - err := fmt.Errorf("not a BitcoinUTXOsRequest %T", payload) - log.Errorf(err.Error()) - return nil, err - } - - buResp := &bfgapi.BitcoinUTXOsResponse{} - utxos, err := s.btcClient.UTXOs(context.TODO(), bur.ScriptHash) + utxos, err := s.btcClient.UTXOs(ctx, bur.ScriptHash) if err != nil { - ie := NewInternalErrorf("error getting bitcoin utxos: %s", err) - log.Errorf(ie.actual.Error()) - buResp.Error = ie.internal - return buResp, nil + e := protocol.NewInternalErrorf("bitcoin utxos: %w", err) + return &bfgapi.BitcoinUTXOsResponse{ + Error: e.ProtocolError(), + }, e + } + buResp := bfgapi.BitcoinUTXOsResponse{} for _, utxo := range utxos { buResp.UTXOs = append(buResp.UTXOs, &bfgapi.BitcoinUTXO{ Hash: utxo.Hash, @@ -377,47 +356,42 @@ func (s *Server) handleBitcoinUTXOs(ctx context.Context, bws *bfgWs, payload any return buResp, nil } -func (s *Server) handleAccessPublicKeyCreateRequest(ctx context.Context, bws *bfgWs, payload any, id string) (any, error) { +func (s *Server) handleAccessPublicKeyCreateRequest(ctx context.Context, acpkc *bfgapi.AccessPublicKeyCreateRequest) (any, error) { log.Tracef("handleAccessPublicKeyCreateRequest") defer log.Tracef("handleAccessPublicKeyCreateRequest exit") - accessPublicKeyCreateRequest, ok := payload.(*bfgapi.AccessPublicKeyCreateRequest) - if !ok { - err := fmt.Errorf("incorrect type: %T", payload) - return nil, err - } - - response := &bfgapi.AccessPublicKeyCreateResponse{} - - publicKey, err := hex.DecodeString(accessPublicKeyCreateRequest.PublicKey) + publicKey, err := hex.DecodeString(acpkc.PublicKey) if err != nil { - response.Error = protocol.Errorf(err.Error()) - return response, nil + return &bfgapi.AccessPublicKeyCreateResponse{ + Error: protocol.RequestErrorf("public key decode: %v", err), + }, nil } if err := s.db.AccessPublicKeyInsert(ctx, &bfgd.AccessPublicKey{ PublicKey: publicKey, }); err != nil { if errors.Is(err, database.ErrDuplicate) { - response.Error = protocol.Errorf("public key already exists") - return response, nil + return &bfgapi.AccessPublicKeyCreateResponse{ + Error: protocol.RequestErrorf("public key already exists"), + }, nil } if errors.Is(err, database.ErrValidation) { - response.Error = protocol.Errorf("invalid access public key") - return response, nil + return &bfgapi.AccessPublicKeyCreateResponse{ + Error: protocol.RequestErrorf("invalid access public key"), + }, nil } - ie := NewInternalErrorf("error inserting access public key: %s", err) - response.Error = ie.internal - log.Errorf(ie.actual.Error()) - return response, nil + e := protocol.NewInternalErrorf("insert public key: %w", err) + return &bfgapi.AccessPublicKeyCreateResponse{ + Error: protocol.RequestErrorf("invalid access public key"), + }, e } - return response, nil + return &bfgapi.AccessPublicKeyCreateResponse{}, nil } -func (s *Server) handleAccessPublicKeyDelete(ctx context.Context, bws *bfgWs, payload any, id string) (any, error) { +func (s *Server) handleAccessPublicKeyDelete(ctx context.Context, payload any) (any, error) { log.Tracef("handleAccessPublicKeyDelete") defer log.Tracef("handleAccessPublicKeyDelete exit") @@ -426,32 +400,34 @@ func (s *Server) handleAccessPublicKeyDelete(ctx context.Context, bws *bfgWs, pa return nil, fmt.Errorf("incorrect type %T", payload) } - response := &bfgapi.AccessPublicKeyDeleteResponse{} - b, err := hex.DecodeString(accessPublicKeyDeleteRequest.PublicKey) if err != nil { - response.Error = protocol.Errorf(err.Error()) - return response, nil + return &bfgapi.AccessPublicKeyDeleteResponse{ + Error: protocol.RequestErrorf("public key decode: %v", err), + }, nil } if err := s.db.AccessPublicKeyDelete(ctx, &bfgd.AccessPublicKey{ PublicKey: b, }); err != nil { if errors.Is(err, database.ErrNotFound) { - response.Error = protocol.Errorf("public key not found") - return response, nil + // XXX not sure I like giving this information away. + return &bfgapi.AccessPublicKeyDeleteResponse{ + Error: protocol.RequestErrorf("public key not found"), + }, nil } - ie := NewInternalErrorf("error deleting access public key: %s", err) - response.Error = ie.internal - log.Errorf(ie.actual.Error()) - return response, nil + e := protocol.NewInternalErrorf("error deleting access public key: %w", + err) + return &bfgapi.AccessPublicKeyDeleteResponse{ + Error: e.ProtocolError(), + }, e } - return response, nil + return &bfgapi.AccessPublicKeyDeleteResponse{}, nil } func (s *Server) processBitcoinBlock(ctx context.Context, height uint64) error { - log.Infof("Processing Bitcoin block at height %d...", height) + log.Tracef("Processing Bitcoin block at height %d...", height) rbh, err := s.btcClient.RawBlockHeader(ctx, height) if err != nil { @@ -662,9 +638,6 @@ func (s *Server) handleWebsocketPrivateRead(ctx context.Context, bws *bfgWs) { log.Tracef("handleWebsocketPrivateRead: %v", bws.addr) defer log.Tracef("handleWebsocketPrivateRead exit: %v", bws.addr) - // Command completed - defer s.cmdsProcessed.Inc() - for { cmd, id, payload, err := bfgapi.Read(ctx, bws.conn) if err != nil { @@ -682,41 +655,63 @@ func (s *Server) handleWebsocketPrivateRead(ctx context.Context, bws *bfgWs) { log.Tracef("handleWebsocketRead read %v: %v %v %v", bws.addr, cmd, id, spew.Sdump(payload)) - var response any - switch cmd { case bfgapi.CmdPingRequest: - response, err = s.handlePing(ctx, bws, payload, id) + err = s.handlePingRequest(ctx, bws, payload, id) case bfgapi.CmdPopTxForL2BlockRequest: - response, err = s.handlePopTxForL2Block(ctx, bws, payload, id) + handler := func(c context.Context) (any, error) { + msg := payload.(*bfgapi.PopTxsForL2BlockRequest) + return s.handlePopTxsForL2Block(c, msg) + } + + go s.handleRequest(ctx, bws, id, "handle pop for l2 block request", handler) case bfgapi.CmdNewL2KeystonesRequest: - response, err = s.handleNewL2Keystones(ctx, bws, payload, id) + handler := func(c context.Context) (any, error) { + msg := payload.(*bfgapi.NewL2KeystonesRequest) + return s.handleNewL2Keystones(c, msg) + } + + go s.handleRequest(ctx, bws, id, "handle new l2 keystones request", handler) case bfgapi.CmdBTCFinalityByRecentKeystonesRequest: - response, err = s.handleBtcFinalityByRecentKeystonesRequest(ctx, bws, payload, id) + handler := func(c context.Context) (any, error) { + msg := payload.(*bfgapi.BTCFinalityByRecentKeystonesRequest) + return s.handleBtcFinalityByRecentKeystonesRequest(c, msg) + } + + go s.handleRequest(ctx, bws, id, "handle finality recent keystones request", handler) case bfgapi.CmdBTCFinalityByKeystonesRequest: - response, err = s.handleBtcFinalityByKeystonesRequest(ctx, bws, payload, id) + handler := func(c context.Context) (any, error) { + msg := payload.(*bfgapi.BTCFinalityByKeystonesRequest) + return s.handleBtcFinalityByKeystonesRequest(c, msg) + } + + go s.handleRequest(ctx, bws, id, "handle finality keystones request", handler) case bfgapi.CmdAccessPublicKeyCreateRequest: - response, err = s.handleAccessPublicKeyCreateRequest(ctx, bws, payload, id) + handler := func(c context.Context) (any, error) { + msg := payload.(*bfgapi.AccessPublicKeyCreateRequest) + return s.handleAccessPublicKeyCreateRequest(c, msg) + } + + go s.handleRequest(ctx, bws, id, "handle access key create request", handler) case bfgapi.CmdAccessPublicKeyDeleteRequest: - response, err = s.handleAccessPublicKeyDelete(ctx, bws, payload, id) + handler := func(c context.Context) (any, error) { + msg := payload.(*bfgapi.AccessPublicKeyDeleteRequest) + return s.handleAccessPublicKeyDelete(c, msg) + } + + go s.handleRequest(ctx, bws, id, "handle access key delete request", handler) default: err = fmt.Errorf("unknown command") } - // if there was an error, close the websocket, only do this if we - // can't continue + // If set, it is a terminal error. if err != nil { - log.Errorf("handleWebsocketPrivateRead error %v %v: %v", bws.addr, cmd, err) - bws.conn.CloseStatus(websocket.StatusProtocolError, - err.Error()) + log.Errorf("handleWebsocketRead %v %v %v: %v", + bws.addr, cmd, id, err) return - } else { - if err := s.writeResponse(ctx, bws.conn, response, id); err != nil { - bws.conn.CloseStatus(websocket.StatusProtocolError, err.Error()) - return - } } + s.cmdsProcessed.Inc() } } @@ -726,9 +721,6 @@ func (s *Server) handleWebsocketPublicRead(ctx context.Context, bws *bfgWs) { log.Tracef("handleWebsocketPublicRead: %v", bws.addr) defer log.Tracef("handleWebsocketPublicRead exit: %v", bws.addr) - // Command completed - defer s.cmdsProcessed.Inc() - for { cmd, id, payload, err := bfgapi.Read(ctx, bws.conn) if err != nil { @@ -742,37 +734,58 @@ func (s *Server) handleWebsocketPublicRead(ctx context.Context, bws *bfgWs) { return } - var response any - switch cmd { case bfgapi.CmdPingRequest: - response, err = s.handlePing(ctx, bws, payload, id) + // quick call + err = s.handlePingRequest(ctx, bws, payload, id) case bfgapi.CmdL2KeystonesRequest: - response, err = s.handleL2KeystonesRequest(ctx, bws, payload, id) + handler := func(c context.Context) (any, error) { + msg := payload.(*bfgapi.L2KeystonesRequest) + return s.handleL2KeystonesRequest(c, msg) + } + + go s.handleRequest(ctx, bws, id, "handle l2 keystones request", handler) case bfgapi.CmdBitcoinBalanceRequest: - response, err = s.handleBitcoinBalance(ctx, bws, payload, id) + handler := func(c context.Context) (any, error) { + msg := payload.(*bfgapi.BitcoinBalanceRequest) + return s.handleBitcoinBalance(c, msg) + } + + go s.handleRequest(ctx, bws, id, "handle bitcoin balance request", handler) case bfgapi.CmdBitcoinBroadcastRequest: - response, err = s.handleBitcoinBroadcast(ctx, bws, payload, id) + handler := func(c context.Context) (any, error) { + msg := payload.(*bfgapi.BitcoinBroadcastRequest) + return s.handleBitcoinBroadcast(c, msg) + } + + go s.handleRequest(ctx, bws, id, "handle bitcoin broadcast request", handler) case bfgapi.CmdBitcoinInfoRequest: - response, err = s.handleBitcoinInfo(ctx, bws, payload, id) + handler := func(c context.Context) (any, error) { + msg := payload.(*bfgapi.BitcoinInfoRequest) + return s.handleBitcoinInfo(c, msg) + } + + go s.handleRequest(ctx, bws, id, "handle bitcoin broadcast request", handler) case bfgapi.CmdBitcoinUTXOsRequest: - response, err = s.handleBitcoinUTXOs(ctx, bws, payload, id) + + handler := func(c context.Context) (any, error) { + msg := payload.(*bfgapi.BitcoinUTXOsRequest) + return s.handleBitcoinUTXOs(c, msg) + } + + go s.handleRequest(ctx, bws, id, "handle bitcoin utxos request", handler) default: err = fmt.Errorf("unknown command") } + // If set, it is a terminal error. if err != nil { - log.Errorf("handleWebsocketPublicRead %v %v: %v", bws.addr, cmd, err) - bws.conn.CloseStatus(websocket.StatusProtocolError, - err.Error()) + log.Errorf("handleWebsocketRead %v %v %v: %v", + bws.addr, cmd, id, err) return - } else { - if err := s.writeResponse(ctx, bws.conn, response, id); err != nil { - bws.conn.CloseStatus(websocket.StatusProtocolError, err.Error()) - return - } } + s.cmdsProcessed.Inc() } } @@ -800,21 +813,19 @@ func (s *Server) newSession(bws *bfgWs) (string, error) { } } -func (s *Server) killSession(id string, why websocket.StatusCode) { +func (s *Server) deleteSession(id string) { + log.Tracef("deleteSession") + defer log.Tracef("deleteSession exit") + s.mtx.Lock() - bws, ok := s.sessions[id] + _, ok := s.sessions[id] if ok { delete(s.sessions, id) } s.mtx.Unlock() if !ok { - log.Errorf("killSession: id not found in sessions %s", id) - } else { - if err := bws.conn.CloseStatus(why, ""); err != nil { - // XXX this is too noisy. - log.Debugf("session close %v: %v", id, err) - } + log.Errorf("deleteSession: id not found in sessions %s", id) } } @@ -833,6 +844,7 @@ func (s *Server) handleWebsocketPrivate(w http.ResponseWriter, r *http.Request) r.RemoteAddr, err) return } + defer conn.Close(websocket.StatusProtocolError, "") bws := &bfgWs{ addr: r.RemoteAddr, @@ -850,7 +862,7 @@ func (s *Server) handleWebsocketPrivate(w http.ResponseWriter, r *http.Request) } defer func() { - s.killSession(bws.sessionId, websocket.StatusNormalClosure) + s.deleteSession(bws.sessionId) }() bws.wg.Add(1) @@ -941,7 +953,7 @@ func (s *Server) handleWebsocketPublic(w http.ResponseWriter, r *http.Request) { return } defer func() { - s.killSession(bws.sessionId, websocket.StatusNormalClosure) + s.deleteSession(bws.sessionId) }() // Always ping, required by protocol. @@ -963,47 +975,45 @@ func (s *Server) handleWebsocketPublic(w http.ResponseWriter, r *http.Request) { bws.sessionId, r.RemoteAddr, bws.publicKey) } -func (s *Server) handlePing(ctx context.Context, bws *bfgWs, payload any, id string) (any, error) { - log.Tracef("handlePing: %v", bws.addr) - defer log.Tracef("handlePing exit: %v", bws.addr) +func (s *Server) handlePingRequest(ctx context.Context, bws *bfgWs, payload any, id string) error { + log.Tracef("handlePingRequest: %v", bws.addr) + defer log.Tracef("handlePingRequest exit: %v", bws.addr) p, ok := payload.(*bfgapi.PingRequest) if !ok { - return nil, fmt.Errorf("handlePing invalid payload type: %T", payload) + return fmt.Errorf("handlePingRequest invalid payload type: %T", payload) } response := &bfgapi.PingResponse{ OriginTimestamp: p.Timestamp, Timestamp: time.Now().Unix(), } - return response, nil -} + log.Tracef("responding with %v", spew.Sdump(response)) -func (s *Server) handlePopTxForL2Block(ctx context.Context, bws *bfgWs, payload any, id string) (any, error) { - log.Tracef("handlePopTxForL2Block: %v", bws.addr) - defer log.Tracef("handlePopTxForL2Block exit: %v", bws.addr) - - p, ok := payload.(*bfgapi.PopTxsForL2BlockRequest) - if !ok { - return nil, fmt.Errorf("handlePopTxForL2Block invalid payload type: %T", - payload) + if err := bfgapi.Write(ctx, bws.conn, id, response); err != nil { + return fmt.Errorf("handlePingRequest write: %v %v", + bws.addr, err) } + return nil +} - response := bfgapi.PopTxsForL2BlockResponse{} +func (s *Server) handlePopTxsForL2Block(ctx context.Context, ptl2 *bfgapi.PopTxsForL2BlockRequest) (any, error) { + log.Tracef("handlePopTxsForL2Block") + defer log.Tracef("handlePopTxsForL2Block exit") - hash := hemi.HashSerializedL2KeystoneAbrev(p.L2Block) + hash := hemi.HashSerializedL2KeystoneAbrev(ptl2.L2Block) var h [32]byte copy(h[:], hash) popTxs, err := s.db.PopBasisByL2KeystoneAbrevHash(ctx, h, true) if err != nil { - ie := NewInternalErrorf("error getting pop basis: %s", err) - response.Error = ie.internal - log.Errorf(ie.actual.Error()) - return response, nil + e := protocol.NewInternalErrorf("error getting pop basis: %w", err) + return &bfgapi.PopTxsForL2BlockResponse{ + Error: e.ProtocolError(), + }, e } + response := &bfgapi.PopTxsForL2BlockResponse{} response.PopTxs = make([]bfgapi.PopTx, 0, len(popTxs)) - for k := range popTxs { response.PopTxs = append(response.PopTxs, bfgapi.PopTx{ BtcTxId: api.ByteSlice(popTxs[k].BtcTxId), @@ -1020,57 +1030,46 @@ func (s *Server) handlePopTxForL2Block(ctx context.Context, bws *bfgWs, payload return response, nil } -func (s *Server) handleBtcFinalityByRecentKeystonesRequest(ctx context.Context, bws *bfgWs, payload any, id string) (any, error) { - p, ok := payload.(*bfgapi.BTCFinalityByRecentKeystonesRequest) - if ok == false { - return nil, fmt.Errorf( - "handleBtcFinalityByRecentKeystonesRequest invalid payload type %T", - payload, - ) - } - - response := bfgapi.BTCFinalityByRecentKeystonesResponse{} +func (s *Server) handleBtcFinalityByRecentKeystonesRequest(ctx context.Context, bfrk *bfgapi.BTCFinalityByRecentKeystonesRequest) (any, error) { + log.Tracef("handleBtcFinalityByRecentKeystonesRequest") + defer log.Tracef("handleBtcFinalityByRecentKeystonesRequest exit") - finalities, err := s.db.L2BTCFinalityMostRecent(ctx, p.NumRecentKeystones) + finalities, err := s.db.L2BTCFinalityMostRecent(ctx, bfrk.NumRecentKeystones) if err != nil { - ie := NewInternalErrorf("error getting finality: %s", err) - response.Error = ie.internal - log.Errorf(ie.actual.Error()) - return response, nil + e := protocol.NewInternalErrorf("error getting finality: %w", err) + return &bfgapi.BTCFinalityByRecentKeystonesResponse{ + Error: e.ProtocolError(), + }, e } - apiFinalities := []hemi.L2BTCFinality{} - for _, finality := range finalities { + apiFinalities := make([]hemi.L2BTCFinality, 0, len(finalities)) + for k, finality := range finalities { apiFinality, err := hemi.L2BTCFinalityFromBfgd( &finality, finality.BTCTipHeight, finality.EffectiveHeight, ) if err != nil { - return nil, err + e := protocol.NewInternalErrorf("error getting finality (%v): %w", + k, err) + return &bfgapi.BTCFinalityByRecentKeystonesResponse{ + Error: e.ProtocolError(), + }, e } apiFinalities = append(apiFinalities, *apiFinality) } - response.L2BTCFinalities = apiFinalities - - return response, nil + return &bfgapi.BTCFinalityByRecentKeystonesResponse{ + L2BTCFinalities: apiFinalities, + }, nil } -func (s *Server) handleBtcFinalityByKeystonesRequest(ctx context.Context, bws *bfgWs, payload any, id string) (any, error) { - p, ok := payload.(*bfgapi.BTCFinalityByKeystonesRequest) - if ok == false { - return nil, fmt.Errorf( - "handleBtcFinalityByKeystonesRequest invalid payload type %T", - payload, - ) - } - - response := bfgapi.BTCFinalityByKeystonesResponse{} - - l2KeystoneAbrevHashes := []database.ByteArray{} +func (s *Server) handleBtcFinalityByKeystonesRequest(ctx context.Context, bfkr *bfgapi.BTCFinalityByKeystonesRequest) (any, error) { + log.Tracef("handleBtcFinalityByKeystonesRequest") + defer log.Tracef("handleBtcFinalityByKeystonesRequest exit") - for _, l := range p.L2Keystones { + l2KeystoneAbrevHashes := make([]database.ByteArray, 0, len(bfkr.L2Keystones)) + for _, l := range bfkr.L2Keystones { a := hemi.L2KeystoneAbbreviate(l) l2KeystoneAbrevHashes = append(l2KeystoneAbrevHashes, a.Hash()) } @@ -1080,13 +1079,13 @@ func (s *Server) handleBtcFinalityByKeystonesRequest(ctx context.Context, bws *b l2KeystoneAbrevHashes, ) if err != nil { - ie := NewInternalErrorf("error getting l2 keystones: %s", err) - response.Error = ie.internal - log.Errorf(ie.actual.Error()) - return response, nil + e := protocol.NewInternalErrorf("l2 keystones: %w", err) + return &bfgapi.BTCFinalityByKeystonesResponse{ + Error: e.ProtocolError(), + }, e } - apiFinalities := []hemi.L2BTCFinality{} + apiFinalities := make([]hemi.L2BTCFinality, 0, len(finalities)) for _, finality := range finalities { apiFinality, err := hemi.L2BTCFinalityFromBfgd( &finality, @@ -1094,38 +1093,34 @@ func (s *Server) handleBtcFinalityByKeystonesRequest(ctx context.Context, bws *b finality.EffectiveHeight, ) if err != nil { - return nil, err + e := protocol.NewInternalErrorf("l2 btc finality: %w", err) + return &bfgapi.BTCFinalityByKeystonesResponse{ + Error: e.ProtocolError(), + }, e } apiFinalities = append(apiFinalities, *apiFinality) } - response.L2BTCFinalities = apiFinalities - - return response, nil + return &bfgapi.BTCFinalityByKeystonesResponse{ + L2BTCFinalities: apiFinalities, + }, nil } -func (s *Server) handleL2KeystonesRequest(ctx context.Context, bws *bfgWs, payload any, id string) (any, error) { - p, ok := payload.(*bfgapi.L2KeystonesRequest) - if ok == false { - return nil, fmt.Errorf( - "handleL2KeystonesRequest invalid payload type %T", - payload, - ) - } +func (s *Server) handleL2KeystonesRequest(ctx context.Context, l2kr *bfgapi.L2KeystonesRequest) (any, error) { + log.Tracef("handleL2KeystonesRequest") + defer log.Tracef("handleL2KeystonesRequest exit") - gkhResp := &bfgapi.L2KeystonesResponse{} - - results, err := s.db.L2KeystonesMostRecentN(ctx, - uint32(p.NumL2Keystones)) + results, err := s.db.L2KeystonesMostRecentN(ctx, uint32(l2kr.NumL2Keystones)) if err != nil { - ie := NewInternalErrorf("error getting l2 keystones: %s", err) - gkhResp.Error = ie.internal - log.Errorf(ie.actual.Error()) - return gkhResp, nil + e := protocol.NewInternalErrorf("error getting l2 keystones: %w", err) + return &bfgapi.L2KeystonesResponse{ + Error: e.ProtocolError(), + }, e } + l2Keystones := make([]hemi.L2Keystone, 0, len(results)) for _, v := range results { - gkhResp.L2Keystones = append(gkhResp.L2Keystones, hemi.L2Keystone{ + l2Keystones = append(l2Keystones, hemi.L2Keystone{ Version: uint8(v.Version), L1BlockNumber: v.L1BlockNumber, L2BlockNumber: v.L2BlockNumber, @@ -1136,28 +1131,27 @@ func (s *Server) handleL2KeystonesRequest(ctx context.Context, bws *bfgWs, paylo }) } - return gkhResp, nil + return &bfgapi.L2KeystonesResponse{ + L2Keystones: l2Keystones, + }, nil } func writeNotificationResponse(bws *bfgWs, response any) { if err := bfgapi.Write(bws.requestContext, bws.conn, "", response); err != nil { - log.Errorf( - "handleBtcFinalityNotification write: %v %v", - bws.addr, - err, - ) + log.Errorf("handleBtcFinalityNotification write: %v %v", bws.addr, err) } } func (s *Server) handleBtcFinalityNotification() error { - response := bfgapi.BTCFinalityNotification{} + log.Tracef("handleBtcFinalityNotification") + defer log.Tracef("handleBtcFinalityNotification exit") s.mtx.Lock() for _, bws := range s.sessions { if _, ok := bws.notify[notifyBtcFinalities]; !ok { continue } - go writeNotificationResponse(bws, response) + go writeNotificationResponse(bws, &bfgapi.BTCFinalityNotification{}) } s.mtx.Unlock() @@ -1165,14 +1159,15 @@ func (s *Server) handleBtcFinalityNotification() error { } func (s *Server) handleBtcBlockNotification() error { - response := bfgapi.BTCNewBlockNotification{} + log.Tracef("handleBtcBlockNotification") + defer log.Tracef("handleBtcBlockNotification exit") s.mtx.Lock() for _, bws := range s.sessions { if _, ok := bws.notify[notifyBtcBlocks]; !ok { continue } - go writeNotificationResponse(bws, response) + go writeNotificationResponse(bws, &bfgapi.BTCNewBlockNotification{}) } s.mtx.Unlock() @@ -1180,14 +1175,15 @@ func (s *Server) handleBtcBlockNotification() error { } func (s *Server) handleL2KeystonesNotification() error { - response := bfgapi.L2KeystonesNotification{} + log.Tracef("handleL2KeystonesNotification") + defer log.Tracef("handleL2KeystonesNotification exit") s.mtx.Lock() for _, bws := range s.sessions { if _, ok := bws.notify[notifyL2Keystones]; !ok { continue } - go writeNotificationResponse(bws, response) + go writeNotificationResponse(bws, &bfgapi.L2KeystonesNotification{}) } s.mtx.Unlock() @@ -1215,8 +1211,11 @@ func hemiL2KeystonesToDb(l2ks []hemi.L2Keystone) []bfgd.L2Keystone { return dbks } -func (s *Server) handleNewL2Keystones(ctx context.Context, bws *bfgWs, payload any, id string) (any, error) { - ks := hemiL2KeystonesToDb(payload.(*bfgapi.NewL2KeystonesRequest).L2Keystones) +func (s *Server) handleNewL2Keystones(ctx context.Context, nlkr *bfgapi.NewL2KeystonesRequest) (any, error) { + log.Tracef("handleNewL2Keystones") + defer log.Tracef("handleNewL2Keystones exit") + + ks := hemiL2KeystonesToDb(nlkr.L2Keystones) err := s.db.L2KeystonesInsert(ctx, ks) response := bfgapi.NewL2KeystonesResponse{} if err != nil { @@ -1306,8 +1305,6 @@ func (s *Server) handleAccessPublicKeys(table string, action string, payload, pa return } - // XXX this is racing with killSession but protected. We should - // create a killSessions that takes an encoded PublicKey s.mtx.Lock() for _, v := range s.sessions { // if public key does not exist on session, it's not an authenticated @@ -1320,8 +1317,7 @@ func (s *Server) handleAccessPublicKeys(table string, action string, payload, pa // encoding, ensure that the session string does for an equal comparison sessionPublicKeyEncoded := fmt.Sprintf("\\x%s", hex.EncodeToString(v.publicKey)) if sessionPublicKeyEncoded == accessPublicKey.PublicKeyEncoded { - sessionId := v.sessionId - go s.killSession(sessionId, protocol.StatusHandshakeErr) + v.conn.CloseStatus(websocket.StatusProtocolError, "killed") } } s.mtx.Unlock() @@ -1455,7 +1451,7 @@ func (s *Server) Run(pctx context.Context) error { return fmt.Errorf("failed to create server: %w", err) } cs := []prometheus.Collector{ - s.cmdsProcessed, + s.cmdsProcessed, // XXX should we make two counters? priv/pub prometheus.NewGaugeFunc(prometheus.GaugeOpts{ Subsystem: promSubsystem, Name: "running", diff --git a/service/bss/bss.go b/service/bss/bss.go index 002c2c2c..5e377732 100644 --- a/service/bss/bss.go +++ b/service/bss/bss.go @@ -43,40 +43,6 @@ func init() { loggo.ConfigureLoggers(logLevel) } -// InternalError is an error type to differentiates between caller and callee -// errors. An internal error is used whne something internal to the application -// fails. -type InternalError struct { - internal *protocol.Error - actual error -} - -// Err return the protocol.Error that can be sent over the wire. -func (ie InternalError) Err() *protocol.Error { - return ie.internal -} - -// String return the actual underlying error. -func (ie InternalError) String() string { - i := ie.internal - return fmt.Sprintf("%v [%v:%v]", ie.actual.Error(), i.Trace, i.Timestamp) -} - -// Error satifies the error interface. -func (ie InternalError) Error() string { - if ie.internal == nil { - return "internal error" - } - return ie.internal.String() -} - -func NewInternalErrorf(msg string, args ...interface{}) *InternalError { - return &InternalError{ - internal: protocol.Errorf("internal error"), - actual: fmt.Errorf(msg, args...), - } -} - // Wrap for calling bfg commands type bfgCmd struct { msg any @@ -213,18 +179,8 @@ func (s *Server) handleRequest(parrentCtx context.Context, bws *bssWs, wsid stri response, err := handler(ctx) if err != nil { - // XXX these errors print an invalid trace for some reason. It - // mostly works but have a look at it and compare with client - // output and fix. - var ie *InternalError - if errors.As(err, &ie) { - log.Errorf("[INTERNAL ERROR] Failed to handle %v request %v: %v", - requestType, bws.addr, ie.String()) - } else { - // This may be too loud and can be silenced once in production. - log.Errorf("Failed to handle %v request %v: %v", - requestType, bws.addr, err) - } + log.Errorf("Failed to handle %v request %v: %v", + requestType, bws.addr, err) } if response == nil { return @@ -233,7 +189,8 @@ func (s *Server) handleRequest(parrentCtx context.Context, bws *bssWs, wsid stri log.Debugf("Responding to %v request with %v", requestType, spew.Sdump(response)) if err := bssapi.Write(ctx, bws.conn, wsid, response); err != nil { - log.Errorf("Failed to handle %v request: protocol write failed: %v", requestType, err) + log.Errorf("Failed to handle %v request: protocol write failed: %v", + requestType, err) } } @@ -271,56 +228,51 @@ func (s *Server) handlePopPayoutsRequest(ctx context.Context, msg *bssapi.PopPay log.Tracef("handlePopPayoutsRequest") defer log.Tracef("handlePopPayoutsRequest exit") - popTxsForL2BlockRequest := bfgapi.PopTxsForL2BlockRequest{ + popTxsForL2BlockRes, err := s.callBFG(ctx, bfgapi.PopTxsForL2BlockRequest{ L2Block: msg.L2BlockForPayout, - } - - popTxsForL2BlockRes, err := s.callBFG(ctx, &popTxsForL2BlockRequest) + }) if err != nil { + e := protocol.NewInternalErrorf("pop tx for l2: block %w", err) return &bssapi.PopPayoutsResponse{ - Error: protocol.Errorf("%v", err), - }, err - } - - popPayouts := ConvertPopTxsToPopPayouts( - (popTxsForL2BlockRes.(*bfgapi.PopTxsForL2BlockResponse)).PopTxs, - ) - - popPayoutsResponse := bssapi.PopPayoutsResponse{ - PopPayouts: popPayouts, + Error: e.ProtocolError(), + }, e } - return &popPayoutsResponse, nil + return &bssapi.PopPayoutsResponse{ + PopPayouts: ConvertPopTxsToPopPayouts( + (popTxsForL2BlockRes.(*bfgapi.PopTxsForL2BlockResponse)).PopTxs, + ), + }, nil } func (s *Server) handleL2KeytoneRequest(ctx context.Context, msg *bssapi.L2KeystoneRequest) (*bssapi.L2KeystoneResponse, error) { log.Tracef("handleL2KeytoneRequest") defer log.Tracef("handleL2KeytoneRequest exit") - newKeystoneHeadersRequest := bfgapi.NewL2KeystonesRequest{ - L2Keystones: []hemi.L2Keystone{ - msg.L2Keystone, - }, - } - - resp := &bssapi.L2KeystoneResponse{} - _, err := s.callBFG(ctx, &newKeystoneHeadersRequest) + _, err := s.callBFG(ctx, &bfgapi.NewL2KeystonesRequest{ + L2Keystones: []hemi.L2Keystone{msg.L2Keystone}, + }) if err != nil { - resp.Error = protocol.Errorf("%v", err) + e := protocol.NewInternalErrorf("new l2 keytsones: %w", err) + return &bssapi.L2KeystoneResponse{ + Error: e.ProtocolError(), + }, e } - return resp, err + return &bssapi.L2KeystoneResponse{}, nil } func (s *Server) handleBtcFinalityByRecentKeystonesRequest(ctx context.Context, msg *bssapi.BTCFinalityByRecentKeystonesRequest) (*bssapi.BTCFinalityByRecentKeystonesResponse, error) { - request := bfgapi.BTCFinalityByRecentKeystonesRequest{ - NumRecentKeystones: msg.NumRecentKeystones, - } + log.Tracef("handleBtcFinalityByRecentKeystonesRequest") + defer log.Tracef("handleBtcFinalityByRecentKeystonesRequest exit") - response, err := s.callBFG(ctx, &request) + response, err := s.callBFG(ctx, &bfgapi.BTCFinalityByRecentKeystonesRequest{ + NumRecentKeystones: msg.NumRecentKeystones, + }) if err != nil { + e := protocol.NewInternalErrorf("btc finality recent: %w", err) return &bssapi.BTCFinalityByRecentKeystonesResponse{ - Error: protocol.Errorf("%v", err), + Error: e.ProtocolError(), }, err } @@ -330,14 +282,16 @@ func (s *Server) handleBtcFinalityByRecentKeystonesRequest(ctx context.Context, } func (s *Server) handleBtcFinalityByKeystonesRequest(ctx context.Context, msg *bssapi.BTCFinalityByKeystonesRequest) (*bssapi.BTCFinalityByKeystonesResponse, error) { - request := bfgapi.BTCFinalityByKeystonesRequest{ - L2Keystones: msg.L2Keystones, - } + log.Tracef("handleBtcFinalityByKeystonesRequest") + defer log.Tracef("handleBtcFinalityByKeystonesRequest exit") - response, err := s.callBFG(ctx, &request) + response, err := s.callBFG(ctx, &bfgapi.BTCFinalityByKeystonesRequest{ + L2Keystones: msg.L2Keystones, + }) if err != nil { + e := protocol.NewInternalErrorf("btc finality keystones: %w", err) return &bssapi.BTCFinalityByKeystonesResponse{ - Error: protocol.Errorf("%v", err), + Error: e.ProtocolError(), }, err } @@ -415,8 +369,6 @@ func (s *Server) handleWebsocketRead(ctx context.Context, bws *bssWs) { if err != nil { log.Errorf("handleWebsocketRead %v %v %v: %v", bws.addr, cmd, id, err) - bws.conn.CloseStatus(websocket.StatusProtocolError, - err.Error()) return } @@ -458,7 +410,6 @@ func (s *Server) handleWebsocket(w http.ResponseWriter, r *http.Request) { defer func() { s.deleteSession(bws.sessionId) - conn.Close(websocket.StatusNormalClosure, "") // Force shutdown connection }() bws.wg.Add(1) @@ -508,11 +459,13 @@ func (s *Server) newSession(bws *bssWs) (string, error) { func (s *Server) deleteSession(id string) { s.mtx.Lock() - if _, ok := s.sessions[id]; !ok { - log.Errorf("id not found in sessions %s", id) - } + _, ok := s.sessions[id] delete(s.sessions, id) s.mtx.Unlock() + + if !ok { + log.Errorf("id not found in sessions %s", id) + } } func writeNotificationResponse(bws *bssWs, response any) { @@ -526,11 +479,9 @@ func writeNotificationResponse(bws *bssWs, response any) { } func (s *Server) handleBtcFinalityNotification() error { - response := bssapi.BTCFinalityNotification{} - s.mtx.Lock() for _, bws := range s.sessions { - go writeNotificationResponse(bws, response) + go writeNotificationResponse(bws, &bssapi.BTCFinalityNotification{}) } s.mtx.Unlock() @@ -538,11 +489,9 @@ func (s *Server) handleBtcFinalityNotification() error { } func (s *Server) handleBtcBlockNotification() error { - response := bssapi.BTCNewBlockNotification{} - s.mtx.Lock() for _, bws := range s.sessions { - go writeNotificationResponse(bws, response) + go writeNotificationResponse(bws, &bssapi.BTCNewBlockNotification{}) } s.mtx.Unlock() @@ -593,7 +542,7 @@ func (s *Server) handleBFGWebsocketReadUnauth(ctx context.Context, conn *protoco go s.handleBtcBlockNotification() default: log.Errorf("unknown command: %v", cmd) - return // XXX exit for now to cause a ruckus in the logs + return } } } @@ -652,17 +601,17 @@ func (s *Server) callBFG(parrentCtx context.Context, msg any) (any, error) { // attempt to send select { case <-ctx.Done(): - return nil, NewInternalErrorf("callBFG send context error: %v", + return nil, protocol.NewInternalErrorf("callBFG send context error: %w", ctx.Err()) case s.bfgCmdCh <- bc: default: - return nil, NewInternalErrorf("bfg command queue full") + return nil, protocol.NewInternalErrorf("bfg command queue full") } // Wait for response select { case <-ctx.Done(): - return nil, NewInternalErrorf("callBFG received context error: %v", + return nil, protocol.NewInternalErrorf("callBFG received context error: %w", ctx.Err()) case payload := <-bc.ch: if err, ok := payload.(error); ok {