Skip to content

Commit

Permalink
Transactions are broadcasted using the workshares
Browse files Browse the repository at this point in the history
  • Loading branch information
gameofpointers committed Jun 13, 2024
1 parent 81a1d9e commit c1089b2
Show file tree
Hide file tree
Showing 29 changed files with 695 additions and 590 deletions.
3 changes: 1 addition & 2 deletions cmd/utils/hierarchical_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,7 @@ func (hc *HierarchicalCoordinator) startNode(logPath string, quaiBackend quai.Co

if quaiBackend.ProcessingState(location) && location.Context() == common.ZONE_CTX {
// Subscribe to the new topics after setting the api backend
hc.p2p.Subscribe(location, &types.WorkObjectHeader{})
hc.p2p.Subscribe(location, &types.Transactions{})
hc.p2p.Subscribe(location, &types.WorkObjectShareView{})
}

if location.Context() == common.PRIME_CTX || location.Context() == common.REGION_CTX || quaiBackend.ProcessingState(location) {
Expand Down
4 changes: 0 additions & 4 deletions consensus/blake3pow/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,10 +272,6 @@ func (blake3pow *Blake3pow) VerifyUncles(chain consensus.ChainReader, block *typ
if diff := new(big.Int).Sub(uncle.Number(), parentNumber); diff.Cmp(big.NewInt(1)) != 0 {
return consensus.ErrInvalidNumber
}

if !blake3pow.CheckIfValidWorkShare(uncle) {
return errors.New("invalid workshare included")
}
}
}
return nil
Expand Down
5 changes: 0 additions & 5 deletions consensus/progpow/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,11 +273,6 @@ func (progpow *Progpow) VerifyUncles(chain consensus.ChainReader, block *types.W
if diff := new(big.Int).Sub(uncle.Number(), parentNumber); diff.Cmp(big.NewInt(1)) != 0 {
return consensus.ErrInvalidNumber
}

if !progpow.CheckIfValidWorkShare(uncle) {
return errors.New("invalid workshare included")
}

}
}
return nil
Expand Down
13 changes: 9 additions & 4 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ const (
c_normalListBackoffThreshold = 5 // Max multiple on the c_normalListProcCounter
c_maxRemoteTxQueue = 50000
c_remoteTxProcPeriod = 2 // Time between remote tx pool processing
c_asyncWorkShareTimer = 100 * time.Millisecond
)

