diff --git a/go.mod b/go.mod index 0d26924835..63f6879041 100644 --- a/go.mod +++ b/go.mod @@ -202,7 +202,7 @@ require ( github.com/ipfs/bbloom v0.0.4 // indirect github.com/ipfs/go-ipfs-blockstore v1.3.1 // indirect github.com/ipfs/go-ipfs-delay v0.0.1 // indirect - github.com/ipfs/go-ipfs-ds-help v1.1.0 // indirect + github.com/ipfs/go-ipfs-ds-help v1.1.0 github.com/ipfs/go-ipfs-exchange-interface v0.2.1 // indirect github.com/ipfs/go-ipfs-exchange-offline v0.3.0 // indirect github.com/ipfs/go-ipfs-pq v0.0.3 // indirect diff --git a/nodebuilder/tests/reconstruct_test.go b/nodebuilder/tests/reconstruct_test.go index 8b1490272c..d8640c5249 100644 --- a/nodebuilder/tests/reconstruct_test.go +++ b/nodebuilder/tests/reconstruct_test.go @@ -14,6 +14,7 @@ import ( "github.com/libp2p/go-libp2p/core/event" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" + ma "github.com/multiformats/go-multiaddr" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" @@ -81,6 +82,156 @@ func TestFullReconstructFromBridge(t *testing.T) { require.NoError(t, errg.Wait()) } +/* +Test-Case: Full Node reconstructs blocks from each other, after unsuccessfully syncing the complete +block from LN subnetworks. Analog to TestShareAvailable_DisconnectedFullNodes. +*/ +func TestFullReconstructFromFulls(t *testing.T) { + light.DefaultSampleAmount = 10 // s + const ( + blocks = 10 + btime = time.Millisecond * 300 + bsize = 8 // k + lnodes = 12 // c - total number of nodes on two subnetworks + ) + + ctx, cancel := context.WithTimeout(context.Background(), swamp.DefaultTestTimeout) + t.Cleanup(cancel) + + sw := swamp.NewSwamp(t, swamp.WithBlockTime(btime)) + fillDn := swamp.FillBlocks(ctx, sw.ClientContext, sw.Accounts, bsize, blocks) + + const defaultTimeInterval = time.Second * 5 + bridge := sw.NewBridgeNode() + + sw.SetBootstrapper(t, bridge) + require.NoError(t, bridge.Start(ctx)) + + // TODO: This is required to avoid flakes coming from unfinished retry + // mechanism for the same peer in go-header + _, err := bridge.HeaderServ.WaitForHeight(ctx, uint64(blocks)) + require.NoError(t, err) + + lights1 := make([]*nodebuilder.Node, lnodes/2) + lights2 := make([]*nodebuilder.Node, lnodes/2) + subs := make([]event.Subscription, lnodes) + errg, errCtx := errgroup.WithContext(ctx) + for i := 0; i < lnodes/2; i++ { + i := i + errg.Go(func() error { + lnConfig := nodebuilder.DefaultConfig(node.Light) + setTimeInterval(lnConfig, defaultTimeInterval) + light := sw.NewNodeWithConfig(node.Light, lnConfig) + sub, err := light.Host.EventBus().Subscribe(&event.EvtPeerIdentificationCompleted{}) + if err != nil { + return err + } + subs[i] = sub + lights1[i] = light + return light.Start(errCtx) + }) + errg.Go(func() error { + lnConfig := nodebuilder.DefaultConfig(node.Light) + setTimeInterval(lnConfig, defaultTimeInterval) + light := sw.NewNodeWithConfig(node.Light, lnConfig) + sub, err := light.Host.EventBus().Subscribe(&event.EvtPeerIdentificationCompleted{}) + if err != nil { + return err + } + subs[(lnodes/2)+i] = sub + lights2[i] = light + return light.Start(errCtx) + }) + } + + require.NoError(t, errg.Wait()) + + for i := 0; i < lnodes; i++ { + select { + case <-ctx.Done(): + t.Fatal("peer was not found") + case <-subs[i].Out(): + require.NoError(t, subs[i].Close()) + continue + } + } + + // Remove bootstrappers to prevent FNs from connecting to bridge + sw.Bootstrappers = []ma.Multiaddr{} + // Use light nodes from respective subnetworks as bootstrappers to prevent connection to bridge + lnBootstrapper1, err := peer.AddrInfoToP2pAddrs(host.InfoFromHost(lights1[0].Host)) + require.NoError(t, err) + lnBootstrapper2, err := peer.AddrInfoToP2pAddrs(host.InfoFromHost(lights2[0].Host)) + require.NoError(t, err) + + cfg := nodebuilder.DefaultConfig(node.Full) + setTimeInterval(cfg, defaultTimeInterval) + cfg.Share.UseShareExchange = false + cfg.Share.Discovery.PeersLimit = 0 + cfg.Header.TrustedPeers = []string{lnBootstrapper1[0].String()} + full1 := sw.NewNodeWithConfig(node.Full, cfg) + cfg.Header.TrustedPeers = []string{lnBootstrapper2[0].String()} + full2 := sw.NewNodeWithConfig(node.Full, cfg) + require.NoError(t, full1.Start(ctx)) + require.NoError(t, full2.Start(ctx)) + + // Form topology + for i := 0; i < lnodes/2; i++ { + // Separate light nodes into two subnetworks + for j := 0; j < lnodes/2; j++ { + sw.Disconnect(t, lights1[i], lights2[j]) + if i != j { + sw.Connect(t, lights1[i], lights1[j]) + sw.Connect(t, lights2[i], lights2[j]) + } + } + + sw.Connect(t, full1, lights1[i]) + sw.Disconnect(t, full1, lights2[i]) + + sw.Connect(t, full2, lights2[i]) + sw.Disconnect(t, full2, lights1[i]) + } + + // Ensure the fulls are not connected to the bridge + sw.Disconnect(t, full1, full2) + sw.Disconnect(t, full1, bridge) + sw.Disconnect(t, full2, bridge) + + h, err := full1.HeaderServ.WaitForHeight(ctx, uint64(10+blocks-1)) + require.NoError(t, err) + + // Ensure that the full nodes cannot reconstruct before being connected to each other + ctxErr, cancelErr := context.WithTimeout(ctx, time.Second*30) + errg, errCtx = errgroup.WithContext(ctxErr) + errg.Go(func() error { + return full1.ShareServ.SharesAvailable(errCtx, h.DAH) + }) + errg.Go(func() error { + return full2.ShareServ.SharesAvailable(errCtx, h.DAH) + }) + require.Error(t, errg.Wait()) + cancelErr() + + // Reconnect FNs + sw.Connect(t, full1, full2) + + errg, bctx := errgroup.WithContext(ctx) + for i := 10; i < blocks+11; i++ { + h, err := full1.HeaderServ.WaitForHeight(bctx, uint64(i)) + require.NoError(t, err) + errg.Go(func() error { + return full1.ShareServ.SharesAvailable(bctx, h.DAH) + }) + errg.Go(func() error { + return full2.ShareServ.SharesAvailable(bctx, h.DAH) + }) + } + + require.NoError(t, <-fillDn) + require.NoError(t, errg.Wait()) +} + /* Test-Case: Full Node reconstructs blocks only from Light Nodes Pre-Reqs: diff --git a/share/eds/blockstore.go b/share/eds/blockstore.go index 9fc9d7f2f8..22ef28f821 100644 --- a/share/eds/blockstore.go +++ b/share/eds/blockstore.go @@ -10,12 +10,15 @@ import ( blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/namespace" + dshelp "github.com/ipfs/go-ipfs-ds-help" ipld "github.com/ipfs/go-ipld-format" ) var _ bstore.Blockstore = (*blockstore)(nil) var ( + blockstoreCacheKey = datastore.NewKey("bs-cache") errUnsupportedOperation = errors.New("unsupported operation") ) @@ -30,29 +33,42 @@ var ( type blockstore struct { store *Store cache *blockstoreCache + ds datastore.Batching } -func newBlockstore(store *Store, cache *blockstoreCache) *blockstore { +func newBlockstore(store *Store, cache *blockstoreCache, ds datastore.Batching) *blockstore { return &blockstore{ store: store, cache: cache, + ds: namespace.Wrap(ds, blockstoreCacheKey), } } func (bs *blockstore) Has(ctx context.Context, cid cid.Cid) (bool, error) { keys, err := bs.store.dgstr.ShardsContainingMultihash(ctx, cid.Hash()) - if errors.Is(err, ErrNotFound) { - return false, nil + if errors.Is(err, ErrNotFound) || errors.Is(err, ErrNotFoundInIndex) { + // key wasn't found in top level blockstore, but could be in datastore while being reconstructed + dsHas, dsErr := bs.ds.Has(ctx, dshelp.MultihashToDsKey(cid.Hash())) + if dsErr != nil { + return false, nil + } + return dsHas, nil } if err != nil { - return false, fmt.Errorf("failed to find shards containing multihash: %w", err) + return false, err } + return len(keys) > 0, nil } func (bs *blockstore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) { blockstr, err := bs.getReadOnlyBlockstore(ctx, cid) - if errors.Is(err, ErrNotFound) { + if errors.Is(err, ErrNotFound) || errors.Is(err, ErrNotFoundInIndex) { + k := dshelp.MultihashToDsKey(cid.Hash()) + blockData, err := bs.ds.Get(ctx, k) + if err == nil { + return blocks.NewBlockWithCid(blockData, cid) + } // nmt's GetNode expects an ipld.ErrNotFound when a cid is not found. return nil, ipld.ErrNotFound{Cid: cid} } @@ -65,7 +81,12 @@ func (bs *blockstore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error func (bs *blockstore) GetSize(ctx context.Context, cid cid.Cid) (int, error) { blockstr, err := bs.getReadOnlyBlockstore(ctx, cid) - if errors.Is(err, ErrNotFound) { + if errors.Is(err, ErrNotFound) || errors.Is(err, ErrNotFoundInIndex) { + k := dshelp.MultihashToDsKey(cid.Hash()) + size, err := bs.ds.GetSize(ctx, k) + if err == nil { + return size, nil + } // nmt's GetSize expects an ipld.ErrNotFound when a cid is not found. return 0, ipld.ErrNotFound{Cid: cid} } @@ -75,27 +96,35 @@ func (bs *blockstore) GetSize(ctx context.Context, cid cid.Cid) (int, error) { return blockstr.GetSize(ctx, cid) } -// DeleteBlock is a noop on the EDS blockstore that returns an errUnsupportedOperation when called. -func (bs *blockstore) DeleteBlock(context.Context, cid.Cid) error { - return errUnsupportedOperation +func (bs *blockstore) DeleteBlock(ctx context.Context, cid cid.Cid) error { + k := dshelp.MultihashToDsKey(cid.Hash()) + return bs.ds.Delete(ctx, k) } -// Put is a noop on the EDS blockstore, but it does not return an error because it is called by -// bitswap. For clarification, an implementation of Put does not make sense in this context because -// it is unclear which CAR file the block should be written to. -// -// TODO: throw errUnsupportedOperation after issue #1440 -func (bs *blockstore) Put(context.Context, blocks.Block) error { - return nil +func (bs *blockstore) Put(ctx context.Context, blk blocks.Block) error { + k := dshelp.MultihashToDsKey(blk.Cid().Hash()) + // note: we leave duplicate resolution to the underlying datastore + return bs.ds.Put(ctx, k, blk.RawData()) } -// PutMany is a noop on the EDS blockstore, but it does not return an error because it is called by -// bitswap. For clarification, an implementation of PutMany does not make sense in this context -// because it is unclear which CAR file the blocks should be written to. -// -// TODO: throw errUnsupportedOperation after issue #1440 -func (bs *blockstore) PutMany(context.Context, []blocks.Block) error { - return nil +func (bs *blockstore) PutMany(ctx context.Context, blocks []blocks.Block) error { + if len(blocks) == 1 { + // performance fast-path + return bs.Put(ctx, blocks[0]) + } + + t, err := bs.ds.Batch(ctx) + if err != nil { + return err + } + for _, b := range blocks { + k := dshelp.MultihashToDsKey(b.Cid().Hash()) + err = t.Put(ctx, k, b.RawData()) + if err != nil { + return err + } + } + return t.Commit(ctx) } // AllKeysChan is a noop on the EDS blockstore because the keys are not stored in a single CAR file. @@ -112,7 +141,7 @@ func (bs *blockstore) HashOnRead(bool) { // getReadOnlyBlockstore finds the underlying blockstore of the shard that contains the given CID. func (bs *blockstore) getReadOnlyBlockstore(ctx context.Context, cid cid.Cid) (dagstore.ReadBlockstore, error) { keys, err := bs.store.dgstr.ShardsContainingMultihash(ctx, cid.Hash()) - if errors.Is(err, datastore.ErrNotFound) { + if errors.Is(err, datastore.ErrNotFound) || errors.Is(err, ErrNotFoundInIndex) { return nil, ErrNotFound } if err != nil { diff --git a/share/eds/inverted_index.go b/share/eds/inverted_index.go index 81df5d92ef..8b9dcb5d95 100644 --- a/share/eds/inverted_index.go +++ b/share/eds/inverted_index.go @@ -2,6 +2,7 @@ package eds import ( "context" + "errors" "fmt" "github.com/filecoin-project/dagstore/index" @@ -14,6 +15,9 @@ import ( const invertedIndexPath = "/inverted_index/" +// ErrNotFoundInIndex is returned instead of ErrNotFound if the multihash doesn't exist in the index +var ErrNotFoundInIndex = fmt.Errorf("does not exist in index") + // simpleInvertedIndex is an inverted index that only stores a single shard key per multihash. Its // implementation is modified from the default upstream implementation in dagstore/index. type simpleInvertedIndex struct { @@ -76,7 +80,7 @@ func (s *simpleInvertedIndex) GetShardsForMultihash(ctx context.Context, mh mult key := ds.NewKey(string(mh)) sbz, err := s.ds.Get(ctx, key) if err != nil { - return nil, fmt.Errorf("failed to lookup index for mh %s, err: %w", mh, err) + return nil, errors.Join(ErrNotFoundInIndex, err) } return []shard.Key{shard.KeyFromString(string(sbz))}, nil diff --git a/share/eds/retriever.go b/share/eds/retriever.go index a870e07e22..e7837ae23a 100644 --- a/share/eds/retriever.go +++ b/share/eds/retriever.go @@ -122,6 +122,7 @@ type retrievalSession struct { // newSession creates a new retrieval session and kicks off requesting process. func (r *Retriever) newSession(ctx context.Context, dah *da.DataAvailabilityHeader) (*retrievalSession, error) { size := len(dah.RowRoots) + treeFn := func(_ rsmt2d.Axis, index uint) rsmt2d.Tree { // use proofs adder if provided, to cache collected proofs while recomputing the eds var opts []nmt.Option @@ -152,6 +153,7 @@ func (r *Retriever) newSession(ctx context.Context, dah *da.DataAvailabilityHead for i := range ses.squareCellsLks { ses.squareCellsLks[i] = make([]sync.Mutex, size) } + go ses.request(ctx) return ses, nil } diff --git a/share/eds/store.go b/share/eds/store.go index fd679e2591..24a96c9fe4 100644 --- a/share/eds/store.go +++ b/share/eds/store.go @@ -119,7 +119,7 @@ func NewStore(basepath string, ds datastore.Batching) (*Store, error) { mounts: r, cache: cache, } - store.bs = newBlockstore(store, cache) + store.bs = newBlockstore(store, cache, ds) return store, nil } diff --git a/share/getters/getter_test.go b/share/getters/getter_test.go index 5cc73aa7cb..ae90074625 100644 --- a/share/getters/getter_test.go +++ b/share/getters/getter_test.go @@ -163,7 +163,8 @@ func TestIPLDGetter(t *testing.T) { err = edsStore.Start(ctx) require.NoError(t, err) - bserv := bsrv.New(edsStore.Blockstore(), offline.Exchange(edsStore.Blockstore())) + bStore := edsStore.Blockstore() + bserv := bsrv.New(bStore, offline.Exchange(bStore)) sg := NewIPLDGetter(bserv) t.Run("GetShare", func(t *testing.T) { @@ -200,6 +201,12 @@ func TestIPLDGetter(t *testing.T) { retrievedEDS, err := sg.GetEDS(ctx, &dah) require.NoError(t, err) assert.True(t, randEds.Equals(retrievedEDS)) + + // Ensure blocks still exist after cleanup + colRoots, _ := retrievedEDS.ColRoots() + has, err := bStore.Has(ctx, ipld.MustCidFromNamespacedSha256(colRoots[0])) + assert.NoError(t, err) + assert.True(t, has) }) t.Run("GetSharesByNamespace", func(t *testing.T) {