Skip to content

Commit

Permalink
reuse test helper and fix start-stop error
Browse files Browse the repository at this point in the history
  • Loading branch information
cristaloleg committed May 31, 2024
1 parent edbc71d commit 04064ab
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 85 deletions.
4 changes: 2 additions & 2 deletions store/init_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ func TestInitStore_NoReinit(t *testing.T) {

suite := headertest.NewTestSuite(t)
head := suite.Head()
exchange := local.NewExchange(NewTestStore(ctx, t, head))

ds := sync.MutexWrap(datastore.NewMapDatastore())
exchange := local.NewExchange(NewTestStore(t, ctx, ds, head))
store, err := NewStore[*headertest.DummyHeader](ds)
require.NoError(t, err)

err = Init[*headertest.DummyHeader](ctx, store, exchange, head.Hash())
err = Init(ctx, store, exchange, head.Hash())
assert.NoError(t, err)

err = store.Start(ctx)
Expand Down
22 changes: 7 additions & 15 deletions store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,21 +64,6 @@ func NewStore[H header.Header[H]](ds datastore.Batching, opts ...Option) (*Store
return newStore[H](ds, opts...)
}

// NewStoreWithHead initiates a new Store and forcefully sets a given trusted header as head.
func NewStoreWithHead[H header.Header[H]](
ctx context.Context,
ds datastore.Batching,
head H,
opts ...Option,
) (*Store[H], error) {
store, err := newStore[H](ds, opts...)
if err != nil {
return nil, err
}

return store, store.Init(ctx, head)
}

func newStore[H header.Header[H]](ds datastore.Batching, opts ...Option) (*Store[H], error) {
params := DefaultParameters()
for _, opt := range opts {
Expand Down Expand Up @@ -142,6 +127,13 @@ func (s *Store[H]) Init(ctx context.Context, initial H) error {
}

func (s *Store[H]) Start(context.Context) error {
// closed s.writesDn means that store was stopped before, recreate chan.
select {
case <-s.writesDn:
s.writesDn = make(chan struct{})
default:
}

go s.flushLoop()
return nil
}
Expand Down
40 changes: 9 additions & 31 deletions store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,7 @@ func TestStore(t *testing.T) {
suite := headertest.NewTestSuite(t)

ds := sync.MutexWrap(datastore.NewMapDatastore())
store, err := NewStoreWithHead(ctx, ds, suite.Head())
require.NoError(t, err)

err = store.Start(ctx)
require.NoError(t, err)
store := NewTestStore(t, ctx, ds, suite.Head())

head, err := store.Head(ctx)
require.NoError(t, err)
Expand Down Expand Up @@ -81,9 +77,6 @@ func TestStore(t *testing.T) {

// check that the store can be successfully started after previous stop
// with all data being flushed.
store, err = NewStore[*headertest.DummyHeader](ds)
require.NoError(t, err)

err = store.Start(ctx)
require.NoError(t, err)

Expand All @@ -94,9 +87,6 @@ func TestStore(t *testing.T) {
out, err = store.getRangeByHeight(ctx, 1, 13)
require.NoError(t, err)
assert.Len(t, out, 12)

err = store.Stop(ctx)
require.NoError(t, err)
}

// TestStore_GetRangeByHeight_ExpectedRange
Expand All @@ -107,11 +97,7 @@ func TestStore_GetRangeByHeight_ExpectedRange(t *testing.T) {
suite := headertest.NewTestSuite(t)

ds := sync.MutexWrap(datastore.NewMapDatastore())
store, err := NewStoreWithHead(ctx, ds, suite.Head())
require.NoError(t, err)

err = store.Start(ctx)
require.NoError(t, err)
store := NewTestStore(t, ctx, ds, suite.Head())

head, err := store.Head(ctx)
require.NoError(t, err)
Expand Down Expand Up @@ -143,11 +129,7 @@ func TestStore_Append_BadHeader(t *testing.T) {
suite := headertest.NewTestSuite(t)

ds := sync.MutexWrap(datastore.NewMapDatastore())
store, err := NewStoreWithHead(ctx, ds, suite.Head())
require.NoError(t, err)

err = store.Start(ctx)
require.NoError(t, err)
store := NewTestStore(t, ctx, ds, suite.Head())

head, err := store.Head(ctx)
require.NoError(t, err)
Expand All @@ -168,11 +150,7 @@ func TestStore_GetRange(t *testing.T) {
suite := headertest.NewTestSuite(t)

ds := sync.MutexWrap(datastore.NewMapDatastore())
store, err := NewStoreWithHead(ctx, ds, suite.Head())
require.NoError(t, err)

err = store.Start(ctx)
require.NoError(t, err)
store := NewTestStore(t, ctx, ds, suite.Head())

head, err := store.Head(ctx)
require.NoError(t, err)
Expand Down Expand Up @@ -216,6 +194,9 @@ func TestStore_GetRange(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()

firstHeaderInRangeHeight := tt.from
lastHeaderInRangeHeight := tt.to - 1
to := lastHeaderInRangeHeight + 1
Expand Down Expand Up @@ -245,15 +226,12 @@ func TestStorePendingCacheMiss(t *testing.T) {

ds := sync.MutexWrap(datastore.NewMapDatastore())

store, err := NewStoreWithHead(ctx, ds, suite.Head(),
store := NewTestStore(t, ctx, ds, suite.Head(),
WithWriteBatchSize(100),
WithStoreCacheSize(100),
)
require.NoError(t, err)

err = store.Start(ctx)
require.NoError(t, err)
err = store.Append(ctx, suite.GenDummyHeaders(100)...)
err := store.Append(ctx, suite.GenDummyHeaders(100)...)
require.NoError(t, err)

err = store.Append(ctx, suite.GenDummyHeaders(50)...)
Expand Down
19 changes: 11 additions & 8 deletions store/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,27 @@ import (
"testing"

"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/sync"
"github.com/stretchr/testify/require"

"github.com/celestiaorg/go-header"
"github.com/celestiaorg/go-header/headertest"
)

// NewTestStore creates initialized and started in memory header Store which is useful for testing.
func NewTestStore(ctx context.Context, t *testing.T, head *headertest.DummyHeader) header.Store[*headertest.DummyHeader] {
store, err := NewStoreWithHead(ctx, sync.MutexWrap(datastore.NewMapDatastore()), head)
require.NoError(t, err)
func NewTestStore(tb testing.TB, ctx context.Context,
ds datastore.Batching, head *headertest.DummyHeader, opts ...Option,
) *Store[*headertest.DummyHeader] {
store, err := NewStore[*headertest.DummyHeader](ds, opts...)
require.NoError(tb, err)

err = store.Init(ctx, head)
require.NoError(tb, err)

err = store.Start(ctx)
require.NoError(t, err)
require.NoError(tb, err)

t.Cleanup(func() {
tb.Cleanup(func() {
err := store.Stop(ctx)
require.NoError(t, err)
require.NoError(tb, err)
})
return store
}
10 changes: 3 additions & 7 deletions sync/sync_head_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,12 @@ import (
"testing"
"time"

"github.com/ipfs/go-datastore"
sync2 "github.com/ipfs/go-datastore/sync"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/celestiaorg/go-header"
"github.com/celestiaorg/go-header/headertest"
"github.com/celestiaorg/go-header/local"
"github.com/celestiaorg/go-header/store"
)

func TestSyncer_incomingNetworkHeadRaces(t *testing.T) {
Expand Down Expand Up @@ -61,11 +58,10 @@ func TestSyncer_HeadWithTrustedHead(t *testing.T) {
suite := headertest.NewTestSuite(t)
head := suite.Head()

localStore := store.NewTestStore(ctx, t, head)
localStore := newTestStore(t, ctx, head)
remoteStore := newTestStore(t, ctx, head)

remoteStore, err := store.NewStoreWithHead(ctx, sync2.MutexWrap(datastore.NewMapDatastore()), head)
require.NoError(t, err)
err = remoteStore.Append(ctx, suite.GenDummyHeaders(100)...)
err := remoteStore.Append(ctx, suite.GenDummyHeaders(100)...)
require.NoError(t, err)

// create a wrappedGetter to track exchange interactions
Expand Down
52 changes: 30 additions & 22 deletions sync/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"testing"
"time"

"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/sync"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand All @@ -21,15 +23,15 @@ func TestSyncSimpleRequestingHead(t *testing.T) {
suite := headertest.NewTestSuite(t)
head := suite.Head()

remoteStore := store.NewTestStore(ctx, t, head)
remoteStore := newTestStore(t, ctx, head)
err := remoteStore.Append(ctx, suite.GenDummyHeaders(100)...)
require.NoError(t, err)

_, err = remoteStore.GetByHeight(ctx, 100)
require.NoError(t, err)

localStore := store.NewTestStore(ctx, t, head)
syncer, err := NewSyncer[*headertest.DummyHeader](
localStore := newTestStore(t, ctx, head)
syncer, err := NewSyncer(
local.NewExchange(remoteStore),
localStore,
headertest.NewDummySubscriber(),
Expand Down Expand Up @@ -67,9 +69,9 @@ func TestDoSyncFullRangeFromExternalPeer(t *testing.T) {
suite := headertest.NewTestSuite(t)
head := suite.Head()

remoteStore := store.NewTestStore(ctx, t, head)
localStore := store.NewTestStore(ctx, t, head)
syncer, err := NewSyncer[*headertest.DummyHeader](
remoteStore := newTestStore(t, ctx, head)
localStore := newTestStore(t, ctx, head)
syncer, err := NewSyncer(
local.NewExchange(remoteStore),
localStore,
headertest.NewDummySubscriber(),
Expand Down Expand Up @@ -106,9 +108,9 @@ func TestSyncCatchUp(t *testing.T) {
suite := headertest.NewTestSuite(t)
head := suite.Head()

remoteStore := store.NewTestStore(ctx, t, head)
localStore := store.NewTestStore(ctx, t, head)
syncer, err := NewSyncer[*headertest.DummyHeader](
remoteStore := newTestStore(t, ctx, head)
localStore := newTestStore(t, ctx, head)
syncer, err := NewSyncer(
local.NewExchange(remoteStore),
localStore,
headertest.NewDummySubscriber(),
Expand Down Expand Up @@ -157,9 +159,9 @@ func TestSyncPendingRangesWithMisses(t *testing.T) {
suite := headertest.NewTestSuite(t)
head := suite.Head()

remoteStore := store.NewTestStore(ctx, t, head)
localStore := store.NewTestStore(ctx, t, head)
syncer, err := NewSyncer[*headertest.DummyHeader](
remoteStore := newTestStore(t, ctx, head)
localStore := newTestStore(t, ctx, head)
syncer, err := NewSyncer(
local.NewExchange(remoteStore),
localStore,
headertest.NewDummySubscriber(),
Expand Down Expand Up @@ -224,9 +226,9 @@ func TestSyncer_FindHeadersReturnsCorrectRange(t *testing.T) {
suite := headertest.NewTestSuite(t)
head := suite.Head()

remoteStore := store.NewTestStore(ctx, t, head)
localStore := store.NewTestStore(ctx, t, head)
syncer, err := NewSyncer[*headertest.DummyHeader](
remoteStore := newTestStore(t, ctx, head)
localStore := newTestStore(t, ctx, head)
syncer, err := NewSyncer(
local.NewExchange(remoteStore),
localStore,
headertest.NewDummySubscriber(),
Expand Down Expand Up @@ -260,9 +262,9 @@ func TestSyncerIncomingDuplicate(t *testing.T) {
suite := headertest.NewTestSuite(t)
head := suite.Head()

remoteStore := store.NewTestStore(ctx, t, head)
localStore := store.NewTestStore(ctx, t, head)
syncer, err := NewSyncer[*headertest.DummyHeader](
remoteStore := newTestStore(t, ctx, head)
localStore := newTestStore(t, ctx, head)
syncer, err := NewSyncer(
&delayedGetter[*headertest.DummyHeader]{Getter: local.NewExchange(remoteStore)},
localStore,
headertest.NewDummySubscriber(),
Expand Down Expand Up @@ -301,12 +303,12 @@ func TestSync_InvalidSyncTarget(t *testing.T) {
head := suite.Head()

// create a local store which is initialised at genesis height
localStore := store.NewTestStore(ctx, t, head)
localStore := newTestStore(t, ctx, head)
// create a peer which is already on height 100
remoteStore := headertest.NewStore[*headertest.DummyHeader](t, suite, 100)
remoteStore := headertest.NewStore(t, suite, 100)

syncer, err := NewSyncer[*headertest.DummyHeader](
local.NewExchange[*headertest.DummyHeader](remoteStore),
syncer, err := NewSyncer(
local.NewExchange(remoteStore),
localStore,
headertest.NewDummySubscriber(),
WithBlockTime(time.Nanosecond),
Expand Down Expand Up @@ -396,3 +398,9 @@ func (d *delayedGetter[H]) GetRangeByHeight(ctx context.Context, from H, to uint
return nil, ctx.Err()
}
}

// newTestStore creates initialized and started in memory header Store which is useful for testing.
func newTestStore(tb testing.TB, ctx context.Context, head *headertest.DummyHeader) header.Store[*headertest.DummyHeader] {
ds := sync.MutexWrap(datastore.NewMapDatastore())
return store.NewTestStore(tb, ctx, ds, head)
}

0 comments on commit 04064ab

Please sign in to comment.