Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[actpool] impl a store to cache actions on disk #4362

Merged
merged 8 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
270 changes: 270 additions & 0 deletions actpool/blobstore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,270 @@
package actpool

import (
"encoding/hex"
"fmt"
"os"
"sync"

"github.com/ethereum/go-ethereum/params"
"github.com/holiman/billy"
"github.com/iotexproject/go-pkgs/hash"
"github.com/pkg/errors"
"go.uber.org/zap"

"github.com/iotexproject/iotex-core/action"
"github.com/iotexproject/iotex-core/pkg/lifecycle"
"github.com/iotexproject/iotex-core/pkg/log"
)

const (
// blobSize is the protocol constrained byte size of a single blob in a
// transaction. There can be multiple of these embedded into a single tx.
blobSize = params.BlobTxFieldElementsPerBlob * params.BlobTxBytesPerFieldElement

// maxBlobsPerTransaction is the maximum number of blobs a single transaction
// is allowed to contain. Whilst the spec states it's unlimited, the block
// data slots are protocol bound, which implicitly also limit this.
maxBlobsPerTransaction = params.MaxBlobGasPerBlock / params.BlobTxBlobGasPerBlob

// txAvgSize is an approximate byte size of a transaction metadata to avoid
// tiny overflows causing all txs to move a shelf higher, wasting disk space.
txAvgSize = 4 * 1024

// txMaxSize is the maximum size a single transaction can have, outside
// the included blobs. Since blob transactions are pulled instead of pushed,
// and only a small metadata is kept in ram, the rest is on disk, there is
// no critical limit that should be enforced. Still, capping it to some sane
// limit can never hurt.
txMaxSize = 1024 * 1024
)

type (
blobStore struct {
lifecycle.Readiness
config blobStoreConfig // Configuration for the blob store

store billy.Database // Persistent data store for the tx
stored uint64 // Useful data size of all transactions on disk

lookup map[hash.Hash256]uint64 // Lookup table mapping hashes to tx billy entries
lock sync.RWMutex // Mutex protecting the store

encode encodeAction // Encoder for the tx
decode decodeAction // Decoder for the tx
}
blobStoreConfig struct {
Datadir string `yaml:"datadir"` // Data directory containing the currently executable blobs
dustinxie marked this conversation as resolved.
Show resolved Hide resolved
Datacap uint64 `yaml:"datacap"` // Soft-cap of database storage (hard cap is larger due to overhead)
}

onAction func(selp *action.SealedEnvelope) error
encodeAction func(selp *action.SealedEnvelope) ([]byte, error)
decodeAction func([]byte) (*action.SealedEnvelope, error)
)

var (
errBlobNotFound = fmt.Errorf("blob not found")
errStoreNotOpen = fmt.Errorf("blob store is not open")
)

var defaultBlobStoreConfig = blobStoreConfig{
Datadir: "blobpool",
Datacap: 10 * 1024 * 1024 * 1024,
dustinxie marked this conversation as resolved.
Show resolved Hide resolved
}

func newBlobStore(cfg blobStoreConfig, encode encodeAction, decode decodeAction) (*blobStore, error) {
if len(cfg.Datadir) == 0 {
return nil, errors.New("datadir is empty")

Check warning on line 78 in actpool/blobstore.go

View check run for this annotation

Codecov / codecov/patch

actpool/blobstore.go#L78

Added line #L78 was not covered by tests
}
return &blobStore{
config: cfg,
lookup: make(map[hash.Hash256]uint64),
encode: encode,
decode: decode,
}, nil
}

