Skip to content

Commit

Permalink
Merge commit '0610e2e32317' into upstream-v2.47.0-0610e2e
Browse files Browse the repository at this point in the history
  • Loading branch information
ImTei committed Jul 20, 2023
2 parents a5298b6 + 0610e2e commit 433a605
Show file tree
Hide file tree
Showing 4 changed files with 206 additions and 166 deletions.
4 changes: 3 additions & 1 deletion compress/decompress.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ func NewDecompressor(compressedFilePath string) (d *Decompressor, err error) {
fileName: fName,
}
defer func() {

if rec := recover(); rec != nil {
err = fmt.Errorf("decompressing file: %s, %+v, trace: %s", compressedFilePath, rec, dbg.Stack())
}
Expand All @@ -171,9 +172,10 @@ func NewDecompressor(compressedFilePath string) (d *Decompressor, err error) {
if d.mmapHandle1, d.mmapHandle2, err = mmap.Mmap(d.f, int(d.size)); err != nil {
return nil, err
}

// read patterns from file
d.data = d.mmapHandle1[:d.size]
defer d.EnableReadAhead().DisableReadAhead() //speedup opening on slow drives

d.wordsCount = binary.BigEndian.Uint64(d.data[:8])
d.emptyWordsCount = binary.BigEndian.Uint64(d.data[8:16])
dictSize := binary.BigEndian.Uint64(d.data[16:24])
Expand Down
2 changes: 2 additions & 0 deletions recsplit/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ func OpenIndex(indexFilePath string) (*Index, error) {
return nil, err
}
idx.data = idx.mmapHandle1[:idx.size]
defer idx.EnableReadAhead().DisableReadAhead()

// Read number of keys and bytes per record
idx.baseDataID = binary.BigEndian.Uint64(idx.data[:8])
idx.keyCount = binary.BigEndian.Uint64(idx.data[8:16])
Expand Down
63 changes: 25 additions & 38 deletions state/aggregator_v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/ledgerwatch/erigon-lib/common/background"
"github.com/ledgerwatch/erigon-lib/common/cmp"
"github.com/ledgerwatch/erigon-lib/common/dbg"
"github.com/ledgerwatch/erigon-lib/etl"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/bitmapdb"
"github.com/ledgerwatch/erigon-lib/kv/iter"
Expand Down Expand Up @@ -660,25 +659,18 @@ func (a *AggregatorV3) HasNewFrozenFiles() bool {
return a.needSaveFilesListInDB.CompareAndSwap(true, false)
}

func (a *AggregatorV3) Unwind(ctx context.Context, txUnwindTo uint64, stateLoad etl.LoadFunc) error {
stateChanges := etl.NewCollector(a.logPrefix, a.tmpdir, etl.NewOldestEntryBuffer(etl.BufferOptimalSize), a.logger)
defer stateChanges.Close()
if err := a.accounts.pruneF(txUnwindTo, math2.MaxUint64, func(_ uint64, k, v []byte) error {
return stateChanges.Collect(k, v)
}); err != nil {
func (a *AggregatorV3) Unwind(ctx context.Context, txUnwindTo uint64) error {
logEvery := time.NewTicker(30 * time.Second)
defer logEvery.Stop()
if err := a.accounts.prune(ctx, txUnwindTo, math2.MaxUint64, math2.MaxUint64, logEvery); err != nil {
return err
}
if err := a.storage.pruneF(txUnwindTo, math2.MaxUint64, func(_ uint64, k, v []byte) error {
return stateChanges.Collect(k, v)
}); err != nil {
if err := a.storage.prune(ctx, txUnwindTo, math2.MaxUint64, math2.MaxUint64, logEvery); err != nil {
return err
}

if err := stateChanges.Load(a.rwTx, kv.PlainState, stateLoad, etl.TransformArgs{Quit: ctx.Done()}); err != nil {
if err := a.code.prune(ctx, txUnwindTo, math2.MaxUint64, math2.MaxUint64, logEvery); err != nil {
return err
}
logEvery := time.NewTicker(30 * time.Second)
defer logEvery.Stop()
if err := a.logAddrs.prune(ctx, txUnwindTo, math2.MaxUint64, math2.MaxUint64, logEvery); err != nil {
return err
}
Expand Down Expand Up @@ -1347,30 +1339,25 @@ func (a *AggregatorV3) EnableMadvNormal() *AggregatorV3 {
return a
}

// -- range
func (ac *AggregatorV3Context) LogAddrRange(addr []byte, startTxNum, endTxNum int, asc order.By, limit int, tx kv.Tx) (iter.U64, error) {
return ac.logAddrs.IdxRange(addr, startTxNum, endTxNum, asc, limit, tx)
}

func (ac *AggregatorV3Context) LogTopicRange(topic []byte, startTxNum, endTxNum int, asc order.By, limit int, tx kv.Tx) (iter.U64, error) {
return ac.logTopics.IdxRange(topic, startTxNum, endTxNum, asc, limit, tx)
}

func (ac *AggregatorV3Context) TraceFromRange(addr []byte, startTxNum, endTxNum int, asc order.By, limit int, tx kv.Tx) (iter.U64, error) {
return ac.tracesFrom.IdxRange(addr, startTxNum, endTxNum, asc, limit, tx)
}

func (ac *AggregatorV3Context) TraceToRange(addr []byte, startTxNum, endTxNum int, asc order.By, limit int, tx kv.Tx) (iter.U64, error) {
return ac.tracesTo.IdxRange(addr, startTxNum, endTxNum, asc, limit, tx)
}
func (ac *AggregatorV3Context) AccountHistoyIdxRange(addr []byte, startTxNum, endTxNum int, asc order.By, limit int, tx kv.Tx) (iter.U64, error) {
return ac.accounts.IdxRange(addr, startTxNum, endTxNum, asc, limit, tx)
}
func (ac *AggregatorV3Context) StorageHistoyIdxRange(addr []byte, startTxNum, endTxNum int, asc order.By, limit int, tx kv.Tx) (iter.U64, error) {
return ac.storage.IdxRange(addr, startTxNum, endTxNum, asc, limit, tx)
}
func (ac *AggregatorV3Context) CodeHistoyIdxRange(addr []byte, startTxNum, endTxNum int, asc order.By, limit int, tx kv.Tx) (iter.U64, error) {
return ac.code.IdxRange(addr, startTxNum, endTxNum, asc, limit, tx)
func (ac *AggregatorV3Context) IndexRange(name kv.InvertedIdx, k []byte, fromTs, toTs int, asc order.By, limit int, tx kv.Tx) (timestamps iter.U64, err error) {
switch name {
case kv.AccountsHistoryIdx:
return ac.accounts.IdxRange(k, fromTs, toTs, asc, limit, tx)
case kv.StorageHistoryIdx:
return ac.storage.IdxRange(k, fromTs, toTs, asc, limit, tx)
case kv.CodeHistoryIdx:
return ac.code.IdxRange(k, fromTs, toTs, asc, limit, tx)
case kv.LogTopicIdx:
return ac.logTopics.IdxRange(k, fromTs, toTs, asc, limit, tx)
case kv.LogAddrIdx:
return ac.logAddrs.IdxRange(k, fromTs, toTs, asc, limit, tx)
case kv.TracesFromIdx:
return ac.tracesFrom.IdxRange(k, fromTs, toTs, asc, limit, tx)
case kv.TracesToIdx:
return ac.tracesTo.IdxRange(k, fromTs, toTs, asc, limit, tx)
default:
return nil, fmt.Errorf("unexpected history name: %s", name)
}
}

// -- range end
Expand Down
Loading

0 comments on commit 433a605

Please sign in to comment.