From 85347fc1a5fb1331830185b5b2f691f00fc62627 Mon Sep 17 00:00:00 2001 From: dirkmc Date: Wed, 31 Aug 2022 16:26:26 +0200 Subject: [PATCH] feat: transfer limiter web UI updates (#740) --- gql/resolver.go | 51 +++++++++---- gql/resolver_transfers.go | 57 ++++++++++++++- gql/schema.graphql | 18 +++++ react/src/DealTransfers.css | 23 ++++++ react/src/DealTransfers.js | 117 +++++++++++++++++++++++++++--- react/src/Deals.css | 5 ++ react/src/Deals.js | 38 ++++++++++ react/src/gql.js | 25 +++++++ storagemarket/provider.go | 4 - storagemarket/transfer_limiter.go | 63 ++++++++++++++++ storagemarket/transfers.go | 67 ++++++++++++----- 11 files changed, 418 insertions(+), 50 deletions(-) create mode 100644 react/src/DealTransfers.css diff --git a/gql/resolver.go b/gql/resolver.go index 4884fda73..2cabdbbec 100644 --- a/gql/resolver.go +++ b/gql/resolver.go @@ -102,7 +102,7 @@ func (r *resolver) Deal(ctx context.Context, args struct{ ID graphql.ID }) (*dea return nil, err } - return newDealResolver(deal, r.dealsDB, r.logsDB, r.spApi), nil + return newDealResolver(deal, r.provider, r.dealsDB, r.logsDB, r.spApi), nil } type dealsArgs struct { @@ -135,7 +135,7 @@ func (r *resolver) Deals(ctx context.Context, args dealsArgs) (*dealListResolver resolvers := make([]*dealResolver, 0, len(deals)) for _, deal := range deals { - resolvers = append(resolvers, newDealResolver(&deal, r.dealsDB, r.logsDB, r.spApi)) + resolvers = append(resolvers, newDealResolver(&deal, r.provider, r.dealsDB, r.logsDB, r.spApi)) } return &dealListResolver{ @@ -168,7 +168,7 @@ func (r *resolver) DealUpdate(ctx context.Context, args struct{ ID graphql.ID }) } net := make(chan *dealResolver, 1) - net <- newDealResolver(deal, r.dealsDB, r.logsDB, r.spApi) + net <- newDealResolver(deal, r.provider, r.dealsDB, r.logsDB, r.spApi) // Updates to deal state are broadcast on pubsub. Pipe these updates to the // client @@ -180,7 +180,7 @@ func (r *resolver) DealUpdate(ctx context.Context, args struct{ ID graphql.ID }) } return nil, fmt.Errorf("%s: subscribing to deal updates: %w", args.ID, err) } - sub := &subLastUpdate{sub: dealUpdatesSub, dealsDB: r.dealsDB, logsDB: r.logsDB, spApi: r.spApi} + sub := &subLastUpdate{sub: dealUpdatesSub, provider: r.provider, dealsDB: r.dealsDB, logsDB: r.logsDB, spApi: r.spApi} go func() { sub.Pipe(ctx, net) // blocks until connection is closed close(net) @@ -219,7 +219,7 @@ func (r *resolver) DealNew(ctx context.Context) (<-chan *dealNewResolver, error) case evti := <-sub.Out(): // Pipe the deal to the new deal channel di := evti.(types.ProviderDealState) - rsv := newDealResolver(&di, r.dealsDB, r.logsDB, r.spApi) + rsv := newDealResolver(&di, r.provider, r.dealsDB, r.logsDB, r.spApi) totalCount, err := r.dealsDB.Count(ctx, "") if err != nil { log.Errorf("getting total deal count: %w", err) @@ -330,15 +330,17 @@ func (r *resolver) dealList(ctx context.Context, query string, cursor *graphql.I type dealResolver struct { types.ProviderDealState + provider *storagemarket.Provider transferred uint64 dealsDB *db.DealsDB logsDB *db.LogsDB spApi sealingpipeline.API } -func newDealResolver(deal *types.ProviderDealState, dealsDB *db.DealsDB, logsDB *db.LogsDB, spApi sealingpipeline.API) *dealResolver { +func newDealResolver(deal *types.ProviderDealState, provider *storagemarket.Provider, dealsDB *db.DealsDB, logsDB *db.LogsDB, spApi sealingpipeline.API) *dealResolver { return &dealResolver{ ProviderDealState: *deal, + provider: provider, transferred: uint64(deal.NBytesReceived), dealsDB: dealsDB, logsDB: logsDB, @@ -502,13 +504,17 @@ func (dr *dealResolver) message(ctx context.Context, checkpoint dealcheckpoints. if dr.IsOffline { return "Awaiting Offline Data Import" } - switch dr.transferred { - case 0: + switch { + case dr.transferred == 0 && !dr.provider.IsTransferStalled(dr.DealUuid): return "Transfer Queued" - case 100: + case dr.transferred == 100: return "Transfer Complete" default: pct := (100 * dr.transferred) / dr.ProviderDealState.Transfer.Size + isStalled := dr.provider.IsTransferStalled(dr.DealUuid) + if isStalled { + return fmt.Sprintf("Transfer stalled at %d%% ", pct) + } return fmt.Sprintf("Transferring %d%%", pct) } case dealcheckpoints.Transferred: @@ -533,6 +539,22 @@ func (dr *dealResolver) message(ctx context.Context, checkpoint dealcheckpoints. return checkpoint.String() } +func (dr *dealResolver) TransferSamples() []*transferPoint { + points := dr.provider.Transfer(dr.DealUuid) + pts := make([]*transferPoint, 0, len(points)) + for _, pt := range points { + pts = append(pts, &transferPoint{ + At: graphql.Time{Time: pt.At}, + Bytes: gqltypes.Uint64(pt.Bytes), + }) + } + return pts +} + +func (dr *dealResolver) IsTransferStalled() bool { + return dr.provider.IsTransferStalled(dr.DealUuid) +} + func (dr *dealResolver) sealingState(ctx context.Context) string { si, err := dr.spApi.SectorsStatus(ctx, dr.SectorID, false) if err != nil { @@ -594,10 +616,11 @@ func toUuid(id graphql.ID) (uuid.UUID, error) { } type subLastUpdate struct { - sub event.Subscription - dealsDB *db.DealsDB - logsDB *db.LogsDB - spApi sealingpipeline.API + sub event.Subscription + provider *storagemarket.Provider + dealsDB *db.DealsDB + logsDB *db.LogsDB + spApi sealingpipeline.API } func (s *subLastUpdate) Pipe(ctx context.Context, net chan *dealResolver) { @@ -636,7 +659,7 @@ func (s *subLastUpdate) Pipe(ctx context.Context, net chan *dealResolver) { loop: for { di := lastUpdate.(types.ProviderDealState) - rsv := newDealResolver(&di, s.dealsDB, s.logsDB, s.spApi) + rsv := newDealResolver(&di, s.provider, s.dealsDB, s.logsDB, s.spApi) select { case <-ctx.Done(): diff --git a/gql/resolver_transfers.go b/gql/resolver_transfers.go index a4c7091a3..e7a20f306 100644 --- a/gql/resolver_transfers.go +++ b/gql/resolver_transfers.go @@ -6,6 +6,8 @@ import ( "time" gqltypes "github.com/filecoin-project/boost/gql/types" + "github.com/filecoin-project/boost/storagemarket" + "github.com/google/uuid" "github.com/graph-gophers/graphql-go" ) @@ -15,15 +17,62 @@ type transferPoint struct { } // query: transfers: [TransferPoint] -func (r *resolver) Transfers(_ context.Context) ([]*transferPoint, error) { - deals := r.provider.Transfers() +func (r *resolver) Transfers(_ context.Context) []*transferPoint { + return r.getTransferSamples(r.provider.Transfers(), nil) +} + +type transferStats struct { + HttpMaxConcurrentDownloads int32 + Stats []*hostTransferStats +} + +type hostTransferStats struct { + Host string + Total int32 + Started int32 + Stalled int32 + TransferSamples []*transferPoint +} + +// query: transferStats: TransferStats +func (r *resolver) TransferStats(_ context.Context) *transferStats { + transfersByDeal := r.provider.Transfers() + stats := r.provider.TransferStats() + gqlStats := make([]*hostTransferStats, 0, len(stats)) + for _, s := range stats { + gqlStats = append(gqlStats, &hostTransferStats{ + Host: s.Host, + Total: int32(s.Total), + Started: int32(s.Started), + Stalled: int32(s.Stalled), + TransferSamples: r.getTransferSamples(transfersByDeal, s.DealUuids), + }) + } + return &transferStats{ + HttpMaxConcurrentDownloads: int32(r.cfg.Dealmaking.HttpTransferMaxConcurrentDownloads), + Stats: gqlStats, + } +} + +func (r *resolver) getTransferSamples(deals map[uuid.UUID][]storagemarket.TransferPoint, filter []uuid.UUID) []*transferPoint { + // If filter is nil, include all deals + if filter == nil { + for dealUuid := range deals { + filter = append(filter, dealUuid) + } + } // We have // dealUUID -> [At: