From de91f83c116413436a1403196d16cde542a6f88f Mon Sep 17 00:00:00 2001 From: Marco Peereboom Date: Thu, 29 Feb 2024 08:11:17 +0000 Subject: [PATCH] Move internal error into protocol package (#10) * Move internal error into protocol package. This is a bit awkward and probably does not belong here. However, this code is copy and pasted everywhere and pretty brittle. So to prevent reinventing the wheel and bad copy paste it is allowed here. Co-authored-by: ClaytonNorthey92 Co-authored-by: Joshua Sing --- api/protocol/protocol.go | 93 +++++- e2e/e2e_ext_test.go | 8 +- service/bfg/bfg.go | 628 +++++++++++++++++++-------------------- service/bss/bss.go | 145 +++------ 4 files changed, 446 insertions(+), 428 deletions(-) diff --git a/api/protocol/protocol.go b/api/protocol/protocol.go index 09a9ddd1..95d79a31 100644 --- a/api/protocol/protocol.go +++ b/api/protocol/protocol.go @@ -41,8 +41,7 @@ func (he HandshakeError) Error() string { } func (he HandshakeError) Is(target error) bool { - _, ok := target.(HandshakeError) - return ok + return errors.Is(he, target) } var PublicKeyAuthError = websocket.CloseError{ @@ -230,17 +229,18 @@ type Message struct { Payload json.RawMessage `json:"payload"` } -// Error is a protocol Error type that can be used for additional error -// context. It embeds an 8 byte number that can be used to trace calls on both the -// client and server side. +// Error is a protocol error type that is used to provide additional error +// information between a server and client. +// +// A unique "trace" string may be embedded, which can be used to trace errors +// between a server and client. type Error struct { Timestamp int64 `json:"timestamp"` - Trace string `json:"trace"` - Message string `json:"error"` + Trace string `json:"trace,omitempty"` + Message string `json:"message"` } -// Errorf is a client induced protocol error (e.g. "invalid height"). This is a -// pretty printable error on the client and server and is not fatal. +// Errorf returns a protocol Error type with an embedded trace. func Errorf(msg string, args ...interface{}) *Error { trace, _ := random(8) return &Error{ @@ -250,10 +250,83 @@ func Errorf(msg string, args ...interface{}) *Error { } } +// String pretty prints a protocol error. func (e Error) String() string { + if len(e.Trace) == 0 { + return e.Message + } return fmt.Sprintf("%v [%v:%v]", e.Message, e.Trace, e.Timestamp) } +// RequestError wraps an error to create a protocol request error. +// +// Request errors are usually something caused by a client, e.g. validation or +// input errors, and therefore should not be logged server-side and do not +// contain an embedded trace. +func RequestError(err error) *Error { + return &Error{ + Timestamp: time.Now().Unix(), + Message: err.Error(), + } +} + +// RequestErrorf creates a new protocol request error. +// +// Request errors are usually something caused by a client, e.g. validation or +// input errors, and therefore should not be logged server-side and do not +// contain an embedded trace. +func RequestErrorf(msg string, args ...any) *Error { + return &Error{ + Timestamp: time.Now().Unix(), + Message: fmt.Sprintf(msg, args...), + } +} + +// InternalError represents an internal application error. +// +// Internal errors are errors that occurred within the application and are not +// caused by a client (e.g. validation or input errors). The actual error +// message should not be sent to clients, as it is internal to the application, +// and may be server-operator specific. +type InternalError struct { + protocol *Error + internal error +} + +// ProtocolError returns the protocol error representation. +// This error is intended to be sent to clients. +func (ie InternalError) ProtocolError() *Error { + return ie.protocol +} + +// Error satisfies the error interface. +func (ie InternalError) Error() string { + if ie.internal != nil { + return fmt.Sprintf("%v [%v:%v]", ie.internal.Error(), + ie.protocol.Timestamp, ie.protocol.Trace) + } + return ie.protocol.String() +} + +// Unwrap returns the error wrapped by this internal error. +func (ie InternalError) Unwrap() error { + return ie.internal +} + +// NewInternalError returns an InternalError wrapping the given error. +func NewInternalError(err error) *InternalError { + return NewInternalErrorf("internal error: %w", err) +} + +// NewInternalErrorf returns an InternalError constructed from the passed +// message and arguments. +func NewInternalErrorf(msg string, args ...interface{}) *InternalError { + return &InternalError{ + protocol: Errorf("internal error"), + internal: fmt.Errorf(msg, args...), + } +} + // Ping type PingRequest struct { Timestamp int64 `json:"timestamp"` // Local timestamp @@ -416,7 +489,7 @@ func (ac *Conn) IsOnline() bool { return ac.wsc != nil } -// Close close a websocket connection with normal status. +// Close closes a websocket connection with normal status. func (ac *Conn) Close() error { return ac.CloseStatus(websocket.StatusNormalClosure, "") } diff --git a/e2e/e2e_ext_test.go b/e2e/e2e_ext_test.go index ce694d0b..276a4a26 100644 --- a/e2e/e2e_ext_test.go +++ b/e2e/e2e_ext_test.go @@ -980,7 +980,7 @@ func TestBFGPublicErrorCases(t *testing.T) { }, { name: "bitcoin broadcast deserialize error", - expectedError: "failed to deserialized tx: unexpected EOF", + expectedError: "failed to deserialize tx: unexpected EOF", requests: []bfgapi.BitcoinBroadcastRequest{ { Transaction: []byte("invalid..."), @@ -1000,7 +1000,7 @@ func TestBFGPublicErrorCases(t *testing.T) { }, { name: "bitcoin broadcast database error", - expectedError: "pop_basis already exists", + expectedError: "pop basis already exists", requests: []bfgapi.BitcoinBroadcastRequest{ { Transaction: btx, @@ -1158,7 +1158,7 @@ func TestBFGPrivateErrorCases(t *testing.T) { }, { name: "public key is invalid", - expectedError: "encoding/hex: invalid byte: U+006C 'l'", + expectedError: "public key decode: encoding/hex: invalid byte: U+006C 'l'", requests: []bfgapi.AccessPublicKeyCreateRequest{ { PublicKey: "blahblahblah", @@ -3315,7 +3315,7 @@ func TestBFGAuthPingThenRemoval(t *testing.T) { var v interface{} err = wsjson.Read(ctx, c, &v) - if err != nil && !strings.Contains(err.Error(), "status = StatusCode(4100)") { + if err != nil && !strings.Contains(err.Error(), "status = StatusProtocolError and reason = \"killed\"") { t.Fatal(err) } diff --git a/service/bfg/bfg.go b/service/bfg/bfg.go index 305a145e..daec4161 100644 --- a/service/bfg/bfg.go +++ b/service/bfg/bfg.go @@ -62,40 +62,6 @@ func init() { loggo.ConfigureLoggers(logLevel) } -// InternalError is an error type to differentiates between caller and callee -// errors. An internal error is used whne something internal to the application -// fails. -type InternalError struct { - internal *protocol.Error - actual error -} - -// Err return the protocol.Error that can be sent over the wire. -func (ie InternalError) Err() *protocol.Error { - return ie.internal -} - -// String return the actual underlying error. -func (ie InternalError) String() string { - i := ie.internal - return fmt.Sprintf("%v [%v:%v]", ie.actual.Error(), i.Trace, i.Timestamp) -} - -// Error satifies the error interface. -func (ie InternalError) Error() string { - if ie.internal == nil { - return "internal error" - } - return ie.internal.String() -} - -func NewInternalErrorf(msg string, args ...interface{}) *InternalError { - return &InternalError{ - internal: protocol.Errorf("internal error"), - actual: fmt.Errorf(msg, args...), - } -} - func NewDefaultConfig() *Config { return &Config{ EXBTCAddress: "localhost:18001", @@ -146,6 +112,11 @@ type Server struct { cfg *Config + // requests + requestLimit int // Request limiter queue depth + requestLimiter chan bool // Maximum in progress websocket commands + requestTimeout time.Duration + btcHeight uint64 hemiHeight uint32 @@ -185,12 +156,17 @@ func NewServer(cfg *Config) (*Server, error) { if cfg == nil { cfg = NewDefaultConfig() } + defaultRequestTimeout := 9 * time.Second // XXX + requestLimit := 1000 // XXX s := &Server{ - cfg: cfg, - popTXFinality: make(map[uint64][]*popTX), - btcHeight: cfg.BTCStartHeight, - server: http.NewServeMux(), - publicServer: http.NewServeMux(), + cfg: cfg, + requestLimiter: make(chan bool, requestLimit), + requestLimit: requestLimit, + requestTimeout: defaultRequestTimeout, + popTXFinality: make(map[uint64][]*popTX), + btcHeight: cfg.BTCStartHeight, + server: http.NewServeMux(), + publicServer: http.NewServeMux(), cmdsProcessed: prometheus.NewCounter(prometheus.CounterOpts{ Subsystem: promSubsystem, Name: "rpc_calls_total", @@ -198,6 +174,9 @@ func NewServer(cfg *Config) (*Server, error) { }), sessions: make(map[string]*bfgWs), } + for i := 0; i < requestLimit; i++ { + s.requestLimiter <- true + } var err error s.btcClient, err = electrumx.NewClient(cfg.EXBTCAddress) @@ -210,88 +189,103 @@ func NewServer(cfg *Config) (*Server, error) { return s, nil } -func (s *Server) writeResponse(ctx context.Context, conn protocol.APIConn, response any, id string) error { - if err := bfgapi.Write(ctx, conn, id, response); err != nil { - log.Errorf("error occurred writing bfgapi: %s", err) - return err +// handleRequest is called as a go routine to handle a long lived command. +func (s *Server) handleRequest(parrentCtx context.Context, bws *bfgWs, wsid string, requestType string, handler func(ctx context.Context) (any, error)) { + log.Tracef("handleRequest: %v", bws.addr) + defer log.Tracef("handleRequest exit: %v", bws.addr) + + ctx, cancel := context.WithTimeout(parrentCtx, s.requestTimeout) + defer cancel() + + select { + case <-s.requestLimiter: + default: + log.Infof("Request limiter hit %v: %v", bws.addr, requestType) + <-s.requestLimiter } + defer func() { s.requestLimiter <- true }() - return nil -} + log.Tracef("Handling request %v: %v", bws.addr, requestType) -func (s *Server) handleBitcoinBalance(ctx context.Context, bws *bfgWs, payload any, id string) (any, error) { - log.Tracef("handleBitcoinBalance") - defer log.Tracef("handleBitcoinBalance exit") - // Increade command count - defer s.cmdsProcessed.Inc() + response, err := handler(ctx) + if err != nil { + log.Errorf("Failed to handle %v request %v: %v", + requestType, bws.addr, err) + } + if response == nil { + return + } - br, ok := payload.(*bfgapi.BitcoinBalanceRequest) - if !ok { - return nil, fmt.Errorf("not BitcoinBalanceRequest: %T", br) + log.Debugf("Responding to %v request with %v", requestType, spew.Sdump(response)) + + if err := bfgapi.Write(ctx, bws.conn, wsid, response); err != nil { + log.Errorf("Failed to handle %v request: protocol write failed: %v", + requestType, err) } +} - bResp := &bfgapi.BitcoinBalanceResponse{} +func (s *Server) handleBitcoinBalance(ctx context.Context, bbr *bfgapi.BitcoinBalanceRequest) (any, error) { + log.Tracef("handleBitcoinBalance") + defer log.Tracef("handleBitcoinBalance exit") - balance, err := s.btcClient.Balance(ctx, br.ScriptHash) + balance, err := s.btcClient.Balance(ctx, bbr.ScriptHash) if err != nil { - ie := NewInternalErrorf("error getting bitcoin balance: %s", err) - log.Errorf(ie.actual.Error()) - bResp.Error = ie.internal - return bResp, nil + e := protocol.NewInternalErrorf("bitcoin balance: %w", err) + return &bfgapi.BitcoinBalanceResponse{ + Error: e.ProtocolError(), + }, e } - bResp.Confirmed = balance.Confirmed - bResp.Unconfirmed = balance.Unconfirmed - return bResp, nil + return &bfgapi.BitcoinBalanceResponse{ + Confirmed: balance.Confirmed, + Unconfirmed: balance.Unconfirmed, + }, nil } -func (s *Server) handleBitcoinBroadcast(ctx context.Context, bws *bfgWs, payload any, id string) (any, error) { +func (s *Server) handleBitcoinBroadcast(ctx context.Context, bbr *bfgapi.BitcoinBroadcastRequest) (any, error) { log.Tracef("handleBitcoinBroadcast") defer log.Tracef("handleBitcoinBroadcast exit") - // Increade command count - defer s.cmdsProcessed.Inc() - - bbr, ok := payload.(*bfgapi.BitcoinBroadcastRequest) - if !ok { - return nil, fmt.Errorf("not a BitcoinBroadcastRequest: %T", bbr) - } - - bbResp := &bfgapi.BitcoinBroadcastResponse{} rr := bytes.NewReader(bbr.Transaction) mb := wire.MsgTx{} if err := mb.Deserialize(rr); err != nil { - bbResp.Error = protocol.Errorf("failed to deserialized tx: %s", err) - return bbResp, nil + return &bfgapi.BitcoinBroadcastResponse{Error: protocol.RequestErrorf( + "failed to deserialize tx: %s", err, + )}, nil } - var tl2 *pop.TransactionL2 - var err error + var ( + tl2 *pop.TransactionL2 + err error + ) for _, v := range mb.TxOut { tl2, err = pop.ParseTransactionL2FromOpReturn(v.PkScript) - if err != nil { - log.Errorf(err.Error()) // handle real error below + if err == nil { + break // Found the pop transaction. } } + if tl2 == nil { - bbResp.Error = protocol.Errorf("could not find l2 keystone abbrev in btc tx") - return bbResp, nil + return &bfgapi.BitcoinBroadcastResponse{ + Error: protocol.RequestErrorf("could not find l2 keystone abbrev in btc tx"), + }, nil } publicKeyUncompressed, err := pop.ParsePublicKeyFromSignatureScript(mb.TxIn[0].SignatureScript) if err != nil { - bbResp.Error = protocol.Errorf("could not parse signature script: %s", err) - return bbResp, nil + return &bfgapi.BitcoinBroadcastResponse{ + Error: protocol.RequestErrorf("could not parse signature script: %v", err), + }, nil } - txHash, err := s.btcClient.Broadcast(context.TODO(), bbr.Transaction) + txHash, err := s.btcClient.Broadcast(ctx, bbr.Transaction) if err != nil { - ie := NewInternalErrorf("error broadcasting to bitcoin: %s", err) - log.Errorf(ie.actual.Error()) - bbResp.Error = ie.internal - return bbResp, nil + // This may not alwyas be an internal error. + e := protocol.NewInternalErrorf("broadcast tx: %w", err) + return &bfgapi.BitcoinBroadcastResponse{ + Error: e.ProtocolError(), + }, e } - bbResp.TXID = txHash if err := s.db.PopBasisInsertPopMFields(ctx, &bfgd.PopBasis{ BtcTxId: txHash, @@ -300,72 +294,57 @@ func (s *Server) handleBitcoinBroadcast(ctx context.Context, bws *bfgWs, payload L2KeystoneAbrevHash: tl2.L2Keystone.Hash(), }); err != nil { if errors.Is(err, database.ErrDuplicate) { - bbResp.Error = protocol.Errorf("pop_basis already exists") - return bbResp, nil + return &bfgapi.BitcoinBroadcastResponse{ + Error: protocol.RequestErrorf("pop basis already exists"), + }, nil } if errors.Is(err, database.ErrValidation) { - log.Errorf("invalid pop basis: %s", err) - bbResp.Error = protocol.Errorf("invalid pop_basis") - return bbResp, nil + e := protocol.NewInternalErrorf("invalid pop basis: %w", err) + return &bfgapi.BitcoinBroadcastResponse{ + Error: e.ProtocolError(), + }, e } - ie := NewInternalErrorf("error inserting pop basis: %s", err) - bbResp.Error = ie.internal - log.Errorf(ie.actual.Error()) - return bbResp, nil + e := protocol.NewInternalErrorf("insert pop basis: %w", err) + return &bfgapi.BitcoinBroadcastResponse{ + Error: e.ProtocolError(), + }, e } - return bbResp, nil + return &bfgapi.BitcoinBroadcastResponse{TXID: txHash}, nil } -func (s *Server) handleBitcoinInfo(ctx context.Context, bws *bfgWs, payload any, id string) (any, error) { +func (s *Server) handleBitcoinInfo(ctx context.Context, bir *bfgapi.BitcoinInfoRequest) (any, error) { log.Tracef("handleBitcoinInfo") defer log.Tracef("handleBitcoinInfo exit") - // Increade command count - defer s.cmdsProcessed.Inc() - - _, ok := payload.(*bfgapi.BitcoinInfoRequest) - if !ok { - return nil, fmt.Errorf("not a BitcoinInfoRequest %T", payload) - } - - biResp := &bfgapi.BitcoinInfoResponse{} height, err := s.btcClient.Height(ctx) if err != nil { - ie := NewInternalErrorf("error getting bitcoin height: %s", err) - log.Errorf(ie.actual.Error()) - biResp.Error = ie.internal - return biResp, nil + e := protocol.NewInternalErrorf("bitcoin height: %w", err) + return &bfgapi.BitcoinInfoResponse{ + Error: e.ProtocolError(), + }, e } - biResp.Height = height - return biResp, nil + return &bfgapi.BitcoinInfoResponse{ + Height: height, + }, nil } -func (s *Server) handleBitcoinUTXOs(ctx context.Context, bws *bfgWs, payload any, id string) (any, error) { +func (s *Server) handleBitcoinUTXOs(ctx context.Context, bur *bfgapi.BitcoinUTXOsRequest) (any, error) { log.Tracef("handleBitcoinUTXOs") defer log.Tracef("handleBitcoinUTXOs exit") - // Increade command count - defer s.cmdsProcessed.Inc() - - bur, ok := payload.(*bfgapi.BitcoinUTXOsRequest) - if !ok { - err := fmt.Errorf("not a BitcoinUTXOsRequest %T", payload) - log.Errorf(err.Error()) - return nil, err - } - - buResp := &bfgapi.BitcoinUTXOsResponse{} - utxos, err := s.btcClient.UTXOs(context.TODO(), bur.ScriptHash) + utxos, err := s.btcClient.UTXOs(ctx, bur.ScriptHash) if err != nil { - ie := NewInternalErrorf("error getting bitcoin utxos: %s", err) - log.Errorf(ie.actual.Error()) - buResp.Error = ie.internal - return buResp, nil + e := protocol.NewInternalErrorf("bitcoin utxos: %w", err) + return &bfgapi.BitcoinUTXOsResponse{ + Error: e.ProtocolError(), + }, e + } + buResp := bfgapi.BitcoinUTXOsResponse{} for _, utxo := range utxos { buResp.UTXOs = append(buResp.UTXOs, &bfgapi.BitcoinUTXO{ Hash: utxo.Hash, @@ -377,47 +356,42 @@ func (s *Server) handleBitcoinUTXOs(ctx context.Context, bws *bfgWs, payload any return buResp, nil } -func (s *Server) handleAccessPublicKeyCreateRequest(ctx context.Context, bws *bfgWs, payload any, id string) (any, error) { +func (s *Server) handleAccessPublicKeyCreateRequest(ctx context.Context, acpkc *bfgapi.AccessPublicKeyCreateRequest) (any, error) { log.Tracef("handleAccessPublicKeyCreateRequest") defer log.Tracef("handleAccessPublicKeyCreateRequest exit") - accessPublicKeyCreateRequest, ok := payload.(*bfgapi.AccessPublicKeyCreateRequest) - if !ok { - err := fmt.Errorf("incorrect type: %T", payload) - return nil, err - } - - response := &bfgapi.AccessPublicKeyCreateResponse{} - - publicKey, err := hex.DecodeString(accessPublicKeyCreateRequest.PublicKey) + publicKey, err := hex.DecodeString(acpkc.PublicKey) if err != nil { - response.Error = protocol.Errorf(err.Error()) - return response, nil + return &bfgapi.AccessPublicKeyCreateResponse{ + Error: protocol.RequestErrorf("public key decode: %v", err), + }, nil } if err := s.db.AccessPublicKeyInsert(ctx, &bfgd.AccessPublicKey{ PublicKey: publicKey, }); err != nil { if errors.Is(err, database.ErrDuplicate) { - response.Error = protocol.Errorf("public key already exists") - return response, nil + return &bfgapi.AccessPublicKeyCreateResponse{ + Error: protocol.RequestErrorf("public key already exists"), + }, nil } if errors.Is(err, database.ErrValidation) { - response.Error = protocol.Errorf("invalid access public key") - return response, nil + return &bfgapi.AccessPublicKeyCreateResponse{ + Error: protocol.RequestErrorf("invalid access public key"), + }, nil } - ie := NewInternalErrorf("error inserting access public key: %s", err) - response.Error = ie.internal - log.Errorf(ie.actual.Error()) - return response, nil + e := protocol.NewInternalErrorf("insert public key: %w", err) + return &bfgapi.AccessPublicKeyCreateResponse{ + Error: protocol.RequestErrorf("invalid access public key"), + }, e } - return response, nil + return &bfgapi.AccessPublicKeyCreateResponse{}, nil } -func (s *Server) handleAccessPublicKeyDelete(ctx context.Context, bws *bfgWs, payload any, id string) (any, error) { +func (s *Server) handleAccessPublicKeyDelete(ctx context.Context, payload any) (any, error) { log.Tracef("handleAccessPublicKeyDelete") defer log.Tracef("handleAccessPublicKeyDelete exit") @@ -426,32 +400,34 @@ func (s *Server) handleAccessPublicKeyDelete(ctx context.Context, bws *bfgWs, pa return nil, fmt.Errorf("incorrect type %T", payload) } - response := &bfgapi.AccessPublicKeyDeleteResponse{} - b, err := hex.DecodeString(accessPublicKeyDeleteRequest.PublicKey) if err != nil { - response.Error = protocol.Errorf(err.Error()) - return response, nil + return &bfgapi.AccessPublicKeyDeleteResponse{ + Error: protocol.RequestErrorf("public key decode: %v", err), + }, nil } if err := s.db.AccessPublicKeyDelete(ctx, &bfgd.AccessPublicKey{ PublicKey: b, }); err != nil { if errors.Is(err, database.ErrNotFound) { - response.Error = protocol.Errorf("public key not found") - return response, nil + // XXX not sure I like giving this information away. + return &bfgapi.AccessPublicKeyDeleteResponse{ + Error: protocol.RequestErrorf("public key not found"), + }, nil } - ie := NewInternalErrorf("error deleting access public key: %s", err) - response.Error = ie.internal - log.Errorf(ie.actual.Error()) - return response, nil + e := protocol.NewInternalErrorf("error deleting access public key: %w", + err) + return &bfgapi.AccessPublicKeyDeleteResponse{ + Error: e.ProtocolError(), + }, e } - return response, nil + return &bfgapi.AccessPublicKeyDeleteResponse{}, nil } func (s *Server) processBitcoinBlock(ctx context.Context, height uint64) error { - log.Infof("Processing Bitcoin block at height %d...", height) + log.Tracef("Processing Bitcoin block at height %d...", height) rbh, err := s.btcClient.RawBlockHeader(ctx, height) if err != nil { @@ -662,9 +638,6 @@ func (s *Server) handleWebsocketPrivateRead(ctx context.Context, bws *bfgWs) { log.Tracef("handleWebsocketPrivateRead: %v", bws.addr) defer log.Tracef("handleWebsocketPrivateRead exit: %v", bws.addr) - // Command completed - defer s.cmdsProcessed.Inc() - for { cmd, id, payload, err := bfgapi.Read(ctx, bws.conn) if err != nil { @@ -682,41 +655,63 @@ func (s *Server) handleWebsocketPrivateRead(ctx context.Context, bws *bfgWs) { log.Tracef("handleWebsocketRead read %v: %v %v %v", bws.addr, cmd, id, spew.Sdump(payload)) - var response any - switch cmd { case bfgapi.CmdPingRequest: - response, err = s.handlePing(ctx, bws, payload, id) + err = s.handlePingRequest(ctx, bws, payload, id) case bfgapi.CmdPopTxForL2BlockRequest: - response, err = s.handlePopTxForL2Block(ctx, bws, payload, id) + handler := func(c context.Context) (any, error) { + msg := payload.(*bfgapi.PopTxsForL2BlockRequest) + return s.handlePopTxsForL2Block(c, msg) + } + + go s.handleRequest(ctx, bws, id, "handle pop for l2 block request", handler) case bfgapi.CmdNewL2KeystonesRequest: - response, err = s.handleNewL2Keystones(ctx, bws, payload, id) + handler := func(c context.Context) (any, error) { + msg := payload.(*bfgapi.NewL2KeystonesRequest) + return s.handleNewL2Keystones(c, msg) + } + + go s.handleRequest(ctx, bws, id, "handle new l2 keystones request", handler) case bfgapi.CmdBTCFinalityByRecentKeystonesRequest: - response, err = s.handleBtcFinalityByRecentKeystonesRequest(ctx, bws, payload, id) + handler := func(c context.Context) (any, error) { + msg := payload.(*bfgapi.BTCFinalityByRecentKeystonesRequest) + return s.handleBtcFinalityByRecentKeystonesRequest(c, msg) + } + + go s.handleRequest(ctx, bws, id, "handle finality recent keystones request", handler) case bfgapi.CmdBTCFinalityByKeystonesRequest: - response, err = s.handleBtcFinalityByKeystonesRequest(ctx, bws, payload, id) + handler := func(c context.Context) (any, error) { + msg := payload.(*bfgapi.BTCFinalityByKeystonesRequest) + return s.handleBtcFinalityByKeystonesRequest(c, msg) + } + + go s.handleRequest(ctx, bws, id, "handle finality keystones request", handler) case bfgapi.CmdAccessPublicKeyCreateRequest: - response, err = s.handleAccessPublicKeyCreateRequest(ctx, bws, payload, id) + handler := func(c context.Context) (any, error) { + msg := payload.(*bfgapi.AccessPublicKeyCreateRequest) + return s.handleAccessPublicKeyCreateRequest(c, msg) + } + + go s.handleRequest(ctx, bws, id, "handle access key create request", handler) case bfgapi.CmdAccessPublicKeyDeleteRequest: - response, err = s.handleAccessPublicKeyDelete(ctx, bws, payload, id) + handler := func(c context.Context) (any, error) { + msg := payload.(*bfgapi.AccessPublicKeyDeleteRequest) + return s.handleAccessPublicKeyDelete(c, msg) + } + + go s.handleRequest(ctx, bws, id, "handle access key delete request", handler) default: err = fmt.Errorf("unknown command") } - // if there was an error, close the websocket, only do this if we - // can't continue + // If set, it is a terminal error. if err != nil { - log.Errorf("handleWebsocketPrivateRead error %v %v: %v", bws.addr, cmd, err) - bws.conn.CloseStatus(websocket.StatusProtocolError, - err.Error()) + log.Errorf("handleWebsocketRead %v %v %v: %v", + bws.addr, cmd, id, err) return - } else { - if err := s.writeResponse(ctx, bws.conn, response, id); err != nil { - bws.conn.CloseStatus(websocket.StatusProtocolError, err.Error()) - return - } } + s.cmdsProcessed.Inc() } } @@ -726,9 +721,6 @@ func (s *Server) handleWebsocketPublicRead(ctx context.Context, bws *bfgWs) { log.Tracef("handleWebsocketPublicRead: %v", bws.addr) defer log.Tracef("handleWebsocketPublicRead exit: %v", bws.addr) - // Command completed - defer s.cmdsProcessed.Inc() - for { cmd, id, payload, err := bfgapi.Read(ctx, bws.conn) if err != nil { @@ -742,37 +734,58 @@ func (s *Server) handleWebsocketPublicRead(ctx context.Context, bws *bfgWs) { return } - var response any - switch cmd { case bfgapi.CmdPingRequest: - response, err = s.handlePing(ctx, bws, payload, id) + // quick call + err = s.handlePingRequest(ctx, bws, payload, id) case bfgapi.CmdL2KeystonesRequest: - response, err = s.handleL2KeystonesRequest(ctx, bws, payload, id) + handler := func(c context.Context) (any, error) { + msg := payload.(*bfgapi.L2KeystonesRequest) + return s.handleL2KeystonesRequest(c, msg) + } + + go s.handleRequest(ctx, bws, id, "handle l2 keystones request", handler) case bfgapi.CmdBitcoinBalanceRequest: - response, err = s.handleBitcoinBalance(ctx, bws, payload, id) + handler := func(c context.Context) (any, error) { + msg := payload.(*bfgapi.BitcoinBalanceRequest) + return s.handleBitcoinBalance(c, msg) + } + + go s.handleRequest(ctx, bws, id, "handle bitcoin balance request", handler) case bfgapi.CmdBitcoinBroadcastRequest: - response, err = s.handleBitcoinBroadcast(ctx, bws, payload, id) + handler := func(c context.Context) (any, error) { + msg := payload.(*bfgapi.BitcoinBroadcastRequest) + return s.handleBitcoinBroadcast(c, msg) + } + + go s.handleRequest(ctx, bws, id, "handle bitcoin broadcast request", handler) case bfgapi.CmdBitcoinInfoRequest: - response, err = s.handleBitcoinInfo(ctx, bws, payload, id) + handler := func(c context.Context) (any, error) { + msg := payload.(*bfgapi.BitcoinInfoRequest) + return s.handleBitcoinInfo(c, msg) + } + + go s.handleRequest(ctx, bws, id, "handle bitcoin broadcast request", handler) case bfgapi.CmdBitcoinUTXOsRequest: - response, err = s.handleBitcoinUTXOs(ctx, bws, payload, id) + + handler := func(c context.Context) (any, error) { + msg := payload.(*bfgapi.BitcoinUTXOsRequest) + return s.handleBitcoinUTXOs(c, msg) + } + + go s.handleRequest(ctx, bws, id, "handle bitcoin utxos request", handler) default: err = fmt.Errorf("unknown command") } + // If set, it is a terminal error. if err != nil { - log.Errorf("handleWebsocketPublicRead %v %v: %v", bws.addr, cmd, err) - bws.conn.CloseStatus(websocket.StatusProtocolError, - err.Error()) + log.Errorf("handleWebsocketRead %v %v %v: %v", + bws.addr, cmd, id, err) return - } else { - if err := s.writeResponse(ctx, bws.conn, response, id); err != nil { - bws.conn.CloseStatus(websocket.StatusProtocolError, err.Error()) - return - } } + s.cmdsProcessed.Inc() } } @@ -800,21 +813,19 @@ func (s *Server) newSession(bws *bfgWs) (string, error) { } } -func (s *Server) killSession(id string, why websocket.StatusCode) { +func (s *Server) deleteSession(id string) { + log.Tracef("deleteSession") + defer log.Tracef("deleteSession exit") + s.mtx.Lock() - bws, ok := s.sessions[id] + _, ok := s.sessions[id] if ok { delete(s.sessions, id) } s.mtx.Unlock() if !ok { - log.Errorf("killSession: id not found in sessions %s", id) - } else { - if err := bws.conn.CloseStatus(why, ""); err != nil { - // XXX this is too noisy. - log.Debugf("session close %v: %v", id, err) - } + log.Errorf("deleteSession: id not found in sessions %s", id) } } @@ -833,6 +844,7 @@ func (s *Server) handleWebsocketPrivate(w http.ResponseWriter, r *http.Request) r.RemoteAddr, err) return } + defer conn.Close(websocket.StatusProtocolError, "") bws := &bfgWs{ addr: r.RemoteAddr, @@ -850,7 +862,7 @@ func (s *Server) handleWebsocketPrivate(w http.ResponseWriter, r *http.Request) } defer func() { - s.killSession(bws.sessionId, websocket.StatusNormalClosure) + s.deleteSession(bws.sessionId) }() bws.wg.Add(1) @@ -941,7 +953,7 @@ func (s *Server) handleWebsocketPublic(w http.ResponseWriter, r *http.Request) { return } defer func() { - s.killSession(bws.sessionId, websocket.StatusNormalClosure) + s.deleteSession(bws.sessionId) }() // Always ping, required by protocol. @@ -963,47 +975,45 @@ func (s *Server) handleWebsocketPublic(w http.ResponseWriter, r *http.Request) { bws.sessionId, r.RemoteAddr, bws.publicKey) } -func (s *Server) handlePing(ctx context.Context, bws *bfgWs, payload any, id string) (any, error) { - log.Tracef("handlePing: %v", bws.addr) - defer log.Tracef("handlePing exit: %v", bws.addr) +func (s *Server) handlePingRequest(ctx context.Context, bws *bfgWs, payload any, id string) error { + log.Tracef("handlePingRequest: %v", bws.addr) + defer log.Tracef("handlePingRequest exit: %v", bws.addr) p, ok := payload.(*bfgapi.PingRequest) if !ok { - return nil, fmt.Errorf("handlePing invalid payload type: %T", payload) + return fmt.Errorf("handlePingRequest invalid payload type: %T", payload) } response := &bfgapi.PingResponse{ OriginTimestamp: p.Timestamp, Timestamp: time.Now().Unix(), } - return response, nil -} + log.Tracef("responding with %v", spew.Sdump(response)) -func (s *Server) handlePopTxForL2Block(ctx context.Context, bws *bfgWs, payload any, id string) (any, error) { - log.Tracef("handlePopTxForL2Block: %v", bws.addr) - defer log.Tracef("handlePopTxForL2Block exit: %v", bws.addr) - - p, ok := payload.(*bfgapi.PopTxsForL2BlockRequest) - if !ok { - return nil, fmt.Errorf("handlePopTxForL2Block invalid payload type: %T", - payload) + if err := bfgapi.Write(ctx, bws.conn, id, response); err != nil { + return fmt.Errorf("handlePingRequest write: %v %v", + bws.addr, err) } + return nil +} - response := bfgapi.PopTxsForL2BlockResponse{} +func (s *Server) handlePopTxsForL2Block(ctx context.Context, ptl2 *bfgapi.PopTxsForL2BlockRequest) (any, error) { + log.Tracef("handlePopTxsForL2Block") + defer log.Tracef("handlePopTxsForL2Block exit") - hash := hemi.HashSerializedL2KeystoneAbrev(p.L2Block) + hash := hemi.HashSerializedL2KeystoneAbrev(ptl2.L2Block) var h [32]byte copy(h[:], hash) popTxs, err := s.db.PopBasisByL2KeystoneAbrevHash(ctx, h, true) if err != nil { - ie := NewInternalErrorf("error getting pop basis: %s", err) - response.Error = ie.internal - log.Errorf(ie.actual.Error()) - return response, nil + e := protocol.NewInternalErrorf("error getting pop basis: %w", err) + return &bfgapi.PopTxsForL2BlockResponse{ + Error: e.ProtocolError(), + }, e } + response := &bfgapi.PopTxsForL2BlockResponse{} response.PopTxs = make([]bfgapi.PopTx, 0, len(popTxs)) - for k := range popTxs { response.PopTxs = append(response.PopTxs, bfgapi.PopTx{ BtcTxId: api.ByteSlice(popTxs[k].BtcTxId), @@ -1020,57 +1030,46 @@ func (s *Server) handlePopTxForL2Block(ctx context.Context, bws *bfgWs, payload return response, nil } -func (s *Server) handleBtcFinalityByRecentKeystonesRequest(ctx context.Context, bws *bfgWs, payload any, id string) (any, error) { - p, ok := payload.(*bfgapi.BTCFinalityByRecentKeystonesRequest) - if ok == false { - return nil, fmt.Errorf( - "handleBtcFinalityByRecentKeystonesRequest invalid payload type %T", - payload, - ) - } - - response := bfgapi.BTCFinalityByRecentKeystonesResponse{} +func (s *Server) handleBtcFinalityByRecentKeystonesRequest(ctx context.Context, bfrk *bfgapi.BTCFinalityByRecentKeystonesRequest) (any, error) { + log.Tracef("handleBtcFinalityByRecentKeystonesRequest") + defer log.Tracef("handleBtcFinalityByRecentKeystonesRequest exit") - finalities, err := s.db.L2BTCFinalityMostRecent(ctx, p.NumRecentKeystones) + finalities, err := s.db.L2BTCFinalityMostRecent(ctx, bfrk.NumRecentKeystones) if err != nil { - ie := NewInternalErrorf("error getting finality: %s", err) - response.Error = ie.internal - log.Errorf(ie.actual.Error()) - return response, nil + e := protocol.NewInternalErrorf("error getting finality: %w", err) + return &bfgapi.BTCFinalityByRecentKeystonesResponse{ + Error: e.ProtocolError(), + }, e } - apiFinalities := []hemi.L2BTCFinality{} - for _, finality := range finalities { + apiFinalities := make([]hemi.L2BTCFinality, 0, len(finalities)) + for k, finality := range finalities { apiFinality, err := hemi.L2BTCFinalityFromBfgd( &finality, finality.BTCTipHeight, finality.EffectiveHeight, ) if err != nil { - return nil, err + e := protocol.NewInternalErrorf("error getting finality (%v): %w", + k, err) + return &bfgapi.BTCFinalityByRecentKeystonesResponse{ + Error: e.ProtocolError(), + }, e } apiFinalities = append(apiFinalities, *apiFinality) } - response.L2BTCFinalities = apiFinalities - - return response, nil + return &bfgapi.BTCFinalityByRecentKeystonesResponse{ + L2BTCFinalities: apiFinalities, + }, nil } -func (s *Server) handleBtcFinalityByKeystonesRequest(ctx context.Context, bws *bfgWs, payload any, id string) (any, error) { - p, ok := payload.(*bfgapi.BTCFinalityByKeystonesRequest) - if ok == false { - return nil, fmt.Errorf( - "handleBtcFinalityByKeystonesRequest invalid payload type %T", - payload, - ) - } - - response := bfgapi.BTCFinalityByKeystonesResponse{} - - l2KeystoneAbrevHashes := []database.ByteArray{} +func (s *Server) handleBtcFinalityByKeystonesRequest(ctx context.Context, bfkr *bfgapi.BTCFinalityByKeystonesRequest) (any, error) { + log.Tracef("handleBtcFinalityByKeystonesRequest") + defer log.Tracef("handleBtcFinalityByKeystonesRequest exit") - for _, l := range p.L2Keystones { + l2KeystoneAbrevHashes := make([]database.ByteArray, 0, len(bfkr.L2Keystones)) + for _, l := range bfkr.L2Keystones { a := hemi.L2KeystoneAbbreviate(l) l2KeystoneAbrevHashes = append(l2KeystoneAbrevHashes, a.Hash()) } @@ -1080,13 +1079,13 @@ func (s *Server) handleBtcFinalityByKeystonesRequest(ctx context.Context, bws *b l2KeystoneAbrevHashes, ) if err != nil { - ie := NewInternalErrorf("error getting l2 keystones: %s", err) - response.Error = ie.internal - log.Errorf(ie.actual.Error()) - return response, nil + e := protocol.NewInternalErrorf("l2 keystones: %w", err) + return &bfgapi.BTCFinalityByKeystonesResponse{ + Error: e.ProtocolError(), + }, e } - apiFinalities := []hemi.L2BTCFinality{} + apiFinalities := make([]hemi.L2BTCFinality, 0, len(finalities)) for _, finality := range finalities { apiFinality, err := hemi.L2BTCFinalityFromBfgd( &finality, @@ -1094,38 +1093,34 @@ func (s *Server) handleBtcFinalityByKeystonesRequest(ctx context.Context, bws *b finality.EffectiveHeight, ) if err != nil { - return nil, err + e := protocol.NewInternalErrorf("l2 btc finality: %w", err) + return &bfgapi.BTCFinalityByKeystonesResponse{ + Error: e.ProtocolError(), + }, e } apiFinalities = append(apiFinalities, *apiFinality) } - response.L2BTCFinalities = apiFinalities - - return response, nil + return &bfgapi.BTCFinalityByKeystonesResponse{ + L2BTCFinalities: apiFinalities, + }, nil } -func (s *Server) handleL2KeystonesRequest(ctx context.Context, bws *bfgWs, payload any, id string) (any, error) { - p, ok := payload.(*bfgapi.L2KeystonesRequest) - if ok == false { - return nil, fmt.Errorf( - "handleL2KeystonesRequest invalid payload type %T", - payload, - ) - } +func (s *Server) handleL2KeystonesRequest(ctx context.Context, l2kr *bfgapi.L2KeystonesRequest) (any, error) { + log.Tracef("handleL2KeystonesRequest") + defer log.Tracef("handleL2KeystonesRequest exit") - gkhResp := &bfgapi.L2KeystonesResponse{} - - results, err := s.db.L2KeystonesMostRecentN(ctx, - uint32(p.NumL2Keystones)) + results, err := s.db.L2KeystonesMostRecentN(ctx, uint32(l2kr.NumL2Keystones)) if err != nil { - ie := NewInternalErrorf("error getting l2 keystones: %s", err) - gkhResp.Error = ie.internal - log.Errorf(ie.actual.Error()) - return gkhResp, nil + e := protocol.NewInternalErrorf("error getting l2 keystones: %w", err) + return &bfgapi.L2KeystonesResponse{ + Error: e.ProtocolError(), + }, e } + l2Keystones := make([]hemi.L2Keystone, 0, len(results)) for _, v := range results { - gkhResp.L2Keystones = append(gkhResp.L2Keystones, hemi.L2Keystone{ + l2Keystones = append(l2Keystones, hemi.L2Keystone{ Version: uint8(v.Version), L1BlockNumber: v.L1BlockNumber, L2BlockNumber: v.L2BlockNumber, @@ -1136,28 +1131,27 @@ func (s *Server) handleL2KeystonesRequest(ctx context.Context, bws *bfgWs, paylo }) } - return gkhResp, nil + return &bfgapi.L2KeystonesResponse{ + L2Keystones: l2Keystones, + }, nil } func writeNotificationResponse(bws *bfgWs, response any) { if err := bfgapi.Write(bws.requestContext, bws.conn, "", response); err != nil { - log.Errorf( - "handleBtcFinalityNotification write: %v %v", - bws.addr, - err, - ) + log.Errorf("handleBtcFinalityNotification write: %v %v", bws.addr, err) } } func (s *Server) handleBtcFinalityNotification() error { - response := bfgapi.BTCFinalityNotification{} + log.Tracef("handleBtcFinalityNotification") + defer log.Tracef("handleBtcFinalityNotification exit") s.mtx.Lock() for _, bws := range s.sessions { if _, ok := bws.notify[notifyBtcFinalities]; !ok { continue } - go writeNotificationResponse(bws, response) + go writeNotificationResponse(bws, &bfgapi.BTCFinalityNotification{}) } s.mtx.Unlock() @@ -1165,14 +1159,15 @@ func (s *Server) handleBtcFinalityNotification() error { } func (s *Server) handleBtcBlockNotification() error { - response := bfgapi.BTCNewBlockNotification{} + log.Tracef("handleBtcBlockNotification") + defer log.Tracef("handleBtcBlockNotification exit") s.mtx.Lock() for _, bws := range s.sessions { if _, ok := bws.notify[notifyBtcBlocks]; !ok { continue } - go writeNotificationResponse(bws, response) + go writeNotificationResponse(bws, &bfgapi.BTCNewBlockNotification{}) } s.mtx.Unlock() @@ -1180,14 +1175,15 @@ func (s *Server) handleBtcBlockNotification() error { } func (s *Server) handleL2KeystonesNotification() error { - response := bfgapi.L2KeystonesNotification{} + log.Tracef("handleL2KeystonesNotification") + defer log.Tracef("handleL2KeystonesNotification exit") s.mtx.Lock() for _, bws := range s.sessions { if _, ok := bws.notify[notifyL2Keystones]; !ok { continue } - go writeNotificationResponse(bws, response) + go writeNotificationResponse(bws, &bfgapi.L2KeystonesNotification{}) } s.mtx.Unlock() @@ -1215,8 +1211,11 @@ func hemiL2KeystonesToDb(l2ks []hemi.L2Keystone) []bfgd.L2Keystone { return dbks } -func (s *Server) handleNewL2Keystones(ctx context.Context, bws *bfgWs, payload any, id string) (any, error) { - ks := hemiL2KeystonesToDb(payload.(*bfgapi.NewL2KeystonesRequest).L2Keystones) +func (s *Server) handleNewL2Keystones(ctx context.Context, nlkr *bfgapi.NewL2KeystonesRequest) (any, error) { + log.Tracef("handleNewL2Keystones") + defer log.Tracef("handleNewL2Keystones exit") + + ks := hemiL2KeystonesToDb(nlkr.L2Keystones) err := s.db.L2KeystonesInsert(ctx, ks) response := bfgapi.NewL2KeystonesResponse{} if err != nil { @@ -1306,8 +1305,6 @@ func (s *Server) handleAccessPublicKeys(table string, action string, payload, pa return } - // XXX this is racing with killSession but protected. We should - // create a killSessions that takes an encoded PublicKey s.mtx.Lock() for _, v := range s.sessions { // if public key does not exist on session, it's not an authenticated @@ -1320,8 +1317,7 @@ func (s *Server) handleAccessPublicKeys(table string, action string, payload, pa // encoding, ensure that the session string does for an equal comparison sessionPublicKeyEncoded := fmt.Sprintf("\\x%s", hex.EncodeToString(v.publicKey)) if sessionPublicKeyEncoded == accessPublicKey.PublicKeyEncoded { - sessionId := v.sessionId - go s.killSession(sessionId, protocol.StatusHandshakeErr) + v.conn.CloseStatus(websocket.StatusProtocolError, "killed") } } s.mtx.Unlock() @@ -1455,7 +1451,7 @@ func (s *Server) Run(pctx context.Context) error { return fmt.Errorf("failed to create server: %w", err) } cs := []prometheus.Collector{ - s.cmdsProcessed, + s.cmdsProcessed, // XXX should we make two counters? priv/pub prometheus.NewGaugeFunc(prometheus.GaugeOpts{ Subsystem: promSubsystem, Name: "running", diff --git a/service/bss/bss.go b/service/bss/bss.go index 002c2c2c..5e377732 100644 --- a/service/bss/bss.go +++ b/service/bss/bss.go @@ -43,40 +43,6 @@ func init() { loggo.ConfigureLoggers(logLevel) } -// InternalError is an error type to differentiates between caller and callee -// errors. An internal error is used whne something internal to the application -// fails. -type InternalError struct { - internal *protocol.Error - actual error -} - -// Err return the protocol.Error that can be sent over the wire. -func (ie InternalError) Err() *protocol.Error { - return ie.internal -} - -// String return the actual underlying error. -func (ie InternalError) String() string { - i := ie.internal - return fmt.Sprintf("%v [%v:%v]", ie.actual.Error(), i.Trace, i.Timestamp) -} - -// Error satifies the error interface. -func (ie InternalError) Error() string { - if ie.internal == nil { - return "internal error" - } - return ie.internal.String() -} - -func NewInternalErrorf(msg string, args ...interface{}) *InternalError { - return &InternalError{ - internal: protocol.Errorf("internal error"), - actual: fmt.Errorf(msg, args...), - } -} - // Wrap for calling bfg commands type bfgCmd struct { msg any @@ -213,18 +179,8 @@ func (s *Server) handleRequest(parrentCtx context.Context, bws *bssWs, wsid stri response, err := handler(ctx) if err != nil { - // XXX these errors print an invalid trace for some reason. It - // mostly works but have a look at it and compare with client - // output and fix. - var ie *InternalError - if errors.As(err, &ie) { - log.Errorf("[INTERNAL ERROR] Failed to handle %v request %v: %v", - requestType, bws.addr, ie.String()) - } else { - // This may be too loud and can be silenced once in production. - log.Errorf("Failed to handle %v request %v: %v", - requestType, bws.addr, err) - } + log.Errorf("Failed to handle %v request %v: %v", + requestType, bws.addr, err) } if response == nil { return @@ -233,7 +189,8 @@ func (s *Server) handleRequest(parrentCtx context.Context, bws *bssWs, wsid stri log.Debugf("Responding to %v request with %v", requestType, spew.Sdump(response)) if err := bssapi.Write(ctx, bws.conn, wsid, response); err != nil { - log.Errorf("Failed to handle %v request: protocol write failed: %v", requestType, err) + log.Errorf("Failed to handle %v request: protocol write failed: %v", + requestType, err) } } @@ -271,56 +228,51 @@ func (s *Server) handlePopPayoutsRequest(ctx context.Context, msg *bssapi.PopPay log.Tracef("handlePopPayoutsRequest") defer log.Tracef("handlePopPayoutsRequest exit") - popTxsForL2BlockRequest := bfgapi.PopTxsForL2BlockRequest{ + popTxsForL2BlockRes, err := s.callBFG(ctx, bfgapi.PopTxsForL2BlockRequest{ L2Block: msg.L2BlockForPayout, - } - - popTxsForL2BlockRes, err := s.callBFG(ctx, &popTxsForL2BlockRequest) + }) if err != nil { + e := protocol.NewInternalErrorf("pop tx for l2: block %w", err) return &bssapi.PopPayoutsResponse{ - Error: protocol.Errorf("%v", err), - }, err - } - - popPayouts := ConvertPopTxsToPopPayouts( - (popTxsForL2BlockRes.(*bfgapi.PopTxsForL2BlockResponse)).PopTxs, - ) - - popPayoutsResponse := bssapi.PopPayoutsResponse{ - PopPayouts: popPayouts, + Error: e.ProtocolError(), + }, e } - return &popPayoutsResponse, nil + return &bssapi.PopPayoutsResponse{ + PopPayouts: ConvertPopTxsToPopPayouts( + (popTxsForL2BlockRes.(*bfgapi.PopTxsForL2BlockResponse)).PopTxs, + ), + }, nil } func (s *Server) handleL2KeytoneRequest(ctx context.Context, msg *bssapi.L2KeystoneRequest) (*bssapi.L2KeystoneResponse, error) { log.Tracef("handleL2KeytoneRequest") defer log.Tracef("handleL2KeytoneRequest exit") - newKeystoneHeadersRequest := bfgapi.NewL2KeystonesRequest{ - L2Keystones: []hemi.L2Keystone{ - msg.L2Keystone, - }, - } - - resp := &bssapi.L2KeystoneResponse{} - _, err := s.callBFG(ctx, &newKeystoneHeadersRequest) + _, err := s.callBFG(ctx, &bfgapi.NewL2KeystonesRequest{ + L2Keystones: []hemi.L2Keystone{msg.L2Keystone}, + }) if err != nil { - resp.Error = protocol.Errorf("%v", err) + e := protocol.NewInternalErrorf("new l2 keytsones: %w", err) + return &bssapi.L2KeystoneResponse{ + Error: e.ProtocolError(), + }, e } - return resp, err + return &bssapi.L2KeystoneResponse{}, nil } func (s *Server) handleBtcFinalityByRecentKeystonesRequest(ctx context.Context, msg *bssapi.BTCFinalityByRecentKeystonesRequest) (*bssapi.BTCFinalityByRecentKeystonesResponse, error) { - request := bfgapi.BTCFinalityByRecentKeystonesRequest{ - NumRecentKeystones: msg.NumRecentKeystones, - } + log.Tracef("handleBtcFinalityByRecentKeystonesRequest") + defer log.Tracef("handleBtcFinalityByRecentKeystonesRequest exit") - response, err := s.callBFG(ctx, &request) + response, err := s.callBFG(ctx, &bfgapi.BTCFinalityByRecentKeystonesRequest{ + NumRecentKeystones: msg.NumRecentKeystones, + }) if err != nil { + e := protocol.NewInternalErrorf("btc finality recent: %w", err) return &bssapi.BTCFinalityByRecentKeystonesResponse{ - Error: protocol.Errorf("%v", err), + Error: e.ProtocolError(), }, err } @@ -330,14 +282,16 @@ func (s *Server) handleBtcFinalityByRecentKeystonesRequest(ctx context.Context, } func (s *Server) handleBtcFinalityByKeystonesRequest(ctx context.Context, msg *bssapi.BTCFinalityByKeystonesRequest) (*bssapi.BTCFinalityByKeystonesResponse, error) { - request := bfgapi.BTCFinalityByKeystonesRequest{ - L2Keystones: msg.L2Keystones, - } + log.Tracef("handleBtcFinalityByKeystonesRequest") + defer log.Tracef("handleBtcFinalityByKeystonesRequest exit") - response, err := s.callBFG(ctx, &request) + response, err := s.callBFG(ctx, &bfgapi.BTCFinalityByKeystonesRequest{ + L2Keystones: msg.L2Keystones, + }) if err != nil { + e := protocol.NewInternalErrorf("btc finality keystones: %w", err) return &bssapi.BTCFinalityByKeystonesResponse{ - Error: protocol.Errorf("%v", err), + Error: e.ProtocolError(), }, err } @@ -415,8 +369,6 @@ func (s *Server) handleWebsocketRead(ctx context.Context, bws *bssWs) { if err != nil { log.Errorf("handleWebsocketRead %v %v %v: %v", bws.addr, cmd, id, err) - bws.conn.CloseStatus(websocket.StatusProtocolError, - err.Error()) return } @@ -458,7 +410,6 @@ func (s *Server) handleWebsocket(w http.ResponseWriter, r *http.Request) { defer func() { s.deleteSession(bws.sessionId) - conn.Close(websocket.StatusNormalClosure, "") // Force shutdown connection }() bws.wg.Add(1) @@ -508,11 +459,13 @@ func (s *Server) newSession(bws *bssWs) (string, error) { func (s *Server) deleteSession(id string) { s.mtx.Lock() - if _, ok := s.sessions[id]; !ok { - log.Errorf("id not found in sessions %s", id) - } + _, ok := s.sessions[id] delete(s.sessions, id) s.mtx.Unlock() + + if !ok { + log.Errorf("id not found in sessions %s", id) + } } func writeNotificationResponse(bws *bssWs, response any) { @@ -526,11 +479,9 @@ func writeNotificationResponse(bws *bssWs, response any) { } func (s *Server) handleBtcFinalityNotification() error { - response := bssapi.BTCFinalityNotification{} - s.mtx.Lock() for _, bws := range s.sessions { - go writeNotificationResponse(bws, response) + go writeNotificationResponse(bws, &bssapi.BTCFinalityNotification{}) } s.mtx.Unlock() @@ -538,11 +489,9 @@ func (s *Server) handleBtcFinalityNotification() error { } func (s *Server) handleBtcBlockNotification() error { - response := bssapi.BTCNewBlockNotification{} - s.mtx.Lock() for _, bws := range s.sessions { - go writeNotificationResponse(bws, response) + go writeNotificationResponse(bws, &bssapi.BTCNewBlockNotification{}) } s.mtx.Unlock() @@ -593,7 +542,7 @@ func (s *Server) handleBFGWebsocketReadUnauth(ctx context.Context, conn *protoco go s.handleBtcBlockNotification() default: log.Errorf("unknown command: %v", cmd) - return // XXX exit for now to cause a ruckus in the logs + return } } } @@ -652,17 +601,17 @@ func (s *Server) callBFG(parrentCtx context.Context, msg any) (any, error) { // attempt to send select { case <-ctx.Done(): - return nil, NewInternalErrorf("callBFG send context error: %v", + return nil, protocol.NewInternalErrorf("callBFG send context error: %w", ctx.Err()) case s.bfgCmdCh <- bc: default: - return nil, NewInternalErrorf("bfg command queue full") + return nil, protocol.NewInternalErrorf("bfg command queue full") } // Wait for response select { case <-ctx.Done(): - return nil, NewInternalErrorf("callBFG received context error: %v", + return nil, protocol.NewInternalErrorf("callBFG received context error: %w", ctx.Err()) case payload := <-bc.ch: if err, ok := payload.(error); ok {