diff --git a/core/core.go b/core/core.go index ac7aa3e2c6..9926d6b000 100644 --- a/core/core.go +++ b/core/core.go @@ -541,6 +541,10 @@ func (c *Core) TxPool() *TxPool { return c.sl.txPool } +func (c *Core) GetTxsFromBroadcastSet(hash common.Hash) (types.Transactions, error) { + return c.sl.GetTxsFromBroadcastSet(hash) +} + func (c *Core) Stop() { // Delete the append queue c.appendQueue.Purge() @@ -1192,10 +1196,6 @@ func (c *Core) TxPoolPending(enforceTips bool) (map[common.AddressBytes]types.Tr return c.sl.txPool.TxPoolPending(enforceTips) } -func (c *Core) GetTxsFromBroadcastSet(hash common.Hash) (types.Transactions, error) { - return c.sl.txPool.GetTxsFromBroadcastSet(hash) -} - func (c *Core) Get(hash common.Hash) *types.Transaction { return c.sl.txPool.Get(hash) } diff --git a/core/slice.go b/core/slice.go index 32980895fb..dc7c2d5b39 100644 --- a/core/slice.go +++ b/core/slice.go @@ -1879,6 +1879,29 @@ func (sl *Slice) GetSlicesRunning() []common.Location { return sl.hc.SlicesRunning() } +func (sl *Slice) GetTxsFromBroadcastSet(hash common.Hash) (types.Transactions, error) { + // Every time we read the broadcast set, get the next set of transactions and + // update the phcache + // Get the latest transactions to be broadcasted from the pool + sl.txPool.broadcastSetMu.RLock() + if len(sl.txPool.broadcastSet) > 0 { + txs := sl.txPool.broadcastSet + hash := types.DeriveSha(txs, trie.NewStackTrie(nil)) + bestPh, exists := sl.readPhCache(sl.bestPhKey) + if exists { + bestPh.WorkObject().WorkObjectHeader().SetLocation(sl.NodeLocation()) + bestPh.WorkObject().WorkObjectHeader().SetTxHash(hash) + sl.writePhCache(sl.bestPhKey, bestPh) + sl.miner.worker.pendingHeaderFeed.Send(bestPh.WorkObject()) + } + sl.txPool.broadcastSetCache.Add(hash, txs) + sl.txPool.broadcastSet = types.Transactions{} + sl.logger.Error("Adding txs set into the broadcastSetCache", hash, "len txs ", len(txs)) + } + sl.txPool.broadcastSetMu.RUnlock() + return sl.txPool.GetTxsFromBroadcastSet(hash) +} + ////// Expansion related logic const (