From 95623f4aa535f31ce488a9197c88e0f65a6d6240 Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Mon, 19 Jun 2023 11:34:12 +0700 Subject: [PATCH 1/4] faster opening of snapshots and indices (#1025) - mostly by MADV_SEQUENTIAL during opening --- compress/decompress.go | 4 +++- recsplit/index.go | 2 ++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/compress/decompress.go b/compress/decompress.go index 1e5fe3089..7e0c5b481 100644 --- a/compress/decompress.go +++ b/compress/decompress.go @@ -150,6 +150,7 @@ func NewDecompressor(compressedFilePath string) (d *Decompressor, err error) { fileName: fName, } defer func() { + if rec := recover(); rec != nil { err = fmt.Errorf("decompressing file: %s, %+v, trace: %s", compressedFilePath, rec, dbg.Stack()) } @@ -171,9 +172,10 @@ func NewDecompressor(compressedFilePath string) (d *Decompressor, err error) { if d.mmapHandle1, d.mmapHandle2, err = mmap.Mmap(d.f, int(d.size)); err != nil { return nil, err } - // read patterns from file d.data = d.mmapHandle1[:d.size] + defer d.EnableReadAhead().DisableReadAhead() //speedup opening on slow drives + d.wordsCount = binary.BigEndian.Uint64(d.data[:8]) d.emptyWordsCount = binary.BigEndian.Uint64(d.data[8:16]) dictSize := binary.BigEndian.Uint64(d.data[16:24]) diff --git a/recsplit/index.go b/recsplit/index.go index 73c6452eb..5942fbb5d 100644 --- a/recsplit/index.go +++ b/recsplit/index.go @@ -95,6 +95,8 @@ func OpenIndex(indexFilePath string) (*Index, error) { return nil, err } idx.data = idx.mmapHandle1[:idx.size] + defer idx.EnableReadAhead().DisableReadAhead() + // Read number of keys and bytes per record idx.baseDataID = binary.BigEndian.Uint64(idx.data[:8]) idx.keyCount = binary.BigEndian.Uint64(idx.data[8:16]) From 43e3155ed0ca8b1821de93b6038e82f144cadedb Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Mon, 19 Jun 2023 11:48:14 +0700 Subject: [PATCH 2/4] e3: simplify unwind (#1026) --- state/aggregator_v3.go | 20 ++++-------- state/history.go | 71 ------------------------------------------ 2 files changed, 6 insertions(+), 85 deletions(-) diff --git a/state/aggregator_v3.go b/state/aggregator_v3.go index 065b59701..47bb4e850 100644 --- a/state/aggregator_v3.go +++ b/state/aggregator_v3.go @@ -33,7 +33,6 @@ import ( "github.com/ledgerwatch/erigon-lib/common/background" "github.com/ledgerwatch/erigon-lib/common/cmp" "github.com/ledgerwatch/erigon-lib/common/dbg" - "github.com/ledgerwatch/erigon-lib/etl" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv/bitmapdb" "github.com/ledgerwatch/erigon-lib/kv/iter" @@ -660,25 +659,18 @@ func (a *AggregatorV3) HasNewFrozenFiles() bool { return a.needSaveFilesListInDB.CompareAndSwap(true, false) } -func (a *AggregatorV3) Unwind(ctx context.Context, txUnwindTo uint64, stateLoad etl.LoadFunc) error { - stateChanges := etl.NewCollector(a.logPrefix, a.tmpdir, etl.NewOldestEntryBuffer(etl.BufferOptimalSize), a.logger) - defer stateChanges.Close() - if err := a.accounts.pruneF(txUnwindTo, math2.MaxUint64, func(_ uint64, k, v []byte) error { - return stateChanges.Collect(k, v) - }); err != nil { +func (a *AggregatorV3) Unwind(ctx context.Context, txUnwindTo uint64) error { + logEvery := time.NewTicker(30 * time.Second) + defer logEvery.Stop() + if err := a.accounts.prune(ctx, txUnwindTo, math2.MaxUint64, math2.MaxUint64, logEvery); err != nil { return err } - if err := a.storage.pruneF(txUnwindTo, math2.MaxUint64, func(_ uint64, k, v []byte) error { - return stateChanges.Collect(k, v) - }); err != nil { + if err := a.storage.prune(ctx, txUnwindTo, math2.MaxUint64, math2.MaxUint64, logEvery); err != nil { return err } - - if err := stateChanges.Load(a.rwTx, kv.PlainState, stateLoad, etl.TransformArgs{Quit: ctx.Done()}); err != nil { + if err := a.code.prune(ctx, txUnwindTo, math2.MaxUint64, math2.MaxUint64, logEvery); err != nil { return err } - logEvery := time.NewTicker(30 * time.Second) - defer logEvery.Stop() if err := a.logAddrs.prune(ctx, txUnwindTo, math2.MaxUint64, math2.MaxUint64, logEvery); err != nil { return err } diff --git a/state/history.go b/state/history.go index a72e2dd59..d4fb3d82a 100644 --- a/state/history.go +++ b/state/history.go @@ -1149,77 +1149,6 @@ func (h *History) prune(ctx context.Context, txFrom, txTo, limit uint64, logEver return nil } -func (h *History) pruneF(txFrom, txTo uint64, f func(txNum uint64, k, v []byte) error) error { - historyKeysCursor, err := h.tx.RwCursorDupSort(h.indexKeysTable) - if err != nil { - return fmt.Errorf("create %s history cursor: %w", h.filenameBase, err) - } - defer historyKeysCursor.Close() - var txKey [8]byte - binary.BigEndian.PutUint64(txKey[:], txFrom) - var k, v []byte - var valsC kv.RwCursor - var valsCDup kv.RwCursorDupSort - if h.largeValues { - valsC, err = h.tx.RwCursor(h.historyValsTable) - if err != nil { - return err - } - defer valsC.Close() - } else { - valsCDup, err = h.tx.RwCursorDupSort(h.historyValsTable) - if err != nil { - return err - } - defer valsCDup.Close() - } - for k, v, err = historyKeysCursor.Seek(txKey[:]); err == nil && k != nil; k, v, err = historyKeysCursor.Next() { - txNum := binary.BigEndian.Uint64(k) - if txNum >= txTo { - break - } - - if h.largeValues { - seek := append(common.Copy(v), k...) - kk, vv, err := valsC.SeekExact(seek) - if err != nil { - return err - } - if err := f(txNum, kk[:len(kk)-8], vv); err != nil { - return err - } - if kk != nil { - if err = valsC.DeleteCurrent(); err != nil { - return err - } - } - } else { - vv, err := valsCDup.SeekBothRange(v, k) - if err != nil { - return err - } - if binary.BigEndian.Uint64(vv) != txNum { - continue - } - if err := f(txNum, v, vv[8:]); err != nil { - return err - } - if err = valsCDup.DeleteCurrent(); err != nil { - return err - } - } - - // This DeleteCurrent needs to the last in the loop iteration, because it invalidates k and v - if err = historyKeysCursor.DeleteCurrent(); err != nil { - return err - } - } - if err != nil { - return fmt.Errorf("iterate over %s history keys: %w", h.filenameBase, err) - } - return nil -} - type HistoryContext struct { h *History ic *InvertedIndexContext From 9090990e1696145cb6c62a5c86fc8508c8cec104 Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Mon, 19 Jun 2023 13:42:15 +0700 Subject: [PATCH 3/4] e3: simplify unwind (#1027) --- state/history.go | 304 +++++++++++++++++++++++++++++++++-------------- 1 file changed, 212 insertions(+), 92 deletions(-) diff --git a/state/history.go b/state/history.go index d4fb3d82a..b237c7ff7 100644 --- a/state/history.go +++ b/state/history.go @@ -525,7 +525,14 @@ type historyWAL struct { historyKey []byte buffered bool discard bool - largeValues bool + + // not large: + // keys: txNum -> key1+key2 + // vals: key1+key2 -> txNum + value (DupSort) + // large: + // keys: txNum -> key1+key2 + // vals: key1+key2+txNum -> value (not DupSort) + largeValues bool } func (h *historyWAL) close() { @@ -1038,114 +1045,182 @@ func (h *History) prune(ctx context.Context, txFrom, txTo, limit uint64, logEver defer historyKeysCursor.Close() var txKey [8]byte binary.BigEndian.PutUint64(txKey[:], txFrom) - - k, v, err := historyKeysCursor.Seek(txKey[:]) - if err != nil { - return err - } - if k == nil { - return nil - } - txFrom = binary.BigEndian.Uint64(k) - if limit != math.MaxUint64 && limit != 0 { - txTo = cmp.Min(txTo, txFrom+limit) - } - if txFrom >= txTo { - return nil - } - - collector := etl.NewCollector("snapshots", h.tmpdir, etl.NewOldestEntryBuffer(etl.BufferOptimalSize), h.logger) - defer collector.Close() - - // Invariant: if some `txNum=N` pruned - it's pruned Fully - // Means: can use DeleteCurrentDuplicates all values of given `txNum` - for ; err == nil && k != nil; k, v, err = historyKeysCursor.NextNoDup() { - txNum := binary.BigEndian.Uint64(k) - if txNum >= txTo { - break - } - for ; err == nil && k != nil; k, v, err = historyKeysCursor.NextDup() { - if err := collector.Collect(v, nil); err != nil { - return err - } - } - - // This DeleteCurrent needs to the last in the loop iteration, because it invalidates k and v - if err = historyKeysCursor.DeleteCurrentDuplicates(); err != nil { - return err - } - } - + var k, v []byte + var valsC kv.RwCursor + var valsCDup kv.RwCursorDupSort if h.largeValues { - valsC, err := h.tx.RwCursor(h.historyValsTable) + valsC, err = h.tx.RwCursor(h.historyValsTable) if err != nil { return err } defer valsC.Close() - - if err := collector.Load(h.tx, "", func(key, _ []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error { - for k, _, err := valsC.Seek(key); k != nil; k, _, err = valsC.Next() { - if err != nil { - return err - } - if !bytes.HasPrefix(k, key) { - break - } - txNum := binary.BigEndian.Uint64(k[len(k)-8:]) - if txNum >= txTo { - break - } - if err = valsC.DeleteCurrent(); err != nil { - return err - } - - select { - case <-logEvery.C: - log.Info("[snapshots] prune history", "name", h.filenameBase, "to_step", fmt.Sprintf("%.2f", float64(txTo)/float64(h.aggregationStep)), "prefix", fmt.Sprintf("%x", key[:8])) - default: - } - } - return nil - }, etl.TransformArgs{Quit: ctx.Done()}); err != nil { - return err - } - if err != nil { - return fmt.Errorf("iterate over %s history keys: %w", h.filenameBase, err) - } } else { - valsC, err := h.tx.RwCursorDupSort(h.historyValsTable) + valsCDup, err = h.tx.RwCursorDupSort(h.historyValsTable) if err != nil { return err } - defer valsC.Close() + defer valsCDup.Close() + } + for k, v, err = historyKeysCursor.Seek(txKey[:]); err == nil && k != nil; k, v, err = historyKeysCursor.Next() { + txNum := binary.BigEndian.Uint64(k) + if txNum >= txTo { + break + } + if limit == 0 { + return nil + } + limit-- - if err := collector.Load(h.tx, "", func(key, _ []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error { - for k, v, err := valsC.SeekExact(key); k != nil; k, v, err = valsC.NextDup() { - if err != nil { - return err - } - txNum := binary.BigEndian.Uint64(v) - if txNum >= txTo { - break - } + if h.largeValues { + seek := append(common.Copy(v), k...) + kk, _, err := valsC.SeekExact(seek) + if err != nil { + return err + } + if kk != nil { if err = valsC.DeleteCurrent(); err != nil { return err } - - select { - case <-logEvery.C: - log.Info("[snapshots] prune history", "name", h.filenameBase, "to_step", fmt.Sprintf("%.2f", float64(txTo)/float64(h.aggregationStep)), "prefix", fmt.Sprintf("%x", key[:8])) - default: - } } - return nil - }, etl.TransformArgs{Quit: ctx.Done()}); err != nil { - return err + } else { + vv, err := valsCDup.SeekBothRange(v, k) + if err != nil { + return err + } + if binary.BigEndian.Uint64(vv) != txNum { + continue + } + if err = valsCDup.DeleteCurrent(); err != nil { + return err + } } - if err != nil { - return fmt.Errorf("iterate over %s history keys: %w", h.filenameBase, err) + + // This DeleteCurrent needs to the last in the loop iteration, because it invalidates k and v + if err = historyKeysCursor.DeleteCurrent(); err != nil { + return err } } + if err != nil { + return fmt.Errorf("iterate over %s history keys: %w", h.filenameBase, err) + } + + /* + historyKeysCursor, err := h.tx.RwCursorDupSort(h.indexKeysTable) + if err != nil { + return fmt.Errorf("create %s history cursor: %w", h.filenameBase, err) + } + defer historyKeysCursor.Close() + var txKey [8]byte + binary.BigEndian.PutUint64(txKey[:], txFrom) + + k, v, err := historyKeysCursor.Seek(txKey[:]) + if err != nil { + return err + } + if k == nil { + return nil + } + txFrom = binary.BigEndian.Uint64(k) + if limit != math.MaxUint64 && limit != 0 { + txTo = cmp.Min(txTo, txFrom+limit) + } + if txFrom >= txTo { + return nil + } + + collector := etl.NewCollector("snapshots", h.tmpdir, etl.NewOldestEntryBuffer(etl.BufferOptimalSize), h.logger) + defer collector.Close() + + // Invariant: if some `txNum=N` pruned - it's pruned Fully + // Means: can use DeleteCurrentDuplicates all values of given `txNum` + for ; err == nil && k != nil; k, v, err = historyKeysCursor.NextNoDup() { + txNum := binary.BigEndian.Uint64(k) + if txNum >= txTo { + break + } + for ; err == nil && k != nil; k, v, err = historyKeysCursor.NextDup() { + if err := collector.Collect(v, nil); err != nil { + return err + } + } + + // This DeleteCurrent needs to the last in the loop iteration, because it invalidates k and v + if err = historyKeysCursor.DeleteCurrentDuplicates(); err != nil { + return err + } + } + + if h.largeValues { + valsC, err := h.tx.RwCursor(h.historyValsTable) + if err != nil { + return err + } + defer valsC.Close() + + if err := collector.Load(h.tx, "", func(key, _ []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error { + for k, _, err := valsC.Seek(key); k != nil; k, _, err = valsC.Next() { + if err != nil { + return err + } + if !bytes.HasPrefix(k, key) { + break + } + txNum := binary.BigEndian.Uint64(k[len(k)-8:]) + if txNum >= txTo { + break + } + if err = valsC.DeleteCurrent(); err != nil { + return err + } + + select { + case <-logEvery.C: + log.Info("[snapshots] prune history", "name", h.filenameBase, "to_step", fmt.Sprintf("%.2f", float64(txTo)/float64(h.aggregationStep)), "prefix", fmt.Sprintf("%x", key[:8])) + default: + } + } + return nil + }, etl.TransformArgs{Quit: ctx.Done()}); err != nil { + return err + } + if err != nil { + return fmt.Errorf("iterate over %s history keys: %w", h.filenameBase, err) + } + } else { + valsC, err := h.tx.RwCursorDupSort(h.historyValsTable) + if err != nil { + return err + } + defer valsC.Close() + + if err := collector.Load(h.tx, "", func(key, _ []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error { + for k, v, err := valsC.SeekExact(key); k != nil; k, v, err = valsC.NextDup() { + if err != nil { + return err + } + txNum := binary.BigEndian.Uint64(v) + if txNum >= txTo { + break + } + if err = valsC.DeleteCurrent(); err != nil { + return err + } + + select { + case <-logEvery.C: + log.Info("[snapshots] prune history", "name", h.filenameBase, "to_step", fmt.Sprintf("%.2f", float64(txTo)/float64(h.aggregationStep)), "prefix", fmt.Sprintf("%x", key[:8])) + default: + } + } + return nil + }, etl.TransformArgs{Quit: ctx.Done()}); err != nil { + return err + } + if err != nil { + return fmt.Errorf("iterate over %s history keys: %w", h.filenameBase, err) + } + } + */ return nil } @@ -2170,6 +2245,51 @@ func (hi *HistoryChangesIterDBDup) advance() (err error) { return nil } +func (hi *HistoryChangesIterDBDup) advance2() (err error) { + var k, v []byte + if hi.txNum2kCursor == nil { + if hi.valsCursor, err = hi.roTx.CursorDupSort(hi.valsTable); err != nil { + return err + } + if hi.txNum2kCursor, err = hi.roTx.CursorDupSort(hi.idxKeysTable); err != nil { + return err + } + + k, v, err = hi.txNum2kCursor.Seek(hi.startTxKey[:]) + } else { + k, v, err = hi.txNum2kCursor.Next() + } + if err != nil { + return err + } + + if k == nil { + hi.nextKey = nil + return nil + } + + txNum := binary.BigEndian.Uint64(k) + if hi.endTxNum >= 0 && int(txNum) >= hi.endTxNum { + hi.nextKey = nil + return nil + } + + // not large: + // keys: txNum -> key1+key2 + // vals: key1+key2 -> txNum + value (DupSort) + + vv, err := hi.valsCursor.SeekBothRange(v, k) + if err != nil { + return err + } + if binary.BigEndian.Uint64(vv) != txNum { + panic(1) + } + hi.nextKey = v + hi.nextVal = vv[8:] + return nil +} + func (hi *HistoryChangesIterDBDup) HasNext() bool { if hi.err != nil { // always true, then .Next() call will return this error return true From 0610e2e32317cbea1b192e9f96b69439e542a6d9 Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Tue, 20 Jun 2023 10:57:25 +0700 Subject: [PATCH 4/4] e3: idx range method (#1028) --- state/aggregator_v3.go | 43 +++++++++++++++++++----------------------- 1 file changed, 19 insertions(+), 24 deletions(-) diff --git a/state/aggregator_v3.go b/state/aggregator_v3.go index 47bb4e850..373dbdad9 100644 --- a/state/aggregator_v3.go +++ b/state/aggregator_v3.go @@ -1339,30 +1339,25 @@ func (a *AggregatorV3) EnableMadvNormal() *AggregatorV3 { return a } -// -- range -func (ac *AggregatorV3Context) LogAddrRange(addr []byte, startTxNum, endTxNum int, asc order.By, limit int, tx kv.Tx) (iter.U64, error) { - return ac.logAddrs.IdxRange(addr, startTxNum, endTxNum, asc, limit, tx) -} - -func (ac *AggregatorV3Context) LogTopicRange(topic []byte, startTxNum, endTxNum int, asc order.By, limit int, tx kv.Tx) (iter.U64, error) { - return ac.logTopics.IdxRange(topic, startTxNum, endTxNum, asc, limit, tx) -} - -func (ac *AggregatorV3Context) TraceFromRange(addr []byte, startTxNum, endTxNum int, asc order.By, limit int, tx kv.Tx) (iter.U64, error) { - return ac.tracesFrom.IdxRange(addr, startTxNum, endTxNum, asc, limit, tx) -} - -func (ac *AggregatorV3Context) TraceToRange(addr []byte, startTxNum, endTxNum int, asc order.By, limit int, tx kv.Tx) (iter.U64, error) { - return ac.tracesTo.IdxRange(addr, startTxNum, endTxNum, asc, limit, tx) -} -func (ac *AggregatorV3Context) AccountHistoyIdxRange(addr []byte, startTxNum, endTxNum int, asc order.By, limit int, tx kv.Tx) (iter.U64, error) { - return ac.accounts.IdxRange(addr, startTxNum, endTxNum, asc, limit, tx) -} -func (ac *AggregatorV3Context) StorageHistoyIdxRange(addr []byte, startTxNum, endTxNum int, asc order.By, limit int, tx kv.Tx) (iter.U64, error) { - return ac.storage.IdxRange(addr, startTxNum, endTxNum, asc, limit, tx) -} -func (ac *AggregatorV3Context) CodeHistoyIdxRange(addr []byte, startTxNum, endTxNum int, asc order.By, limit int, tx kv.Tx) (iter.U64, error) { - return ac.code.IdxRange(addr, startTxNum, endTxNum, asc, limit, tx) +func (ac *AggregatorV3Context) IndexRange(name kv.InvertedIdx, k []byte, fromTs, toTs int, asc order.By, limit int, tx kv.Tx) (timestamps iter.U64, err error) { + switch name { + case kv.AccountsHistoryIdx: + return ac.accounts.IdxRange(k, fromTs, toTs, asc, limit, tx) + case kv.StorageHistoryIdx: + return ac.storage.IdxRange(k, fromTs, toTs, asc, limit, tx) + case kv.CodeHistoryIdx: + return ac.code.IdxRange(k, fromTs, toTs, asc, limit, tx) + case kv.LogTopicIdx: + return ac.logTopics.IdxRange(k, fromTs, toTs, asc, limit, tx) + case kv.LogAddrIdx: + return ac.logAddrs.IdxRange(k, fromTs, toTs, asc, limit, tx) + case kv.TracesFromIdx: + return ac.tracesFrom.IdxRange(k, fromTs, toTs, asc, limit, tx) + case kv.TracesToIdx: + return ac.tracesTo.IdxRange(k, fromTs, toTs, asc, limit, tx) + default: + return nil, fmt.Errorf("unexpected history name: %s", name) + } } // -- range end