func (s *blobStore) Open(onData onAction) error {
s.lock.Lock()
defer s.lock.Unlock()

dir := s.config.Datadir
if err := os.MkdirAll(dir, 0700); err != nil {
dustinxie marked this conversation as resolved.
Show resolved Hide resolved
return errors.Wrap(err, "failed to create blob store directory")
}
// Index all transactions on disk and delete anything inprocessable
var fails []uint64
index := func(id uint64, size uint32, blob []byte) {
act, err := s.decode(blob)
if err != nil {
fails = append(fails, id)
log.L().Warn("Failed to decode action", zap.Error(err))
return

Check warning on line 103 in actpool/blobstore.go

View check run for this annotation

Codecov / codecov/patch

actpool/blobstore.go#L101-L103

Added lines #L101 - L103 were not covered by tests
}
if err = onData(act); err != nil {
dustinxie marked this conversation as resolved.
Show resolved Hide resolved
fails = append(fails, id)
log.L().Warn("Failed to process action", zap.Error(err))
return

Check warning on line 108 in actpool/blobstore.go

View check run for this annotation

Codecov / codecov/patch

actpool/blobstore.go#L106-L108

Added lines #L106 - L108 were not covered by tests
}
s.stored += uint64(size)
h, _ := act.Hash()
s.lookup[h] = id
}
store, err := billy.Open(billy.Options{Path: dir}, newSlotter(), index)
if err != nil {
return errors.Wrap(err, "failed to open blob store")
}
s.store = store

if len(fails) > 0 {
log.L().Warn("Dropping invalidated blob transactions", zap.Int("count", len(fails)))

Check warning on line 121 in actpool/blobstore.go

View check run for this annotation

Codecov / codecov/patch

actpool/blobstore.go#L121

Added line #L121 was not covered by tests

for _, id := range fails {
if err := s.store.Delete(id); err != nil {

Check warning on line 124 in actpool/blobstore.go

View check run for this annotation

Codecov / codecov/patch

actpool/blobstore.go#L123-L124

Added lines #L123 - L124 were not covered by tests
s.Close()
return errors.Wrap(err, "failed to delete blob from store")
}
}
}

return s.TurnOn()
}

func (s *blobStore) Close() error {
s.lock.Lock()
defer s.lock.Unlock()

if err := s.TurnOff(); err != nil {
return err
}
return s.store.Close()
}

func (s *blobStore) Get(hash hash.Hash256) (*action.SealedEnvelope, error) {
if !s.IsReady() {
return nil, errors.Wrap(errStoreNotOpen, "")

Check warning on line 146 in actpool/blobstore.go

View check run for this annotation

Codecov / codecov/patch

actpool/blobstore.go#L144-L146

Added lines #L144 - L146 were not covered by tests
}
s.lock.RLock()
defer s.lock.RUnlock()

Check warning on line 149 in actpool/blobstore.go

View check run for this annotation

Codecov / codecov/patch

actpool/blobstore.go#L148-L149

Added lines #L148 - L149 were not covered by tests

id, ok := s.lookup[hash]
if !ok {
return nil, errors.Wrap(errBlobNotFound, "")

Check warning on line 153 in actpool/blobstore.go

View check run for this annotation

Codecov / codecov/patch

actpool/blobstore.go#L151-L153

Added lines #L151 - L153 were not covered by tests
}
blob, err := s.store.Get(id)
if err != nil {

Check warning on line 156 in actpool/blobstore.go

View check run for this annotation

Codecov / codecov/patch

actpool/blobstore.go#L155-L156

Added lines #L155 - L156 were not covered by tests
return nil, errors.Wrap(err, "failed to get blob from store")
}
return s.decode(blob)

Check warning on line 159 in actpool/blobstore.go

View check run for this annotation

Codecov / codecov/patch

actpool/blobstore.go#L159

Added line #L159 was not covered by tests
}

func (s *blobStore) Put(act *action.SealedEnvelope) error {
if !s.IsReady() {
return errors.Wrap(errStoreNotOpen, "")

Check warning on line 164 in actpool/blobstore.go

View check run for this annotation

Codecov / codecov/patch

actpool/blobstore.go#L164

Added line #L164 was not covered by tests
}
s.lock.Lock()
defer s.lock.Unlock()

h, _ := act.Hash()
// if action is already stored, nothing to do
if _, ok := s.lookup[h]; ok {
return nil

Check warning on line 172 in actpool/blobstore.go

View check run for this annotation

Codecov / codecov/patch

actpool/blobstore.go#L172

Added line #L172 was not covered by tests
}
// insert it into the database and update the indices
blob, err := s.encode(act)
if err != nil {
return errors.Wrap(err, "failed to encode action")
}
id, err := s.store.Put(blob)
if err != nil {
return errors.Wrap(err, "failed to put blob into store")
}
s.stored += uint64(len(blob))
s.lookup[h] = id
// if the datacap is exceeded, remove old data
if s.stored > s.config.Datacap {
s.drop()

Check warning on line 187 in actpool/blobstore.go

View check run for this annotation

Codecov / codecov/patch

actpool/blobstore.go#L187

Added line #L187 was not covered by tests
}
return nil
}

