diff --git a/store/init_test.go b/store/init_test.go index bad02ea7..ad26ee43 100644 --- a/store/init_test.go +++ b/store/init_test.go @@ -20,13 +20,13 @@ func TestInitStore_NoReinit(t *testing.T) { suite := headertest.NewTestSuite(t) head := suite.Head() - exchange := local.NewExchange(NewTestStore(ctx, t, head)) ds := sync.MutexWrap(datastore.NewMapDatastore()) + exchange := local.NewExchange(NewTestStore(t, ctx, ds, head)) store, err := NewStore[*headertest.DummyHeader](ds) require.NoError(t, err) - err = Init[*headertest.DummyHeader](ctx, store, exchange, head.Hash()) + err = Init(ctx, store, exchange, head.Hash()) assert.NoError(t, err) err = store.Start(ctx) diff --git a/store/store.go b/store/store.go index ddffa3d0..4b12ffe5 100644 --- a/store/store.go +++ b/store/store.go @@ -64,21 +64,6 @@ func NewStore[H header.Header[H]](ds datastore.Batching, opts ...Option) (*Store return newStore[H](ds, opts...) } -// NewStoreWithHead initiates a new Store and forcefully sets a given trusted header as head. -func NewStoreWithHead[H header.Header[H]]( - ctx context.Context, - ds datastore.Batching, - head H, - opts ...Option, -) (*Store[H], error) { - store, err := newStore[H](ds, opts...) - if err != nil { - return nil, err - } - - return store, store.Init(ctx, head) -} - func newStore[H header.Header[H]](ds datastore.Batching, opts ...Option) (*Store[H], error) { params := DefaultParameters() for _, opt := range opts { @@ -142,6 +127,13 @@ func (s *Store[H]) Init(ctx context.Context, initial H) error { } func (s *Store[H]) Start(context.Context) error { + // closed s.writesDn means that store was stopped before, recreate chan. + select { + case <-s.writesDn: + s.writesDn = make(chan struct{}) + default: + } + go s.flushLoop() return nil } diff --git a/store/store_test.go b/store/store_test.go index a96a1ef4..53d40d55 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -20,11 +20,7 @@ func TestStore(t *testing.T) { suite := headertest.NewTestSuite(t) ds := sync.MutexWrap(datastore.NewMapDatastore()) - store, err := NewStoreWithHead(ctx, ds, suite.Head()) - require.NoError(t, err) - - err = store.Start(ctx) - require.NoError(t, err) + store := NewTestStore(t, ctx, ds, suite.Head()) head, err := store.Head(ctx) require.NoError(t, err) @@ -81,9 +77,6 @@ func TestStore(t *testing.T) { // check that the store can be successfully started after previous stop // with all data being flushed. - store, err = NewStore[*headertest.DummyHeader](ds) - require.NoError(t, err) - err = store.Start(ctx) require.NoError(t, err) @@ -94,9 +87,6 @@ func TestStore(t *testing.T) { out, err = store.getRangeByHeight(ctx, 1, 13) require.NoError(t, err) assert.Len(t, out, 12) - - err = store.Stop(ctx) - require.NoError(t, err) } // TestStore_GetRangeByHeight_ExpectedRange @@ -107,11 +97,7 @@ func TestStore_GetRangeByHeight_ExpectedRange(t *testing.T) { suite := headertest.NewTestSuite(t) ds := sync.MutexWrap(datastore.NewMapDatastore()) - store, err := NewStoreWithHead(ctx, ds, suite.Head()) - require.NoError(t, err) - - err = store.Start(ctx) - require.NoError(t, err) + store := NewTestStore(t, ctx, ds, suite.Head()) head, err := store.Head(ctx) require.NoError(t, err) @@ -143,11 +129,7 @@ func TestStore_Append_BadHeader(t *testing.T) { suite := headertest.NewTestSuite(t) ds := sync.MutexWrap(datastore.NewMapDatastore()) - store, err := NewStoreWithHead(ctx, ds, suite.Head()) - require.NoError(t, err) - - err = store.Start(ctx) - require.NoError(t, err) + store := NewTestStore(t, ctx, ds, suite.Head()) head, err := store.Head(ctx) require.NoError(t, err) @@ -168,11 +150,7 @@ func TestStore_GetRange(t *testing.T) { suite := headertest.NewTestSuite(t) ds := sync.MutexWrap(datastore.NewMapDatastore()) - store, err := NewStoreWithHead(ctx, ds, suite.Head()) - require.NoError(t, err) - - err = store.Start(ctx) - require.NoError(t, err) + store := NewTestStore(t, ctx, ds, suite.Head()) head, err := store.Head(ctx) require.NoError(t, err) @@ -216,6 +194,9 @@ func TestStore_GetRange(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + ctx, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() + firstHeaderInRangeHeight := tt.from lastHeaderInRangeHeight := tt.to - 1 to := lastHeaderInRangeHeight + 1 @@ -245,15 +226,12 @@ func TestStorePendingCacheMiss(t *testing.T) { ds := sync.MutexWrap(datastore.NewMapDatastore()) - store, err := NewStoreWithHead(ctx, ds, suite.Head(), + store := NewTestStore(t, ctx, ds, suite.Head(), WithWriteBatchSize(100), WithStoreCacheSize(100), ) - require.NoError(t, err) - err = store.Start(ctx) - require.NoError(t, err) - err = store.Append(ctx, suite.GenDummyHeaders(100)...) + err := store.Append(ctx, suite.GenDummyHeaders(100)...) require.NoError(t, err) err = store.Append(ctx, suite.GenDummyHeaders(50)...) diff --git a/store/testing.go b/store/testing.go index b415bcea..9662b0b4 100644 --- a/store/testing.go +++ b/store/testing.go @@ -5,24 +5,27 @@ import ( "testing" "github.com/ipfs/go-datastore" - "github.com/ipfs/go-datastore/sync" "github.com/stretchr/testify/require" - "github.com/celestiaorg/go-header" "github.com/celestiaorg/go-header/headertest" ) // NewTestStore creates initialized and started in memory header Store which is useful for testing. -func NewTestStore(ctx context.Context, t *testing.T, head *headertest.DummyHeader) header.Store[*headertest.DummyHeader] { - store, err := NewStoreWithHead(ctx, sync.MutexWrap(datastore.NewMapDatastore()), head) - require.NoError(t, err) +func NewTestStore(tb testing.TB, ctx context.Context, + ds datastore.Batching, head *headertest.DummyHeader, opts ...Option, +) *Store[*headertest.DummyHeader] { + store, err := NewStore[*headertest.DummyHeader](ds, opts...) + require.NoError(tb, err) + + err = store.Init(ctx, head) + require.NoError(tb, err) err = store.Start(ctx) - require.NoError(t, err) + require.NoError(tb, err) - t.Cleanup(func() { + tb.Cleanup(func() { err := store.Stop(ctx) - require.NoError(t, err) + require.NoError(tb, err) }) return store } diff --git a/sync/sync_head_test.go b/sync/sync_head_test.go index 29b75ff4..b952b232 100644 --- a/sync/sync_head_test.go +++ b/sync/sync_head_test.go @@ -7,15 +7,12 @@ import ( "testing" "time" - "github.com/ipfs/go-datastore" - sync2 "github.com/ipfs/go-datastore/sync" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/celestiaorg/go-header" "github.com/celestiaorg/go-header/headertest" "github.com/celestiaorg/go-header/local" - "github.com/celestiaorg/go-header/store" ) func TestSyncer_incomingNetworkHeadRaces(t *testing.T) { @@ -61,11 +58,10 @@ func TestSyncer_HeadWithTrustedHead(t *testing.T) { suite := headertest.NewTestSuite(t) head := suite.Head() - localStore := store.NewTestStore(ctx, t, head) + localStore := newTestStore(t, ctx, head) + remoteStore := newTestStore(t, ctx, head) - remoteStore, err := store.NewStoreWithHead(ctx, sync2.MutexWrap(datastore.NewMapDatastore()), head) - require.NoError(t, err) - err = remoteStore.Append(ctx, suite.GenDummyHeaders(100)...) + err := remoteStore.Append(ctx, suite.GenDummyHeaders(100)...) require.NoError(t, err) // create a wrappedGetter to track exchange interactions diff --git a/sync/sync_test.go b/sync/sync_test.go index dc108cc5..17b5a36e 100644 --- a/sync/sync_test.go +++ b/sync/sync_test.go @@ -5,6 +5,8 @@ import ( "testing" "time" + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/sync" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -21,15 +23,15 @@ func TestSyncSimpleRequestingHead(t *testing.T) { suite := headertest.NewTestSuite(t) head := suite.Head() - remoteStore := store.NewTestStore(ctx, t, head) + remoteStore := newTestStore(t, ctx, head) err := remoteStore.Append(ctx, suite.GenDummyHeaders(100)...) require.NoError(t, err) _, err = remoteStore.GetByHeight(ctx, 100) require.NoError(t, err) - localStore := store.NewTestStore(ctx, t, head) - syncer, err := NewSyncer[*headertest.DummyHeader]( + localStore := newTestStore(t, ctx, head) + syncer, err := NewSyncer( local.NewExchange(remoteStore), localStore, headertest.NewDummySubscriber(), @@ -67,9 +69,9 @@ func TestDoSyncFullRangeFromExternalPeer(t *testing.T) { suite := headertest.NewTestSuite(t) head := suite.Head() - remoteStore := store.NewTestStore(ctx, t, head) - localStore := store.NewTestStore(ctx, t, head) - syncer, err := NewSyncer[*headertest.DummyHeader]( + remoteStore := newTestStore(t, ctx, head) + localStore := newTestStore(t, ctx, head) + syncer, err := NewSyncer( local.NewExchange(remoteStore), localStore, headertest.NewDummySubscriber(), @@ -106,9 +108,9 @@ func TestSyncCatchUp(t *testing.T) { suite := headertest.NewTestSuite(t) head := suite.Head() - remoteStore := store.NewTestStore(ctx, t, head) - localStore := store.NewTestStore(ctx, t, head) - syncer, err := NewSyncer[*headertest.DummyHeader]( + remoteStore := newTestStore(t, ctx, head) + localStore := newTestStore(t, ctx, head) + syncer, err := NewSyncer( local.NewExchange(remoteStore), localStore, headertest.NewDummySubscriber(), @@ -157,9 +159,9 @@ func TestSyncPendingRangesWithMisses(t *testing.T) { suite := headertest.NewTestSuite(t) head := suite.Head() - remoteStore := store.NewTestStore(ctx, t, head) - localStore := store.NewTestStore(ctx, t, head) - syncer, err := NewSyncer[*headertest.DummyHeader]( + remoteStore := newTestStore(t, ctx, head) + localStore := newTestStore(t, ctx, head) + syncer, err := NewSyncer( local.NewExchange(remoteStore), localStore, headertest.NewDummySubscriber(), @@ -224,9 +226,9 @@ func TestSyncer_FindHeadersReturnsCorrectRange(t *testing.T) { suite := headertest.NewTestSuite(t) head := suite.Head() - remoteStore := store.NewTestStore(ctx, t, head) - localStore := store.NewTestStore(ctx, t, head) - syncer, err := NewSyncer[*headertest.DummyHeader]( + remoteStore := newTestStore(t, ctx, head) + localStore := newTestStore(t, ctx, head) + syncer, err := NewSyncer( local.NewExchange(remoteStore), localStore, headertest.NewDummySubscriber(), @@ -260,9 +262,9 @@ func TestSyncerIncomingDuplicate(t *testing.T) { suite := headertest.NewTestSuite(t) head := suite.Head() - remoteStore := store.NewTestStore(ctx, t, head) - localStore := store.NewTestStore(ctx, t, head) - syncer, err := NewSyncer[*headertest.DummyHeader]( + remoteStore := newTestStore(t, ctx, head) + localStore := newTestStore(t, ctx, head) + syncer, err := NewSyncer( &delayedGetter[*headertest.DummyHeader]{Getter: local.NewExchange(remoteStore)}, localStore, headertest.NewDummySubscriber(), @@ -301,12 +303,12 @@ func TestSync_InvalidSyncTarget(t *testing.T) { head := suite.Head() // create a local store which is initialised at genesis height - localStore := store.NewTestStore(ctx, t, head) + localStore := newTestStore(t, ctx, head) // create a peer which is already on height 100 - remoteStore := headertest.NewStore[*headertest.DummyHeader](t, suite, 100) + remoteStore := headertest.NewStore(t, suite, 100) - syncer, err := NewSyncer[*headertest.DummyHeader]( - local.NewExchange[*headertest.DummyHeader](remoteStore), + syncer, err := NewSyncer( + local.NewExchange(remoteStore), localStore, headertest.NewDummySubscriber(), WithBlockTime(time.Nanosecond), @@ -396,3 +398,9 @@ func (d *delayedGetter[H]) GetRangeByHeight(ctx context.Context, from H, to uint return nil, ctx.Err() } } + +// newTestStore creates initialized and started in memory header Store which is useful for testing. +func newTestStore(tb testing.TB, ctx context.Context, head *headertest.DummyHeader) header.Store[*headertest.DummyHeader] { + ds := sync.MutexWrap(datastore.NewMapDatastore()) + return store.NewTestStore(tb, ctx, ds, head) +}