Skip to content

Commit

Permalink
Merge pull request #126 from filecoin-project/feat/option-to-disable-…
Browse files Browse the repository at this point in the history
…graphsync

Global option to disable graphsync retrievals
  • Loading branch information
hannahhoward authored Mar 1, 2023
2 parents 150349b + 4698b96 commit 634af2b
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 27 deletions.
5 changes: 5 additions & 0 deletions cmd/lassie/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ var daemonFlags = []cli.Flag{
FlagExposeMetrics,
FlagVerbose,
FlagVeryVerbose,
FlagDisableGraphsync,
}

var daemonCmd = &cli.Command{
Expand All @@ -93,6 +94,7 @@ func daemonCommand(cctx *cli.Context) error {
libp2pHighWater := cctx.Int("libp2p-conns-highwater")
exposeMetrics := cctx.Bool("expose-metrics")
concurrentSPRetrievals := cctx.Uint("concurrent-sp-retrievals")
disableGraphsync := cctx.Bool("disable-graphsync")
lassieOpts := []lassie.LassieOption{lassie.WithProviderTimeout(20 * time.Second)}
if libp2pHighWater != 0 || libp2pLowWater != 0 {
connManager, err := connmgr.NewConnManager(libp2pLowWater, libp2pHighWater)
Expand All @@ -105,6 +107,9 @@ func daemonCommand(cctx *cli.Context) error {
lassie.WithConcurrentSPRetrievals(concurrentSPRetrievals),
)
}
if disableGraphsync {
lassieOpts = append(lassieOpts, lassie.WithGraphsyncDisabled())
}
// create a lassie instance
lassie, err := lassie.NewLassie(cctx.Context, lassieOpts...)
if err != nil {
Expand Down
6 changes: 5 additions & 1 deletion cmd/lassie/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ var fetchCmd = &cli.Command{
FlagEventRecorderUrl,
FlagVerbose,
FlagVeryVerbose,
FlagDisableGraphsync,
},
}

Expand Down Expand Up @@ -104,7 +105,10 @@ func Fetch(c *cli.Context) error {
finderOpt := lassie.WithFinder(retriever.NewDirectCandidateFinder(host, fetchProviderAddrInfos))
opts = append(opts, finderOpt)
}

disableGraphsync := c.Bool("disable-graphsync")
if disableGraphsync {
opts = append(opts, lassie.WithGraphsyncDisabled())
}
lassie, err := lassie.NewLassie(c.Context, opts...)
if err != nil {
return err
Expand Down
7 changes: 7 additions & 0 deletions cmd/lassie/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,10 @@ var FlagExposeMetrics = &cli.BoolFlag{
Usage: "expose metrics at /metrics",
EnvVars: []string{"LASSIE_EXPOSE_METRICS"},
}

// FlagDisableGraphsync turns off all retrievals over the graphsync protocol
var FlagDisableGraphsync = &cli.BoolFlag{
Name: "disable-graphsync",
Usage: "turn off graphsync retrievals",
EnvVars: []string{"LASSIE_DISABLE_GRAPHSYNC"},
}
69 changes: 47 additions & 22 deletions pkg/internal/itest/http_fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ func TestHttpFetch(t *testing.T) {
name string
graphsyncRemotes int
bitswapRemotes int
disableGraphsync bool
expectFail bool
modifyHttpConfig func(httpserver.HttpServerConfig) httpserver.HttpServerConfig
generate func(*testing.T, io.Reader, []testpeer.TestPeer) []unixfs.DirEntry
paths []string
Expand Down Expand Up @@ -483,6 +485,18 @@ func TestHttpFetch(t *testing.T) {
}
},
},
{
name: "two separate, parallel graphsync retrievals, with graphsync disabled",
graphsyncRemotes: 2,
disableGraphsync: true,
expectFail: true,
generate: func(t *testing.T, rndReader io.Reader, remotes []testpeer.TestPeer) []unixfs.DirEntry {
return []unixfs.DirEntry{
unixfs.GenerateFile(t, &remotes[0].LinkSystem, rndReader, 4<<20),
unixfs.GenerateDirectory(t, &remotes[1].LinkSystem, rndReader, 16<<20, false),
}
},
},
{
name: "parallel, separate graphsync and bitswap retrievals",
graphsyncRemotes: 1,
Expand Down Expand Up @@ -525,11 +539,16 @@ func TestHttpFetch(t *testing.T) {

// Setup a new lassie
req := require.New(t)
lassie, err := lassie.NewLassie(
ctx,
lassie.WithProviderTimeout(20*time.Second),
opts := []lassie.LassieOption{lassie.WithProviderTimeout(20 * time.Second),
lassie.WithHost(mrn.Self),
lassie.WithFinder(mrn.Finder),
}
if testCase.disableGraphsync {
opts = append(opts, lassie.WithGraphsyncDisabled())
}
lassie, err := lassie.NewLassie(
ctx,
opts...,
)
req.NoError(err)

Expand Down Expand Up @@ -581,29 +600,35 @@ func TestHttpFetch(t *testing.T) {
}
}

// wait for graphsync retrievals to finish on the remotes
var wg sync.WaitGroup
wg.Add(len(finishedChans))
for _, finishedChan := range finishedChans {
go func(finishedChan chan []datatransfer.Event) {
mocknet.WaitForFinish(ctx, t, finishedChan, 1*time.Second)
wg.Done()
}(finishedChan)
if !testCase.disableGraphsync {
// wait for graphsync retrievals to finish on the remotes
var wg sync.WaitGroup
wg.Add(len(finishedChans))
for _, finishedChan := range finishedChans {
go func(finishedChan chan []datatransfer.Event) {
mocknet.WaitForFinish(ctx, t, finishedChan, 1*time.Second)
wg.Done()
}(finishedChan)
}
wg.Wait()
}
wg.Wait()

for i, resp := range responses {
req.Equal(http.StatusOK, resp.StatusCode)
body, err := io.ReadAll(resp.Body)
req.NoError(err)
resp.Body.Close()
req.NoError(err)

if testCase.validateBodies != nil && testCase.validateBodies[i] != nil {
testCase.validateBodies[i](t, srcData[i], body)
if testCase.expectFail {
req.Equal(http.StatusGatewayTimeout, resp.StatusCode)
} else {
gotDir := unixfs.CarToDirEntry(t, bytes.NewReader(body), srcData[i].Root, true)
unixfs.CompareDirEntries(t, srcData[i], gotDir)
req.Equal(http.StatusOK, resp.StatusCode)
body, err := io.ReadAll(resp.Body)
req.NoError(err)
resp.Body.Close()
req.NoError(err)

if testCase.validateBodies != nil && testCase.validateBodies[i] != nil {
testCase.validateBodies[i](t, srcData[i], body)
} else {
gotDir := unixfs.CarToDirEntry(t, bytes.NewReader(body), srcData[i].Root, true)
unixfs.CompareDirEntries(t, srcData[i], gotDir)
}
}
}

Expand Down
8 changes: 8 additions & 0 deletions pkg/lassie/lassie.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type LassieConfig struct {
ConcurrentSPRetrievals uint
GlobalTimeout time.Duration
Libp2pOptions []libp2p.Option
DisableGraphsync bool
}

type LassieOption func(cfg *LassieConfig)
Expand Down Expand Up @@ -80,6 +81,7 @@ func NewLassieWithConfig(ctx context.Context, cfg *LassieConfig) (*Lassie, error
RetrievalTimeout: cfg.ProviderTimeout,
MaxConcurrentRetrievals: cfg.ConcurrentSPRetrievals,
},
DisableGraphsync: cfg.DisableGraphsync,
}

retriever, err := retriever.NewRetriever(ctx, retrieverCfg, retrievalClient, cfg.Finder, bitswapRetriever)
Expand Down Expand Up @@ -143,6 +145,12 @@ func WithConcurrentSPRetrievals(maxConcurrentSPRtreievals uint) LassieOption {
}
}

func WithGraphsyncDisabled() LassieOption {
return func(cfg *LassieConfig) {
cfg.DisableGraphsync = true
}
}

func (l *Lassie) Fetch(ctx context.Context, request types.RetrievalRequest) (*types.RetrievalStats, error) {
var cancel context.CancelFunc
if l.cfg.GlobalTimeout != time.Duration(0) {
Expand Down
4 changes: 4 additions & 0 deletions pkg/retriever/coordinators/race.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package coordinators

import (
"context"
"errors"
"sync"

"github.com/filecoin-project/lassie/pkg/types"
Expand Down Expand Up @@ -34,6 +35,9 @@ func Race(ctx context.Context, retrievalCalls []types.CandidateRetrievalCall) (*
func collectResults(ctx context.Context, resultChan <-chan types.RetrievalResult, totalRetrievers int) (*types.RetrievalStats, error) {
var totalErr error
finishedCount := 0
if totalRetrievers == 0 {
return nil, errors.New("no eligible retrievers")
}
for {
select {
case result := <-resultChan:
Expand Down
13 changes: 9 additions & 4 deletions pkg/retriever/retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type RetrieverConfig struct {
DefaultMinerConfig MinerConfig
MinerConfigs map[peer.ID]MinerConfig
PaidRetrievals bool
DisableGraphsync bool
}

func (cfg *RetrieverConfig) getMinerConfig(peer peer.ID) MinerConfig {
Expand Down Expand Up @@ -97,20 +98,24 @@ func NewRetriever(
eventManager: events.NewEventManager(ctx),
spTracker: newSpTracker(nil),
}
candidateRetrievers := []types.CandidateRetriever{
&GraphSyncRetriever{
candidateRetrievers := []types.CandidateRetriever{}
protocols := []multicodec.Code{}
if !config.DisableGraphsync {
candidateRetrievers = append(candidateRetrievers, &GraphSyncRetriever{
GetStorageProviderTimeout: retriever.getStorageProviderTimeout,
IsAcceptableQueryResponse: retriever.isAcceptableQueryResponse,
Client: client,
},
})
protocols = append(protocols, multicodec.TransportGraphsyncFilecoinv1)
}
if bitswapRetriever != nil {
candidateRetrievers = append(candidateRetrievers, bitswapRetriever)
protocols = append(protocols, multicodec.TransportBitswap)
}
retriever.executor = combinators.RetrieverWithCandidateFinder{
CandidateFinder: NewAssignableCandidateFinder(candidateFinder, retriever.isAcceptableStorageProvider),
CandidateRetriever: combinators.SplitRetriever{
CandidateSplitter: NewProtocolSplitter([]multicodec.Code{multicodec.TransportGraphsyncFilecoinv1, multicodec.TransportBitswap}),
CandidateSplitter: NewProtocolSplitter(protocols),
CandidateRetrievers: candidateRetrievers,
CoordinationKind: types.RaceCoordination,
},
Expand Down

0 comments on commit 634af2b

Please sign in to comment.