From 099b52ead8ad88a30b17cdef059e47d545f5922d Mon Sep 17 00:00:00 2001 From: envestcc Date: Thu, 15 Aug 2024 10:55:53 +0800 Subject: [PATCH 1/5] actpool disk cache --- actpool/actpool.go | 90 ++++++++++++++++++++++++++++++++++++++++- actpool/config.go | 13 ++++++ actpool/options.go | 21 ++++++++++ actpool/queueworker.go | 7 +++- chainservice/builder.go | 27 ++++++++++++- 5 files changed, 154 insertions(+), 4 deletions(-) diff --git a/actpool/actpool.go b/actpool/actpool.go index 81f1a580f3..4284a64be1 100644 --- a/actpool/actpool.go +++ b/actpool/actpool.go @@ -11,6 +11,7 @@ import ( "sort" "sync" "sync/atomic" + "time" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -28,6 +29,7 @@ import ( "github.com/iotexproject/iotex-core/blockchain/genesis" "github.com/iotexproject/iotex-core/pkg/log" "github.com/iotexproject/iotex-core/pkg/prometheustimer" + "github.com/iotexproject/iotex-core/pkg/routine" "github.com/iotexproject/iotex-core/pkg/tracer" ) @@ -103,6 +105,9 @@ type actPool struct { senderBlackList map[string]bool jobQueue []chan workerJob worker []*queueWorker + + store *blobStore // store is the persistent cache for actpool + storeSync *routine.RecurringTask // storeSync is the recurring task to sync actions from store to memory } // NewActPool constructs a new actpool @@ -142,7 +147,7 @@ func NewActPool(g genesis.Genesis, sf protocol.StateReader, cfg Config, opts ... return nil, err } ap.timerFactory = timerFactory - + // TODO: move to Start for i := 0; i < _numWorker; i++ { ap.jobQueue[i] = make(chan workerJob, ap.cfg.WorkerBufferSize) ap.worker[i] = newQueueWorker(ap, ap.jobQueue[i]) @@ -153,6 +158,33 @@ func NewActPool(g genesis.Genesis, sf protocol.StateReader, cfg Config, opts ... return ap, nil } +func (ap *actPool) Start(ctx context.Context) error { + // open action store and load all actions + if ap.store != nil { + err := ap.store.Open(func(selp *action.SealedEnvelope) error { + return ap.add(ctx, selp) + }) + if err != nil { + return err + } + return ap.storeSync.Start(ctx) + } + return nil +} + +func (ap *actPool) Stop(ctx context.Context) error { + for i := 0; i < _numWorker; i++ { + if err := ap.worker[i].Stop(); err != nil { + return err + } + } + if ap.store != nil { + ap.storeSync.Stop(ctx) + return ap.store.Close() + } + return nil +} + func (ap *actPool) AddActionEnvelopeValidators(fs ...action.SealedEnvelopeValidator) { ap.actionEnvelopeValidators = append(ap.actionEnvelopeValidators, fs...) } @@ -217,6 +249,10 @@ func (ap *actPool) PendingActionMap() map[string][]*action.SealedEnvelope { } func (ap *actPool) Add(ctx context.Context, act *action.SealedEnvelope) error { + return ap.add(ctx, act) +} + +func (ap *actPool) add(ctx context.Context, act *action.SealedEnvelope) error { ctx, span := tracer.NewSpan(ap.context(ctx), "actPool.Add") defer span.End() ctx = ap.context(ctx) @@ -345,6 +381,16 @@ func (ap *actPool) GetUnconfirmedActs(addrStr string) []*action.SealedEnvelope { func (ap *actPool) GetActionByHash(hash hash.Hash256) (*action.SealedEnvelope, error) { act, ok := ap.allActions.Get(hash) if !ok { + if ap.store != nil { + act, err := ap.store.Get(hash) + switch errors.Cause(err) { + case nil: + return act, nil + case errBlobNotFound: + default: + return nil, err + } + } return nil, errors.Wrapf(action.ErrNotFound, "action hash %x does not exist in pool", hash) } return act.(*action.SealedEnvelope), nil @@ -412,6 +458,21 @@ func (ap *actPool) validate(ctx context.Context, selp *action.SealedEnvelope) er return nil } +func (ap *actPool) removeReplacedActs(acts []*action.SealedEnvelope) { + for _, act := range acts { + hash, err := act.Hash() + if err != nil { + log.L().Debug("Skipping action due to hash error", zap.Error(err)) + continue + } + log.L().Debug("Removed replaced action.", log.Hex("hash", hash[:])) + ap.allActions.Delete(hash) + intrinsicGas, _ := act.IntrinsicGas() + atomic.AddUint64(&ap.gasInPool, ^uint64(intrinsicGas-1)) + ap.accountDesActs.delete(act) + } +} + func (ap *actPool) removeInvalidActs(acts []*action.SealedEnvelope) { for _, act := range acts { hash, err := act.Hash() @@ -424,6 +485,11 @@ func (ap *actPool) removeInvalidActs(acts []*action.SealedEnvelope) { intrinsicGas, _ := act.IntrinsicGas() atomic.AddUint64(&ap.gasInPool, ^uint64(intrinsicGas-1)) ap.accountDesActs.delete(act) + if ap.store != nil { + if err = ap.store.Delete(hash); err != nil { + log.L().Warn("Failed to delete action from store", zap.Error(err), log.Hex("hash", hash[:])) + } + } } } @@ -461,6 +527,28 @@ func (ap *actPool) allocatedWorker(senderAddr address.Address) int { return int(lastByte) % _numWorker } +func (ap *actPool) syncFromStore() { + if ap.store == nil { + return + } + ap.store.Range(func(h hash.Hash256) bool { + if _, exist := ap.allActions.Get(h); !exist { + act, err := ap.store.Get(h) + if err != nil { + log.L().Warn("Failed to get action from store", zap.Error(err)) + return true + } + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + if err := ap.add(ctx, act); err != nil { + log.L().Warn("Failed to add action to pool", zap.Error(err)) + return true + } + } + return true + }) +} + type destinationMap struct { mu sync.Mutex acts map[string]map[hash.Hash256]*action.SealedEnvelope diff --git a/actpool/config.go b/actpool/config.go index 453f8d3efc..f69ac8f926 100644 --- a/actpool/config.go +++ b/actpool/config.go @@ -37,6 +37,8 @@ type Config struct { MinGasPriceStr string `yaml:"minGasPrice"` // BlackList lists the account address that are banned from initiating actions BlackList []string `yaml:"blackList"` + // Store defines the config for persistent cache + Store *StoreConfig `yaml:"store"` } // MinGasPrice returns the minimal gas price threshold @@ -47,3 +49,14 @@ func (ap Config) MinGasPrice() *big.Int { } return mgp } + +// StoreConfig is the configuration for the blob store +type StoreConfig struct { + Store blobStoreConfig `yaml:"store"` + ReadInterval time.Duration `yaml:"readInterval"` // Interval to read from store to actpool memory +} + +var defaultStoreConfig = StoreConfig{ + Store: defaultBlobStoreConfig, + ReadInterval: 10 * time.Minute, +} diff --git a/actpool/options.go b/actpool/options.go index 821f9523ab..335cfde19d 100644 --- a/actpool/options.go +++ b/actpool/options.go @@ -6,9 +6,12 @@ package actpool import ( + "errors" "time" "github.com/facebookgo/clock" + + "github.com/iotexproject/iotex-core/pkg/routine" ) // ActQueueOption is the option for actQueue. @@ -33,3 +36,21 @@ func WithTimeOut(ttl time.Duration) interface{ ActQueueOption } { } func (o *ttlOption) SetActQueueOption(aq *actQueue) { aq.ttl = o.ttl } + +// WithStore is the option to set store encode and decode functions. +func WithStore(cfg StoreConfig, encode encodeAction, decode decodeAction) func(*actPool) error { + return func(a *actPool) error { + if encode == nil || decode == nil { + return errors.New("encode and decode functions must be provided") + } + store, err := newBlobStore(cfg.Store, encode, decode) + if err != nil { + return err + } + a.store = store + a.storeSync = routine.NewRecurringTask(func() { + a.syncFromStore() + }, cfg.ReadInterval) + return nil + } +} diff --git a/actpool/queueworker.go b/actpool/queueworker.go index aa78871f6c..54fcebfaf5 100644 --- a/actpool/queueworker.go +++ b/actpool/queueworker.go @@ -105,6 +105,11 @@ func (worker *queueWorker) Handle(job workerJob) error { } worker.ap.allActions.Set(actHash, act) + if worker.ap.store != nil { + if err := worker.ap.store.Put(act); err != nil { + log.L().Warn("failed to store action", zap.Error(err), log.Hex("hash", actHash[:])) + } + } if desAddress, ok := act.Destination(); ok && !strings.EqualFold(sender, desAddress) { if err := worker.ap.accountDesActs.addAction(act); err != nil { @@ -123,7 +128,7 @@ func (worker *queueWorker) Handle(job workerJob) error { log.L().Warn("UNEXPECTED ERROR: action pool is full, but no action to drop") return nil } - worker.ap.removeInvalidActs([]*action.SealedEnvelope{actToReplace}) + worker.ap.removeReplacedActs([]*action.SealedEnvelope{actToReplace}) if actToReplace.SenderAddress().String() == sender && actToReplace.Nonce() == nonce { err = action.ErrTxPoolOverflow _actpoolMtc.WithLabelValues("overMaxNumActsPerPool").Inc() diff --git a/chainservice/builder.go b/chainservice/builder.go index 16cb6ab656..9122ecd6cd 100644 --- a/chainservice/builder.go +++ b/chainservice/builder.go @@ -12,6 +12,7 @@ import ( "github.com/iotexproject/iotex-address/address" "github.com/iotexproject/iotex-election/committee" + "github.com/iotexproject/iotex-proto/golang/iotextypes" "github.com/pkg/errors" "go.uber.org/zap" "google.golang.org/protobuf/proto" @@ -245,7 +246,29 @@ func (builder *Builder) createElectionCommittee() (committee.Committee, error) { func (builder *Builder) buildActionPool() error { if builder.cs.actpool == nil { - ac, err := actpool.NewActPool(builder.cfg.Genesis, builder.cs.factory, builder.cfg.ActPool) + options := []actpool.Option{} + if builder.cfg.ActPool.Store != nil { + evmID := builder.cfg.Chain.EVMNetworkID + options = append(options, actpool.WithStore( + *builder.cfg.ActPool.Store, + func(selp *action.SealedEnvelope) ([]byte, error) { + return proto.Marshal(selp.Proto()) + }, + func(blob []byte) (*action.SealedEnvelope, error) { + d := &action.Deserializer{} + d.SetEvmNetworkID(evmID) + a := &iotextypes.Action{} + if err := proto.Unmarshal(blob, a); err != nil { + return nil, err + } + se, err := d.ActionToSealedEnvelope(a) + if err != nil { + return nil, err + } + return se, nil + })) + } + ac, err := actpool.NewActPool(builder.cfg.Genesis, builder.cs.factory, builder.cfg.ActPool, options...) if err != nil { return errors.Wrap(err, "failed to create actpool") } @@ -428,7 +451,7 @@ func (builder *Builder) createGateWayComponents(forTest bool) ( func (builder *Builder) buildBlockchain(forSubChain, forTest bool) error { builder.cs.chain = builder.createBlockchain(forSubChain, forTest) builder.cs.lifecycle.Add(builder.cs.chain) - + builder.cs.lifecycle.Add(builder.cs.actpool) if err := builder.cs.chain.AddSubscriber(builder.cs.actpool); err != nil { return errors.Wrap(err, "failed to add actpool as subscriber") } From e276d52670a6c9dec1997e22f4957378f3d58287 Mon Sep 17 00:00:00 2001 From: envestcc Date: Tue, 20 Aug 2024 11:58:56 +0800 Subject: [PATCH 2/5] address comment --- chainservice/builder.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/chainservice/builder.go b/chainservice/builder.go index 9122ecd6cd..e8480595c2 100644 --- a/chainservice/builder.go +++ b/chainservice/builder.go @@ -248,15 +248,14 @@ func (builder *Builder) buildActionPool() error { if builder.cs.actpool == nil { options := []actpool.Option{} if builder.cfg.ActPool.Store != nil { - evmID := builder.cfg.Chain.EVMNetworkID + d := &action.Deserializer{} + d.SetEvmNetworkID(builder.cfg.Chain.EVMNetworkID) options = append(options, actpool.WithStore( *builder.cfg.ActPool.Store, func(selp *action.SealedEnvelope) ([]byte, error) { return proto.Marshal(selp.Proto()) }, func(blob []byte) (*action.SealedEnvelope, error) { - d := &action.Deserializer{} - d.SetEvmNetworkID(evmID) a := &iotextypes.Action{} if err := proto.Unmarshal(blob, a); err != nil { return nil, err From b6b4365b0771e6b8d8637b528c7c447d839492cf Mon Sep 17 00:00:00 2001 From: envestcc Date: Wed, 21 Aug 2024 22:32:21 +0800 Subject: [PATCH 3/5] address comment --- actpool/{blobstore.go => actionstore.go} | 30 +++++++++---------- ...{blobstore_test.go => actionstore_test.go} | 4 +-- actpool/actpool.go | 2 +- actpool/config.go | 6 ++-- actpool/queueworker.go | 3 +- 5 files changed, 23 insertions(+), 22 deletions(-) rename actpool/{blobstore.go => actionstore.go} (89%) rename actpool/{blobstore_test.go => actionstore_test.go} (98%) diff --git a/actpool/blobstore.go b/actpool/actionstore.go similarity index 89% rename from actpool/blobstore.go rename to actpool/actionstore.go index 38e1e4d955..9e5804f7bb 100644 --- a/actpool/blobstore.go +++ b/actpool/actionstore.go @@ -40,9 +40,9 @@ const ( ) type ( - blobStore struct { + actionStore struct { lifecycle.Readiness - config blobStoreConfig // Configuration for the blob store + config actionStoreConfig // Configuration for the blob store store billy.Database // Persistent data store for the tx stored uint64 // Useful data size of all transactions on disk @@ -53,7 +53,7 @@ type ( encode encodeAction // Encoder for the tx decode decodeAction // Decoder for the tx } - blobStoreConfig struct { + actionStoreConfig struct { Datadir string `yaml:"datadir"` // Data directory containing the currently executable blobs Datacap uint64 `yaml:"datacap"` // Soft-cap of database storage (hard cap is larger due to overhead) } @@ -68,16 +68,16 @@ var ( errStoreNotOpen = fmt.Errorf("blob store is not open") ) -var defaultBlobStoreConfig = blobStoreConfig{ - Datadir: "blobpool", +var defaultActionStoreConfig = actionStoreConfig{ + Datadir: "actionstore", Datacap: 10 * 1024 * 1024 * 1024, } -func newBlobStore(cfg blobStoreConfig, encode encodeAction, decode decodeAction) (*blobStore, error) { +func newBlobStore(cfg actionStoreConfig, encode encodeAction, decode decodeAction) (*actionStore, error) { if len(cfg.Datadir) == 0 { return nil, errors.New("datadir is empty") } - return &blobStore{ + return &actionStore{ config: cfg, lookup: make(map[hash.Hash256]uint64), encode: encode, @@ -85,7 +85,7 @@ func newBlobStore(cfg blobStoreConfig, encode encodeAction, decode decodeAction) }, nil } -func (s *blobStore) Open(onData onAction) error { +func (s *actionStore) Open(onData onAction) error { s.lock.Lock() defer s.lock.Unlock() @@ -131,7 +131,7 @@ func (s *blobStore) Open(onData onAction) error { return s.TurnOn() } -func (s *blobStore) Close() error { +func (s *actionStore) Close() error { s.lock.Lock() defer s.lock.Unlock() @@ -141,7 +141,7 @@ func (s *blobStore) Close() error { return s.store.Close() } -func (s *blobStore) Get(hash hash.Hash256) (*action.SealedEnvelope, error) { +func (s *actionStore) Get(hash hash.Hash256) (*action.SealedEnvelope, error) { if !s.IsReady() { return nil, errors.Wrap(errStoreNotOpen, "") } @@ -159,7 +159,7 @@ func (s *blobStore) Get(hash hash.Hash256) (*action.SealedEnvelope, error) { return s.decode(blob) } -func (s *blobStore) Put(act *action.SealedEnvelope) error { +func (s *actionStore) Put(act *action.SealedEnvelope) error { if !s.IsReady() { return errors.Wrap(errStoreNotOpen, "") } @@ -189,7 +189,7 @@ func (s *blobStore) Put(act *action.SealedEnvelope) error { return nil } -func (s *blobStore) Delete(hash hash.Hash256) error { +func (s *actionStore) Delete(hash hash.Hash256) error { if !s.IsReady() { return errors.Wrap(errStoreNotOpen, "") } @@ -208,7 +208,7 @@ func (s *blobStore) Delete(hash hash.Hash256) error { } // Range iterates over all stored with hashes -func (s *blobStore) Range(fn func(hash.Hash256) bool) { +func (s *actionStore) Range(fn func(hash.Hash256) bool) { if !s.IsReady() { return } @@ -222,7 +222,7 @@ func (s *blobStore) Range(fn func(hash.Hash256) bool) { } } -func (s *blobStore) drop() { +func (s *actionStore) drop() { h, ok := s.evict() if !ok { log.L().Debug("no worst action found") @@ -241,7 +241,7 @@ func (s *blobStore) drop() { } // TODO: implement a proper eviction policy -func (s *blobStore) evict() (hash.Hash256, bool) { +func (s *actionStore) evict() (hash.Hash256, bool) { for h := range s.lookup { return h, true } diff --git a/actpool/blobstore_test.go b/actpool/actionstore_test.go similarity index 98% rename from actpool/blobstore_test.go rename to actpool/actionstore_test.go index 7244b75724..879fc55758 100644 --- a/actpool/blobstore_test.go +++ b/actpool/actionstore_test.go @@ -21,7 +21,7 @@ import ( func TestBlobStore(t *testing.T) { r := require.New(t) - cfg := blobStoreConfig{ + cfg := actionStoreConfig{ Datadir: t.TempDir(), Datacap: 10 * 1024 * 1024, } @@ -83,7 +83,7 @@ func BenchmarkDatabase(b *testing.B) { } b.Run("billy-put", func(b *testing.B) { - cfg := blobStoreConfig{ + cfg := actionStoreConfig{ Datadir: b.TempDir(), Datacap: 1024, } diff --git a/actpool/actpool.go b/actpool/actpool.go index 4284a64be1..5509d2cccc 100644 --- a/actpool/actpool.go +++ b/actpool/actpool.go @@ -106,7 +106,7 @@ type actPool struct { jobQueue []chan workerJob worker []*queueWorker - store *blobStore // store is the persistent cache for actpool + store *actionStore // store is the persistent cache for actpool storeSync *routine.RecurringTask // storeSync is the recurring task to sync actions from store to memory } diff --git a/actpool/config.go b/actpool/config.go index f69ac8f926..48b62e8cc1 100644 --- a/actpool/config.go +++ b/actpool/config.go @@ -52,11 +52,11 @@ func (ap Config) MinGasPrice() *big.Int { // StoreConfig is the configuration for the blob store type StoreConfig struct { - Store blobStoreConfig `yaml:"store"` - ReadInterval time.Duration `yaml:"readInterval"` // Interval to read from store to actpool memory + Store actionStoreConfig `yaml:"store"` + ReadInterval time.Duration `yaml:"readInterval"` // Interval to read from store to actpool memory } var defaultStoreConfig = StoreConfig{ - Store: defaultBlobStoreConfig, + Store: defaultActionStoreConfig, ReadInterval: 10 * time.Minute, } diff --git a/actpool/queueworker.go b/actpool/queueworker.go index 54fcebfaf5..c3560f2d8f 100644 --- a/actpool/queueworker.go +++ b/actpool/queueworker.go @@ -105,7 +105,8 @@ func (worker *queueWorker) Handle(job workerJob) error { } worker.ap.allActions.Set(actHash, act) - if worker.ap.store != nil { + isBlobTx := false // TODO: only store blob tx + if worker.ap.store != nil && isBlobTx { if err := worker.ap.store.Put(act); err != nil { log.L().Warn("failed to store action", zap.Error(err), log.Hex("hash", actHash[:])) } From 1f104cd76e9572fb469b96a7dd277f333a8ed489 Mon Sep 17 00:00:00 2001 From: envestcc Date: Wed, 11 Sep 2024 11:41:59 +0800 Subject: [PATCH 4/5] address comment --- actpool/actionstore.go | 4 ++-- actpool/actionstore_test.go | 6 +++--- actpool/actpool.go | 39 ++++++++++++------------------------- actpool/options.go | 2 +- actpool/queueworker.go | 2 +- 5 files changed, 19 insertions(+), 34 deletions(-) diff --git a/actpool/actionstore.go b/actpool/actionstore.go index 9e5804f7bb..a6c51282cc 100644 --- a/actpool/actionstore.go +++ b/actpool/actionstore.go @@ -70,10 +70,10 @@ var ( var defaultActionStoreConfig = actionStoreConfig{ Datadir: "actionstore", - Datacap: 10 * 1024 * 1024 * 1024, + Datacap: 1024 * 1024 * 1024, } -func newBlobStore(cfg actionStoreConfig, encode encodeAction, decode decodeAction) (*actionStore, error) { +func newActionStore(cfg actionStoreConfig, encode encodeAction, decode decodeAction) (*actionStore, error) { if len(cfg.Datadir) == 0 { return nil, errors.New("datadir is empty") } diff --git a/actpool/actionstore_test.go b/actpool/actionstore_test.go index 879fc55758..c7cb7508e7 100644 --- a/actpool/actionstore_test.go +++ b/actpool/actionstore_test.go @@ -42,7 +42,7 @@ func TestBlobStore(t *testing.T) { } return se, nil } - store, err := newBlobStore(cfg, encode, decode) + store, err := newActionStore(cfg, encode, decode) r.NoError(err) r.NoError(store.Open(func(selp *action.SealedEnvelope) error { r.FailNow("should not be called") @@ -53,7 +53,7 @@ func TestBlobStore(t *testing.T) { r.NoError(store.Put(act)) r.NoError(store.Close()) - store, err = newBlobStore(cfg, encode, decode) + store, err = newActionStore(cfg, encode, decode) r.NoError(err) acts := []*action.SealedEnvelope{} r.NoError(store.Open(func(selp *action.SealedEnvelope) error { @@ -87,7 +87,7 @@ func BenchmarkDatabase(b *testing.B) { Datadir: b.TempDir(), Datacap: 1024, } - store, err := newBlobStore(cfg, nil, nil) + store, err := newActionStore(cfg, nil, nil) r.NoError(err) r.NoError(store.Open(func(selp *action.SealedEnvelope) error { r.FailNow("should not be called") diff --git a/actpool/actpool.go b/actpool/actpool.go index 5509d2cccc..f689c03f98 100644 --- a/actpool/actpool.go +++ b/actpool/actpool.go @@ -380,20 +380,20 @@ func (ap *actPool) GetUnconfirmedActs(addrStr string) []*action.SealedEnvelope { // GetActionByHash returns the pending action in pool given action's hash func (ap *actPool) GetActionByHash(hash hash.Hash256) (*action.SealedEnvelope, error) { act, ok := ap.allActions.Get(hash) - if !ok { - if ap.store != nil { - act, err := ap.store.Get(hash) - switch errors.Cause(err) { - case nil: - return act, nil - case errBlobNotFound: - default: - return nil, err - } + if ok { + return act.(*action.SealedEnvelope), nil + } + if ap.store != nil { + act, err := ap.store.Get(hash) + switch errors.Cause(err) { + case nil: + return act, nil + case errBlobNotFound: + default: + return nil, err } - return nil, errors.Wrapf(action.ErrNotFound, "action hash %x does not exist in pool", hash) } - return act.(*action.SealedEnvelope), nil + return nil, errors.Wrapf(action.ErrNotFound, "action hash %x does not exist in pool", hash) } // GetSize returns the act pool size @@ -458,21 +458,6 @@ func (ap *actPool) validate(ctx context.Context, selp *action.SealedEnvelope) er return nil } -func (ap *actPool) removeReplacedActs(acts []*action.SealedEnvelope) { - for _, act := range acts { - hash, err := act.Hash() - if err != nil { - log.L().Debug("Skipping action due to hash error", zap.Error(err)) - continue - } - log.L().Debug("Removed replaced action.", log.Hex("hash", hash[:])) - ap.allActions.Delete(hash) - intrinsicGas, _ := act.IntrinsicGas() - atomic.AddUint64(&ap.gasInPool, ^uint64(intrinsicGas-1)) - ap.accountDesActs.delete(act) - } -} - func (ap *actPool) removeInvalidActs(acts []*action.SealedEnvelope) { for _, act := range acts { hash, err := act.Hash() diff --git a/actpool/options.go b/actpool/options.go index 335cfde19d..235ecc6dd0 100644 --- a/actpool/options.go +++ b/actpool/options.go @@ -43,7 +43,7 @@ func WithStore(cfg StoreConfig, encode encodeAction, decode decodeAction) func(* if encode == nil || decode == nil { return errors.New("encode and decode functions must be provided") } - store, err := newBlobStore(cfg.Store, encode, decode) + store, err := newActionStore(cfg.Store, encode, decode) if err != nil { return err } diff --git a/actpool/queueworker.go b/actpool/queueworker.go index c3560f2d8f..66ba926a2f 100644 --- a/actpool/queueworker.go +++ b/actpool/queueworker.go @@ -129,7 +129,7 @@ func (worker *queueWorker) Handle(job workerJob) error { log.L().Warn("UNEXPECTED ERROR: action pool is full, but no action to drop") return nil } - worker.ap.removeReplacedActs([]*action.SealedEnvelope{actToReplace}) + worker.ap.removeInvalidActs([]*action.SealedEnvelope{actToReplace}) if actToReplace.SenderAddress().String() == sender && actToReplace.Nonce() == nonce { err = action.ErrTxPoolOverflow _actpoolMtc.WithLabelValues("overMaxNumActsPerPool").Inc() From 68cbd80a3e9d9bdad04c9bebc59d89aa4ccde020 Mon Sep 17 00:00:00 2001 From: envestcc Date: Wed, 18 Sep 2024 19:19:48 +0800 Subject: [PATCH 5/5] address comment --- actpool/actpool.go | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/actpool/actpool.go b/actpool/actpool.go index f689c03f98..4213c0408d 100644 --- a/actpool/actpool.go +++ b/actpool/actpool.go @@ -159,17 +159,16 @@ func NewActPool(g genesis.Genesis, sf protocol.StateReader, cfg Config, opts ... } func (ap *actPool) Start(ctx context.Context) error { - // open action store and load all actions - if ap.store != nil { - err := ap.store.Open(func(selp *action.SealedEnvelope) error { - return ap.add(ctx, selp) - }) - if err != nil { - return err - } - return ap.storeSync.Start(ctx) + if ap.store == nil { + return nil } - return nil + err := ap.store.Open(func(selp *action.SealedEnvelope) error { + return ap.add(ctx, selp) + }) + if err != nil { + return err + } + return ap.storeSync.Start(ctx) } func (ap *actPool) Stop(ctx context.Context) error {