Skip to content

Commit

Permalink
Merge pull request #5751 from multiversx/sharded-persister-static-sto…
Browse files Browse the repository at this point in the history
…rers

Use sharded persister for static storers
  • Loading branch information
iulianpascalau authored Dec 7, 2023
2 parents 861ba44 + ab86556 commit dce39c1
Show file tree
Hide file tree
Showing 14 changed files with 228 additions and 45 deletions.
17 changes: 8 additions & 9 deletions dataRetriever/factory/dataPoolFactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,14 +176,13 @@ func createTrieSyncDB(args ArgsDataPool) (storage.Persister, error) {
return disabled.NewPersister(), nil
}

dbCfg := factory.GetDBFromConfig(mainConfig.TrieSyncStorage.DB)
shardId := core.GetShardIDString(args.ShardCoordinator.SelfId())
argDB := storageunit.ArgDB{
DBType: dbCfg.Type,
Path: args.PathManager.PathForStatic(shardId, mainConfig.TrieSyncStorage.DB.FilePath),
BatchDelaySeconds: dbCfg.BatchDelaySeconds,
MaxBatchSize: dbCfg.MaxBatchSize,
MaxOpenFiles: dbCfg.MaxOpenFiles,
path := args.PathManager.PathForStatic(shardId, mainConfig.TrieSyncStorage.DB.FilePath)

dbConfigHandler := factory.NewDBConfigHandler(mainConfig.TrieSyncStorage.DB)
persisterFactory, err := factory.NewPersisterFactory(dbConfigHandler)
if err != nil {
return nil, err
}

if mainConfig.TrieSyncStorage.DB.UseTmpAsFilePath {
Expand All @@ -192,10 +191,10 @@ func createTrieSyncDB(args ArgsDataPool) (storage.Persister, error) {
return nil, errTempDir
}

argDB.Path = filePath
path = filePath
}

db, err := storageunit.NewDB(argDB)
db, err := storageunit.NewDB(persisterFactory, path)
if err != nil {
return nil, fmt.Errorf("%w while creating the db for the trie nodes", err)
}
Expand Down
14 changes: 10 additions & 4 deletions epochStart/metachain/systemSCs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/multiversx/mx-chain-go/state/storagePruningManager"
"github.com/multiversx/mx-chain-go/state/storagePruningManager/evictionWaitingList"
"github.com/multiversx/mx-chain-go/storage"
storageFactory "github.com/multiversx/mx-chain-go/storage/factory"
"github.com/multiversx/mx-chain-go/storage/storageunit"
"github.com/multiversx/mx-chain-go/testscommon"
"github.com/multiversx/mx-chain-go/testscommon/cryptoMocks"
Expand Down Expand Up @@ -76,16 +77,21 @@ func createPhysicalUnit(t *testing.T) (storage.Storer, string) {
Shards: 0,
}
dir := t.TempDir()
persisterConfig := storageunit.ArgDB{
Path: dir,
DBType: "LvlDBSerial",

dbConfig := config.DBConfig{
FilePath: dir,
Type: "LvlDBSerial",
BatchDelaySeconds: 2,
MaxBatchSize: 45000,
MaxOpenFiles: 10,
}

dbConfigHandler := storageFactory.NewDBConfigHandler(dbConfig)
persisterFactory, err := storageFactory.NewPersisterFactory(dbConfigHandler)
assert.Nil(t, err)

cache, _ := storageunit.NewCache(cacheConfig)
persist, _ := storageunit.NewDB(persisterConfig)
persist, _ := storageunit.NewDB(persisterFactory, dir)
unit, _ := storageunit.NewStorageUnit(cache, persist)

return unit, dir
Expand Down
8 changes: 8 additions & 0 deletions genesis/process/genesisBlockCreator.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,17 @@ func (gbc *genesisBlockCreator) createHardForkImportHandler() error {
func createStorer(storageConfig config.StorageConfig, folder string) (storage.Storer, error) {
dbConfig := factory.GetDBFromConfig(storageConfig.DB)
dbConfig.FilePath = path.Join(folder, storageConfig.DB.FilePath)

dbConfigHandler := factory.NewDBConfigHandler(storageConfig.DB)
persisterFactory, err := factory.NewPersisterFactory(dbConfigHandler)
if err != nil {
return nil, err
}

store, err := storageunit.NewStorageUnitFromConf(
factory.GetCacherFromConfig(storageConfig.Cache),
dbConfig,
persisterFactory,
)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ require (
github.com/multiversx/mx-chain-es-indexer-go v1.4.17
github.com/multiversx/mx-chain-logger-go v1.0.13
github.com/multiversx/mx-chain-scenario-go v1.2.1
github.com/multiversx/mx-chain-storage-go v1.0.13
github.com/multiversx/mx-chain-storage-go v1.0.14
github.com/multiversx/mx-chain-vm-common-go v1.5.8
github.com/multiversx/mx-chain-vm-go v1.5.21
github.com/multiversx/mx-chain-vm-v1_2-go v1.2.63
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -396,8 +396,8 @@ github.com/multiversx/mx-chain-logger-go v1.0.13 h1:eru/TETo0MkO4ZTnXsQDKf4PBRpA
github.com/multiversx/mx-chain-logger-go v1.0.13/go.mod h1:MZJhTAtZTJxT+yK2EHc4ZW3YOHUc1UdjCD0iahRNBZk=
github.com/multiversx/mx-chain-scenario-go v1.2.1 h1:9eC6VcOEAKRRKZ7EbSWPLzCdNIMWwuNBtAZlgR4cSMA=
github.com/multiversx/mx-chain-scenario-go v1.2.1/go.mod h1:EuZY7DpNFHVNSxJR8dKE1z2I8gBYfEFFPSwNUOXptqE=
github.com/multiversx/mx-chain-storage-go v1.0.13 h1:i41VPDJZ0pn5gf18zTXrac5xeiolUOztNuzL3wEXRuI=
github.com/multiversx/mx-chain-storage-go v1.0.13/go.mod h1:sJ2q49tgjxNpMpsHysjABqCAB0FLBmDblbjBkQ8XfmA=
github.com/multiversx/mx-chain-storage-go v1.0.14 h1:h0acoqPS3FKJ4S3cKBEriTU0OabSQnpxai5WKhi1YCs=
github.com/multiversx/mx-chain-storage-go v1.0.14/go.mod h1:sJ2q49tgjxNpMpsHysjABqCAB0FLBmDblbjBkQ8XfmA=
github.com/multiversx/mx-chain-vm-common-go v1.5.8 h1:IRHB9/DasGrK5HU8TVeeLrEWt7tcb9zfnzHRLa6KES8=
github.com/multiversx/mx-chain-vm-common-go v1.5.8/go.mod h1:sqkKMCnwkWl8DURdb9q7pctK8IANghdHY1KJLE0ox2c=
github.com/multiversx/mx-chain-vm-go v1.5.21 h1:tA4fZ4ZwjsRcvMEyTgWStzxJFP/ypSmpwgLCEUl7NXw=
Expand Down
8 changes: 8 additions & 0 deletions process/smartContract/hooks/blockChainHook.go
Original file line number Diff line number Diff line change
Expand Up @@ -816,9 +816,17 @@ func (bh *BlockChainHookImpl) makeCompiledSCStorage() error {

dbConfig := factory.GetDBFromConfig(bh.configSCStorage.DB)
dbConfig.FilePath = path.Join(bh.workingDir, defaultCompiledSCPath, bh.configSCStorage.DB.FilePath)

dbConfigHandler := factory.NewDBConfigHandler(bh.configSCStorage.DB)
persisterFactory, err := factory.NewPersisterFactory(dbConfigHandler)
if err != nil {
return err
}

store, err := storageunit.NewStorageUnitFromConf(
factory.GetCacherFromConfig(bh.configSCStorage.Cache),
dbConfig,
persisterFactory,
)
if err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions storage/factory/persisterCreator.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func newPersisterCreator(config config.DBConfig) *persisterCreator {
}

// Create will create the persister for the provided path
// TODO: refactor to use max tries mechanism
func (pc *persisterCreator) Create(path string) (storage.Persister, error) {
if len(path) == 0 {
return nil, storage.ErrInvalidFilePath
Expand Down
116 changes: 106 additions & 10 deletions storage/factory/storageServiceFactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ func checkArgs(args StorageServiceFactoryArgs) error {
return nil
}

// TODO: refactor this function, split it into multiple ones
func (psf *StorageServiceFactory) createAndAddBaseStorageUnits(
store dataRetriever.StorageService,
customDatabaseRemover storage.CustomDatabaseRemoverHandler,
Expand Down Expand Up @@ -215,9 +216,18 @@ func (psf *StorageServiceFactory) createAndAddBaseStorageUnits(
metaHdrHashNonceUnitConfig := GetDBFromConfig(psf.generalConfig.MetaHdrNonceHashStorage.DB)
dbPath := psf.pathManager.PathForStatic(shardID, psf.generalConfig.MetaHdrNonceHashStorage.DB.FilePath)
metaHdrHashNonceUnitConfig.FilePath = dbPath

dbConfigHandler := NewDBConfigHandler(psf.generalConfig.MetaHdrNonceHashStorage.DB)
metaHdrHashNoncePersisterCreator, err := NewPersisterFactory(dbConfigHandler)
if err != nil {
return err
}

metaHdrHashNonceUnit, err := storageunit.NewStorageUnitFromConf(
GetCacherFromConfig(psf.generalConfig.MetaHdrNonceHashStorage.Cache),
metaHdrHashNonceUnitConfig)
metaHdrHashNonceUnitConfig,
metaHdrHashNoncePersisterCreator,
)
if err != nil {
return fmt.Errorf("%w for MetaHdrNonceHashStorage", err)
}
Expand Down Expand Up @@ -263,9 +273,18 @@ func (psf *StorageServiceFactory) createAndAddBaseStorageUnits(
shardId := core.GetShardIDString(psf.shardCoordinator.SelfId())
dbPath = psf.pathManager.PathForStatic(shardId, psf.generalConfig.StatusMetricsStorage.DB.FilePath)
statusMetricsDbConfig.FilePath = dbPath

dbConfigHandler = NewDBConfigHandler(psf.generalConfig.StatusMetricsStorage.DB)
statusMetricsPersisterCreator, err := NewPersisterFactory(dbConfigHandler)
if err != nil {
return err
}

statusMetricsStorageUnit, err := storageunit.NewStorageUnitFromConf(
GetCacherFromConfig(psf.generalConfig.StatusMetricsStorage.Cache),
statusMetricsDbConfig)
statusMetricsDbConfig,
statusMetricsPersisterCreator,
)
if err != nil {
return fmt.Errorf("%w for StatusMetricsStorage", err)
}
Expand Down Expand Up @@ -297,9 +316,18 @@ func (psf *StorageServiceFactory) CreateForShard() (dataRetriever.StorageService
shardHdrHashNonceConfig := GetDBFromConfig(psf.generalConfig.ShardHdrNonceHashStorage.DB)
dbPath := psf.pathManager.PathForStatic(shardID, psf.generalConfig.ShardHdrNonceHashStorage.DB.FilePath) + shardID
shardHdrHashNonceConfig.FilePath = dbPath

dbConfigHandler := NewDBConfigHandler(psf.generalConfig.ShardHdrNonceHashStorage.DB)
shardHdrHashNoncePersisterCreator, err := NewPersisterFactory(dbConfigHandler)
if err != nil {
return nil, err
}

shardHdrHashNonceUnit, err := storageunit.NewStorageUnitFromConf(
GetCacherFromConfig(psf.generalConfig.ShardHdrNonceHashStorage.Cache),
shardHdrHashNonceConfig)
shardHdrHashNonceConfig,
shardHdrHashNoncePersisterCreator,
)
if err != nil {
return nil, fmt.Errorf("%w for ShardHdrNonceHashStorage", err)
}
Expand Down Expand Up @@ -368,9 +396,18 @@ func (psf *StorageServiceFactory) CreateForMeta() (dataRetriever.StorageService,
shardID = core.GetShardIDString(core.MetachainShardId)
dbPath := psf.pathManager.PathForStatic(shardID, psf.generalConfig.ShardHdrNonceHashStorage.DB.FilePath) + fmt.Sprintf("%d", i)
shardHdrHashNonceConfig.FilePath = dbPath

dbConfigHandler := NewDBConfigHandler(psf.generalConfig.ShardHdrNonceHashStorage.DB)
shardHdrHashNoncePersisterCreator, err := NewPersisterFactory(dbConfigHandler)
if err != nil {
return nil, err
}

shardHdrHashNonceUnits[i], err = storageunit.NewStorageUnitFromConf(
GetCacherFromConfig(psf.generalConfig.ShardHdrNonceHashStorage.Cache),
shardHdrHashNonceConfig)
shardHdrHashNonceConfig,
shardHdrHashNoncePersisterCreator,
)
if err != nil {
return nil, fmt.Errorf("%w for ShardHdrNonceHashStorage on shard %d", err, i)
}
Expand Down Expand Up @@ -501,7 +538,18 @@ func (psf *StorageServiceFactory) setUpDbLookupExtensions(chainStorer *dataRetri
miniblockHashByTxHashDbConfig := GetDBFromConfig(miniblockHashByTxHashConfig.DB)
miniblockHashByTxHashDbConfig.FilePath = psf.pathManager.PathForStatic(shardID, miniblockHashByTxHashConfig.DB.FilePath)
miniblockHashByTxHashCacherConfig := GetCacherFromConfig(miniblockHashByTxHashConfig.Cache)
miniblockHashByTxHashUnit, err := storageunit.NewStorageUnitFromConf(miniblockHashByTxHashCacherConfig, miniblockHashByTxHashDbConfig)

dbConfigHandler := NewDBConfigHandler(miniblockHashByTxHashConfig.DB)
miniblockHashByTxHashPersisterCreator, err := NewPersisterFactory(dbConfigHandler)
if err != nil {
return err
}

miniblockHashByTxHashUnit, err := storageunit.NewStorageUnitFromConf(
miniblockHashByTxHashCacherConfig,
miniblockHashByTxHashDbConfig,
miniblockHashByTxHashPersisterCreator,
)
if err != nil {
return fmt.Errorf("%w for DbLookupExtensions.MiniblockHashByTxHashStorageConfig", err)
}
Expand All @@ -513,7 +561,18 @@ func (psf *StorageServiceFactory) setUpDbLookupExtensions(chainStorer *dataRetri
blockHashByRoundDBConfig := GetDBFromConfig(blockHashByRoundConfig.DB)
blockHashByRoundDBConfig.FilePath = psf.pathManager.PathForStatic(shardID, blockHashByRoundConfig.DB.FilePath)
blockHashByRoundCacherConfig := GetCacherFromConfig(blockHashByRoundConfig.Cache)
blockHashByRoundUnit, err := storageunit.NewStorageUnitFromConf(blockHashByRoundCacherConfig, blockHashByRoundDBConfig)

dbConfigHandler = NewDBConfigHandler(blockHashByRoundConfig.DB)
blockHashByRoundPersisterCreator, err := NewPersisterFactory(dbConfigHandler)
if err != nil {
return err
}

blockHashByRoundUnit, err := storageunit.NewStorageUnitFromConf(
blockHashByRoundCacherConfig,
blockHashByRoundDBConfig,
blockHashByRoundPersisterCreator,
)
if err != nil {
return fmt.Errorf("%w for DbLookupExtensions.RoundHashStorageConfig", err)
}
Expand All @@ -525,7 +584,18 @@ func (psf *StorageServiceFactory) setUpDbLookupExtensions(chainStorer *dataRetri
epochByHashDbConfig := GetDBFromConfig(epochByHashConfig.DB)
epochByHashDbConfig.FilePath = psf.pathManager.PathForStatic(shardID, epochByHashConfig.DB.FilePath)
epochByHashCacherConfig := GetCacherFromConfig(epochByHashConfig.Cache)
epochByHashUnit, err := storageunit.NewStorageUnitFromConf(epochByHashCacherConfig, epochByHashDbConfig)

dbConfigHandler = NewDBConfigHandler(epochByHashConfig.DB)
epochByHashPersisterCreator, err := NewPersisterFactory(dbConfigHandler)
if err != nil {
return err
}

epochByHashUnit, err := storageunit.NewStorageUnitFromConf(
epochByHashCacherConfig,
epochByHashDbConfig,
epochByHashPersisterCreator,
)
if err != nil {
return fmt.Errorf("%w for DbLookupExtensions.EpochByHashStorageConfig", err)
}
Expand Down Expand Up @@ -564,7 +634,16 @@ func (psf *StorageServiceFactory) createEsdtSuppliesUnit(shardIDStr string) (sto
esdtSuppliesDbConfig := GetDBFromConfig(esdtSuppliesConfig.DB)
esdtSuppliesDbConfig.FilePath = psf.pathManager.PathForStatic(shardIDStr, esdtSuppliesConfig.DB.FilePath)
esdtSuppliesCacherConfig := GetCacherFromConfig(esdtSuppliesConfig.Cache)
return storageunit.NewStorageUnitFromConf(esdtSuppliesCacherConfig, esdtSuppliesDbConfig)

dbConfigHandler := NewDBConfigHandler(esdtSuppliesConfig.DB)
esdtSuppliesPersisterCreator, err := NewPersisterFactory(dbConfigHandler)
if err != nil {
return nil, err
}

return storageunit.NewStorageUnitFromConf(
esdtSuppliesCacherConfig, esdtSuppliesDbConfig,
esdtSuppliesPersisterCreator)
}

func (psf *StorageServiceFactory) createPruningStorerArgs(
Expand Down Expand Up @@ -617,9 +696,18 @@ func (psf *StorageServiceFactory) createTrieEpochRootHashStorerIfNeeded() (stora
shardId := core.GetShardIDString(psf.shardCoordinator.SelfId())
dbPath := psf.pathManager.PathForStatic(shardId, psf.generalConfig.TrieEpochRootHashStorage.DB.FilePath)
trieEpochRootHashDbConfig.FilePath = dbPath

dbConfigHandler := NewDBConfigHandler(psf.generalConfig.TrieEpochRootHashStorage.DB)
esdtSuppliesPersisterCreator, err := NewPersisterFactory(dbConfigHandler)
if err != nil {
return nil, err
}

trieEpochRootHashStorageUnit, err := storageunit.NewStorageUnitFromConf(
GetCacherFromConfig(psf.generalConfig.TrieEpochRootHashStorage.Cache),
trieEpochRootHashDbConfig)
trieEpochRootHashDbConfig,
esdtSuppliesPersisterCreator,
)
if err != nil {
return nil, fmt.Errorf("%w for TrieEpochRootHashStorage", err)
}
Expand All @@ -634,9 +722,17 @@ func (psf *StorageServiceFactory) createTriePersister(
shardID := core.GetShardIDString(psf.shardCoordinator.SelfId())
dbPath := psf.pathManager.PathForStatic(shardID, storageConfig.DB.FilePath)
trieDBConfig.FilePath = dbPath

dbConfigHandler := NewDBConfigHandler(storageConfig.DB)
persisterFactory, err := NewPersisterFactory(dbConfigHandler)
if err != nil {
return nil, err
}

trieUnit, err := storageunit.NewStorageUnitFromConf(
GetCacherFromConfig(storageConfig.Cache),
trieDBConfig)
trieDBConfig,
persisterFactory)
if err != nil {
return nil, err
}
Expand Down
6 changes: 6 additions & 0 deletions storage/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,3 +207,9 @@ type ManagedPeersHolder interface {
IsMultiKeyMode() bool
IsInterfaceNil() bool
}

// PersisterFactoryHandler defines the behaviour of a component which is able to create persisters
type PersisterFactoryHandler interface {
Create(path string) (Persister, error)
IsInterfaceNil() bool
}
8 changes: 4 additions & 4 deletions storage/storageunit/storageunit.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ func NewCache(config CacheConfig) (storage.Cacher, error) {
}

// NewDB creates a new database from database config
func NewDB(argDB ArgDB) (storage.Persister, error) {
return storageUnit.NewDB(argDB)
func NewDB(persisterFactory storage.PersisterFactoryHandler, path string) (storage.Persister, error) {
return storageUnit.NewDB(persisterFactory, path)
}

// NewStorageUnitFromConf creates a new storage unit from a storage unit config
func NewStorageUnitFromConf(cacheConf CacheConfig, dbConf DBConfig) (*Unit, error) {
return storageUnit.NewStorageUnitFromConf(cacheConf, dbConf)
func NewStorageUnitFromConf(cacheConf CacheConfig, dbConf DBConfig, persisterFactory storage.PersisterFactoryHandler) (*Unit, error) {
return storageUnit.NewStorageUnitFromConf(cacheConf, dbConf, persisterFactory)
}

// NewNilStorer will return a nil storer
Expand Down
Loading

0 comments on commit dce39c1

Please sign in to comment.