From c68aca8a47e09affb4f93ad1d6881eaab6aacb9b Mon Sep 17 00:00:00 2001 From: envestcc Date: Wed, 14 Aug 2024 06:50:32 +0800 Subject: [PATCH 1/7] init blobstore --- actpool/blobstore.go | 203 +++++++++++++++++++++++++++++++++++++++++++ go.mod | 1 + 2 files changed, 204 insertions(+) create mode 100644 actpool/blobstore.go diff --git a/actpool/blobstore.go b/actpool/blobstore.go new file mode 100644 index 0000000000..7389137fa6 --- /dev/null +++ b/actpool/blobstore.go @@ -0,0 +1,203 @@ +package actpool + +import ( + "fmt" + "os" + "sync" + + "github.com/ethereum/go-ethereum/params" + "github.com/holiman/billy" + "github.com/iotexproject/go-pkgs/hash" + "github.com/pkg/errors" + "go.uber.org/zap" + + "github.com/iotexproject/iotex-core/action" + "github.com/iotexproject/iotex-core/pkg/log" +) + +const ( + // blobSize is the protocol constrained byte size of a single blob in a + // transaction. There can be multiple of these embedded into a single tx. + blobSize = params.BlobTxFieldElementsPerBlob * params.BlobTxBytesPerFieldElement + + // maxBlobsPerTransaction is the maximum number of blobs a single transaction + // is allowed to contain. Whilst the spec states it's unlimited, the block + // data slots are protocol bound, which implicitly also limit this. + maxBlobsPerTransaction = params.MaxBlobGasPerBlock / params.BlobTxBlobGasPerBlob + + // txAvgSize is an approximate byte size of a transaction metadata to avoid + // tiny overflows causing all txs to move a shelf higher, wasting disk space. + txAvgSize = 4 * 1024 + + // txMaxSize is the maximum size a single transaction can have, outside + // the included blobs. Since blob transactions are pulled instead of pushed, + // and only a small metadata is kept in ram, the rest is on disk, there is + // no critical limit that should be enforced. Still, capping it to some sane + // limit can never hurt. + txMaxSize = 1024 * 1024 +) + +type ( + blobStore struct { + config blobStoreConfig // Configuration for the blob store + + store billy.Database // Persistent data store for the tx metadata and blobs + stored uint64 // Useful data size of all transactions on disk + + lookup map[hash.Hash256]uint64 // Lookup table mapping hashes to tx billy entries + lock sync.RWMutex // Mutex protecting the store + + encode encodeAction // Encoder for the tx metadata and blobs + decode decodeAction // Decoder for the tx metadata and blobs + } + + blobStoreConfig struct { + Datadir string // Data directory containing the currently executable blobs + Datacap uint64 // Soft-cap of database storage (hard cap is larger due to overhead) + } + + onAction func(selp *action.SealedEnvelope) + encodeAction func(selp *action.SealedEnvelope) ([]byte, error) + decodeAction func([]byte) (*action.SealedEnvelope, error) +) + +var defaultBlobStoreConfig = blobStoreConfig{ + Datadir: "blobpool", + Datacap: 10 * 1024 * 1024 * 1024, +} + +var ( + ErrBlobNotFound = fmt.Errorf("blob not found") +) + +func newBlobStore(cfg blobStoreConfig, encode encodeAction, decode decodeAction) (*blobStore, error) { + if len(cfg.Datadir) == 0 { + return nil, errors.New("datadir is empty") + } + return &blobStore{ + config: cfg, + lookup: make(map[hash.Hash256]uint64), + encode: encode, + decode: decode, + }, nil +} + +func (s *blobStore) Open(onData onAction) error { + dir := s.config.Datadir + if err := os.MkdirAll(dir, 0700); err != nil { + return errors.Wrap(err, "failed to create blob store directory") + } + // Index all transactions on disk and delete anything inprocessable + var fails []uint64 + index := func(id uint64, size uint32, blob []byte) { + act, err := s.decode(blob) + if err != nil { + fails = append(fails, id) + log.L().Warn("Failed to decode action", zap.Error(err)) + return + } + s.stored += uint64(size) + h, _ := act.Hash() + s.lookup[h] = id + + onData(act) + } + store, err := billy.Open(billy.Options{Path: dir}, newSlotter(), index) + if err != nil { + return errors.Wrap(err, "failed to open blob store") + } + s.store = store + + if len(fails) > 0 { + log.L().Warn("Dropping invalidated blob transactions", zap.Int("count", len(fails))) + + for _, id := range fails { + if err := s.store.Delete(id); err != nil { + s.Close() + return errors.Wrap(err, "failed to delete blob from store") + } + } + } + return nil +} + +func (s *blobStore) Close() error { + return s.store.Close() +} + +func (s *blobStore) Get(hash hash.Hash256) (*action.SealedEnvelope, error) { + s.lock.RLock() + defer s.lock.RUnlock() + + id, ok := s.lookup[hash] + if !ok { + return nil, errors.Wrap(ErrBlobNotFound, "") + } + blob, err := s.store.Get(id) + if err != nil { + return nil, errors.Wrap(err, "failed to get blob from store") + } + return s.decode(blob) +} + +func (s *blobStore) Put(act *action.SealedEnvelope) error { + s.lock.Lock() + defer s.lock.Unlock() + h, _ := act.Hash() + // if action is already stored, nothing to do + if _, ok := s.lookup[h]; ok { + return nil + } + // insert it into the database and update the indices + blob, err := s.encode(act) + if err != nil { + return errors.Wrap(err, "failed to encode action") + } + id, err := s.store.Put(blob) + if err != nil { + return errors.Wrap(err, "failed to put blob into store") + } + s.stored += uint64(len(blob)) + s.lookup[h] = id + // if the datacap is exceeded, remove old data + if s.stored > s.config.Datacap { + // TODO: remove old data + } + return nil +} + +func (s *blobStore) Delete(hash hash.Hash256) error { + s.lock.Lock() + defer s.lock.Unlock() + + id, ok := s.lookup[hash] + if !ok { + return nil + } + if err := s.store.Delete(id); err != nil { + return errors.Wrap(err, "failed to delete blob from store") + } + delete(s.lookup, hash) + return nil +} + +// newSlotter creates a helper method for the Billy datastore that returns the +// individual shelf sizes used to store transactions in. +// +// The slotter will create shelves for each possible blob count + some tx metadata +// wiggle room, up to the max permitted limits. +// +// The slotter also creates a shelf for 0-blob transactions. Whilst those are not +// allowed in the current protocol, having an empty shelf is not a relevant use +// of resources, but it makes stress testing with junk transactions simpler. +func newSlotter() func() (uint32, bool) { + slotsize := uint32(txAvgSize) + slotsize -= uint32(blobSize) // underflows, it's ok, will overflow back in the first return + + return func() (size uint32, done bool) { + slotsize += blobSize + finished := slotsize > maxBlobsPerTransaction*blobSize+txMaxSize + + return slotsize, finished + } +} diff --git a/go.mod b/go.mod index 6ceba1f2cf..92fb659932 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,7 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.2.0 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/hashicorp/vault/api v1.1.0 + github.com/holiman/billy v0.0.0-20230718173358-1c7e68d277a7 github.com/holiman/uint256 v1.2.4 github.com/iotexproject/go-fsm v1.0.0 github.com/iotexproject/go-p2p v0.3.7-0.20240327085559-423bb9cc8f5f From 5bc10e7e1d80bb70ef9b6061ae3bc75151cb2241 Mon Sep 17 00:00:00 2001 From: envestcc Date: Wed, 14 Aug 2024 07:37:40 +0800 Subject: [PATCH 2/7] add test --- actpool/blobstore_test.go | 131 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 131 insertions(+) create mode 100644 actpool/blobstore_test.go diff --git a/actpool/blobstore_test.go b/actpool/blobstore_test.go new file mode 100644 index 0000000000..5f96f16e4c --- /dev/null +++ b/actpool/blobstore_test.go @@ -0,0 +1,131 @@ +package actpool + +import ( + "context" + "crypto/rand" + "math/big" + mrand "math/rand" + "path" + "testing" + + "github.com/golang/protobuf/proto" + "github.com/stretchr/testify/require" + + "github.com/iotexproject/go-pkgs/hash" + "github.com/iotexproject/iotex-proto/golang/iotextypes" + + "github.com/iotexproject/iotex-core/action" + "github.com/iotexproject/iotex-core/db" + "github.com/iotexproject/iotex-core/test/identityset" +) + +func TestBlobStore(t *testing.T) { + r := require.New(t) + cfg := blobStoreConfig{ + Datadir: t.TempDir(), + Datacap: 10 * 1024 * 1024, + } + t.Log(cfg) + encode := func(selp *action.SealedEnvelope) ([]byte, error) { + return proto.Marshal(selp.Proto()) + } + decode := func(blob []byte) (*action.SealedEnvelope, error) { + d := &action.Deserializer{} + d.SetEvmNetworkID(4689) + + a := &iotextypes.Action{} + if err := proto.Unmarshal(blob, a); err != nil { + return nil, err + } + se, err := d.ActionToSealedEnvelope(a) + if err != nil { + return nil, err + } + return se, nil + } + store, err := newBlobStore(cfg, encode, decode) + r.NoError(err) + r.NoError(store.Open(func(selp *action.SealedEnvelope) { + r.FailNow("should not be called") + })) + act, err := action.SignedExecution("", identityset.PrivateKey(1), 1, big.NewInt(1), 100, big.NewInt(100), nil) + r.NoError(err) + r.NoError(store.Put(act)) + r.NoError(store.Close()) + + store, err = newBlobStore(cfg, encode, decode) + r.NoError(err) + acts := []*action.SealedEnvelope{} + r.NoError(store.Open(func(selp *action.SealedEnvelope) { + acts = append(acts, selp) + })) + acts[0].Hash() + r.Len(acts, 1) + r.Equal(act, acts[0]) +} + +func BenchmarkDatabase(b *testing.B) { + r := require.New(b) + blobs := make([][]byte, 100) + for i := range blobs { + blob := make([]byte, 4096*32) + _, err := rand.Read(blob) + r.NoError(err) + blobs[i] = blob + } + keys := make([]hash.Hash256, 100) + for i := range keys { + key := make([]byte, 32) + _, err := rand.Read(key) + r.NoError(err) + copy(keys[i][:], key) + } + + b.Run("billy-put", func(b *testing.B) { + cfg := blobStoreConfig{ + Datadir: b.TempDir(), + Datacap: 10 * 1024 * 1024, + } + store, err := newBlobStore(cfg, nil, nil) + r.NoError(err) + r.NoError(store.Open(func(selp *action.SealedEnvelope) { + r.FailNow("should not be called") + })) + + b.StartTimer() + for i := 0; i < b.N; i++ { + _, err := store.store.Put(blobs[mrand.Intn(len(blobs))]) + r.NoError(err) + } + b.StopTimer() + r.NoError(store.store.Close()) + }) + b.Run("pebbledb-put", func(b *testing.B) { + cfg := db.DefaultConfig + cfg.DbPath = path.Join(b.TempDir(), "pebbledb") + store := db.NewPebbleDB(cfg) + r.NoError(store.Start(context.Background())) + b.StartTimer() + for i := 0; i < b.N; i++ { + idx := mrand.Intn(len(blobs)) + err := store.Put("ns", keys[idx][:], blobs[idx]) + r.NoError(err) + } + b.StopTimer() + r.NoError(store.Stop(context.Background())) + }) + b.Run("boltdb-put", func(b *testing.B) { + cfg := db.DefaultConfig + cfg.DbPath = path.Join(b.TempDir(), "boltdb") + store := db.NewBoltDB(cfg) + r.NoError(store.Start(context.Background())) + b.StartTimer() + for i := 0; i < b.N; i++ { + idx := mrand.Intn(len(blobs)) + err := store.Put("ns", keys[idx][:], blobs[idx]) + r.NoError(err) + } + b.StopTimer() + r.NoError(store.Stop(context.Background())) + }) +} From 0e51a96f98406238cddec1600fc3ac5ff327fea6 Mon Sep 17 00:00:00 2001 From: envestcc Date: Thu, 15 Aug 2024 10:42:42 +0800 Subject: [PATCH 3/7] blobstore evict --- actpool/blobstore.go | 59 +++++++++++++++++++++++++++++---------- actpool/blobstore_test.go | 16 ++++++----- 2 files changed, 54 insertions(+), 21 deletions(-) diff --git a/actpool/blobstore.go b/actpool/blobstore.go index 7389137fa6..9afd0fb126 100644 --- a/actpool/blobstore.go +++ b/actpool/blobstore.go @@ -1,6 +1,7 @@ package actpool import ( + "encoding/hex" "fmt" "os" "sync" @@ -39,29 +40,29 @@ const ( type ( blobStore struct { - config blobStoreConfig // Configuration for the blob store + config StoreConfig // Configuration for the blob store - store billy.Database // Persistent data store for the tx metadata and blobs + store billy.Database // Persistent data store for the tx stored uint64 // Useful data size of all transactions on disk lookup map[hash.Hash256]uint64 // Lookup table mapping hashes to tx billy entries lock sync.RWMutex // Mutex protecting the store - encode encodeAction // Encoder for the tx metadata and blobs - decode decodeAction // Decoder for the tx metadata and blobs + encode encodeAction // Encoder for the tx + decode decodeAction // Decoder for the tx } - - blobStoreConfig struct { - Datadir string // Data directory containing the currently executable blobs - Datacap uint64 // Soft-cap of database storage (hard cap is larger due to overhead) + // StoreConfig is the configuration for the blob store + StoreConfig struct { + Datadir string `yaml:"datadir"` // Data directory containing the currently executable blobs + Datacap uint64 `yaml:"datacap"` // Soft-cap of database storage (hard cap is larger due to overhead) } - onAction func(selp *action.SealedEnvelope) + onAction func(selp *action.SealedEnvelope) error encodeAction func(selp *action.SealedEnvelope) ([]byte, error) decodeAction func([]byte) (*action.SealedEnvelope, error) ) -var defaultBlobStoreConfig = blobStoreConfig{ +var defaultStoreConfig = StoreConfig{ Datadir: "blobpool", Datacap: 10 * 1024 * 1024 * 1024, } @@ -70,7 +71,7 @@ var ( ErrBlobNotFound = fmt.Errorf("blob not found") ) -func newBlobStore(cfg blobStoreConfig, encode encodeAction, decode decodeAction) (*blobStore, error) { +func newBlobStore(cfg StoreConfig, encode encodeAction, decode decodeAction) (*blobStore, error) { if len(cfg.Datadir) == 0 { return nil, errors.New("datadir is empty") } @@ -96,11 +97,14 @@ func (s *blobStore) Open(onData onAction) error { log.L().Warn("Failed to decode action", zap.Error(err)) return } + if err = onData(act); err != nil { + fails = append(fails, id) + log.L().Warn("Failed to process action", zap.Error(err)) + return + } s.stored += uint64(size) h, _ := act.Hash() s.lookup[h] = id - - onData(act) } store, err := billy.Open(billy.Options{Path: dir}, newSlotter(), index) if err != nil { @@ -161,7 +165,7 @@ func (s *blobStore) Put(act *action.SealedEnvelope) error { s.lookup[h] = id // if the datacap is exceeded, remove old data if s.stored > s.config.Datacap { - // TODO: remove old data + s.drop() } return nil } @@ -181,6 +185,33 @@ func (s *blobStore) Delete(hash hash.Hash256) error { return nil } +func (s *blobStore) drop() { + for { + h, ok := s.evict() + if !ok { + log.L().Error("no worst action found") + return + } + id, ok := s.lookup[h] + if !ok { + log.L().Error("worst action not found in lookup", zap.String("hash", hex.EncodeToString(h[:]))) + continue + } + if err := s.store.Delete(id); err != nil { + log.L().Error("failed to delete worst action", zap.Error(err)) + } + return + } +} + +// TODO: implement a proper eviction policy +func (s *blobStore) evict() (hash.Hash256, bool) { + for h := range s.lookup { + return h, true + } + return hash.ZeroHash256, false +} + // newSlotter creates a helper method for the Billy datastore that returns the // individual shelf sizes used to store transactions in. // diff --git a/actpool/blobstore_test.go b/actpool/blobstore_test.go index 5f96f16e4c..c8809bf788 100644 --- a/actpool/blobstore_test.go +++ b/actpool/blobstore_test.go @@ -21,11 +21,10 @@ import ( func TestBlobStore(t *testing.T) { r := require.New(t) - cfg := blobStoreConfig{ + cfg := StoreConfig{ Datadir: t.TempDir(), Datacap: 10 * 1024 * 1024, } - t.Log(cfg) encode := func(selp *action.SealedEnvelope) ([]byte, error) { return proto.Marshal(selp.Proto()) } @@ -45,8 +44,9 @@ func TestBlobStore(t *testing.T) { } store, err := newBlobStore(cfg, encode, decode) r.NoError(err) - r.NoError(store.Open(func(selp *action.SealedEnvelope) { + r.NoError(store.Open(func(selp *action.SealedEnvelope) error { r.FailNow("should not be called") + return nil })) act, err := action.SignedExecution("", identityset.PrivateKey(1), 1, big.NewInt(1), 100, big.NewInt(100), nil) r.NoError(err) @@ -56,8 +56,9 @@ func TestBlobStore(t *testing.T) { store, err = newBlobStore(cfg, encode, decode) r.NoError(err) acts := []*action.SealedEnvelope{} - r.NoError(store.Open(func(selp *action.SealedEnvelope) { + r.NoError(store.Open(func(selp *action.SealedEnvelope) error { acts = append(acts, selp) + return nil })) acts[0].Hash() r.Len(acts, 1) @@ -82,14 +83,15 @@ func BenchmarkDatabase(b *testing.B) { } b.Run("billy-put", func(b *testing.B) { - cfg := blobStoreConfig{ + cfg := StoreConfig{ Datadir: b.TempDir(), - Datacap: 10 * 1024 * 1024, + Datacap: 1024, } store, err := newBlobStore(cfg, nil, nil) r.NoError(err) - r.NoError(store.Open(func(selp *action.SealedEnvelope) { + r.NoError(store.Open(func(selp *action.SealedEnvelope) error { r.FailNow("should not be called") + return nil })) b.StartTimer() From a1911aba5bddb1939ec76341c44a7a9a4cffa35f Mon Sep 17 00:00:00 2001 From: envestcc Date: Thu, 15 Aug 2024 16:04:04 +0800 Subject: [PATCH 4/7] range blobstore --- actpool/blobstore.go | 31 +++++++++++++++++++++---------- actpool/blobstore_test.go | 4 ++-- 2 files changed, 23 insertions(+), 12 deletions(-) diff --git a/actpool/blobstore.go b/actpool/blobstore.go index 9afd0fb126..a867756868 100644 --- a/actpool/blobstore.go +++ b/actpool/blobstore.go @@ -40,7 +40,7 @@ const ( type ( blobStore struct { - config StoreConfig // Configuration for the blob store + config blobStoreConfig // Configuration for the blob store store billy.Database // Persistent data store for the tx stored uint64 // Useful data size of all transactions on disk @@ -51,8 +51,7 @@ type ( encode encodeAction // Encoder for the tx decode decodeAction // Decoder for the tx } - // StoreConfig is the configuration for the blob store - StoreConfig struct { + blobStoreConfig struct { Datadir string `yaml:"datadir"` // Data directory containing the currently executable blobs Datacap uint64 `yaml:"datacap"` // Soft-cap of database storage (hard cap is larger due to overhead) } @@ -62,16 +61,16 @@ type ( decodeAction func([]byte) (*action.SealedEnvelope, error) ) -var defaultStoreConfig = StoreConfig{ +var ( + errBlobNotFound = fmt.Errorf("blob not found") +) + +var defaultBlobStoreConfig = blobStoreConfig{ Datadir: "blobpool", Datacap: 10 * 1024 * 1024 * 1024, } -var ( - ErrBlobNotFound = fmt.Errorf("blob not found") -) - -func newBlobStore(cfg StoreConfig, encode encodeAction, decode decodeAction) (*blobStore, error) { +func newBlobStore(cfg blobStoreConfig, encode encodeAction, decode decodeAction) (*blobStore, error) { if len(cfg.Datadir) == 0 { return nil, errors.New("datadir is empty") } @@ -135,7 +134,7 @@ func (s *blobStore) Get(hash hash.Hash256) (*action.SealedEnvelope, error) { id, ok := s.lookup[hash] if !ok { - return nil, errors.Wrap(ErrBlobNotFound, "") + return nil, errors.Wrap(errBlobNotFound, "") } blob, err := s.store.Get(id) if err != nil { @@ -185,6 +184,18 @@ func (s *blobStore) Delete(hash hash.Hash256) error { return nil } +// Range iterates over all stored with hashes +func (s *blobStore) Range(fn func(hash.Hash256) bool) { + s.lock.RLock() + defer s.lock.RUnlock() + + for h := range s.lookup { + if !fn(h) { + return + } + } +} + func (s *blobStore) drop() { for { h, ok := s.evict() diff --git a/actpool/blobstore_test.go b/actpool/blobstore_test.go index c8809bf788..7244b75724 100644 --- a/actpool/blobstore_test.go +++ b/actpool/blobstore_test.go @@ -21,7 +21,7 @@ import ( func TestBlobStore(t *testing.T) { r := require.New(t) - cfg := StoreConfig{ + cfg := blobStoreConfig{ Datadir: t.TempDir(), Datacap: 10 * 1024 * 1024, } @@ -83,7 +83,7 @@ func BenchmarkDatabase(b *testing.B) { } b.Run("billy-put", func(b *testing.B) { - cfg := StoreConfig{ + cfg := blobStoreConfig{ Datadir: b.TempDir(), Datacap: 1024, } From 824475371df0c4d2b29ef112c0908e0abff61313 Mon Sep 17 00:00:00 2001 From: envestcc Date: Fri, 16 Aug 2024 11:06:57 +0800 Subject: [PATCH 5/7] fix delete --- actpool/blobstore.go | 1 + 1 file changed, 1 insertion(+) diff --git a/actpool/blobstore.go b/actpool/blobstore.go index a867756868..8bb4cd0af2 100644 --- a/actpool/blobstore.go +++ b/actpool/blobstore.go @@ -211,6 +211,7 @@ func (s *blobStore) drop() { if err := s.store.Delete(id); err != nil { log.L().Error("failed to delete worst action", zap.Error(err)) } + delete(s.lookup, h) return } } From 285a07b67a753954c31a06d3795edd9e98243a1f Mon Sep 17 00:00:00 2001 From: envestcc Date: Fri, 16 Aug 2024 17:48:56 +0800 Subject: [PATCH 6/7] protect --- actpool/blobstore.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/actpool/blobstore.go b/actpool/blobstore.go index 8bb4cd0af2..f3ec7e523a 100644 --- a/actpool/blobstore.go +++ b/actpool/blobstore.go @@ -63,6 +63,7 @@ type ( var ( errBlobNotFound = fmt.Errorf("blob not found") + errStoreNotOpen = fmt.Errorf("blob store is not open") ) var defaultBlobStoreConfig = blobStoreConfig{ @@ -83,6 +84,8 @@ func newBlobStore(cfg blobStoreConfig, encode encodeAction, decode decodeAction) } func (s *blobStore) Open(onData onAction) error { + s.lock.Lock() + defer s.lock.Unlock() dir := s.config.Datadir if err := os.MkdirAll(dir, 0700); err != nil { return errors.Wrap(err, "failed to create blob store directory") @@ -125,12 +128,17 @@ func (s *blobStore) Open(onData onAction) error { } func (s *blobStore) Close() error { + s.lock.Lock() + defer s.lock.Unlock() return s.store.Close() } func (s *blobStore) Get(hash hash.Hash256) (*action.SealedEnvelope, error) { s.lock.RLock() defer s.lock.RUnlock() + if s.store == nil { + return nil, errors.Wrap(errStoreNotOpen, "") + } id, ok := s.lookup[hash] if !ok { @@ -146,6 +154,9 @@ func (s *blobStore) Get(hash hash.Hash256) (*action.SealedEnvelope, error) { func (s *blobStore) Put(act *action.SealedEnvelope) error { s.lock.Lock() defer s.lock.Unlock() + if s.store == nil { + return errors.Wrap(errStoreNotOpen, "") + } h, _ := act.Hash() // if action is already stored, nothing to do if _, ok := s.lookup[h]; ok { @@ -172,6 +183,9 @@ func (s *blobStore) Put(act *action.SealedEnvelope) error { func (s *blobStore) Delete(hash hash.Hash256) error { s.lock.Lock() defer s.lock.Unlock() + if s.store == nil { + return errors.Wrap(errStoreNotOpen, "") + } id, ok := s.lookup[hash] if !ok { From c7c461ddfa29a8461b3b3cc4762d0292be957368 Mon Sep 17 00:00:00 2001 From: envestcc Date: Mon, 19 Aug 2024 11:03:41 +0800 Subject: [PATCH 7/7] address comment --- actpool/blobstore.go | 60 ++++++++++++++++++++++++++------------------ 1 file changed, 35 insertions(+), 25 deletions(-) diff --git a/actpool/blobstore.go b/actpool/blobstore.go index f3ec7e523a..38e1e4d955 100644 --- a/actpool/blobstore.go +++ b/actpool/blobstore.go @@ -13,6 +13,7 @@ import ( "go.uber.org/zap" "github.com/iotexproject/iotex-core/action" + "github.com/iotexproject/iotex-core/pkg/lifecycle" "github.com/iotexproject/iotex-core/pkg/log" ) @@ -40,6 +41,7 @@ const ( type ( blobStore struct { + lifecycle.Readiness config blobStoreConfig // Configuration for the blob store store billy.Database // Persistent data store for the tx @@ -86,6 +88,7 @@ func newBlobStore(cfg blobStoreConfig, encode encodeAction, decode decodeAction) func (s *blobStore) Open(onData onAction) error { s.lock.Lock() defer s.lock.Unlock() + dir := s.config.Datadir if err := os.MkdirAll(dir, 0700); err != nil { return errors.Wrap(err, "failed to create blob store directory") @@ -124,21 +127,26 @@ func (s *blobStore) Open(onData onAction) error { } } } - return nil + + return s.TurnOn() } func (s *blobStore) Close() error { s.lock.Lock() defer s.lock.Unlock() + + if err := s.TurnOff(); err != nil { + return err + } return s.store.Close() } func (s *blobStore) Get(hash hash.Hash256) (*action.SealedEnvelope, error) { - s.lock.RLock() - defer s.lock.RUnlock() - if s.store == nil { + if !s.IsReady() { return nil, errors.Wrap(errStoreNotOpen, "") } + s.lock.RLock() + defer s.lock.RUnlock() id, ok := s.lookup[hash] if !ok { @@ -152,11 +160,12 @@ func (s *blobStore) Get(hash hash.Hash256) (*action.SealedEnvelope, error) { } func (s *blobStore) Put(act *action.SealedEnvelope) error { - s.lock.Lock() - defer s.lock.Unlock() - if s.store == nil { + if !s.IsReady() { return errors.Wrap(errStoreNotOpen, "") } + s.lock.Lock() + defer s.lock.Unlock() + h, _ := act.Hash() // if action is already stored, nothing to do if _, ok := s.lookup[h]; ok { @@ -181,11 +190,11 @@ func (s *blobStore) Put(act *action.SealedEnvelope) error { } func (s *blobStore) Delete(hash hash.Hash256) error { - s.lock.Lock() - defer s.lock.Unlock() - if s.store == nil { + if !s.IsReady() { return errors.Wrap(errStoreNotOpen, "") } + s.lock.Lock() + defer s.lock.Unlock() id, ok := s.lookup[hash] if !ok { @@ -200,6 +209,9 @@ func (s *blobStore) Delete(hash hash.Hash256) error { // Range iterates over all stored with hashes func (s *blobStore) Range(fn func(hash.Hash256) bool) { + if !s.IsReady() { + return + } s.lock.RLock() defer s.lock.RUnlock() @@ -211,23 +223,21 @@ func (s *blobStore) Range(fn func(hash.Hash256) bool) { } func (s *blobStore) drop() { - for { - h, ok := s.evict() - if !ok { - log.L().Error("no worst action found") - return - } - id, ok := s.lookup[h] - if !ok { - log.L().Error("worst action not found in lookup", zap.String("hash", hex.EncodeToString(h[:]))) - continue - } - if err := s.store.Delete(id); err != nil { - log.L().Error("failed to delete worst action", zap.Error(err)) - } - delete(s.lookup, h) + h, ok := s.evict() + if !ok { + log.L().Debug("no worst action found") return } + id, ok := s.lookup[h] + if !ok { + log.L().Warn("worst action not found in lookup", zap.String("hash", hex.EncodeToString(h[:]))) + return + } + if err := s.store.Delete(id); err != nil { + log.L().Error("failed to delete worst action", zap.Error(err)) + } + delete(s.lookup, h) + return } // TODO: implement a proper eviction policy