Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Broadcasting transactions using work shares #1843

Merged
merged 5 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions cmd/utils/hierarchical_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,7 @@ func (hc *HierarchicalCoordinator) startNode(logPath string, quaiBackend quai.Co

if quaiBackend.ProcessingState(location) && location.Context() == common.ZONE_CTX {
// Subscribe to the new topics after setting the api backend
hc.p2p.Subscribe(location, &types.WorkObjectHeader{})
hc.p2p.Subscribe(location, &types.Transactions{})
hc.p2p.Subscribe(location, &types.WorkObjectShareView{})
}

if location.Context() == common.PRIME_CTX || location.Context() == common.REGION_CTX || quaiBackend.ProcessingState(location) {
Expand Down
6 changes: 3 additions & 3 deletions common/bootnodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ var (
"/ip4/34.136.140.151/tcp/4001/p2p/12D3KooWJnWmBukEbZtGPPJvT1r4tQ97CRSGmnjHewcrjNB8oRxU",
},
"orchard": {
"/ip4/34.23.101.139/tcp/4001/p2p/12D3KooWF3uxeu5dTSDS3HGfTMMc8BSkrWTUi8aEntMNEvTifETm",
"/ip4/34.122.101.50/tcp/4001/p2p/12D3KooWJii9yWcyorgzDmeRxCtnKnbVtZ6tPy7UacSJECgEWyMg",
"/ip4/34.136.175.169/tcp/4001/p2p/12D3KooWC2PmuLv8kQfoqKdX7ta9ph5qJFKH8HuSdEdC5zZB5J2g",
"/ip4/34.23.101.139/tcp/4001/p2p/12D3KooW9tqyvhhEV5A6Fc1UDaPfF9vzQfr67tck8gHD6rhhyUV7",
"/ip4/34.122.101.50/tcp/4001/p2p/12D3KooWCeucchYeo3Ayn6RwYazEppCzsuVTFryqsut2F3jYtjvM",
"/ip4/34.136.175.169/tcp/4001/p2p/12D3KooWScCUPsMigdHmATmSjt2YH6qok2THVpXo6ea74zL3Emb9",
},
}
)
4 changes: 0 additions & 4 deletions consensus/blake3pow/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,10 +272,6 @@ func (blake3pow *Blake3pow) VerifyUncles(chain consensus.ChainReader, block *typ
if diff := new(big.Int).Sub(uncle.Number(), parentNumber); diff.Cmp(big.NewInt(1)) != 0 {
return consensus.ErrInvalidNumber
}

if !blake3pow.CheckIfValidWorkShare(uncle) {
return errors.New("invalid workshare included")
}
}
}
return nil
Expand Down
5 changes: 0 additions & 5 deletions consensus/progpow/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,11 +273,6 @@ func (progpow *Progpow) VerifyUncles(chain consensus.ChainReader, block *types.W
if diff := new(big.Int).Sub(uncle.Number(), parentNumber); diff.Cmp(big.NewInt(1)) != 0 {
return consensus.ErrInvalidNumber
}

if !progpow.CheckIfValidWorkShare(uncle) {
return errors.New("invalid workshare included")
}

}
}
return nil
Expand Down
1 change: 1 addition & 0 deletions core/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ func (v *BlockValidator) ValidateState(block *types.WorkObject, statedb *state.S
// to keep the baseline gas close to the provided target, and increase it towards
// the target if the baseline gas is lower.
func CalcGasLimit(parent *types.WorkObject, gasCeil uint64) uint64 {
return params.MinGasLimit
// No Gas for TimeToStartTx days worth of zone blocks, this gives enough time to
// onboard new miners into the slice
if parent.NumberU64(common.ZONE_CTX) < params.TimeToStartTx {
Expand Down
17 changes: 13 additions & 4 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ const (
c_normalListBackoffThreshold = 5 // Max multiple on the c_normalListProcCounter
c_maxRemoteTxQueue = 50000
c_remoteTxProcPeriod = 2 // Time between remote tx pool processing
c_asyncWorkShareTimer = 1 * time.Second
)

type blockNumberAndRetryCounter struct {
Expand Down Expand Up @@ -541,6 +542,10 @@ func (c *Core) TxPool() *TxPool {
return c.sl.txPool
}

func (c *Core) GetTxsFromBroadcastSet(hash common.Hash) (types.Transactions, error) {
return c.sl.GetTxsFromBroadcastSet(hash)
}

func (c *Core) Stop() {
// Delete the append queue
c.appendQueue.Purge()
Expand Down Expand Up @@ -651,6 +656,10 @@ func (c *Core) ConstructLocalMinedBlock(woHeader *types.WorkObject) (*types.Work
return c.sl.ConstructLocalMinedBlock(woHeader)
}

func (c *Core) GetPendingBlockBody(woHeader *types.WorkObjectHeader) *types.WorkObject {
return c.sl.GetPendingBlockBody(woHeader)
}

func (c *Core) SubRelayPendingHeader(slPendingHeader types.PendingHeader, newEntropy *big.Int, location common.Location, subReorg bool, order int) {
c.sl.SubRelayPendingHeader(slPendingHeader, newEntropy, location, subReorg, order)
}
Expand Down Expand Up @@ -945,6 +954,10 @@ func (c *Core) WriteAddressOutpoints(outpoints map[string]map[string]*types.Outp
return c.sl.hc.WriteAddressOutpoints(outpoints)
}

func (c *Core) GetMaxTxInWorkShare() uint64 {
return c.sl.hc.GetMaxTxInWorkShare()
}

//--------------------//
// BlockChain methods //
//--------------------//
Expand Down Expand Up @@ -994,10 +1007,6 @@ func (c *Core) TxLookupLimit() uint64 {
return 0
}

func (c *Core) SubscribeNewTxsEvent(ch chan<- NewTxsEvent) event.Subscription {
return c.sl.txPool.SubscribeNewTxsEvent(ch)
}

func (c *Core) SetExtra(extra []byte) error {
return c.sl.miner.SetExtra(extra)
}
Expand Down
2 changes: 1 addition & 1 deletion core/genesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ func DefaultOrchardGenesisBlock(consensusEngine string) *Genesis {
Nonce: 66,
ExtraData: hexutil.MustDecode("0x11bbe8db4e347b4e8c937c1c8370e4b5ed33adb3db69cbdb7a38e1e50b1b82fc"),
GasLimit: 5000000,
Difficulty: big.NewInt(40000000),
Difficulty: big.NewInt(900000),
}
}
return &Genesis{
Expand Down
8 changes: 8 additions & 0 deletions core/headerchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io"
"math"
"math/big"
"runtime/debug"
"sync"
Expand Down Expand Up @@ -1080,3 +1081,10 @@ func (hc *HeaderChain) GetPrimeTerminus(header *types.WorkObject) *types.WorkObj
func (hc *HeaderChain) WriteAddressOutpoints(outpoints map[string]map[string]*types.OutpointAndDenomination) error {
return rawdb.WriteAddressOutpoints(hc.bc.db, outpoints)
}

func (hc *HeaderChain) GetMaxTxInWorkShare() uint64 {
currentGasLimit := hc.CurrentHeader().GasLimit()
maxEoaInBlock := currentGasLimit / params.TxGas
// (maxEoaInBlock*2)/(2^bits)
return (maxEoaInBlock * 2) / uint64(math.Pow(2, float64(params.WorkSharesThresholdDiff)))
}
4 changes: 2 additions & 2 deletions core/rawdb/accessors_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -750,7 +750,7 @@ func WriteWorkObjectBody(db ethdb.KeyValueWriter, hash common.Hash, workObject *
key := workObjectBodyKey(hash)
WriteHeaderNumber(db, hash, workObject.NumberU64(nodeCtx))

protoWorkObjectBody, err := workObject.Body().ProtoEncode()
protoWorkObjectBody, err := workObject.Body().ProtoEncode(woType)
if err != nil {
db.Logger().WithField("err", err).Fatal("Failed to proto encode work object body")
}
Expand Down Expand Up @@ -1044,7 +1044,7 @@ func (b badWorkObject) ProtoEncode() *ProtoBadWorkObject {
if err != nil {
log.Global.WithField("err", err).Fatal("Failed to proto encode header")
}
protoWorkObjectBody, err := b.woBody.ProtoEncode()
protoWorkObjectBody, err := b.woBody.ProtoEncode(types.BlockObject)
if err != nil {
log.Global.WithField("err", err).Fatal("Failed to proto encode body")
}
Expand Down
51 changes: 48 additions & 3 deletions core/slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ func NewSlice(db ethdb.Database, config *Config, txConfig *TxPoolConfig, txLooku

if nodeCtx == common.ZONE_CTX && sl.ProcessingState() {
go sl.asyncPendingHeaderLoop()
go sl.asyncWorkShareUpdateLoop()
}

return sl, nil
Expand Down Expand Up @@ -1323,7 +1324,7 @@ func (sl *Slice) ConstructLocalMinedBlock(wo *types.WorkObject) (*types.WorkObje
nodeCtx := sl.NodeLocation().Context()
var pendingBlockBody *types.WorkObject
if nodeCtx == common.ZONE_CTX {
pendingBlockBody = sl.GetPendingBlockBody(wo)
pendingBlockBody = sl.GetPendingBlockBody(wo.WorkObjectHeader())
if pendingBlockBody == nil {
sl.logger.WithFields(log.Fields{"wo.Hash": wo.Hash(),
"wo.Header": wo.HeaderHash(),
Expand Down Expand Up @@ -1413,8 +1414,10 @@ func (sl *Slice) combinePendingHeader(header *types.WorkObject, slPendingHeader
}

if inSlice {
combinedPendingHeader.Header().SetEtxRollupHash(header.EtxRollupHash())
combinedPendingHeader.WorkObjectHeader().SetDifficulty(header.Difficulty())
combinedPendingHeader.WorkObjectHeader().SetTxHash(header.TxHash())

combinedPendingHeader.Header().SetEtxRollupHash(header.EtxRollupHash())
combinedPendingHeader.Header().SetUncledS(header.Header().UncledS())
combinedPendingHeader.Header().SetUncleHash(header.UncleHash())
combinedPendingHeader.Header().SetTxHash(header.Header().TxHash())
Expand Down Expand Up @@ -1578,7 +1581,7 @@ func (sl *Slice) NewGenesisPendingHeader(domPendingHeader *types.WorkObject, dom
return nil
}

func (sl *Slice) GetPendingBlockBody(wo *types.WorkObject) *types.WorkObject {
func (sl *Slice) GetPendingBlockBody(wo *types.WorkObjectHeader) *types.WorkObject {
blockBody, _ := sl.miner.worker.GetPendingBlockBody(wo)
return blockBody
}
Expand Down Expand Up @@ -1877,6 +1880,48 @@ func (sl *Slice) GetSlicesRunning() []common.Location {
return sl.hc.SlicesRunning()
}

func (sl *Slice) asyncWorkShareUpdateLoop() {
defer func() {
if r := recover(); r != nil {
sl.logger.WithFields(log.Fields{
"error": r,
"stacktrace": string(debug.Stack()),
}).Error("Go-Quai Panicked")
}
}()
asyncTimer := time.NewTicker(c_asyncWorkShareTimer)
defer asyncTimer.Stop()
for {
select {
case <-asyncTimer.C:
// Every time we read the broadcast set, get the next set of transactions and
// update the phcache
// Get the latest transactions to be broadcasted from the pool
if len(sl.txPool.broadcastSet) > 0 {
txs := make(types.Transactions, len(sl.txPool.broadcastSet))
copy(txs, sl.txPool.broadcastSet)

hash := types.DeriveSha(txs, trie.NewStackTrie(nil))
bestPh, exists := sl.readPhCache(sl.bestPhKey)
if exists {
bestPh.WorkObject().WorkObjectHeader().SetLocation(sl.NodeLocation())
bestPh.WorkObject().WorkObjectHeader().SetTxHash(hash)
sl.writePhCache(sl.bestPhKey, bestPh)
sl.miner.worker.pendingHeaderFeed.Send(bestPh.WorkObject())
}
sl.txPool.broadcastSetCache.Add(hash, txs)
}
case <-sl.quit:
return
}
}
}

func (sl *Slice) GetTxsFromBroadcastSet(hash common.Hash) (types.Transactions, error) {
sl.txPool.broadcastSet = types.Transactions{}
return sl.txPool.GetTxsFromBroadcastSet(hash)
}

////// Expansion related logic

const (
Expand Down
49 changes: 33 additions & 16 deletions core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ const (
// c_reorgCounterThreshold determines the frequency of the timing prints
// around important functions in txpool
c_reorgCounterThreshold = 200

// c_broadcastSetCacheSize is the maxminum number of latest broadcastSets that we keep in
// the pool
c_broadcastSetCacheSize = 10
)

var (
Expand Down Expand Up @@ -154,6 +158,7 @@ type blockChain interface {
GetHeaderOrCandidate(common.Hash, uint64) *types.WorkObject
NodeCtx() int
GetHeaderByHash(common.Hash) *types.WorkObject
GetMaxTxInWorkShare() uint64
}

// TxPoolConfig are the configuration parameters of the transaction pool.
Expand Down Expand Up @@ -187,14 +192,14 @@ var DefaultTxPoolConfig = TxPoolConfig{
PriceBump: 5,

AccountSlots: 10,
GlobalSlots: 9000 + 1024, // urgent + floating queue capacity with 4:1 ratio
MaxSenders: 10000, // 5 MB - at least 10 blocks worth of transactions in case of reorg or high production rate
SendersChBuffer: 1024, // at 500 TPS in zone, 2s buffer
AccountQueue: 1,
GlobalQueue: 2048,
GlobalSlots: 19000 + 1024, // urgent + floating queue capacity with 4:1 ratio
MaxSenders: 10000, // 5 MB - at least 10 blocks worth of transactions in case of reorg or high production rate
SendersChBuffer: 1024, // at 500 TPS in zone, 2s buffer
AccountQueue: 3,
GlobalQueue: 20048,
QiPoolSize: 10024,
Lifetime: 3 * time.Hour,
ReorgFrequency: 2 * time.Second,
ReorgFrequency: 1 * time.Second,
}

// sanitize checks the provided user configurations and changes anything that's
Expand Down Expand Up @@ -286,7 +291,6 @@ type TxPool struct {
chainconfig *params.ChainConfig
chain blockChain
gasPrice *big.Int
txFeed event.Feed
scope event.SubscriptionScope
signer types.Signer
mu sync.RWMutex
Expand All @@ -310,6 +314,10 @@ type TxPool struct {
localTxsCount int // count of txs in last 1 min. Purely for logging purpose
remoteTxsCount int // count of txs in last 1 min. Purely for logging purpose

broadcastSetCache *lru.Cache[common.Hash, types.Transactions]
broadcastSetMu sync.RWMutex
broadcastSet types.Transactions

reOrgCounter int // keeps track of the number of times the runReorg is called, it is reset every c_reorgCounterThreshold times

chainHeadCh chan ChainHeadEvent
Expand Down Expand Up @@ -379,6 +387,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
reqResetCh: make(chan *txpoolResetRequest),
reqPromoteCh: make(chan *accountSet),
queueTxEventCh: make(chan *types.Transaction),
broadcastSet: make(types.Transactions, 0),
reorgDoneCh: make(chan chan struct{}),
reorgShutdownCh: make(chan struct{}),
gasPrice: new(big.Int).SetUint64(config.PriceLimit),
Expand All @@ -388,6 +397,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
logger: logger,
}
pool.senders, _ = lru.New[common.Hash, common.InternalAddress](int(config.MaxSenders))
pool.broadcastSetCache, _ = lru.New[common.Hash, types.Transactions](c_broadcastSetCacheSize)
pool.locals = newAccountSet(pool.signer)
for _, addr := range config.Locals {
logger.WithField("address", addr).Debug("Setting new local account")
Expand Down Expand Up @@ -524,12 +534,6 @@ func (pool *TxPool) Stop() {
pool.logger.Info("Transaction pool stopped")
}

// SubscribeNewTxsEvent registers a subscription of NewTxsEvent and
// starts sending event to the given channel.
func (pool *TxPool) SubscribeNewTxsEvent(ch chan<- NewTxsEvent) event.Subscription {
return pool.scope.Track(pool.txFeed.Subscribe(ch))
}

// GasPrice returns the current gas price enforced by the transaction pool.
func (pool *TxPool) GasPrice() *big.Int {
pool.mu.RLock()
Expand Down Expand Up @@ -636,6 +640,14 @@ func (pool *TxPool) QiPoolPending() map[common.Hash]*types.TxWithMinerFee {
return qiTxs
}

func (pool *TxPool) GetTxsFromBroadcastSet(hash common.Hash) (types.Transactions, error) {
txs, ok := pool.broadcastSetCache.Get(hash)
if !ok {
return types.Transactions{}, fmt.Errorf("cannot find the txs in the broadcast set for txhash [%s]", hash)
}
return txs, nil
}

// Pending retrieves all currently processable transactions, grouped by origin
// account and sorted by nonce. The returned transaction set is a copy and can be
// freely modified by calling code.
Expand Down Expand Up @@ -1573,16 +1585,21 @@ func (pool *TxPool) runReorg(done chan struct{}, cancel chan struct{}, reset *tx
}
events[internal].Put(tx)
}
var txs []*types.Transaction
if len(events) > 0 {
var txs []*types.Transaction
for _, set := range events {
txs = append(txs, set.Flatten()...)
}
pool.txFeed.Send(NewTxsEvent{txs})
}
if len(queuedQiTxs) > 0 {
pool.txFeed.Send(NewTxsEvent{queuedQiTxs})
txs = append(txs, queuedQiTxs...)
}
if len(pool.broadcastSet)+len(txs) < int(pool.chain.GetMaxTxInWorkShare()) {
pool.broadcastSetMu.Lock()
pool.broadcastSet = append(pool.broadcastSet, txs...)
pool.broadcastSetMu.Unlock()
}

if pool.reOrgCounter == c_reorgCounterThreshold {
pool.logger.WithField("time", common.PrettyDuration(time.Since(start))).Debug("Time taken to runReorg in txpool")
pool.reOrgCounter = 0
Expand Down
3 changes: 3 additions & 0 deletions core/types/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,9 @@ func (h *Header) EmptyReceipts() bool {
// CopyHeader creates a deep copy of a block header to prevent side effects from
// modifying a header variable.
func CopyHeader(h *Header) *Header {
if h == nil {
return nil
}
cpy := *h
cpy.parentHash = make([]common.Hash, common.HierarchyDepth-1)
cpy.manifestHash = make([]common.Hash, common.HierarchyDepth)
Expand Down
Loading
Loading