Skip to content

Commit

Permalink
feat: transfer limiter web UI updates (#740)
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc authored Aug 31, 2022
1 parent badb603 commit 85347fc
Show file tree
Hide file tree
Showing 11 changed files with 418 additions and 50 deletions.
51 changes: 37 additions & 14 deletions gql/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (r *resolver) Deal(ctx context.Context, args struct{ ID graphql.ID }) (*dea
return nil, err
}

return newDealResolver(deal, r.dealsDB, r.logsDB, r.spApi), nil
return newDealResolver(deal, r.provider, r.dealsDB, r.logsDB, r.spApi), nil
}

type dealsArgs struct {
Expand Down Expand Up @@ -135,7 +135,7 @@ func (r *resolver) Deals(ctx context.Context, args dealsArgs) (*dealListResolver

resolvers := make([]*dealResolver, 0, len(deals))
for _, deal := range deals {
resolvers = append(resolvers, newDealResolver(&deal, r.dealsDB, r.logsDB, r.spApi))
resolvers = append(resolvers, newDealResolver(&deal, r.provider, r.dealsDB, r.logsDB, r.spApi))
}

return &dealListResolver{
Expand Down Expand Up @@ -168,7 +168,7 @@ func (r *resolver) DealUpdate(ctx context.Context, args struct{ ID graphql.ID })
}

net := make(chan *dealResolver, 1)
net <- newDealResolver(deal, r.dealsDB, r.logsDB, r.spApi)
net <- newDealResolver(deal, r.provider, r.dealsDB, r.logsDB, r.spApi)

// Updates to deal state are broadcast on pubsub. Pipe these updates to the
// client
Expand All @@ -180,7 +180,7 @@ func (r *resolver) DealUpdate(ctx context.Context, args struct{ ID graphql.ID })
}
return nil, fmt.Errorf("%s: subscribing to deal updates: %w", args.ID, err)
}
sub := &subLastUpdate{sub: dealUpdatesSub, dealsDB: r.dealsDB, logsDB: r.logsDB, spApi: r.spApi}
sub := &subLastUpdate{sub: dealUpdatesSub, provider: r.provider, dealsDB: r.dealsDB, logsDB: r.logsDB, spApi: r.spApi}
go func() {
sub.Pipe(ctx, net) // blocks until connection is closed
close(net)
Expand Down Expand Up @@ -219,7 +219,7 @@ func (r *resolver) DealNew(ctx context.Context) (<-chan *dealNewResolver, error)
case evti := <-sub.Out():
// Pipe the deal to the new deal channel
di := evti.(types.ProviderDealState)
rsv := newDealResolver(&di, r.dealsDB, r.logsDB, r.spApi)
rsv := newDealResolver(&di, r.provider, r.dealsDB, r.logsDB, r.spApi)
totalCount, err := r.dealsDB.Count(ctx, "")
if err != nil {
log.Errorf("getting total deal count: %w", err)
Expand Down Expand Up @@ -330,15 +330,17 @@ func (r *resolver) dealList(ctx context.Context, query string, cursor *graphql.I

type dealResolver struct {
types.ProviderDealState
provider *storagemarket.Provider
transferred uint64
dealsDB *db.DealsDB
logsDB *db.LogsDB
spApi sealingpipeline.API
}

func newDealResolver(deal *types.ProviderDealState, dealsDB *db.DealsDB, logsDB *db.LogsDB, spApi sealingpipeline.API) *dealResolver {
func newDealResolver(deal *types.ProviderDealState, provider *storagemarket.Provider, dealsDB *db.DealsDB, logsDB *db.LogsDB, spApi sealingpipeline.API) *dealResolver {
return &dealResolver{
ProviderDealState: *deal,
provider: provider,
transferred: uint64(deal.NBytesReceived),
dealsDB: dealsDB,
logsDB: logsDB,
Expand Down Expand Up @@ -502,13 +504,17 @@ func (dr *dealResolver) message(ctx context.Context, checkpoint dealcheckpoints.
if dr.IsOffline {
return "Awaiting Offline Data Import"
}
switch dr.transferred {
case 0:
switch {
case dr.transferred == 0 && !dr.provider.IsTransferStalled(dr.DealUuid):
return "Transfer Queued"
case 100:
case dr.transferred == 100:
return "Transfer Complete"
default:
pct := (100 * dr.transferred) / dr.ProviderDealState.Transfer.Size
isStalled := dr.provider.IsTransferStalled(dr.DealUuid)
if isStalled {
return fmt.Sprintf("Transfer stalled at %d%% ", pct)
}
return fmt.Sprintf("Transferring %d%%", pct)
}
case dealcheckpoints.Transferred:
Expand All @@ -533,6 +539,22 @@ func (dr *dealResolver) message(ctx context.Context, checkpoint dealcheckpoints.
return checkpoint.String()
}

func (dr *dealResolver) TransferSamples() []*transferPoint {
points := dr.provider.Transfer(dr.DealUuid)
pts := make([]*transferPoint, 0, len(points))
for _, pt := range points {
pts = append(pts, &transferPoint{
At: graphql.Time{Time: pt.At},
Bytes: gqltypes.Uint64(pt.Bytes),
})
}
return pts
}

func (dr *dealResolver) IsTransferStalled() bool {
return dr.provider.IsTransferStalled(dr.DealUuid)
}

func (dr *dealResolver) sealingState(ctx context.Context) string {
si, err := dr.spApi.SectorsStatus(ctx, dr.SectorID, false)
if err != nil {
Expand Down Expand Up @@ -594,10 +616,11 @@ func toUuid(id graphql.ID) (uuid.UUID, error) {
}

type subLastUpdate struct {
sub event.Subscription
dealsDB *db.DealsDB
logsDB *db.LogsDB
spApi sealingpipeline.API
sub event.Subscription
provider *storagemarket.Provider
dealsDB *db.DealsDB
logsDB *db.LogsDB
spApi sealingpipeline.API
}

func (s *subLastUpdate) Pipe(ctx context.Context, net chan *dealResolver) {
Expand Down Expand Up @@ -636,7 +659,7 @@ func (s *subLastUpdate) Pipe(ctx context.Context, net chan *dealResolver) {
loop:
for {
di := lastUpdate.(types.ProviderDealState)
rsv := newDealResolver(&di, s.dealsDB, s.logsDB, s.spApi)
rsv := newDealResolver(&di, s.provider, s.dealsDB, s.logsDB, s.spApi)

select {
case <-ctx.Done():
Expand Down
57 changes: 53 additions & 4 deletions gql/resolver_transfers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"time"

gqltypes "github.com/filecoin-project/boost/gql/types"
"github.com/filecoin-project/boost/storagemarket"
"github.com/google/uuid"
"github.com/graph-gophers/graphql-go"
)

Expand All @@ -15,15 +17,62 @@ type transferPoint struct {
}

// query: transfers: [TransferPoint]
func (r *resolver) Transfers(_ context.Context) ([]*transferPoint, error) {
deals := r.provider.Transfers()
func (r *resolver) Transfers(_ context.Context) []*transferPoint {
return r.getTransferSamples(r.provider.Transfers(), nil)
}

type transferStats struct {
HttpMaxConcurrentDownloads int32
Stats []*hostTransferStats
}

type hostTransferStats struct {
Host string
Total int32
Started int32
Stalled int32
TransferSamples []*transferPoint
}

// query: transferStats: TransferStats
func (r *resolver) TransferStats(_ context.Context) *transferStats {
transfersByDeal := r.provider.Transfers()
stats := r.provider.TransferStats()
gqlStats := make([]*hostTransferStats, 0, len(stats))
for _, s := range stats {
gqlStats = append(gqlStats, &hostTransferStats{
Host: s.Host,
Total: int32(s.Total),
Started: int32(s.Started),
Stalled: int32(s.Stalled),
TransferSamples: r.getTransferSamples(transfersByDeal, s.DealUuids),
})
}
return &transferStats{
HttpMaxConcurrentDownloads: int32(r.cfg.Dealmaking.HttpTransferMaxConcurrentDownloads),
Stats: gqlStats,
}
}

func (r *resolver) getTransferSamples(deals map[uuid.UUID][]storagemarket.TransferPoint, filter []uuid.UUID) []*transferPoint {
// If filter is nil, include all deals
if filter == nil {
for dealUuid := range deals {
filter = append(filter, dealUuid)
}
}

// We have
// dealUUID -> [At: <time>, Transferred: <bytes>, At: <time>, Transferred: <bytes>, ...]
// Convert this to
// <time> -> <transferred per second>
totalAt := make(map[time.Time]uint64)
for _, points := range deals {
for _, dealUuid := range filter {
points, ok := deals[dealUuid]
if !ok {
continue
}

var prev uint64
first := true
for _, pt := range points {
Expand Down Expand Up @@ -53,5 +102,5 @@ func (r *resolver) Transfers(_ context.Context) ([]*transferPoint, error) {
return pts[i].At.Before(pts[j].At.Time)
})

return pts, nil
return pts
}
18 changes: 18 additions & 0 deletions gql/schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ type Deal {
PublishCid: String!
IsOffline: Boolean!
Transfer: TransferParams!
TransferSamples: [TransferPoint]!
IsTransferStalled: Boolean!
Checkpoint: String!
CheckpointAt: Time!
Err: String!
Expand Down Expand Up @@ -287,6 +289,19 @@ type TransferPoint {
Bytes: Uint64!
}

type HostStats {
Host: String!
Total: Int!
Started: Int!
Stalled: Int!
TransferSamples: [TransferPoint]!
}

type TransferStats {
HttpMaxConcurrentDownloads: Int!
Stats: [HostStats]!
}

type MpoolMessage {
From: String!
To: String!
Expand Down Expand Up @@ -377,6 +392,9 @@ type RootQuery {
"""Get ongoing transfers"""
transfers: [TransferPoint]!

"""Get stats about queued / active transfers"""
transferStats: TransferStats!

"""Get local messages in the mpool"""
mpool(local: Boolean!): [MpoolMessage]!

Expand Down
23 changes: 23 additions & 0 deletions react/src/DealTransfers.css
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
.transfer-stats {
margin: 1em;
}

.transfer-stats .config {
margin: 2em 0 1em 0;
}

.transfer-stats table th, .transfer-stats table td {
padding: 0.5em 1em;
}

.transfer-stats table th {
text-align: left;
}

.transfer-stats table td {
text-align: right;
}

.transfer-stats table td.transfer-rate {
color: #999999;
}
Loading

0 comments on commit 85347fc

Please sign in to comment.