Skip to content

Commit

Permalink
bugfix: Fixing the Sync of blocks after downloaded
Browse files Browse the repository at this point in the history
* Downloader Queue was not working if we had two consecutive dom blocks
* doneCh in the sync was blocking the future downloads, this is fixed by increasing the channel size
* Changed the min peers from 1 to 3 for the sync
* Updated the forceSyncCycle to 60 seconds from 10 secs
  • Loading branch information
gameofpointers committed Sep 6, 2023
1 parent 1cb95df commit 91bc728
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 38 deletions.
2 changes: 0 additions & 2 deletions core/slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ type Slice struct {
quit chan struct{} // slice quit channel

domClient *quaiclient.Client
domUrl string
subClients []*quaiclient.Client

wg sync.WaitGroup
Expand Down Expand Up @@ -80,7 +79,6 @@ func NewSlice(db ethdb.Database, config *Config, txConfig *TxPoolConfig, txLooku
config: chainConfig,
engine: engine,
sliceDb: db,
domUrl: domClientUrl,
quit: make(chan struct{}),
badHashesCache: make(map[common.Hash]bool),
}
Expand Down
6 changes: 3 additions & 3 deletions core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -1428,14 +1428,14 @@ func (pool *TxPool) promoteExecutables(accounts []common.InternalAddress) []*typ
hash := tx.Hash()
pool.all.Remove(hash)
}
log.Debug("Removed old queued transactions", "count", len(forwards))
log.Trace("Removed old queued transactions", "count", len(forwards))
// Drop all transactions that are too costly (low balance or out of gas)
drops, _ := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
for _, tx := range drops {
hash := tx.Hash()
pool.all.Remove(hash)
}
log.Debug("Removed unpayable queued transactions", "count", len(drops))
log.Trace("Removed unpayable queued transactions", "count", len(drops))
queuedNofundsMeter.Mark(int64(len(drops)))

// Gather all executable transactions and promote them
Expand All @@ -1446,7 +1446,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.InternalAddress) []*typ
promoted = append(promoted, tx)
}
}
log.Debug("Promoted queued transactions", "count", len(promoted))
log.Trace("Promoted queued transactions", "count", len(promoted))
queuedGauge.Dec(int64(len(readies)))

// Drop all transactions over the allowed limit
Expand Down
6 changes: 3 additions & 3 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1128,8 +1128,6 @@ func (d *Downloader) processFullSyncContent(peerHeight uint64) error {
if err := d.importBlockResults(results); err != nil {
return err
}
d.headNumber = results[len(results)-1].Header.NumberU64()
d.headEntropy = d.core.TotalLogS(results[len(results)-1].Header)
// If all the blocks are fetched, we exit the sync process
if d.headNumber == peerHeight {
return errNoFetchesPending
Expand All @@ -1152,7 +1150,7 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error {
}
// Retrieve the a batch of results to import
first, last := results[0].Header, results[len(results)-1].Header
log.Debug("Inserting downloaded chain", "items", len(results),
log.Info("Inserting downloaded chain", "items", len(results),
"firstnum", first.Number(), "firsthash", first.Hash(),
"lastnum", last.Number(), "lasthash", last.Hash(),
)
Expand All @@ -1162,6 +1160,8 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error {
if d.core.IsBlockHashABadHash(block.Hash()) {
return errBadBlockFound
}
d.headNumber = block.NumberU64()
d.headEntropy = d.core.TotalLogS(block.Header())
d.core.WriteBlock(block)
}
return nil
Expand Down
46 changes: 23 additions & 23 deletions eth/downloader/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,33 +653,31 @@ func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh
headers[i], headers[j] = headers[j], headers[i]
}

if len(headers) == 0 && accepted {
return 0, nil
}

if accepted {
if headers[len(headers)-1].Number().Uint64() != request.From {
logger.Info("First header broke chain ordering", "number", headers[0].Number(), "hash", headers[0].Hash(), "expected", request.From)
accepted = false
} else if headers[0].NumberU64() != targetTo {
if targetTo != 0 {
logger.Info("Last header broke skeleton structure ", "number", headers[0].Number(), "expected", targetTo)
if len(headers) > 0 && accepted {
if accepted {
if headers[len(headers)-1].Number().Uint64() != request.From {
logger.Info("First header broke chain ordering", "number", headers[0].Number(), "hash", headers[0].Hash(), "expected", request.From)
accepted = false
} else if headers[0].NumberU64() != targetTo {
if targetTo != 0 {
logger.Info("Last header broke skeleton structure ", "number", headers[0].Number(), "expected", targetTo)
accepted = false
}
}
}
}

if accepted {
parentHash := headers[0].Hash()
for _, header := range headers[1:] {
hash := header.Hash()
if parentHash != header.ParentHash() {
logger.Warn("Header broke chain ancestry", "number", header.Number(), "hash", hash)
accepted = false
break
if accepted {
parentHash := headers[0].Hash()
for _, header := range headers[1:] {
hash := header.Hash()
if parentHash != header.ParentHash() {
logger.Warn("Header broke chain ancestry", "number", header.Number(), "hash", hash)
accepted = false
break
}
// Set-up parent hash for next round
parentHash = hash
}
// Set-up parent hash for next round
parentHash = hash
}
}
// If the batch of headers wasn't accepted, mark as unavailable
Expand All @@ -697,7 +695,9 @@ func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh
return 0, errors.New("delivery not accepted")
}

copy(q.headerResults[targetTo-q.headerOffset:], headers)
if len(headers) > 0 {
copy(q.headerResults[targetTo-q.headerOffset:], headers)
}

// Clean up a successful fetch and try to deliver any sub-results
delete(q.headerTaskPool, request.From+1)
Expand Down
15 changes: 8 additions & 7 deletions eth/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ import (
)

const (
forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available
defaultMinSyncPeers = 1 // Amount of peers desired to start syncing
forceSyncCycle = 60 * time.Second // Time interval to force syncs, even if few peers are available
defaultMinSyncPeers = 3 // Amount of peers desired to start syncing

// This is the target size for the packs of transactions sent by txsyncLoop64.
// A pack can get larger than this if a single transactions exceeds this size.
Expand Down Expand Up @@ -274,24 +274,25 @@ func (cs *chainSyncer) modeAndLocalHead() (downloader.SyncMode, *big.Int) {

// startSync launches doSync in a new goroutine.
func (cs *chainSyncer) startSync(op *chainSyncOp) {
cs.doneCh = make(chan error, 1)
cs.doneCh = make(chan error, 10)
go func() { cs.doneCh <- cs.handler.doSync(op) }()
}

// doSync synchronizes the local blockchain with a remote peer.
func (h *handler) doSync(op *chainSyncOp) error {
// Run the sync cycle, and disable fast sync if we're past the pivot block
err := h.downloader.Synchronise(op.peer.ID(), op.head, op.entropy, op.mode)
log.Info("Downloader exited", "err", err)
if err != nil {
return err
}
// If we've successfully finished a sync cycle and passed any required checkpoint,
// enable accepting transactions from the network.
head := h.core.CurrentBlock()
if head == nil {
log.Warn("doSync: head is nil", "hash", h.core.CurrentHeader().Hash(), "number", h.core.CurrentHeader().NumberArray())
return nil
}
if head == nil {
log.Warn("doSync: head is nil", "hash", h.core.CurrentHeader().Hash(), "number", h.core.CurrentHeader().NumberArray())
return nil
}
if head.NumberU64() > 0 {
// We've completed a sync cycle, notify all peers of new state. This path is
// essential in star-topology networks where a gateway node needs to notify
Expand Down

0 comments on commit 91bc728

Please sign in to comment.