diff --git a/blockindex/sgd_indexer_test.go b/blockindex/sgd_indexer_test.go index 7097f53588..02ff4e908a 100644 --- a/blockindex/sgd_indexer_test.go +++ b/blockindex/sgd_indexer_test.go @@ -9,6 +9,8 @@ import ( "github.com/iotexproject/go-pkgs/hash" "github.com/iotexproject/iotex-address/address" + "github.com/stretchr/testify/require" + "github.com/iotexproject/iotex-core/action" "github.com/iotexproject/iotex-core/blockchain/block" "github.com/iotexproject/iotex-core/blockchain/genesis" @@ -16,7 +18,6 @@ import ( "github.com/iotexproject/iotex-core/state" "github.com/iotexproject/iotex-core/test/identityset" "github.com/iotexproject/iotex-core/testutil" - "github.com/stretchr/testify/require" ) const ( diff --git a/blockindex/sync_indexers.go b/blockindex/sync_indexers.go new file mode 100644 index 0000000000..7324755db5 --- /dev/null +++ b/blockindex/sync_indexers.go @@ -0,0 +1,125 @@ +// Copyright (c) 2023 IoTeX Foundation +// This source code is provided 'as is' and no warranties are given as to title or non-infringement, merchantability +// or fitness for purpose and, to the extent permitted by law, all liability for your use of the code is disclaimed. +// This source code is governed by Apache License 2.0 that can be found in the LICENSE file. + +package blockindex + +import ( + "context" + + "github.com/iotexproject/iotex-core/blockchain/block" + "github.com/iotexproject/iotex-core/blockchain/blockdao" +) + +// SyncIndexers is a special index that includes multiple indexes, +// which stay in sync when blocks are added. +type SyncIndexers struct { + indexers []blockdao.BlockIndexer + startHeights []uint64 // start height of each indexer, which will be determined when the indexer is started + minStartHeight uint64 // minimum start height of all indexers +} + +// NewSyncIndexers creates a new SyncIndexers +// each indexer will PutBlock one by one in the order of the indexers +func NewSyncIndexers(indexers ...blockdao.BlockIndexer) *SyncIndexers { + return &SyncIndexers{indexers: indexers} +} + +// Start starts the indexer group +func (ig *SyncIndexers) Start(ctx context.Context) error { + for _, indexer := range ig.indexers { + if err := indexer.Start(ctx); err != nil { + return err + } + } + return ig.initStartHeight() +} + +// Stop stops the indexer group +func (ig *SyncIndexers) Stop(ctx context.Context) error { + for _, indexer := range ig.indexers { + if err := indexer.Stop(ctx); err != nil { + return err + } + } + return nil +} + +// PutBlock puts a block into the indexers in the group +func (ig *SyncIndexers) PutBlock(ctx context.Context, blk *block.Block) error { + for i, indexer := range ig.indexers { + // check if the block is higher than the indexer's start height + if blk.Height() < ig.startHeights[i] { + continue + } + // check if the block is higher than the indexer's height + height, err := indexer.Height() + if err != nil { + return err + } + if blk.Height() <= height { + continue + } + // put block + if err := indexer.PutBlock(ctx, blk); err != nil { + return err + } + } + return nil +} + +// DeleteTipBlock deletes the tip block from the indexers in the group +func (ig *SyncIndexers) DeleteTipBlock(ctx context.Context, blk *block.Block) error { + for _, indexer := range ig.indexers { + if err := indexer.DeleteTipBlock(ctx, blk); err != nil { + return err + } + } + return nil +} + +// StartHeight returns the minimum start height of the indexers in the group +func (ig *SyncIndexers) StartHeight() uint64 { + return ig.minStartHeight +} + +// Height returns the minimum height of the indexers in the group +func (ig *SyncIndexers) Height() (uint64, error) { + var height uint64 + for i, indexer := range ig.indexers { + h, err := indexer.Height() + if err != nil { + return 0, err + } + if i == 0 || h < height { + height = h + } + } + return height, nil +} + +// initStartHeight initializes the start height of the indexers in the group +// for every indexer, the start height is the maximum of tipheight+1 and startheight +func (ig *SyncIndexers) initStartHeight() error { + ig.minStartHeight = 0 + ig.startHeights = make([]uint64, len(ig.indexers)) + for i, indexer := range ig.indexers { + tipHeight, err := indexer.Height() + if err != nil { + return err + } + indexStartHeight := tipHeight + 1 + if indexerWithStart, ok := indexer.(blockdao.BlockIndexerWithStart); ok { + startHeight := indexerWithStart.StartHeight() + if startHeight > indexStartHeight { + indexStartHeight = startHeight + } + } + ig.startHeights[i] = indexStartHeight + if i == 0 || indexStartHeight < ig.minStartHeight { + ig.minStartHeight = indexStartHeight + } + } + return nil +} diff --git a/blockindex/sync_indexers_test.go b/blockindex/sync_indexers_test.go new file mode 100644 index 0000000000..62daa6c85b --- /dev/null +++ b/blockindex/sync_indexers_test.go @@ -0,0 +1,218 @@ +// Copyright (c) 2023 IoTeX Foundation +// This source code is provided 'as is' and no warranties are given as to title or non-infringement, merchantability +// or fitness for purpose and, to the extent permitted by law, all liability for your use of the code is disclaimed. +// This source code is governed by Apache License 2.0 that can be found in the LICENSE file. + +package blockindex + +import ( + "context" + "strconv" + "testing" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" + + "github.com/iotexproject/iotex-core/blockchain/block" + "github.com/iotexproject/iotex-core/blockchain/blockdao" + "github.com/iotexproject/iotex-core/test/identityset" + "github.com/iotexproject/iotex-core/test/mock/mock_blockdao" +) + +func TestSyncIndexers_StartHeight(t *testing.T) { + require := require.New(t) + ctrl := gomock.NewController(t) + + cases := []struct { + name string + indexers [][2]uint64 // [startHeight, height] + expect uint64 + }{ + {"no indexers", nil, 0}, + {"one indexer without start height", [][2]uint64{{0, 100}}, 101}, + {"one indexer with start height I", [][2]uint64{{100, 200}}, 201}, + {"one indexer with start height II", [][2]uint64{{300, 200}}, 300}, + {"two indexers with start height I", [][2]uint64{{100, 200}, {200, 300}}, 201}, + {"two indexers with start height II", [][2]uint64{{100, 200}, {400, 300}}, 201}, + {"two indexers with start height III", [][2]uint64{{100, 350}, {400, 300}}, 351}, + {"two indexers one with start height I", [][2]uint64{{0, 1}, {150, 1}}, 2}, + {"two indexers one with start height II", [][2]uint64{{0, 1}, {150, 200}}, 2}, + {"two indexers one with start height III", [][2]uint64{{0, 200}, {250, 1}}, 201}, + {"two indexers one with start height IV", [][2]uint64{{0, 200}, {150, 1}}, 150}, + {"two indexers I", [][2]uint64{{0, 5}, {0, 1}}, 2}, + {"two indexers II", [][2]uint64{{0, 5}, {0, 5}}, 6}, + {"two indexers III", [][2]uint64{{0, 5}, {0, 6}}, 6}, + {"multiple indexers I", [][2]uint64{{0, 5}, {0, 6}, {0, 7}}, 6}, + {"multiple indexers II", [][2]uint64{{0, 5}, {10, 6}, {0, 7}}, 6}, + {"multiple indexers III", [][2]uint64{{10, 5}, {0, 6}, {0, 7}}, 7}, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + var indexers []blockdao.BlockIndexer + for _, indexer := range c.indexers { + if indexer[0] > 0 { + mockIndexerWithStart := mock_blockdao.NewMockBlockIndexerWithStart(ctrl) + mockIndexerWithStart.EXPECT().Start(gomock.Any()).Return(nil).Times(1) + mockIndexerWithStart.EXPECT().StartHeight().Return(indexer[0]).Times(1) + mockIndexerWithStart.EXPECT().Height().Return(indexer[1], nil).Times(1) + indexers = append(indexers, mockIndexerWithStart) + } else { + mockIndexer := mock_blockdao.NewMockBlockIndexer(ctrl) + mockIndexer.EXPECT().Start(gomock.Any()).Return(nil).Times(1) + mockIndexer.EXPECT().Height().Return(indexer[1], nil).Times(1) + indexers = append(indexers, mockIndexer) + } + } + ig := NewSyncIndexers(indexers...) + err := ig.Start(context.Background()) + require.NoError(err) + height := ig.StartHeight() + require.Equal(c.expect, height) + }) + } + +} + +func TestSyncIndexers_Height(t *testing.T) { + require := require.New(t) + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + cases := []struct { + heights []uint64 + expect uint64 + }{ + {[]uint64{}, 0}, + {[]uint64{100}, 100}, + {[]uint64{100, 100}, 100}, + {[]uint64{90, 100}, 90}, + {[]uint64{100, 90}, 90}, + {[]uint64{100, 100, 100}, 100}, + {[]uint64{90, 100, 100}, 90}, + {[]uint64{90, 80, 100}, 80}, + {[]uint64{90, 80, 70}, 70}, + } + + for i := range cases { + name := strconv.FormatUint(uint64(i), 10) + t.Run(name, func(t *testing.T) { + var indexers []blockdao.BlockIndexer + for _, height := range cases[i].heights { + mockIndexer := mock_blockdao.NewMockBlockIndexer(ctrl) + mockIndexer.EXPECT().Height().Return(height, nil).Times(1) + indexers = append(indexers, mockIndexer) + } + ig := NewSyncIndexers(indexers...) + height, err := ig.Height() + require.NoError(err) + require.Equal(cases[i].expect, height) + }) + } +} + +func TestSyncIndexers_PutBlock(t *testing.T) { + require := require.New(t) + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + cases := []struct { + indexers [][2]uint64 // [startHeight, height] + blocks []uint64 // blocks to put + expectBlocks [][]uint64 // expect blocks to put on every indexer + }{ + { + [][2]uint64{}, + []uint64{100}, + [][]uint64{}, + }, + { + [][2]uint64{{100, 10}}, + []uint64{10, 20, 90, 100, 101}, + [][]uint64{{100, 101}}, + }, + { + [][2]uint64{{100, 210}}, + []uint64{10, 20, 90, 100, 101, 210, 211}, + [][]uint64{{211}}, + }, + { + [][2]uint64{{0, 200}, {250, 1}}, + []uint64{1, 2, 201, 249, 250, 251}, + [][]uint64{{201, 249, 250, 251}, {250, 251}}, + }, + { + [][2]uint64{{0, 250}, {250, 250}}, + []uint64{1, 2, 201, 249, 250, 251, 252}, + [][]uint64{{251, 252}, {251, 252}}, + }, + { + [][2]uint64{{0, 200}, {250, 1}, {300, 1}}, + []uint64{1, 2, 201, 249, 250, 251, 300, 301}, + [][]uint64{{201, 249, 250, 251, 300, 301}, {250, 251, 300, 301}, {300, 301}}, + }, + { + [][2]uint64{{0, 250}, {250, 250}, {300, 250}}, + []uint64{1, 2, 201, 249, 250, 251, 300, 301}, + [][]uint64{{251, 300, 301}, {251, 300, 301}, {300, 301}}, + }, + { + [][2]uint64{{0, 300}, {250, 300}, {300, 300}}, + []uint64{1, 2, 201, 249, 250, 251, 300, 301}, + [][]uint64{{301}, {301}, {301}}, + }, + { + [][2]uint64{{0, 400}, {250, 400}, {300, 400}}, + []uint64{1, 2, 201, 249, 250, 251, 300, 301, 400, 401}, + [][]uint64{{401}, {401}, {401}}, + }, + } + + for _, c := range cases { + t.Run("", func(t *testing.T) { + var indexers []blockdao.BlockIndexer + putBlocks := make([][]uint64, len(c.indexers)) + indexersHeight := make([]uint64, len(c.indexers)) + for id, indexer := range c.indexers { + idx := id + indexersHeight[idx] = indexer[1] + if indexer[0] > 0 { + mockIndexerWithStart := mock_blockdao.NewMockBlockIndexerWithStart(ctrl) + mockIndexerWithStart.EXPECT().Start(gomock.Any()).Return(nil).Times(1) + mockIndexerWithStart.EXPECT().StartHeight().Return(indexer[0]).Times(1) + mockIndexerWithStart.EXPECT().Height().DoAndReturn(func() (uint64, error) { + return indexersHeight[idx], nil + }).AnyTimes() + mockIndexerWithStart.EXPECT().PutBlock(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, blk *block.Block) error { + putBlocks[idx] = append(putBlocks[idx], blk.Height()) + indexersHeight[idx] = blk.Height() + return nil + }).Times(len(c.expectBlocks[idx])) + indexers = append(indexers, mockIndexerWithStart) + } else { + mockIndexer := mock_blockdao.NewMockBlockIndexer(ctrl) + mockIndexer.EXPECT().Start(gomock.Any()).Return(nil).Times(1) + mockIndexer.EXPECT().Height().DoAndReturn(func() (uint64, error) { + return indexersHeight[idx], nil + }).AnyTimes() + mockIndexer.EXPECT().PutBlock(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, blk *block.Block) error { + putBlocks[idx] = append(putBlocks[idx], blk.Height()) + indexersHeight[idx] = blk.Height() + return nil + }).Times(len(c.expectBlocks[idx])) + indexers = append(indexers, mockIndexer) + } + } + ig := NewSyncIndexers(indexers...) + err := ig.Start(context.Background()) + require.NoError(err) + for _, blkHeight := range c.blocks { + blk, err := block.NewBuilder(block.RunnableActions{}).SetHeight(blkHeight).SignAndBuild(identityset.PrivateKey(0)) + require.NoError(err) + err = ig.PutBlock(context.Background(), &blk) + require.NoError(err) + } + require.Equal(c.expectBlocks, putBlocks) + }) + } +} diff --git a/chainservice/builder.go b/chainservice/builder.go index 83fc3ae6ae..0a4deaaea7 100644 --- a/chainservice/builder.go +++ b/chainservice/builder.go @@ -257,19 +257,26 @@ func (builder *Builder) buildBlockDAO(forTest bool) error { } var indexers []blockdao.BlockIndexer - indexers = append(indexers, builder.cs.factory) + // indexers in synchronizedIndexers will need to run PutBlock() one by one + // factory is dependent on sgdIndexer and contractStakingIndexer, so it should be put in the first place + synchronizedIndexers := []blockdao.BlockIndexer{builder.cs.factory} + if builder.cs.contractStakingIndexer != nil { + synchronizedIndexers = append(synchronizedIndexers, builder.cs.contractStakingIndexer) + } + if builder.cs.sgdIndexer != nil { + synchronizedIndexers = append(synchronizedIndexers, builder.cs.sgdIndexer) + } + if len(synchronizedIndexers) > 1 { + indexers = append(indexers, blockindex.NewSyncIndexers(synchronizedIndexers...)) + } else { + indexers = append(indexers, builder.cs.factory) + } if !builder.cfg.Chain.EnableAsyncIndexWrite && builder.cs.indexer != nil { indexers = append(indexers, builder.cs.indexer) } if builder.cs.bfIndexer != nil { indexers = append(indexers, builder.cs.bfIndexer) } - if builder.cs.sgdIndexer != nil { - indexers = append(indexers, builder.cs.sgdIndexer) - } - if builder.cs.contractStakingIndexer != nil { - indexers = append(indexers, builder.cs.contractStakingIndexer) - } if forTest { builder.cs.blockdao = blockdao.NewBlockDAOInMemForTest(indexers) } else {