func (s *blobStore) Delete(hash hash.Hash256) error {
if !s.IsReady() {
return errors.Wrap(errStoreNotOpen, "")

Check warning on line 194 in actpool/blobstore.go

View check run for this annotation

Codecov / codecov/patch

actpool/blobstore.go#L192-L194

Added lines #L192 - L194 were not covered by tests
}
s.lock.Lock()
defer s.lock.Unlock()

Check warning on line 197 in actpool/blobstore.go

View check run for this annotation

Codecov / codecov/patch

actpool/blobstore.go#L196-L197

Added lines #L196 - L197 were not covered by tests

id, ok := s.lookup[hash]
if !ok {
return nil

Check warning on line 201 in actpool/blobstore.go

View check run for this annotation

Codecov / codecov/patch

actpool/blobstore.go#L199-L201

Added lines #L199 - L201 were not covered by tests
}
if err := s.store.Delete(id); err != nil {

Check warning on line 203 in actpool/blobstore.go

View check run for this annotation

Codecov / codecov/patch

actpool/blobstore.go#L203

Added line #L203 was not covered by tests
return errors.Wrap(err, "failed to delete blob from store")
}
delete(s.lookup, hash)
return nil

Check warning on line 207 in actpool/blobstore.go

View check run for this annotation

Codecov / codecov/patch

actpool/blobstore.go#L206-L207

Added lines #L206 - L207 were not covered by tests
}

// Range iterates over all stored with hashes
func (s *blobStore) Range(fn func(hash.Hash256) bool) {
if !s.IsReady() {
return

Check warning on line 213 in actpool/blobstore.go

View check run for this annotation

Codecov / codecov/patch

actpool/blobstore.go#L211-L213

Added lines #L211 - L213 were not covered by tests
}
s.lock.RLock()
defer s.lock.RUnlock()

Check warning on line 216 in actpool/blobstore.go

View check run for this annotation

Codecov / codecov/patch

actpool/blobstore.go#L215-L216

Added lines #L215 - L216 were not covered by tests

for h := range s.lookup {
if !fn(h) {
return

Check warning on line 220 in actpool/blobstore.go

View check run for this annotation

Codecov / codecov/patch

actpool/blobstore.go#L218-L220

Added lines #L218 - L220 were not covered by tests
}
}
}

func (s *blobStore) drop() {
h, ok := s.evict()
if !ok {
log.L().Debug("no worst action found")
return

Check warning on line 229 in actpool/blobstore.go

View check run for this annotation

Codecov / codecov/patch

actpool/blobstore.go#L225-L229

Added lines #L225 - L229 were not covered by tests
}
id, ok := s.lookup[h]
if !ok {
log.L().Warn("worst action not found in lookup", zap.String("hash", hex.EncodeToString(h[:])))
return

Check warning on line 234 in actpool/blobstore.go

View check run for this annotation

Codecov / codecov/patch

actpool/blobstore.go#L231-L234

Added lines #L231 - L234 were not covered by tests
}
if err := s.store.Delete(id); err != nil {
log.L().Error("failed to delete worst action", zap.Error(err))

Check warning on line 237 in actpool/blobstore.go

View check run for this annotation

Codecov / codecov/patch

actpool/blobstore.go#L236-L237

Added lines #L236 - L237 were not covered by tests
}
delete(s.lookup, h)
return

Check warning on line 240 in actpool/blobstore.go

View check run for this annotation

Codecov / codecov/patch

actpool/blobstore.go#L239-L240

Added lines #L239 - L240 were not covered by tests
}

// TODO: implement a proper eviction policy
func (s *blobStore) evict() (hash.Hash256, bool) {
for h := range s.lookup {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a TODO, to find the worst action according to some policy

return h, true

Check warning on line 246 in actpool/blobstore.go

View check run for this annotation

Codecov / codecov/patch

actpool/blobstore.go#L244-L246

Added lines #L244 - L246 were not covered by tests
}
return hash.ZeroHash256, false

Check warning on line 248 in actpool/blobstore.go

View check run for this annotation

Codecov / codecov/patch

actpool/blobstore.go#L248

Added line #L248 was not covered by tests
}

// newSlotter creates a helper method for the Billy datastore that returns the
// individual shelf sizes used to store transactions in.
//
// The slotter will create shelves for each possible blob count + some tx metadata
// wiggle room, up to the max permitted limits.
//
// The slotter also creates a shelf for 0-blob transactions. Whilst those are not
// allowed in the current protocol, having an empty shelf is not a relevant use
// of resources, but it makes stress testing with junk transactions simpler.
func newSlotter() func() (uint32, bool) {
slotsize := uint32(txAvgSize)
slotsize -= uint32(blobSize) // underflows, it's ok, will overflow back in the first return

return func() (size uint32, done bool) {
slotsize += blobSize
finished := slotsize > maxBlobsPerTransaction*blobSize+txMaxSize

return slotsize, finished
}
}
Loading
Loading