type blockNumberAndRetryCounter struct {
Expand Down Expand Up @@ -541,6 +542,10 @@ func (c *Core) TxPool() *TxPool {
return c.sl.txPool
}

func (c *Core) GetTxsFromBroadcastSet(hash common.Hash) (types.Transactions, error) {
return c.sl.GetTxsFromBroadcastSet(hash)
}

func (c *Core) Stop() {
// Delete the append queue
c.appendQueue.Purge()
Expand Down Expand Up @@ -651,6 +656,10 @@ func (c *Core) ConstructLocalMinedBlock(woHeader *types.WorkObject) (*types.Work
return c.sl.ConstructLocalMinedBlock(woHeader)
}

func (c *Core) GetPendingBlockBody(woHeader *types.WorkObjectHeader) *types.WorkObject {
return c.sl.GetPendingBlockBody(woHeader)
}

func (c *Core) SubRelayPendingHeader(slPendingHeader types.PendingHeader, newEntropy *big.Int, location common.Location, subReorg bool, order int) {
c.sl.SubRelayPendingHeader(slPendingHeader, newEntropy, location, subReorg, order)
}
Expand Down Expand Up @@ -994,10 +1003,6 @@ func (c *Core) TxLookupLimit() uint64 {
return 0
}

func (c *Core) SubscribeNewTxsEvent(ch chan<- NewTxsEvent) event.Subscription {
return c.sl.txPool.SubscribeNewTxsEvent(ch)
}

func (c *Core) SetExtra(extra []byte) error {
return c.sl.miner.SetExtra(extra)
}
Expand Down
4 changes: 2 additions & 2 deletions core/rawdb/accessors_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -750,7 +750,7 @@ func WriteWorkObjectBody(db ethdb.KeyValueWriter, hash common.Hash, workObject *
key := workObjectBodyKey(hash)
WriteHeaderNumber(db, hash, workObject.NumberU64(nodeCtx))

protoWorkObjectBody, err := workObject.Body().ProtoEncode()
protoWorkObjectBody, err := workObject.Body().ProtoEncode(woType)
if err != nil {
db.Logger().WithField("err", err).Fatal("Failed to proto encode work object body")
}
Expand Down Expand Up @@ -1044,7 +1044,7 @@ func (b badWorkObject) ProtoEncode() *ProtoBadWorkObject {
if err != nil {
log.Global.WithField("err", err).Fatal("Failed to proto encode header")
}
protoWorkObjectBody, err := b.woBody.ProtoEncode()
protoWorkObjectBody, err := b.woBody.ProtoEncode(types.BlockObject)
if err != nil {
log.Global.WithField("err", err).Fatal("Failed to proto encode body")
}
Expand Down
51 changes: 48 additions & 3 deletions core/slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ func NewSlice(db ethdb.Database, config *Config, txConfig *TxPoolConfig, txLooku

if nodeCtx == common.ZONE_CTX && sl.ProcessingState() {
go sl.asyncPendingHeaderLoop()
go sl.asyncWorkShareUpdateLoop()
}

return sl, nil
Expand Down Expand Up @@ -1323,7 +1324,7 @@ func (sl *Slice) ConstructLocalMinedBlock(wo *types.WorkObject) (*types.WorkObje
nodeCtx := sl.NodeLocation().Context()
var pendingBlockBody *types.WorkObject
if nodeCtx == common.ZONE_CTX {
pendingBlockBody = sl.GetPendingBlockBody(wo)
pendingBlockBody = sl.GetPendingBlockBody(wo.WorkObjectHeader())
if pendingBlockBody == nil {
sl.logger.WithFields(log.Fields{"wo.Hash": wo.Hash(),
"wo.Header": wo.HeaderHash(),
Expand Down Expand Up @@ -1413,8 +1414,10 @@ func (sl *Slice) combinePendingHeader(header *types.WorkObject, slPendingHeader
}

if inSlice {
combinedPendingHeader.Header().SetEtxRollupHash(header.EtxRollupHash())
combinedPendingHeader.WorkObjectHeader().SetDifficulty(header.Difficulty())
combinedPendingHeader.WorkObjectHeader().SetTxHash(header.TxHash())

combinedPendingHeader.Header().SetEtxRollupHash(header.EtxRollupHash())
combinedPendingHeader.Header().SetUncledS(header.Header().UncledS())
combinedPendingHeader.Header().SetUncleHash(header.UncleHash())
combinedPendingHeader.Header().SetTxHash(header.Header().TxHash())
Expand Down Expand Up @@ -1578,7 +1581,7 @@ func (sl *Slice) NewGenesisPendingHeader(domPendingHeader *types.WorkObject, dom
return nil
}

func (sl *Slice) GetPendingBlockBody(wo *types.WorkObject) *types.WorkObject {
func (sl *Slice) GetPendingBlockBody(wo *types.WorkObjectHeader) *types.WorkObject {
blockBody, _ := sl.miner.worker.GetPendingBlockBody(wo)
return blockBody
}
Expand Down Expand Up @@ -1877,6 +1880,48 @@ func (sl *Slice) GetSlicesRunning() []common.Location {
return sl.hc.SlicesRunning()
}

func (sl *Slice) asyncWorkShareUpdateLoop() {
defer func() {
if r := recover(); r != nil {
sl.logger.WithFields(log.Fields{
"error": r,
"stacktrace": string(debug.Stack()),
}).Error("Go-Quai Panicked")
}
}()
asyncTimer := time.NewTicker(c_asyncWorkShareTimer)
defer asyncTimer.Stop()
for {
select {
case <-asyncTimer.C:
// Every time we read the broadcast set, get the next set of transactions and
// update the phcache
// Get the latest transactions to be broadcasted from the pool
sl.txPool.broadcastSetMu.RLock()
if len(sl.txPool.broadcastSet) > 0 {
txs := sl.txPool.broadcastSet
hash := types.DeriveSha(txs, trie.NewStackTrie(nil))
bestPh, exists := sl.readPhCache(sl.bestPhKey)
if exists {
bestPh.WorkObject().WorkObjectHeader().SetLocation(sl.NodeLocation())
bestPh.WorkObject().WorkObjectHeader().SetTxHash(hash)
sl.writePhCache(sl.bestPhKey, bestPh)
sl.miner.worker.pendingHeaderFeed.Send(bestPh.WorkObject())
}
sl.txPool.broadcastSetCache.Add(hash, txs)
}
sl.txPool.broadcastSetMu.RUnlock()
case <-sl.quit:
return
}
}
}

func (sl *Slice) GetTxsFromBroadcastSet(hash common.Hash) (types.Transactions, error) {
sl.txPool.broadcastSet = types.Transactions{}
return sl.txPool.GetTxsFromBroadcastSet(hash)
}

////// Expansion related logic

const (
Expand Down
36 changes: 25 additions & 11 deletions core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ const (
// c_reorgCounterThreshold determines the frequency of the timing prints
// around important functions in txpool
c_reorgCounterThreshold = 200

// c_broadcastSetCacheSize is the maxminum number of latest broadcastSets that we keep in
// the pool
c_broadcastSetCacheSize = 10
)

var (
Expand Down Expand Up @@ -194,7 +198,7 @@ var DefaultTxPoolConfig = TxPoolConfig{
GlobalQueue: 2048,
QiPoolSize: 10024,
Lifetime: 3 * time.Hour,
ReorgFrequency: 2 * time.Second,
ReorgFrequency: 1 * time.Second,
}

// sanitize checks the provided user configurations and changes anything that's
Expand Down Expand Up @@ -286,7 +290,6 @@ type TxPool struct {
chainconfig *params.ChainConfig
chain blockChain
gasPrice *big.Int
txFeed event.Feed
scope event.SubscriptionScope
signer types.Signer
mu sync.RWMutex
Expand All @@ -310,6 +313,10 @@ type TxPool struct {
localTxsCount int // count of txs in last 1 min. Purely for logging purpose
remoteTxsCount int // count of txs in last 1 min. Purely for logging purpose

broadcastSetCache *lru.Cache[common.Hash, types.Transactions]
broadcastSetMu sync.RWMutex
broadcastSet types.Transactions

reOrgCounter int // keeps track of the number of times the runReorg is called, it is reset every c_reorgCounterThreshold times

chainHeadCh chan ChainHeadEvent
Expand Down Expand Up @@ -379,6 +386,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
reqResetCh: make(chan *txpoolResetRequest),
reqPromoteCh: make(chan *accountSet),
queueTxEventCh: make(chan *types.Transaction),
broadcastSet: make(types.Transactions, 0),
reorgDoneCh: make(chan chan struct{}),
reorgShutdownCh: make(chan struct{}),
gasPrice: new(big.Int).SetUint64(config.PriceLimit),
Expand All @@ -388,6 +396,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
logger: logger,
}
pool.senders, _ = lru.New[common.Hash, common.InternalAddress](int(config.MaxSenders))
pool.broadcastSetCache, _ = lru.New[common.Hash, types.Transactions](c_broadcastSetCacheSize)
pool.locals = newAccountSet(pool.signer)
for _, addr := range config.Locals {
logger.WithField("address", addr).Debug("Setting new local account")
Expand Down Expand Up @@ -524,12 +533,6 @@ func (pool *TxPool) Stop() {
pool.logger.Info("Transaction pool stopped")
}

// SubscribeNewTxsEvent registers a subscription of NewTxsEvent and
// starts sending event to the given channel.
func (pool *TxPool) SubscribeNewTxsEvent(ch chan<- NewTxsEvent) event.Subscription {
return pool.scope.Track(pool.txFeed.Subscribe(ch))
}

// GasPrice returns the current gas price enforced by the transaction pool.
func (pool *TxPool) GasPrice() *big.Int {
pool.mu.RLock()
Expand Down Expand Up @@ -636,6 +639,14 @@ func (pool *TxPool) QiPoolPending() map[common.Hash]*types.TxWithMinerFee {
return qiTxs
}

func (pool *TxPool) GetTxsFromBroadcastSet(hash common.Hash) (types.Transactions, error) {
txs, ok := pool.broadcastSetCache.Get(hash)
if !ok {
return types.Transactions{}, fmt.Errorf("cannot find the txs in the broadcast set for txhash [%s]", hash)
}
return txs, nil
}

// Pending retrieves all currently processable transactions, grouped by origin
// account and sorted by nonce. The returned transaction set is a copy and can be
// freely modified by calling code.
Expand Down Expand Up @@ -1573,16 +1584,19 @@ func (pool *TxPool) runReorg(done chan struct{}, cancel chan struct{}, reset *tx
}
events[internal].Put(tx)
}
var txs []*types.Transaction
if len(events) > 0 {
var txs []*types.Transaction
for _, set := range events {
txs = append(txs, set.Flatten()...)
}
pool.txFeed.Send(NewTxsEvent{txs})
}
if len(queuedQiTxs) > 0 {
pool.txFeed.Send(NewTxsEvent{queuedQiTxs})
txs = append(txs, queuedQiTxs...)
}
pool.broadcastSetMu.Lock()
pool.broadcastSet = append(pool.broadcastSet, txs...)
pool.broadcastSetMu.Unlock()

if pool.reOrgCounter == c_reorgCounterThreshold {
pool.logger.WithField("time", common.PrettyDuration(time.Since(start))).Debug("Time taken to runReorg in txpool")
pool.reOrgCounter = 0
Expand Down
3 changes: 3 additions & 0 deletions core/types/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,9 @@ func (h *Header) EmptyReceipts() bool {
// CopyHeader creates a deep copy of a block header to prevent side effects from
// modifying a header variable.
func CopyHeader(h *Header) *Header {
if h == nil {
return nil
}
cpy := *h
cpy.parentHash = make([]common.Hash, common.HierarchyDepth-1)
cpy.manifestHash = make([]common.Hash, common.HierarchyDepth)
Expand Down
Loading

0 comments on commit c1089b2

Please sign in to comment.