Skip to content

Commit

Permalink
Handle batched transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
Djadih authored and jdowning100 committed May 3, 2024
1 parent 2addf67 commit f327c7d
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 11 deletions.
4 changes: 4 additions & 0 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -1122,6 +1122,10 @@ func (c *Core) AddRemote(tx *types.Transaction) error {
return c.sl.txPool.AddRemote(tx)
}

func (c *Core) AddRemotes(txs types.Transactions) []error {
return c.sl.txPool.AddRemotes(txs)
}

func (c *Core) TxPoolPending(enforceTips bool) (map[common.AddressBytes]types.Transactions, error) {
return c.sl.txPool.TxPoolPending(enforceTips)
}
Expand Down
3 changes: 3 additions & 0 deletions core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -1522,6 +1522,9 @@ func (pool *TxPool) runReorg(done chan struct{}, cancel chan struct{}, reset *tx

// Notify subsystems for newly added transactions
for _, tx := range promoted {
if !tx.IsLocal() {
continue
}
addr, err := types.Sender(pool.signer, tx)
if err != nil {
continue
Expand Down
1 change: 1 addition & 0 deletions internal/quaiapi/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ type Backend interface {
// Transaction pool API
SendTx(ctx context.Context, signedTx *types.Transaction) error
SendRemoteTx(tx *types.Transaction) error
SendRemoteTxs(txs types.Transactions) []error
GetTransaction(ctx context.Context, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error)
GetPoolTransactions() (types.Transactions, error)
GetPoolTransaction(txHash common.Hash) *types.Transaction
Expand Down
1 change: 1 addition & 0 deletions p2p/node/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ func (p *P2PNode) handleBroadcast(sourcePeer peer.ID, data interface{}, nodeLoca
p.cacheAdd(v.Hash(), &v, nodeLocation)
// TODO: send it to consensus
case types.Transaction:
case types.Transactions:
default:
log.Global.Debugf("received unsupported block broadcast")
// TODO: ban the peer which sent it?
Expand Down
46 changes: 35 additions & 11 deletions p2p/node/pubsubManager/gossipsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ import (
"github.com/dominant-strategies/go-quai/quai"
)

const numWorkers = 10 // Number of workers per stream
const msgChanSize = 500 // 500 requests per subscription

var (
ErrUnsupportedType = errors.New("data type not supported")
)
Expand Down Expand Up @@ -118,29 +121,50 @@ func (g *PubsubManager) Subscribe(location common.Location, datatype interface{}
}).Fatal("Go-Quai Panicked")
}
}()
// Create a channel for messages
msgChan := make(chan *pubsub.Message, msgChanSize)
full := 0
// Start worker goroutines
for i := 0; i < numWorkers; i++ {
go func(location common.Location) {
for msg := range msgChan { // This should exit when msgChan is closed
var data interface{}
// unmarshal the received data depending on the topic's type
err = pb.UnmarshalAndConvert(msg.Data, location, &data, datatype)
if err != nil {
log.Global.Errorf("error unmarshalling data: %s", err)
continue
}

// handle the received data
if g.onReceived != nil {
g.onReceived(msg.ReceivedFrom, data, location)
}
}
}(location)
}
log.Global.Debugf("waiting for first message on subscription: %s", sub.Topic())
for {
msg, err := sub.Next(g.ctx)
if err != nil || msg == nil {
// if context was cancelled, then we are shutting down
if g.ctx.Err() != nil || msg == nil {
close(msgChan)
return
}
log.Global.Errorf("error getting next message from subscription: %s", err)
continue
}
log.Global.Tracef("received message on topic: %s", topicName)

var data interface{}
// unmarshal the received data depending on the topic's type
err = pb.UnmarshalAndConvert(msg.Data, location, &data, datatype)
if err != nil {
log.Global.Errorf("error unmarshalling data: %s", err)
return
}

// handle the received data
if g.onReceived != nil {
g.onReceived(msg.ReceivedFrom, data, location)
// Send to worker goroutines
select {
case msgChan <- msg:
default:
if full%1000 == 0 {
log.Global.WithField("topic", topicName).Warnf("message channel full. Lost messages: %d", full)
}
full++
}
}
}(location, subscription)
Expand Down
8 changes: 8 additions & 0 deletions quai/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,14 @@ func (b *QuaiAPIBackend) SendRemoteTx(remoteTx *types.Transaction) error {
return b.quai.Core().AddRemote(remoteTx)
}

func (b *QuaiAPIBackend) SendRemoteTxs(remoteTxs types.Transactions) []error {
nodeCtx := b.quai.core.NodeCtx()
if nodeCtx != common.ZONE_CTX {
return []error{errors.New("SendRemoteTxs can only be called in zone chain")}
}
return b.quai.Core().AddRemotes(remoteTxs)
}

func (b *QuaiAPIBackend) GetPoolTransactions() (types.Transactions, error) {
nodeCtx := b.quai.core.NodeCtx()
if nodeCtx != common.ZONE_CTX {
Expand Down
3 changes: 3 additions & 0 deletions quai/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ func (h *handler) txBroadcastLoop() {
case event := <-h.txsCh:
transactions = append(transactions, event.Txs...)
case <-broadcastTransactionsTicker.C:
if len(transactions) == 0 {
continue
}
err := h.p2pBackend.Broadcast(h.nodeLocation, &transactions)
if err != nil {
h.logger.Error("Error broadcasting transactions", err)
Expand Down
7 changes: 7 additions & 0 deletions quai/p2p_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@ func (qbe *QuaiBackend) OnNewBroadcast(sourcePeer p2p.PeerID, data interface{},
backend.SendRemoteTx(&data)
}
// TODO: Handle the error here and mark the peers accordingly
case types.Transactions:
backend := *qbe.GetBackend(nodeLocation)
if backend == nil {
log.Global.Error("no backend found")
return false
}
backend.SendRemoteTxs(data.(types.Transactions))

Check failure on line 110 in quai/p2p_backend.go

View workflow job for this annotation

GitHub Actions / call-common-workflow / buildDeployDevGo / build

invalid operation: data (variable of type "github.com/dominant-strategies/go-quai/core/types".Transactions) is not an interface
}

// If it was a good broadcast, mark the peer as lively
Expand Down

0 comments on commit f327c7d

Please sign in to comment.