From 06479d745d7c0853511437dc23fb5fc6fbb1530e Mon Sep 17 00:00:00 2001 From: Marco Peereboom Date: Tue, 22 Oct 2024 17:54:30 +0100 Subject: [PATCH] Working broadcast --- api/tbcapi/tbcapi.go | 18 +++++++- service/tbc/peer_manager.go | 23 ++++++++++ service/tbc/rpc.go | 43 ++++++++++++++++-- service/tbc/tbc.go | 90 ++++++++++++++++++++++++++++++++++++- 4 files changed, 167 insertions(+), 7 deletions(-) diff --git a/api/tbcapi/tbcapi.go b/api/tbcapi/tbcapi.go index 53fe01fe..ede64578 100644 --- a/api/tbcapi/tbcapi.go +++ b/api/tbcapi/tbcapi.go @@ -61,6 +61,9 @@ const ( CmdTxBroadcastRequest = "tbcapi-tx-broadcast-request" CmdTxBroadcastResponse = "tbcapi-tx-broadcast-response" + + CmdTxBroadcastRawRequest = "tbcapi-tx-broadcast-raw-request" + CmdTxBroadcastRawResponse = "tbcapi-tx-broadcast-raw-response" ) var ( @@ -237,7 +240,8 @@ type TxByIdResponse struct { } type TxBroadcastRequest struct { - Tx *wire.MsgTx `json:"tx"` + Tx *wire.MsgTx `json:"tx"` + Force bool `json:"force"` } type TxBroadcastResponse struct { @@ -245,6 +249,16 @@ type TxBroadcastResponse struct { Error *protocol.Error `json:"error,omitempty"` } +type TxBroadcastRawRequest struct { + Tx api.ByteSlice `json:"tx"` + Force bool `json:"force"` +} + +type TxBroadcastRawResponse struct { + TxID *chainhash.Hash `json:"tx_id"` + Error *protocol.Error `json:"error,omitempty"` +} + var commands = map[protocol.Command]reflect.Type{ CmdPingRequest: reflect.TypeOf(PingRequest{}), CmdPingResponse: reflect.TypeOf(PingResponse{}), @@ -272,6 +286,8 @@ var commands = map[protocol.Command]reflect.Type{ CmdTxByIdResponse: reflect.TypeOf(TxByIdResponse{}), CmdTxBroadcastRequest: reflect.TypeOf(TxBroadcastRequest{}), CmdTxBroadcastResponse: reflect.TypeOf(TxBroadcastResponse{}), + CmdTxBroadcastRawRequest: reflect.TypeOf(TxBroadcastRawRequest{}), + CmdTxBroadcastRawResponse: reflect.TypeOf(TxBroadcastRawResponse{}), } type tbcAPI struct{} diff --git a/service/tbc/peer_manager.go b/service/tbc/peer_manager.go index bd5072ae..b1b44813 100644 --- a/service/tbc/peer_manager.go +++ b/service/tbc/peer_manager.go @@ -281,6 +281,29 @@ func (pm *PeerManager) All(ctx context.Context, f func(ctx context.Context, p *p } } +func (pm *PeerManager) AllBlock(ctx context.Context, f func(ctx context.Context, p *peer)) { + log.Tracef("AllBlock") + defer log.Tracef("AllBlock") + + var wgAll sync.WaitGroup + + pm.mtx.RLock() + for _, p := range pm.peers { + if !p.isConnected() { + continue + } + wgAll.Add(1) + go func() { + defer wgAll.Done() + f(ctx, p) + }() + } + pm.mtx.RUnlock() + + log.Infof("AllBlock waiting") + wgAll.Wait() +} + // RandomConnect blocks until there is a peer ready to use. func (pm *PeerManager) RandomConnect(ctx context.Context) (*peer, error) { log.Tracef("RandomConnect") diff --git a/service/tbc/rpc.go b/service/tbc/rpc.go index e91dd386..ab07bfbc 100644 --- a/service/tbc/rpc.go +++ b/service/tbc/rpc.go @@ -154,6 +154,13 @@ func (s *Server) handleWebsocketRead(ctx context.Context, ws *tbcWs) { return s.handleTxBroadcastRequest(ctx, req) } + go s.handleRequest(ctx, ws, id, cmd, handler) + case tbcapi.CmdTxBroadcastRawRequest: + handler := func(ctx context.Context) (any, error) { + req := payload.(*tbcapi.TxBroadcastRawRequest) + return s.handleTxBroadcastRawRequest(ctx, req) + } + go s.handleRequest(ctx, ws, id, cmd, handler) default: err = fmt.Errorf("unknown command: %v", cmd) @@ -506,12 +513,40 @@ func (s *Server) handleTxBroadcastRequest(ctx context.Context, req *tbcapi.TxBro log.Tracef("handleTxBroadcastRequest") defer log.Tracef("handleTxBroadcastRequest exit") - txid, err := s.TxBroadcast(ctx, req.Tx) + txid, err := s.TxBroadcast(ctx, req.Tx, req.Force) + if err != nil { + var responseErr *protocol.Error + if errors.Is(err, ErrTxAlreadyBroadcast) || errors.Is(err, ErrTxBroadcastNoPeers) { + responseErr = protocol.RequestError(err) + } else { + responseErr = protocol.NewInternalError(err).ProtocolError() + } + return &tbcapi.TxBroadcastResponse{Error: responseErr}, responseErr + } + + return &tbcapi.TxBroadcastResponse{TxID: txid}, nil +} + +func (s *Server) handleTxBroadcastRawRequest(ctx context.Context, req *tbcapi.TxBroadcastRawRequest) (any, error) { + log.Tracef("handleTxBroadcastRawRequest") + defer log.Tracef("handleTxBroadcastRawRequest exit") + + tx := wire.NewMsgTx(0) + err := tx.Deserialize(bytes.NewBuffer(req.Tx)) if err != nil { - responseErr := protocol.NewInternalError(err) return &tbcapi.TxBroadcastResponse{ - Error: responseErr.ProtocolError(), - }, responseErr + Error: protocol.RequestError(err), + }, nil + } + txid, err := s.TxBroadcast(ctx, tx, req.Force) + if err != nil { + var responseErr *protocol.Error + if errors.Is(err, ErrTxAlreadyBroadcast) || errors.Is(err, ErrTxBroadcastNoPeers) { + responseErr = protocol.RequestError(err) + } else { + responseErr = protocol.NewInternalError(err).ProtocolError() + } + return &tbcapi.TxBroadcastResponse{Error: responseErr}, responseErr } return &tbcapi.TxBroadcastResponse{TxID: txid}, nil diff --git a/service/tbc/tbc.go b/service/tbc/tbc.go index 7603ae20..923d8f9c 100644 --- a/service/tbc/tbc.go +++ b/service/tbc/tbc.go @@ -16,6 +16,7 @@ import ( "reflect" "strconv" "sync" + "sync/atomic" "time" "github.com/btcsuite/btcd/blockchain" @@ -63,6 +64,9 @@ var ( localnetSeeds = []string{ "127.0.0.1:18444", } + + ErrTxAlreadyBroadcast = errors.New("tx already broadcast") + ErrTxBroadcastNoPeers = errors.New("can't broadcast tx, no peers") ) var log = loggo.GetLogger("tbc") @@ -114,6 +118,9 @@ type Server struct { // mempool mempool *mempool + // broadcast + broadcast map[chainhash.Hash]*wire.MsgTx + // bitcoin network seeds []string // XXX remove wireNet wire.BitcoinNet @@ -173,6 +180,7 @@ func NewServer(cfg *Config) (*Server, error) { }), sessions: make(map[string]*tbcWs), requestTimeout: defaultRequestTimeout, + broadcast: make(map[chainhash.Hash]*wire.MsgTx, 16), } log.Infof("MEMPOOL IS CURRENTLY BROKEN AND HAS BEEN DISABLED") @@ -309,6 +317,14 @@ func (s *Server) handleGeneric(ctx context.Context, p *peer, msg wire.Message, r return false, fmt.Errorf("handle generic not found: %w", err) } + case *wire.MsgGetData: + if err := s.handleGetData(ctx, p, m, raw); err != nil { + return false, fmt.Errorf("handle generic get data: %w", err) + } + + case *wire.MsgMemPool: + log.Infof("mempool: %v", spew.Sdump(m)) + default: return false, nil } @@ -1293,6 +1309,43 @@ func (s *Server) handleNotFound(ctx context.Context, p *peer, msg *wire.MsgNotFo return nil } +func (s *Server) handleGetData(ctx context.Context, p *peer, msg *wire.MsgGetData, raw []byte) error { + log.Infof("handleGetData %v", p) + defer log.Infof("handleGetData %v exit", p) + + for _, v := range msg.InvList { + switch v.Type { + case wire.InvTypeError: + log.Errorf("get data error: %v", v.Hash) + case wire.InvTypeTx: + s.mtx.RLock() + if tx, ok := s.broadcast[v.Hash]; ok { + log.Infof("handleGetData %v", spew.Sdump(msg)) + txc := tx.Copy() + err := p.write(defaultCmdTimeout, txc) + if err != nil { + log.Errorf("write tx: %v", err) + } + } + s.mtx.RUnlock() + case wire.InvTypeBlock: + log.Infof("get data block: %v", v.Hash) + case wire.InvTypeFilteredBlock: + log.Infof("get data filtered block: %v", v.Hash) + case wire.InvTypeWitnessBlock: + log.Infof("get data witness block: %v", v.Hash) + case wire.InvTypeWitnessTx: + log.Infof("get data witness tx: %v", v.Hash) + case wire.InvTypeFilteredWitnessBlock: + log.Infof("get data filtered witness block: %v", v.Hash) + default: + log.Errorf("get data unknown: %v", spew.Sdump(v.Hash)) + } + } + + return nil +} + func (s *Server) insertGenesis(ctx context.Context) error { log.Tracef("insertGenesis") defer log.Tracef("insertGenesis exit") @@ -1530,11 +1583,44 @@ func (s *Server) TxById(ctx context.Context, txId *chainhash.Hash) (*wire.MsgTx, return nil, database.ErrNotFound } -func (s *Server) TxBroadcast(ctx context.Context, tx *wire.MsgTx) (*chainhash.Hash, error) { +func (s *Server) TxBroadcast(ctx context.Context, tx *wire.MsgTx, force bool) (*chainhash.Hash, error) { log.Tracef("TxBroadcast") defer log.Tracef("TxBroadcast exit") - return nil, fmt.Errorf("GFY") + s.mtx.Lock() + if _, ok := s.broadcast[tx.TxHash()]; ok && !force { + s.mtx.Unlock() + return nil, ErrTxAlreadyBroadcast + } + s.broadcast[tx.TxHash()] = tx + txb := tx.Copy() + s.mtx.Unlock() + + txHash := txb.TxHash() + invTx := wire.NewMsgInv() + err := invTx.AddInvVect(wire.NewInvVect(wire.InvTypeTx, &txHash)) + if err != nil { + return nil, fmt.Errorf("invalid vector: %w", err) + } + var success atomic.Uint64 + inv := func(ctx context.Context, p *peer) { + log.Infof("inv %v", p) + defer log.Infof("inv %v exit", p) + + err := p.write(defaultCmdTimeout, invTx) + if err != nil { + log.Debugf("inv %v: %v", p, err) + return + } + success.Add(1) + } + s.pm.AllBlock(ctx, inv) + + if success.Load() == 0 { + return nil, ErrTxBroadcastNoPeers + } + + return &txHash, nil } func feesFromTransactions(txs []*btcutil.Tx) error {