Skip to content

Commit

Permalink
Working broadcast
Browse files Browse the repository at this point in the history
  • Loading branch information
marcopeereboom committed Oct 22, 2024
1 parent 27aaeb2 commit 06479d7
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 7 deletions.
18 changes: 17 additions & 1 deletion api/tbcapi/tbcapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ const (

CmdTxBroadcastRequest = "tbcapi-tx-broadcast-request"
CmdTxBroadcastResponse = "tbcapi-tx-broadcast-response"

CmdTxBroadcastRawRequest = "tbcapi-tx-broadcast-raw-request"
CmdTxBroadcastRawResponse = "tbcapi-tx-broadcast-raw-response"
)

var (
Expand Down Expand Up @@ -237,14 +240,25 @@ type TxByIdResponse struct {
}

type TxBroadcastRequest struct {
Tx *wire.MsgTx `json:"tx"`
Tx *wire.MsgTx `json:"tx"`
Force bool `json:"force"`
}

type TxBroadcastResponse struct {
TxID *chainhash.Hash `json:"tx_id"`
Error *protocol.Error `json:"error,omitempty"`
}

type TxBroadcastRawRequest struct {
Tx api.ByteSlice `json:"tx"`
Force bool `json:"force"`
}

type TxBroadcastRawResponse struct {
TxID *chainhash.Hash `json:"tx_id"`
Error *protocol.Error `json:"error,omitempty"`
}

var commands = map[protocol.Command]reflect.Type{
CmdPingRequest: reflect.TypeOf(PingRequest{}),
CmdPingResponse: reflect.TypeOf(PingResponse{}),
Expand Down Expand Up @@ -272,6 +286,8 @@ var commands = map[protocol.Command]reflect.Type{
CmdTxByIdResponse: reflect.TypeOf(TxByIdResponse{}),
CmdTxBroadcastRequest: reflect.TypeOf(TxBroadcastRequest{}),
CmdTxBroadcastResponse: reflect.TypeOf(TxBroadcastResponse{}),
CmdTxBroadcastRawRequest: reflect.TypeOf(TxBroadcastRawRequest{}),
CmdTxBroadcastRawResponse: reflect.TypeOf(TxBroadcastRawResponse{}),
}

type tbcAPI struct{}
Expand Down
23 changes: 23 additions & 0 deletions service/tbc/peer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,29 @@ func (pm *PeerManager) All(ctx context.Context, f func(ctx context.Context, p *p
}
}

func (pm *PeerManager) AllBlock(ctx context.Context, f func(ctx context.Context, p *peer)) {
log.Tracef("AllBlock")
defer log.Tracef("AllBlock")

var wgAll sync.WaitGroup

pm.mtx.RLock()
for _, p := range pm.peers {
if !p.isConnected() {
continue
}
wgAll.Add(1)
go func() {
defer wgAll.Done()
f(ctx, p)
}()
}
pm.mtx.RUnlock()

log.Infof("AllBlock waiting")
wgAll.Wait()
}

// RandomConnect blocks until there is a peer ready to use.
func (pm *PeerManager) RandomConnect(ctx context.Context) (*peer, error) {
log.Tracef("RandomConnect")
Expand Down
43 changes: 39 additions & 4 deletions service/tbc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,13 @@ func (s *Server) handleWebsocketRead(ctx context.Context, ws *tbcWs) {
return s.handleTxBroadcastRequest(ctx, req)
}

go s.handleRequest(ctx, ws, id, cmd, handler)
case tbcapi.CmdTxBroadcastRawRequest:
handler := func(ctx context.Context) (any, error) {
req := payload.(*tbcapi.TxBroadcastRawRequest)
return s.handleTxBroadcastRawRequest(ctx, req)
}

go s.handleRequest(ctx, ws, id, cmd, handler)
default:
err = fmt.Errorf("unknown command: %v", cmd)
Expand Down Expand Up @@ -506,12 +513,40 @@ func (s *Server) handleTxBroadcastRequest(ctx context.Context, req *tbcapi.TxBro
log.Tracef("handleTxBroadcastRequest")
defer log.Tracef("handleTxBroadcastRequest exit")

txid, err := s.TxBroadcast(ctx, req.Tx)
txid, err := s.TxBroadcast(ctx, req.Tx, req.Force)
if err != nil {
var responseErr *protocol.Error
if errors.Is(err, ErrTxAlreadyBroadcast) || errors.Is(err, ErrTxBroadcastNoPeers) {
responseErr = protocol.RequestError(err)
} else {
responseErr = protocol.NewInternalError(err).ProtocolError()
}
return &tbcapi.TxBroadcastResponse{Error: responseErr}, responseErr
}

return &tbcapi.TxBroadcastResponse{TxID: txid}, nil
}

func (s *Server) handleTxBroadcastRawRequest(ctx context.Context, req *tbcapi.TxBroadcastRawRequest) (any, error) {
log.Tracef("handleTxBroadcastRawRequest")
defer log.Tracef("handleTxBroadcastRawRequest exit")

tx := wire.NewMsgTx(0)
err := tx.Deserialize(bytes.NewBuffer(req.Tx))
if err != nil {
responseErr := protocol.NewInternalError(err)
return &tbcapi.TxBroadcastResponse{
Error: responseErr.ProtocolError(),
}, responseErr
Error: protocol.RequestError(err),
}, nil
}
txid, err := s.TxBroadcast(ctx, tx, req.Force)
if err != nil {
var responseErr *protocol.Error
if errors.Is(err, ErrTxAlreadyBroadcast) || errors.Is(err, ErrTxBroadcastNoPeers) {
responseErr = protocol.RequestError(err)
} else {
responseErr = protocol.NewInternalError(err).ProtocolError()
}
return &tbcapi.TxBroadcastResponse{Error: responseErr}, responseErr
}

return &tbcapi.TxBroadcastResponse{TxID: txid}, nil
Expand Down
90 changes: 88 additions & 2 deletions service/tbc/tbc.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"reflect"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/btcsuite/btcd/blockchain"
Expand Down Expand Up @@ -63,6 +64,9 @@ var (
localnetSeeds = []string{
"127.0.0.1:18444",
}

ErrTxAlreadyBroadcast = errors.New("tx already broadcast")
ErrTxBroadcastNoPeers = errors.New("can't broadcast tx, no peers")
)

var log = loggo.GetLogger("tbc")
Expand Down Expand Up @@ -114,6 +118,9 @@ type Server struct {
// mempool
mempool *mempool

// broadcast
broadcast map[chainhash.Hash]*wire.MsgTx

// bitcoin network
seeds []string // XXX remove
wireNet wire.BitcoinNet
Expand Down Expand Up @@ -173,6 +180,7 @@ func NewServer(cfg *Config) (*Server, error) {
}),
sessions: make(map[string]*tbcWs),
requestTimeout: defaultRequestTimeout,
broadcast: make(map[chainhash.Hash]*wire.MsgTx, 16),
}

log.Infof("MEMPOOL IS CURRENTLY BROKEN AND HAS BEEN DISABLED")
Expand Down Expand Up @@ -309,6 +317,14 @@ func (s *Server) handleGeneric(ctx context.Context, p *peer, msg wire.Message, r
return false, fmt.Errorf("handle generic not found: %w", err)
}

case *wire.MsgGetData:
if err := s.handleGetData(ctx, p, m, raw); err != nil {
return false, fmt.Errorf("handle generic get data: %w", err)
}

case *wire.MsgMemPool:
log.Infof("mempool: %v", spew.Sdump(m))

default:
return false, nil
}
Expand Down Expand Up @@ -1293,6 +1309,43 @@ func (s *Server) handleNotFound(ctx context.Context, p *peer, msg *wire.MsgNotFo
return nil
}

func (s *Server) handleGetData(ctx context.Context, p *peer, msg *wire.MsgGetData, raw []byte) error {
log.Infof("handleGetData %v", p)
defer log.Infof("handleGetData %v exit", p)

for _, v := range msg.InvList {
switch v.Type {
case wire.InvTypeError:
log.Errorf("get data error: %v", v.Hash)
case wire.InvTypeTx:
s.mtx.RLock()
if tx, ok := s.broadcast[v.Hash]; ok {
log.Infof("handleGetData %v", spew.Sdump(msg))
txc := tx.Copy()
err := p.write(defaultCmdTimeout, txc)
if err != nil {
log.Errorf("write tx: %v", err)
}
}
s.mtx.RUnlock()
case wire.InvTypeBlock:
log.Infof("get data block: %v", v.Hash)
case wire.InvTypeFilteredBlock:
log.Infof("get data filtered block: %v", v.Hash)
case wire.InvTypeWitnessBlock:
log.Infof("get data witness block: %v", v.Hash)
case wire.InvTypeWitnessTx:
log.Infof("get data witness tx: %v", v.Hash)
case wire.InvTypeFilteredWitnessBlock:
log.Infof("get data filtered witness block: %v", v.Hash)
default:
log.Errorf("get data unknown: %v", spew.Sdump(v.Hash))
}
}

return nil
}

func (s *Server) insertGenesis(ctx context.Context) error {
log.Tracef("insertGenesis")
defer log.Tracef("insertGenesis exit")
Expand Down Expand Up @@ -1530,11 +1583,44 @@ func (s *Server) TxById(ctx context.Context, txId *chainhash.Hash) (*wire.MsgTx,
return nil, database.ErrNotFound
}

func (s *Server) TxBroadcast(ctx context.Context, tx *wire.MsgTx) (*chainhash.Hash, error) {
func (s *Server) TxBroadcast(ctx context.Context, tx *wire.MsgTx, force bool) (*chainhash.Hash, error) {
log.Tracef("TxBroadcast")
defer log.Tracef("TxBroadcast exit")

return nil, fmt.Errorf("GFY")
s.mtx.Lock()
if _, ok := s.broadcast[tx.TxHash()]; ok && !force {
s.mtx.Unlock()
return nil, ErrTxAlreadyBroadcast
}
s.broadcast[tx.TxHash()] = tx
txb := tx.Copy()
s.mtx.Unlock()

txHash := txb.TxHash()
invTx := wire.NewMsgInv()
err := invTx.AddInvVect(wire.NewInvVect(wire.InvTypeTx, &txHash))
if err != nil {
return nil, fmt.Errorf("invalid vector: %w", err)
}
var success atomic.Uint64
inv := func(ctx context.Context, p *peer) {
log.Infof("inv %v", p)
defer log.Infof("inv %v exit", p)

err := p.write(defaultCmdTimeout, invTx)
if err != nil {
log.Debugf("inv %v: %v", p, err)
return
}
success.Add(1)
}
s.pm.AllBlock(ctx, inv)

if success.Load() == 0 {
return nil, ErrTxBroadcastNoPeers
}

return &txHash, nil
}

func feesFromTransactions(txs []*btcutil.Tx) error {
Expand Down

0 comments on commit 06479d7

Please sign in to comment.