From 13c2daba63c9d999b9de8c017749187035ad34d3 Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Tue, 1 Aug 2023 20:21:43 +0800 Subject: [PATCH 1/2] fix(share/eds): turn off eds store dagstore gc by default (#2529) Dagstore GC performs garbage collection by reclaiming transient files of shards that are currently available but inactive, or errored. We don't use transient files right now, so GC is turned off by default. --- share/eds/store.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/share/eds/store.go b/share/eds/store.go index 0683df0d99..9bc539590b 100644 --- a/share/eds/store.go +++ b/share/eds/store.go @@ -34,7 +34,10 @@ const ( indexPath = "/index/" transientsPath = "/transients/" - defaultGCInterval = time.Hour + // GC performs DAG store garbage collection by reclaiming transient files of + // shards that are currently available but inactive, or errored. + // We don't use transient files right now, so GC is turned off by default. + defaultGCInterval = 0 ) var ErrNotFound = errors.New("eds not found in store") @@ -117,13 +120,16 @@ func (s *Store) Start(ctx context.Context) error { return err } // start Store only if DagStore succeeds - ctx, cancel := context.WithCancel(context.Background()) + runCtx, cancel := context.WithCancel(context.Background()) s.cancel = cancel // initialize empty gc result to avoid panic on access s.lastGCResult.Store(&dagstore.GCResult{ Shards: make(map[shard.Key]error), }) - go s.gc(ctx) + + if s.gcInterval != 0 { + go s.gc(runCtx) + } return nil } From 08ffb69f0c14776f6b2acd7cdf774d92524533fd Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Tue, 1 Aug 2023 20:39:28 +0800 Subject: [PATCH 2/2] feat(share/shrex/nd): add swamp test for shrex nd (#2227) ## Overview Closes https://github.com/celestiaorg/celestia-node/issues/2119 --------- Co-authored-by: rene <41963722+renaynay@users.noreply.github.com> --- nodebuilder/tests/nd_test.go | 199 +++++++++++++++++++++++++++++++++++ share/p2p/shrexnd/server.go | 24 +++-- 2 files changed, 215 insertions(+), 8 deletions(-) create mode 100644 nodebuilder/tests/nd_test.go diff --git a/nodebuilder/tests/nd_test.go b/nodebuilder/tests/nd_test.go new file mode 100644 index 0000000000..955cff2ffe --- /dev/null +++ b/nodebuilder/tests/nd_test.go @@ -0,0 +1,199 @@ +package tests + +import ( + "context" + "testing" + "time" + + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/network" + "github.com/stretchr/testify/require" + "go.uber.org/fx" + + "github.com/celestiaorg/celestia-node/nodebuilder" + "github.com/celestiaorg/celestia-node/nodebuilder/node" + "github.com/celestiaorg/celestia-node/nodebuilder/p2p" + "github.com/celestiaorg/celestia-node/nodebuilder/tests/swamp" + "github.com/celestiaorg/celestia-node/share" + "github.com/celestiaorg/celestia-node/share/eds" + "github.com/celestiaorg/celestia-node/share/getters" + "github.com/celestiaorg/celestia-node/share/p2p/shrexnd" +) + +func TestShrexNDFromLights(t *testing.T) { + const ( + blocks = 10 + btime = time.Millisecond * 300 + bsize = 16 + ) + + ctx, cancel := context.WithTimeout(context.Background(), swamp.DefaultTestTimeout) + t.Cleanup(cancel) + + sw := swamp.NewSwamp(t, swamp.WithBlockTime(btime)) + fillDn := swamp.FillBlocks(ctx, sw.ClientContext, sw.Accounts, bsize, blocks) + + bridge := sw.NewBridgeNode() + sw.SetBootstrapper(t, bridge) + + cfg := nodebuilder.DefaultConfig(node.Light) + cfg.Share.Discovery.PeersLimit = 1 + light := sw.NewNodeWithConfig(node.Light, cfg) + + err := bridge.Start(ctx) + require.NoError(t, err) + err = light.Start(ctx) + require.NoError(t, err) + + // wait for chain to be filled + require.NoError(t, <-fillDn) + + // first 2 blocks are not filled with data + for i := 3; i < blocks; i++ { + h, err := bridge.HeaderServ.GetByHeight(ctx, uint64(i)) + require.NoError(t, err) + + reqCtx, cancel := context.WithTimeout(ctx, time.Second*5) + + // ensure to fetch random namespace (not the reserved namespace) + namespace := h.DAH.RowRoots[1][:share.NamespaceSize] + + expected, err := bridge.ShareServ.GetSharesByNamespace(reqCtx, h.DAH, namespace) + require.NoError(t, err) + got, err := light.ShareServ.GetSharesByNamespace(reqCtx, h.DAH, namespace) + require.NoError(t, err) + + require.True(t, len(got[0].Shares) > 0) + require.Equal(t, expected, got) + + cancel() + } +} + +func TestShrexNDFromLightsWithBadFulls(t *testing.T) { + const ( + blocks = 10 + btime = time.Millisecond * 300 + bsize = 16 + amountOfFulls = 5 + testTimeout = time.Second * 10 + ) + + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + t.Cleanup(cancel) + + sw := swamp.NewSwamp(t, swamp.WithBlockTime(btime)) + fillDn := swamp.FillBlocks(ctx, sw.ClientContext, sw.Accounts, bsize, blocks) + + bridge := sw.NewBridgeNode() + sw.SetBootstrapper(t, bridge) + + // create full nodes with basic stream.reset handler + ndHandler := func(stream network.Stream) { + _ = stream.Reset() + } + fulls := make([]*nodebuilder.Node, 0, amountOfFulls) + for i := 0; i < amountOfFulls; i++ { + cfg := nodebuilder.DefaultConfig(node.Full) + setTimeInterval(cfg, testTimeout) + full := sw.NewNodeWithConfig(node.Full, cfg, replaceNDServer(cfg, ndHandler), replaceShareGetter()) + fulls = append(fulls, full) + } + + lnConfig := nodebuilder.DefaultConfig(node.Light) + lnConfig.Share.Discovery.PeersLimit = uint(amountOfFulls) + light := sw.NewNodeWithConfig(node.Light, lnConfig) + + // start all nodes + require.NoError(t, bridge.Start(ctx)) + require.NoError(t, startFullNodes(ctx, fulls...)) + require.NoError(t, light.Start(ctx)) + + // wait for chain to fill up + require.NoError(t, <-fillDn) + + // first 2 blocks are not filled with data + for i := 3; i < blocks; i++ { + h, err := bridge.HeaderServ.GetByHeight(ctx, uint64(i)) + require.NoError(t, err) + + if len(h.DAH.RowRoots) != bsize*2 { + // fill blocks does not always fill every block to the given block + // size - this check prevents trying to fetch shares for the parity + // namespace. + continue + } + + reqCtx, cancel := context.WithTimeout(ctx, time.Second*5) + + // ensure to fetch random namespace (not the reserved namespace) + namespace := h.DAH.RowRoots[1][:share.NamespaceSize] + + expected, err := bridge.ShareServ.GetSharesByNamespace(reqCtx, h.DAH, namespace) + require.NoError(t, err) + require.True(t, len(expected[0].Shares) > 0) + + // choose a random full to test + gotFull, err := fulls[len(fulls)/2].ShareServ.GetSharesByNamespace(reqCtx, h.DAH, namespace) + require.NoError(t, err) + require.True(t, len(gotFull[0].Shares) > 0) + + gotLight, err := light.ShareServ.GetSharesByNamespace(reqCtx, h.DAH, namespace) + require.NoError(t, err) + require.True(t, len(gotLight[0].Shares) > 0) + + require.Equal(t, expected, gotFull) + require.Equal(t, expected, gotLight) + + cancel() + } +} + +func startFullNodes(ctx context.Context, fulls ...*nodebuilder.Node) error { + for _, full := range fulls { + err := full.Start(ctx) + if err != nil { + return err + } + } + return nil +} + +func replaceNDServer(cfg *nodebuilder.Config, handler network.StreamHandler) fx.Option { + return fx.Decorate(fx.Annotate( + func( + host host.Host, + store *eds.Store, + getter *getters.StoreGetter, + network p2p.Network, + ) (*shrexnd.Server, error) { + cfg.Share.ShrExNDParams.WithNetworkID(network.String()) + return shrexnd.NewServer(cfg.Share.ShrExNDParams, host, store, getter) + }, + fx.OnStart(func(ctx context.Context, server *shrexnd.Server) error { + // replace handler for server + server.SetHandler(handler) + return server.Start(ctx) + }), + fx.OnStop(func(ctx context.Context, server *shrexnd.Server) error { + return server.Start(ctx) + }), + )) +} + +func replaceShareGetter() fx.Option { + return fx.Decorate(fx.Annotate( + func( + host host.Host, + store *eds.Store, + storeGetter *getters.StoreGetter, + shrexGetter *getters.ShrexGetter, + network p2p.Network, + ) share.Getter { + cascade := make([]share.Getter, 0, 2) + cascade = append(cascade, storeGetter) + cascade = append(cascade, getters.NewTeeGetter(shrexGetter, store)) + return getters.NewCascadeGetter(cascade) + }, + )) +} diff --git a/share/p2p/shrexnd/server.go b/share/p2p/shrexnd/server.go index d61e6b2e68..830a853946 100644 --- a/share/p2p/shrexnd/server.go +++ b/share/p2p/shrexnd/server.go @@ -29,8 +29,9 @@ type Server struct { host host.Host protocolID protocol.ID - getter share.Getter - store *eds.Store + handler network.StreamHandler + getter share.Getter + store *eds.Store params *Parameters middleware *p2p.Middleware @@ -52,18 +53,20 @@ func NewServer(params *Parameters, host host.Host, store *eds.Store, getter shar middleware: p2p.NewMiddleware(params.ConcurrencyLimit), } - return srv, nil -} - -// Start starts the server -func (srv *Server) Start(context.Context) error { ctx, cancel := context.WithCancel(context.Background()) srv.cancel = cancel handler := func(s network.Stream) { srv.handleNamespacedData(ctx, s) } - srv.host.SetStreamHandler(srv.protocolID, srv.middleware.RateLimitHandler(handler)) + srv.handler = srv.middleware.RateLimitHandler(handler) + + return srv, nil +} + +// Start starts the server +func (srv *Server) Start(context.Context) error { + srv.host.SetStreamHandler(srv.protocolID, srv.handler) return nil } @@ -74,6 +77,11 @@ func (srv *Server) Stop(context.Context) error { return nil } +// SetHandler sets server handler +func (srv *Server) SetHandler(handler network.StreamHandler) { + srv.handler = handler +} + func (srv *Server) observeRateLimitedRequests() { numRateLimited := srv.middleware.DrainCounter() if numRateLimited > 0 {