diff --git a/core/core.go b/core/core.go index c56791c7e5..b4b5299d7c 100644 --- a/core/core.go +++ b/core/core.go @@ -1122,6 +1122,10 @@ func (c *Core) AddRemote(tx *types.Transaction) error { return c.sl.txPool.AddRemote(tx) } +func (c *Core) AddRemotes(txs types.Transactions) []error { + return c.sl.txPool.AddRemotes(txs) +} + func (c *Core) TxPoolPending(enforceTips bool) (map[common.AddressBytes]types.Transactions, error) { return c.sl.txPool.TxPoolPending(enforceTips) } diff --git a/core/tx_pool.go b/core/tx_pool.go index 0cbf801f0a..e675d57b15 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -1522,6 +1522,9 @@ func (pool *TxPool) runReorg(done chan struct{}, cancel chan struct{}, reset *tx // Notify subsystems for newly added transactions for _, tx := range promoted { + if !tx.IsLocal() { + continue + } addr, err := types.Sender(pool.signer, tx) if err != nil { continue diff --git a/internal/quaiapi/backend.go b/internal/quaiapi/backend.go index d5d6b9436c..3c9cc92967 100644 --- a/internal/quaiapi/backend.go +++ b/internal/quaiapi/backend.go @@ -103,6 +103,7 @@ type Backend interface { // Transaction pool API SendTx(ctx context.Context, signedTx *types.Transaction) error SendRemoteTx(tx *types.Transaction) error + SendRemoteTxs(txs types.Transactions) []error GetTransaction(ctx context.Context, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error) GetPoolTransactions() (types.Transactions, error) GetPoolTransaction(txHash common.Hash) *types.Transaction diff --git a/p2p/node/api.go b/p2p/node/api.go index 6f47c0a981..58c16c6e08 100644 --- a/p2p/node/api.go +++ b/p2p/node/api.go @@ -313,6 +313,7 @@ func (p *P2PNode) handleBroadcast(sourcePeer peer.ID, data interface{}, nodeLoca p.cacheAdd(v.Hash(), &v, nodeLocation) // TODO: send it to consensus case types.Transaction: + case types.Transactions: default: log.Global.Debugf("received unsupported block broadcast") // TODO: ban the peer which sent it? diff --git a/p2p/node/pubsubManager/gossipsub.go b/p2p/node/pubsubManager/gossipsub.go index ab2e6d7c21..2ebfd5d8f8 100644 --- a/p2p/node/pubsubManager/gossipsub.go +++ b/p2p/node/pubsubManager/gossipsub.go @@ -17,6 +17,9 @@ import ( "github.com/dominant-strategies/go-quai/quai" ) +const numWorkers = 10 // Number of workers per stream +const msgChanSize = 500 // 500 requests per subscription + var ( ErrUnsupportedType = errors.New("data type not supported") ) @@ -118,29 +121,50 @@ func (g *PubsubManager) Subscribe(location common.Location, datatype interface{} }).Fatal("Go-Quai Panicked") } }() + // Create a channel for messages + msgChan := make(chan *pubsub.Message, msgChanSize) + full := 0 + // Start worker goroutines + for i := 0; i < numWorkers; i++ { + go func(location common.Location) { + for msg := range msgChan { // This should exit when msgChan is closed + var data interface{} + // unmarshal the received data depending on the topic's type + err = pb.UnmarshalAndConvert(msg.Data, location, &data, datatype) + if err != nil { + log.Global.Errorf("error unmarshalling data: %s", err) + continue + } + + // handle the received data + if g.onReceived != nil { + g.onReceived(msg.ReceivedFrom, data, location) + } + } + }(location) + } log.Global.Debugf("waiting for first message on subscription: %s", sub.Topic()) for { msg, err := sub.Next(g.ctx) if err != nil || msg == nil { // if context was cancelled, then we are shutting down if g.ctx.Err() != nil || msg == nil { + close(msgChan) return } log.Global.Errorf("error getting next message from subscription: %s", err) + continue } log.Global.Tracef("received message on topic: %s", topicName) - var data interface{} - // unmarshal the received data depending on the topic's type - err = pb.UnmarshalAndConvert(msg.Data, location, &data, datatype) - if err != nil { - log.Global.Errorf("error unmarshalling data: %s", err) - return - } - - // handle the received data - if g.onReceived != nil { - g.onReceived(msg.ReceivedFrom, data, location) + // Send to worker goroutines + select { + case msgChan <- msg: + default: + if full%1000 == 0 { + log.Global.WithField("topic", topicName).Warnf("message channel full. Lost messages: %d", full) + } + full++ } } }(location, subscription) diff --git a/quai/api_backend.go b/quai/api_backend.go index 8a3d795ee8..2f2f957e41 100644 --- a/quai/api_backend.go +++ b/quai/api_backend.go @@ -319,6 +319,14 @@ func (b *QuaiAPIBackend) SendRemoteTx(remoteTx *types.Transaction) error { return b.quai.Core().AddRemote(remoteTx) } +func (b *QuaiAPIBackend) SendRemoteTxs(remoteTxs types.Transactions) []error { + nodeCtx := b.quai.core.NodeCtx() + if nodeCtx != common.ZONE_CTX { + return []error{errors.New("SendRemoteTxs can only be called in zone chain")} + } + return b.quai.Core().AddRemotes(remoteTxs) +} + func (b *QuaiAPIBackend) GetPoolTransactions() (types.Transactions, error) { nodeCtx := b.quai.core.NodeCtx() if nodeCtx != common.ZONE_CTX { diff --git a/quai/handler.go b/quai/handler.go index bd5cff6b68..b493dbf6b0 100644 --- a/quai/handler.go +++ b/quai/handler.go @@ -149,6 +149,9 @@ func (h *handler) txBroadcastLoop() { case event := <-h.txsCh: transactions = append(transactions, event.Txs...) case <-broadcastTransactionsTicker.C: + if len(transactions) == 0 { + continue + } err := h.p2pBackend.Broadcast(h.nodeLocation, &transactions) if err != nil { h.logger.Error("Error broadcasting transactions", err) diff --git a/quai/p2p_backend.go b/quai/p2p_backend.go index bc04428283..c1603a5b4f 100644 --- a/quai/p2p_backend.go +++ b/quai/p2p_backend.go @@ -101,6 +101,13 @@ func (qbe *QuaiBackend) OnNewBroadcast(sourcePeer p2p.PeerID, data interface{}, backend.SendRemoteTx(&data) } // TODO: Handle the error here and mark the peers accordingly + case types.Transactions: + backend := *qbe.GetBackend(nodeLocation) + if backend == nil { + log.Global.Error("no backend found") + return false + } + backend.SendRemoteTxs(data.(types.Transactions)) } // If it was a good broadcast, mark the peer as lively