From 1df76911733cc325a1604ccd0f6a8d2d19e49853 Mon Sep 17 00:00:00 2001 From: gop Date: Thu, 25 Apr 2024 09:20:52 -0500 Subject: [PATCH] Workshares are included and being able to mine workshares are sub shares found in the process of producing a block. Including these sub share samples in the block makes the statistical convergence better. So in this PR, workshares have been added into the uncles list. The weight of the workshares is added into the parent entropy and also parent sub delta S. But a discount based on the reference depth and frequency of workshares is applied while adding the entropy of the workshare. --- cmd/utils/hierarchical_coordinator.go | 1 + consensus/blake3pow/consensus.go | 27 +++++++-- consensus/blake3pow/poem.go | 59 +++++++++++++++++++ consensus/blake3pow/sealer.go | 8 ++- consensus/consensus.go | 10 ++++ consensus/progpow/consensus.go | 36 ++++++++++-- consensus/progpow/poem.go | 62 +++++++++++++++++++ consensus/progpow/sealer.go | 8 ++- core/core.go | 8 +++ core/events.go | 3 +- core/headerchain.go | 2 +- core/slice.go | 2 +- core/worker.go | 85 +++++++++++++++------------ internal/quaiapi/backend.go | 3 + internal/quaiapi/quai_api.go | 25 ++++++++ p2p/node/api.go | 1 + p2p/pb/proto_services.go | 20 +++++++ p2p/pubsubManager/utils.go | 11 ++-- params/protocol_params.go | 3 + quai/api_backend.go | 12 ++++ quai/p2p_backend.go | 8 +++ quaiclient/quaiclient.go | 5 ++ 22 files changed, 341 insertions(+), 58 deletions(-) diff --git a/cmd/utils/hierarchical_coordinator.go b/cmd/utils/hierarchical_coordinator.go index 470f360516..f427bac307 100644 --- a/cmd/utils/hierarchical_coordinator.go +++ b/cmd/utils/hierarchical_coordinator.go @@ -137,6 +137,7 @@ func (hc *HierarchicalCoordinator) startNode(logPath string, quaiBackend quai.Co hc.p2p.Subscribe(location, &types.WorkObject{}) hc.p2p.Subscribe(location, common.Hash{}) hc.p2p.Subscribe(location, &types.Transaction{}) + hc.p2p.Subscribe(location, &types.WorkObjectHeader{}) StartNode(stack) diff --git a/consensus/blake3pow/consensus.go b/consensus/blake3pow/consensus.go index d72d701dae..c3ae192010 100644 --- a/consensus/blake3pow/consensus.go +++ b/consensus/blake3pow/consensus.go @@ -24,7 +24,6 @@ import ( // Blake3pow proof-of-work protocol constants. var ( - maxUncles = 2 // Maximum number of uncles allowed in a single block allowedFutureBlockTimeSeconds = int64(15) // Max seconds from current time allowed for blocks, before they're considered future blocks ContextTimeFactor = big10 @@ -196,7 +195,7 @@ func (blake3pow *Blake3pow) VerifyUncles(chain consensus.ChainReader, block *typ return nil } // Verify that there are at most 2 uncles included in this block - if len(block.Uncles()) > maxUncles { + if len(block.Uncles()) > params.MaxUncleCount { return errTooManyUncles } if len(block.Uncles()) == 0 { @@ -206,7 +205,7 @@ func (blake3pow *Blake3pow) VerifyUncles(chain consensus.ChainReader, block *typ uncles, ancestors := mapset.NewSet(), make(map[common.Hash]*types.WorkObject) number, parent := block.NumberU64(nodeCtx)-1, block.ParentHash(nodeCtx) - for i := 0; i < 7; i++ { + for i := 0; i < params.WorkSharesInclusionDepth; i++ { ancestorHeader := chain.GetHeader(parent, number) if ancestorHeader == nil { break @@ -244,8 +243,9 @@ func (blake3pow *Blake3pow) VerifyUncles(chain consensus.ChainReader, block *typ if ancestors[uncle.ParentHash()] == nil || uncle.ParentHash() == block.ParentHash(nodeCtx) { return errDanglingUncle } - // Verify the seal and get the powHash for the given header - err := blake3pow.verifySeal(uncle) + + // make sure that the work can be computed + _, err := blake3pow.ComputePowHash(uncle) if err != nil { return err } @@ -258,6 +258,19 @@ func (blake3pow *Blake3pow) VerifyUncles(chain consensus.ChainReader, block *typ if expected.Cmp(uncle.Difficulty()) != 0 { return fmt.Errorf("uncle has invalid difficulty: have %v, want %v", uncle.Difficulty(), expected) } + + // Verify that the work share number is parent's +1 + parentNumber := parent.Number(nodeCtx) + if chain.IsGenesisHash(parent.Hash()) { + parentNumber = big.NewInt(0) + } + if diff := new(big.Int).Sub(uncle.Number(), parentNumber); diff.Cmp(big.NewInt(1)) != 0 { + return consensus.ErrInvalidNumber + } + + if !blake3pow.CheckIfValidWorkShare(uncle) { + return errors.New("invalid workshare included") + } } } return nil @@ -627,3 +640,7 @@ func (blake3pow *Blake3pow) NodeLocation() common.Location { func (blake3pow *Blake3pow) ComputePowLight(header *types.WorkObjectHeader) (common.Hash, common.Hash) { panic("compute pow light doesnt exist for blake3") } + +func (blake3pow *Blake3pow) ComputePowHash(header *types.WorkObjectHeader) (common.Hash, error) { + return header.Hash(), nil +} diff --git a/consensus/blake3pow/poem.go b/consensus/blake3pow/poem.go index 9d6fc48ebe..4f8cdd8cc8 100644 --- a/consensus/blake3pow/poem.go +++ b/consensus/blake3pow/poem.go @@ -77,6 +77,11 @@ func (blake3pow *Blake3pow) TotalLogS(chain consensus.GenesisReader, header *typ if err != nil { return big.NewInt(0) } + workShareS, err := blake3pow.WorkShareLogS(header) + if err != nil { + return big.NewInt(0) + } + intrinsicS = new(big.Int).Add(intrinsicS, workShareS) switch order { case common.PRIME_CTX: totalS := new(big.Int).Add(header.ParentEntropy(common.PRIME_CTX), header.ParentDeltaS(common.REGION_CTX)) @@ -119,6 +124,11 @@ func (blake3pow *Blake3pow) DeltaLogS(chain consensus.GenesisReader, header *typ if err != nil { return big.NewInt(0) } + workShareS, err := blake3pow.WorkShareLogS(header) + if err != nil { + return big.NewInt(0) + } + intrinsicS = new(big.Int).Add(intrinsicS, workShareS) switch order { case common.PRIME_CTX: return big.NewInt(0) @@ -149,6 +159,41 @@ func (blake3pow *Blake3pow) UncledLogS(block *types.WorkObject) *big.Int { return totalUncledLogS } +func (blake3pow *Blake3pow) WorkShareLogS(wo *types.WorkObject) (*big.Int, error) { + workShares := wo.Uncles() + totalWsEntropy := big.NewInt(0) + for _, ws := range workShares { + powHash, err := blake3pow.ComputePowHash(ws) + if err != nil { + return big.NewInt(0), err + } + // compute the diff from the pow hash so that the work can be discounted + powDiff := new(big.Int).Div(common.Big2e256, new(big.Int).SetBytes(powHash.Bytes())) + // Two discounts need to be applied to the weight of each work share + // 1) Discount based on the amount of number of other possible work + // shares for the same entropy value + // 2) Discount based on the staleness of inclusion, for every block + // delay the weight gets reduced by the factor of 2 + + // Discount 1) only applies if the workshare has less weight than the + // work object threshold + wsEntropy := blake3pow.IntrinsicLogS(powHash) + woDiff := new(big.Int).Set(wo.Difficulty()) + target := new(big.Int).Div(common.Big2e256, woDiff) + if new(big.Int).SetBytes(powHash.Bytes()).Cmp(target) > 0 { // powHash > target + c, _ := mathutil.BinaryLog(powDiff, mantBits) + thresoldC, _ := mathutil.BinaryLog(woDiff, mantBits) + wsEntropy = new(big.Int).Div(wsEntropy, new(big.Int).Exp(big.NewInt(2), big.NewInt(int64(thresoldC-c)), nil)) + } + // Discount 2) applies to all shares regardless of the weight + wsEntropy = new(big.Int).Div(wsEntropy, new(big.Int).Exp(big.NewInt(2), big.NewInt(int64(wo.NumberU64(common.ZONE_CTX)-ws.NumberU64())), nil)) + + // Add the entropy into the total entropy once the discount calculation is done + totalWsEntropy.Add(totalWsEntropy, wsEntropy) + } + return totalWsEntropy, nil +} + func (blake3pow *Blake3pow) UncledSubDeltaLogS(chain consensus.GenesisReader, header *types.WorkObject) *big.Int { // Treating the genesis block differntly if chain.IsGenesisHash(header.Hash()) { @@ -203,3 +248,17 @@ func (blake3pow *Blake3pow) CalcRank(chain consensus.GenesisReader, header *type return 0, nil } + +func (blake3pow *Blake3pow) CheckIfValidWorkShare(workShare *types.WorkObjectHeader) bool { + // Extract some data from the header + diff := new(big.Int).Set(workShare.Difficulty()) + c, _ := mathutil.BinaryLog(diff, mantBits) + workShareThreshold := c - params.WorkSharesThresholdDiff + workShareDiff := new(big.Int).Exp(big.NewInt(2), big.NewInt(int64(workShareThreshold)), nil) + workShareMintarget := new(big.Int).Div(big2e256, workShareDiff) + powHash, err := blake3pow.ComputePowHash(workShare) + if err != nil { + return false + } + return new(big.Int).SetBytes(powHash.Bytes()).Cmp(workShareMintarget) <= 0 +} diff --git a/consensus/blake3pow/sealer.go b/consensus/blake3pow/sealer.go index f66663d1af..7268fd90d6 100644 --- a/consensus/blake3pow/sealer.go +++ b/consensus/blake3pow/sealer.go @@ -12,6 +12,8 @@ import ( "github.com/dominant-strategies/go-quai/core/types" "github.com/dominant-strategies/go-quai/log" + "github.com/dominant-strategies/go-quai/params" + "modernc.org/mathutil" ) const ( @@ -127,8 +129,12 @@ func (blake3pow *Blake3pow) Seal(header *types.WorkObject, results chan<- *types // seed that results in correct final header difficulty. func (blake3pow *Blake3pow) mine(header *types.WorkObject, id int, seed uint64, abort chan struct{}, found chan *types.WorkObject) { // Extract some data from the header + diff := new(big.Int).Set(header.Difficulty()) + c, _ := mathutil.BinaryLog(diff, mantBits) + workShareThreshold := c - params.WorkSharesThresholdDiff + workShareDiff := new(big.Int).Exp(big.NewInt(2), big.NewInt(int64(workShareThreshold)), nil) var ( - target = new(big.Int).Div(big2e256, header.Difficulty()) + target = new(big.Int).Div(big2e256, workShareDiff) ) // Start generating random nonces until we abort or find a good one var ( diff --git a/consensus/consensus.go b/consensus/consensus.go index 5be12b2b17..fbd262aa93 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -99,6 +99,13 @@ type Engine interface { // UncledLogS returns the log of the entropy reduction by uncles referenced in the block UncledLogS(block *types.WorkObject) *big.Int + // WorkShareLogS returns the log of the entropy reduction by the workshare referenced in the block + WorkShareLogS(block *types.WorkObject) (*big.Int, error) + + // CheckIfValidWorkShare checks if the workshare meets the work share + // requirements defined by the protocol + CheckIfValidWorkShare(workShare *types.WorkObjectHeader) bool + // UncledUncledSubDeltaLogS returns the log of the uncled entropy reduction since the past coincident UncledSubDeltaLogS(chain GenesisReader, header *types.WorkObject) *big.Int @@ -151,6 +158,9 @@ type Engine interface { // that a new block should have. CalcDifficulty(chain ChainHeaderReader, parent *types.WorkObjectHeader) *big.Int + // ComputePowHash returns the pow hash of the workobject header + ComputePowHash(header *types.WorkObjectHeader) (common.Hash, error) + // IsDomCoincident returns true if this block satisfies the difficulty order // of a dominant chain. If this node does not have a dominant chain (i.e. // if this is a prime node), then the function will always return false. diff --git a/consensus/progpow/consensus.go b/consensus/progpow/consensus.go index 20120e2f78..21849db532 100644 --- a/consensus/progpow/consensus.go +++ b/consensus/progpow/consensus.go @@ -25,7 +25,6 @@ import ( // Progpow proof-of-work protocol constants. var ( - maxUncles = 2 // Maximum number of uncles allowed in a single block allowedFutureBlockTimeSeconds = int64(15) // Max seconds from current time allowed for blocks, before they're considered future blocks ContextTimeFactor = big10 @@ -198,7 +197,7 @@ func (progpow *Progpow) VerifyUncles(chain consensus.ChainReader, block *types.W return nil } // Verify that there are at most 2 uncles included in this block - if len(block.Uncles()) > maxUncles { + if len(block.Uncles()) > params.MaxUncleCount { return errTooManyUncles } if len(block.Uncles()) == 0 { @@ -208,7 +207,7 @@ func (progpow *Progpow) VerifyUncles(chain consensus.ChainReader, block *types.W uncles, ancestors := mapset.NewSet(), make(map[common.Hash]*types.WorkObject) number, parent := block.NumberU64(nodeCtx)-1, block.ParentHash(nodeCtx) - for i := 0; i < 7; i++ { + for i := 0; i < params.WorkSharesInclusionDepth; i++ { ancestorHeader := chain.GetHeader(parent, number) if ancestorHeader == nil { break @@ -246,8 +245,7 @@ func (progpow *Progpow) VerifyUncles(chain consensus.ChainReader, block *types.W if ancestors[uncle.ParentHash()] == nil || uncle.ParentHash() == block.ParentHash(nodeCtx) { return errDanglingUncle } - // Verify the seal and get the powHash for the given header - _, err := progpow.verifySeal(uncle) + _, err := progpow.ComputePowHash(uncle) if err != nil { return err } @@ -260,6 +258,20 @@ func (progpow *Progpow) VerifyUncles(chain consensus.ChainReader, block *types.W if expected.Cmp(uncle.Difficulty()) != 0 { return fmt.Errorf("uncle has invalid difficulty: have %v, want %v", uncle.Difficulty(), expected) } + + // Verify that the work share number is parent's +1 + parentNumber := parent.Number(nodeCtx) + if chain.IsGenesisHash(parent.Hash()) { + parentNumber = big.NewInt(0) + } + if diff := new(big.Int).Sub(uncle.Number(), parentNumber); diff.Cmp(big.NewInt(1)) != 0 { + return consensus.ErrInvalidNumber + } + + if !progpow.CheckIfValidWorkShare(uncle) { + return errors.New("invalid workshare included") + } + } } return nil @@ -590,6 +602,20 @@ func (progpow *Progpow) verifySeal(header *types.WorkObjectHeader) (common.Hash, return powHash.(common.Hash), nil } +func (progpow *Progpow) ComputePowHash(header *types.WorkObjectHeader) (common.Hash, error) { + // Check progpow + mixHash := header.PowDigest.Load() + powHash := header.PowHash.Load() + if powHash == nil || mixHash == nil { + mixHash, powHash = progpow.ComputePowLight(header) + } + // Verify the calculated values against the ones provided in the header + if !bytes.Equal(header.MixHash().Bytes(), mixHash.(common.Hash).Bytes()) { + return common.Hash{}, errInvalidMixHash + } + return powHash.(common.Hash), nil +} + // Prepare implements consensus.Engine, initializing the difficulty field of a // header to conform to the progpow protocol. The changes are done inline. func (progpow *Progpow) Prepare(chain consensus.ChainHeaderReader, header *types.WorkObject, parent *types.WorkObject) error { diff --git a/consensus/progpow/poem.go b/consensus/progpow/poem.go index cb93439bdb..9db29fe0e6 100644 --- a/consensus/progpow/poem.go +++ b/consensus/progpow/poem.go @@ -77,6 +77,11 @@ func (progpow *Progpow) TotalLogS(chain consensus.GenesisReader, header *types.W if err != nil { return big.NewInt(0) } + workShareS, err := progpow.WorkShareLogS(header) + if err != nil { + return big.NewInt(0) + } + intrinsicS = new(big.Int).Add(intrinsicS, workShareS) switch order { case common.PRIME_CTX: totalS := new(big.Int).Add(header.ParentEntropy(common.PRIME_CTX), header.ParentDeltaS(common.REGION_CTX)) @@ -111,10 +116,18 @@ func (progpow *Progpow) TotalLogPhS(header *types.WorkObject) *big.Int { } func (progpow *Progpow) DeltaLogS(chain consensus.GenesisReader, header *types.WorkObject) *big.Int { + if chain.IsGenesisHash(header.Hash()) { + return big.NewInt(0) + } intrinsicS, order, err := progpow.CalcOrder(header) if err != nil { return big.NewInt(0) } + workShareS, err := progpow.WorkShareLogS(header) + if err != nil { + return big.NewInt(0) + } + intrinsicS = new(big.Int).Add(intrinsicS, workShareS) switch order { case common.PRIME_CTX: return big.NewInt(0) @@ -145,6 +158,41 @@ func (progpow *Progpow) UncledLogS(block *types.WorkObject) *big.Int { return totalUncledLogS } +func (progpow *Progpow) WorkShareLogS(wo *types.WorkObject) (*big.Int, error) { + workShares := wo.Uncles() + totalWsEntropy := big.NewInt(0) + for _, ws := range workShares { + powHash, err := progpow.ComputePowHash(ws) + if err != nil { + return big.NewInt(0), err + } + // compute the diff from the pow hash so that the work can be discounted + powDiff := new(big.Int).Div(common.Big2e256, new(big.Int).SetBytes(powHash.Bytes())) + // Two discounts need to be applied to the weight of each work share + // 1) Discount based on the amount of number of other possible work + // shares for the same entropy value + // 2) Discount based on the staleness of inclusion, for every block + // delay the weight gets reduced by the factor of 2 + + // Discount 1) only applies if the workshare has less weight than the + // work object threshold + wsEntropy := progpow.IntrinsicLogS(powHash) + woDiff := new(big.Int).Set(wo.Difficulty()) + target := new(big.Int).Div(common.Big2e256, woDiff) + if new(big.Int).SetBytes(powHash.Bytes()).Cmp(target) > 0 { // powHash > target + c, _ := mathutil.BinaryLog(powDiff, mantBits) + thresoldC, _ := mathutil.BinaryLog(woDiff, mantBits) + wsEntropy = new(big.Int).Div(wsEntropy, new(big.Int).Exp(big.NewInt(2), big.NewInt(int64(thresoldC-c)), nil)) + } + // Discount 2) applies to all shares regardless of the weight + wsEntropy = new(big.Int).Div(wsEntropy, new(big.Int).Exp(big.NewInt(2), big.NewInt(int64(wo.NumberU64(common.ZONE_CTX)-ws.NumberU64())), nil)) + + // Add the entropy into the total entropy once the discount calculation is done + totalWsEntropy.Add(totalWsEntropy, wsEntropy) + } + return totalWsEntropy, nil +} + func (progpow *Progpow) UncledSubDeltaLogS(chain consensus.GenesisReader, header *types.WorkObject) *big.Int { // Treating the genesis block differntly if chain.IsGenesisHash(header.Hash()) { @@ -203,3 +251,17 @@ func (progpow *Progpow) CalcRank(chain consensus.GenesisReader, header *types.Wo } return 0, nil } + +func (progpow *Progpow) CheckIfValidWorkShare(workShare *types.WorkObjectHeader) bool { + // Extract some data from the header + diff := new(big.Int).Set(workShare.Difficulty()) + c, _ := mathutil.BinaryLog(diff, mantBits) + workShareThreshold := c - params.WorkSharesThresholdDiff + workShareDiff := new(big.Int).Exp(big.NewInt(2), big.NewInt(int64(workShareThreshold)), nil) + workShareMintarget := new(big.Int).Div(big2e256, workShareDiff) + powHash, err := progpow.ComputePowHash(workShare) + if err != nil { + return false + } + return new(big.Int).SetBytes(powHash.Bytes()).Cmp(workShareMintarget) <= 0 +} diff --git a/consensus/progpow/sealer.go b/consensus/progpow/sealer.go index 2354f789a2..84d1d9b513 100644 --- a/consensus/progpow/sealer.go +++ b/consensus/progpow/sealer.go @@ -13,6 +13,8 @@ import ( "github.com/dominant-strategies/go-quai/common" "github.com/dominant-strategies/go-quai/core/types" "github.com/dominant-strategies/go-quai/log" + "github.com/dominant-strategies/go-quai/params" + "modernc.org/mathutil" ) const ( @@ -128,8 +130,12 @@ func (progpow *Progpow) Seal(header *types.WorkObject, results chan<- *types.Wor // seed that results in correct final block difficulty. func (progpow *Progpow) mine(header *types.WorkObject, id int, seed uint64, abort chan struct{}, found chan *types.WorkObject) { // Extract some data from the header + diff := new(big.Int).Set(header.Difficulty()) + c, _ := mathutil.BinaryLog(diff, mantBits) + workShareThreshold := c - params.WorkSharesThresholdDiff + workShareDiff := new(big.Int).Exp(big.NewInt(2), big.NewInt(int64(workShareThreshold)), nil) var ( - target = new(big.Int).Div(big2e256, header.Difficulty()) + target = new(big.Int).Div(big2e256, workShareDiff) nodeCtx = progpow.config.NodeLocation.Context() ) // Start generating random nonces until we abort or find a good one diff --git a/core/core.go b/core/core.go index c56791c7e5..5fe1a3d988 100644 --- a/core/core.go +++ b/core/core.go @@ -497,6 +497,10 @@ func (c *Core) Engine() consensus.Engine { return c.engine } +func (c *Core) CheckIfValidWorkShare(workShare *types.WorkObjectHeader) bool { + return c.engine.CheckIfValidWorkShare(workShare) +} + // Slice retrieves the slice struct. func (c *Core) Slice() *Slice { return c.sl @@ -1037,6 +1041,10 @@ func (c *Core) SubscribePendingHeader(ch chan<- *types.WorkObject) event.Subscri func (c *Core) IsMining() bool { return c.sl.miner.Mining() } +func (c *Core) SendWorkShare(workShare *types.WorkObjectHeader) error { + return c.sl.miner.worker.AddWorkShare(workShare) +} + //-------------------------// // State Processor methods // //-------------------------// diff --git a/core/events.go b/core/events.go index a5c019d2b0..076c454625 100644 --- a/core/events.go +++ b/core/events.go @@ -21,8 +21,7 @@ type ChainEvent struct { } type ChainSideEvent struct { - Blocks []*types.WorkObject - ResetUncles bool + Blocks []*types.WorkObject } type ChainHeadEvent struct { diff --git a/core/headerchain.go b/core/headerchain.go index 35dbcbc775..57a71624e0 100644 --- a/core/headerchain.go +++ b/core/headerchain.go @@ -422,7 +422,7 @@ func (hc *HeaderChain) SetCurrentHeader(head *types.WorkObject) error { blocks = append(blocks, block) } } - hc.chainSideFeed.Send(ChainSideEvent{Blocks: blocks, ResetUncles: true}) + hc.chainSideFeed.Send(ChainSideEvent{Blocks: blocks}) }() } diff --git a/core/slice.go b/core/slice.go index 5897a6de34..00adb75ab2 100644 --- a/core/slice.go +++ b/core/slice.go @@ -404,7 +404,7 @@ func (sl *Slice) Append(header *types.WorkObject, domPendingHeader *types.WorkOb "location": block.Location(), "parentHash": block.ParentHash(nodeCtx), }).Debug("Found uncle") - sl.hc.chainSideFeed.Send(ChainSideEvent{Blocks: []*types.WorkObject{block}, ResetUncles: false}) + sl.hc.chainSideFeed.Send(ChainSideEvent{Blocks: []*types.WorkObject{block}}) } // Chain head feed is only used by the Zone chains diff --git a/core/worker.go b/core/worker.go index 034f4ba0de..b6bf49a855 100644 --- a/core/worker.go +++ b/core/worker.go @@ -6,6 +6,7 @@ import ( "fmt" "math/big" "runtime/debug" + "sort" "strings" "sync" "sync/atomic" @@ -56,6 +57,10 @@ const ( // c_chainSideChanSize is the size of the channel listening to uncle events chainSideChanSize = 10 + + c_uncleCacheSize = 50 + + c_workShareFilterDist = 100 // the dist from the current block for the work share inclusion in the worker ) // environment is the worker's current environment and holds all @@ -209,9 +214,8 @@ type worker struct { wg sync.WaitGroup - localUncles map[common.Hash]*types.WorkObjectHeader // A set of side blocks generated locally as the possible uncle blocks. - remoteUncles map[common.Hash]*types.WorkObjectHeader // A set of side blocks as the possible uncle blocks. - uncleMu sync.RWMutex + Uncles *lru.Cache + uncleMu sync.RWMutex mu sync.RWMutex // The lock used to protect the coinbase and extra fields coinbase common.Address @@ -279,8 +283,6 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, db ethdb.Databas coinbase: config.Etherbase, isLocalBlock: isLocalBlock, workerDb: db, - localUncles: make(map[common.Hash]*types.WorkObjectHeader), - remoteUncles: make(map[common.Hash]*types.WorkObjectHeader), chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize), chainSideCh: make(chan ChainSideEvent, chainSideChanSize), taskCh: make(chan *task), @@ -292,6 +294,8 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, db ethdb.Databas fillTransactionsRollingAverage: &RollingAverage{windowSize: 100}, logger: logger, } + // initialize a uncle cache + worker.Uncles, _ = lru.New(c_uncleCacheSize) // Set the GasFloor of the worker to the minGasLimit worker.config.GasFloor = params.MinGasLimit @@ -498,34 +502,12 @@ func (w *worker) asyncStateLoop() { }).Fatal("Go-Quai Panicked") } }() - if side.ResetUncles { - w.uncleMu.Lock() - w.localUncles = make(map[common.Hash]*types.WorkObjectHeader) - w.remoteUncles = make(map[common.Hash]*types.WorkObjectHeader) - w.uncleMu.Unlock() - } for _, wo := range side.Blocks { - // Short circuit for duplicate side blocks - w.uncleMu.RLock() - if _, exists := w.localUncles[wo.Hash()]; exists { - w.uncleMu.RUnlock() + if _, exists := w.Uncles.Get(wo.Hash()); exists { continue } - if _, exists := w.remoteUncles[wo.Hash()]; exists { - w.uncleMu.RUnlock() - continue - } - w.uncleMu.RUnlock() - if w.isLocalBlock != nil && w.isLocalBlock(wo) { - w.uncleMu.Lock() - w.localUncles[wo.Hash()] = wo.WorkObjectHeader() - w.uncleMu.Unlock() - } else { - w.uncleMu.Lock() - w.remoteUncles[wo.Hash()] = wo.WorkObjectHeader() - w.uncleMu.Unlock() - } + w.Uncles.ContainsOrAdd(wo.Hash(), wo.WorkObjectHeader()) } }() case <-w.exitCh: @@ -715,7 +697,7 @@ func (w *worker) makeEnv(parent *types.WorkObject, proposedWo *types.WorkObject, etxPLimit: etxPLimit, } // when 08 is processed ancestors contain 07 (quick block) - for _, ancestor := range w.hc.GetBlocksFromHash(parent.Header().Hash(), 7) { + for _, ancestor := range w.hc.GetBlocksFromHash(parent.Header().Hash(), params.WorkSharesInclusionDepth) { for _, uncle := range ancestor.Uncles() { env.family.Add(uncle.Hash()) } @@ -1221,29 +1203,42 @@ func (w *worker) prepareWork(genParams *generateParams, wo *types.WorkObject) (* return nil, err } // Accumulate the uncles for the sealing work. - commitUncles := func(wos map[common.Hash]*types.WorkObjectHeader) { - for hash, uncle := range wos { + commitUncles := func(wos *lru.Cache) { + var uncles []*types.WorkObjectHeader + keys := wos.Keys() + for _, hash := range keys { + if value, exist := wos.Peek(hash); exist { + uncle := value.(*types.WorkObjectHeader) + uncles = append(uncles, uncle) + } + } + // sort the uncles in the decreasing order of entropy + sort.Slice(uncles, func(i, j int) bool { + powHash1, _ := w.engine.ComputePowHash(uncles[i]) + powHash2, _ := w.engine.ComputePowHash(uncles[j]) + return new(big.Int).SetBytes(powHash1.Bytes()).Cmp(new(big.Int).SetBytes(powHash2.Bytes())) < 0 + }) + for _, uncle := range uncles { env.uncleMu.RLock() - if len(env.uncles) == 2 { + if len(env.uncles) == params.MaxUncleCount { env.uncleMu.RUnlock() break } env.uncleMu.RUnlock() if err := w.commitUncle(env, uncle); err != nil { w.logger.WithFields(log.Fields{ - "hash": hash, + "hash": uncle.Hash(), "reason": err, }).Trace("Possible uncle rejected") } else { - w.logger.WithField("hash", hash).Debug("Committing new uncle to block") + w.logger.WithField("hash", uncle.Hash()).Debug("Committing new uncle to block") } } } if nodeCtx == common.ZONE_CTX && w.hc.ProcessingState() { w.uncleMu.RLock() // Prefer to locally generated uncle - commitUncles(w.localUncles) - commitUncles(w.remoteUncles) + commitUncles(w.Uncles) w.uncleMu.RUnlock() } return env, nil @@ -1353,7 +1348,7 @@ func (w *worker) FinalizeAssemble(chain consensus.ChainHeaderReader, newWo *type // Once the uncles list is assembled in the block if nodeCtx == common.ZONE_CTX { - wo.Header().SetUncledS(w.engine.UncledLogS(wo)) + wo.Header().SetUncledS(w.engine.UncledLogS(parent)) } manifestHash := w.ComputeManifestHash(parent) @@ -1420,6 +1415,20 @@ func totalFees(block *types.WorkObject, receipts []*types.Receipt) *big.Float { return new(big.Float).Quo(new(big.Float).SetInt(feesWei), new(big.Float).SetInt(big.NewInt(params.Ether))) } +func (w *worker) AddWorkShare(workShare *types.WorkObjectHeader) error { + if !w.engine.CheckIfValidWorkShare(workShare) { + return errors.New("workshare is not valid") + } + + // Don't add the workshare into the list if its farther than the worksharefilterdist + if workShare.NumberU64()+c_workShareFilterDist < w.hc.CurrentHeader().NumberU64(common.ZONE_CTX) { + return nil + } + + w.Uncles.ContainsOrAdd(workShare.Hash(), workShare) + return nil +} + func (w *worker) CurrentInfo(header *types.WorkObject) bool { if w.headerPrints.Contains(header.Hash()) { return false diff --git a/internal/quaiapi/backend.go b/internal/quaiapi/backend.go index fac1263d0b..f1b235f93a 100644 --- a/internal/quaiapi/backend.go +++ b/internal/quaiapi/backend.go @@ -99,6 +99,8 @@ type Backend interface { AddGenesisPendingEtxs(block *types.WorkObject) SubscribeExpansionEvent(ch chan<- core.ExpansionEvent) event.Subscription WriteGenesisBlock(block *types.WorkObject, location common.Location) + SendWorkShare(workShare *types.WorkObjectHeader) error + CheckIfValidWorkShare(workShare *types.WorkObjectHeader) bool // Transaction pool API SendTx(ctx context.Context, signedTx *types.Transaction) error @@ -129,6 +131,7 @@ type Backend interface { // P2P apis BroadcastBlock(block *types.WorkObject, location common.Location) error + BroadcastWorkShare(workShare *types.WorkObjectHeader, location common.Location) error } func GetAPIs(apiBackend Backend) []rpc.API { diff --git a/internal/quaiapi/quai_api.go b/internal/quaiapi/quai_api.go index aa92617d9c..b8db092b31 100644 --- a/internal/quaiapi/quai_api.go +++ b/internal/quaiapi/quai_api.go @@ -675,6 +675,31 @@ func (s *PublicBlockChainQuaiAPI) ReceiveMinedHeader(ctx context.Context, raw js return nil } +func (s *PublicBlockChainQuaiAPI) ReceiveWorkShare(ctx context.Context, raw json.RawMessage) error { + nodeCtx := s.b.NodeCtx() + if nodeCtx != common.ZONE_CTX { + return errors.New("work shares cannot be broadcasted in non-zone chain") + } + var workShare *types.WorkObjectHeader + if err := json.Unmarshal(raw, &workShare); err != nil { + return err + } + if workShare != nil { + // check if the workshare is valid before broadcasting as a sanity + if !s.b.CheckIfValidWorkShare(workShare) { + return errors.New("work share is invalid") + } + + s.b.Logger().WithField("number", workShare.NumberU64()).Info("Received Work Share") + err := s.b.BroadcastWorkShare(workShare, s.b.NodeLocation()) + if err != nil { + log.Global.WithField("err", err).Error("Error broadcasting work share") + return err + } + } + return nil +} + type tdBlock struct { Header *types.WorkObject `json:"header"` Manifest types.BlockManifest `json:"manifest"` diff --git a/p2p/node/api.go b/p2p/node/api.go index 0208449f2e..f0b31dcd82 100644 --- a/p2p/node/api.go +++ b/p2p/node/api.go @@ -315,6 +315,7 @@ func (p *P2PNode) handleBroadcast(sourcePeer peer.ID, data interface{}, nodeLoca p.cacheAdd(v.Hash(), &v, nodeLocation) // TODO: send it to consensus case types.Transaction: + case types.WorkObjectHeader: default: log.Global.Debugf("received unsupported block broadcast") // TODO: ban the peer which sent it? diff --git a/p2p/pb/proto_services.go b/p2p/pb/proto_services.go index 0f20051077..25aeee883a 100644 --- a/p2p/pb/proto_services.go +++ b/p2p/pb/proto_services.go @@ -215,6 +215,13 @@ func ConvertAndMarshal(data interface{}) ([]byte, error) { log.Global.Tracef("marshalling hash: %+v", data) protoHash := data.ProtoEncode() return proto.Marshal(protoHash) + case *types.WorkObjectHeader: + log.Global.Tracef("marshalling block header: %+v", data) + protoWoHeader, err := data.ProtoEncode() + if err != nil { + return nil, err + } + return proto.Marshal(protoWoHeader) default: return nil, errors.New("unsupported data type") } @@ -239,6 +246,19 @@ func UnmarshalAndConvert(data []byte, sourceLocation common.Location, dataPtr *i } *dataPtr = *workObject return nil + case *types.WorkObjectHeader: + protoWorkObjectHeader := &types.ProtoWorkObjectHeader{} + err := proto.Unmarshal(data, protoWorkObjectHeader) + if err != nil { + return err + } + workObjectHeader := &types.WorkObjectHeader{} + err = workObjectHeader.ProtoDecode(protoWorkObjectHeader) + if err != nil { + return err + } + *dataPtr = *workObjectHeader + return nil case *types.Header: protoHeader := &types.ProtoHeader{} err := proto.Unmarshal(data, protoHeader) diff --git a/p2p/pubsubManager/utils.go b/p2p/pubsubManager/utils.go index 5a0aa51a48..de2e6dd29e 100644 --- a/p2p/pubsubManager/utils.go +++ b/p2p/pubsubManager/utils.go @@ -10,10 +10,11 @@ import ( const ( // Data types for gossipsub topics - C_workObjectType = "blocks" - C_transactionType = "transactions" - C_headerType = "headers" - C_hashType = "hash" + C_workObjectType = "blocks" + C_transactionType = "transactions" + C_headerType = "headers" + C_hashType = "hash" + C_workObjectHeaderType = "woHeaders" ) // gets the name of the topic for the given type of data @@ -26,6 +27,8 @@ func TopicName(genesis common.Hash, location common.Location, data interface{}) return strings.Join([]string{baseTopic, C_hashType}, "/"), nil case *types.Transaction: return strings.Join([]string{baseTopic, C_transactionType}, "/"), nil + case *types.WorkObjectHeader: + return strings.Join([]string{baseTopic, C_workObjectHeaderType}, "/"), nil default: return "", ErrUnsupportedType } diff --git a/params/protocol_params.go b/params/protocol_params.go index dffdc4ade2..31c04443c6 100644 --- a/params/protocol_params.go +++ b/params/protocol_params.go @@ -201,4 +201,7 @@ var ( DifficultyAdjustmentPeriod = big.NewInt(360) // This is the number of blocks over which the average has to be taken DifficultyAdjustmentFactor int64 = 40 // This is the factor that divides the log of the change in the difficulty MinQuaiConversionAmount = new(big.Int).Mul(big.NewInt(1), big.NewInt(GWei)) // 0.000000001 Quai + MaxUncleCount = 10 + WorkSharesThresholdDiff = 3 // Number of bits lower than the target that the default consensus engine uses + WorkSharesInclusionDepth = 7 // Number of blocks upto which the work shares can be referenced and this is protocol enforced ) diff --git a/quai/api_backend.go b/quai/api_backend.go index ded98db698..eb1739ff9f 100644 --- a/quai/api_backend.go +++ b/quai/api_backend.go @@ -574,9 +574,21 @@ func (b *QuaiAPIBackend) SubscribeExpansionEvent(ch chan<- core.ExpansionEvent) return b.quai.core.SubscribeExpansionEvent(ch) } +func (b *QuaiAPIBackend) SendWorkShare(workShare *types.WorkObjectHeader) error { + return b.quai.core.SendWorkShare(workShare) +} + +func (b *QuaiAPIBackend) CheckIfValidWorkShare(workShare *types.WorkObjectHeader) bool { + return b.quai.core.CheckIfValidWorkShare(workShare) +} + // /////////////////////////// // /////// P2P /////////////// // /////////////////////////// func (b *QuaiAPIBackend) BroadcastBlock(block *types.WorkObject, location common.Location) error { return b.quai.p2p.Broadcast(location, block) } + +func (b *QuaiAPIBackend) BroadcastWorkShare(workShare *types.WorkObjectHeader, location common.Location) error { + return b.quai.p2p.Broadcast(location, workShare) +} diff --git a/quai/p2p_backend.go b/quai/p2p_backend.go index a088481d1b..ab1f005730 100644 --- a/quai/p2p_backend.go +++ b/quai/p2p_backend.go @@ -103,6 +103,14 @@ func (qbe *QuaiBackend) OnNewBroadcast(sourcePeer p2p.PeerID, data interface{}, backend.SendRemoteTx(&tx) } // TODO: Handle the error here and mark the peers accordingly + case types.WorkObjectHeader: + woHeader := data.(types.WorkObjectHeader) + backend := *qbe.GetBackend(nodeLocation) + if backend == nil { + log.Global.Error("no backend found") + return false + } + backend.SendWorkShare(&woHeader) } // If it was a good broadcast, mark the peer as lively diff --git a/quaiclient/quaiclient.go b/quaiclient/quaiclient.go index bc84712a1d..79f0ce28a9 100644 --- a/quaiclient/quaiclient.go +++ b/quaiclient/quaiclient.go @@ -303,6 +303,11 @@ func (ec *Client) ReceiveMinedHeader(ctx context.Context, header *types.WorkObje return ec.c.CallContext(ctx, nil, "quai_receiveMinedHeader", data) } +func (ec *Client) ReceiveWorkShare(ctx context.Context, header *types.WorkObjectHeader) error { + data := header.RPCMarshalWorkObjectHeader() + return ec.c.CallContext(ctx, nil, "quai_receiveWorkShare", data) +} + // Filters // SubscribeFilterLogs subscribes to the results of a streaming filter query.