Skip to content

Commit

Permalink
fix: measure bitswap ttfb from after we get candidates back
Browse files Browse the repository at this point in the history
Fixes: #333
  • Loading branch information
rvagg committed Sep 21, 2023
1 parent a7c7002 commit 8f92fcc
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 58 deletions.
48 changes: 32 additions & 16 deletions pkg/retriever/bitswapretriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,21 +166,6 @@ func (br *bitswapRetrieval) RetrieveFromAsyncCandidates(ayncCandidates types.Inb
})
}

totalWritten := atomic.Uint64{}
blockCount := atomic.Uint64{}
bytesWrittenCb := func(bytesWritten uint64) {
// record first byte received
if totalWritten.Load() == 0 {
br.events(events.FirstByte(br.clock.Now(), br.request.RetrievalID, bitswapCandidate, br.clock.Since(startTime), multicodec.TransportBitswap))
}
totalWritten.Add(bytesWritten)
blockCount.Add(1)
// reset the timer
if bytesWritten > 0 && lastBytesReceivedTimer != nil {
lastBytesReceivedTimer.Reset(br.cfg.BlockTimeout)
}
}

// setup providers for this retrieval
hasCandidates, nextCandidates, err := ayncCandidates.Next(retrievalCtx)
if !hasCandidates || err != nil {
Expand All @@ -189,7 +174,8 @@ func (br *bitswapRetrieval) RetrieveFromAsyncCandidates(ayncCandidates types.Inb
return nil, nil
}

br.events(events.StartedRetrieval(br.clock.Now(), br.request.RetrievalID, bitswapCandidate, multicodec.TransportBitswap))
firstCandidatesTime := br.clock.Now()
br.events(events.StartedRetrieval(firstCandidatesTime, br.request.RetrievalID, bitswapCandidate, multicodec.TransportBitswap))

// set initial providers, then start a goroutine to add more as they come in
br.routing.AddProviders(br.request.RetrievalID, nextCandidates)
Expand All @@ -209,6 +195,35 @@ func (br *bitswapRetrieval) RetrieveFromAsyncCandidates(ayncCandidates types.Inb
}
}()

totalWritten := atomic.Uint64{}
blockCount := atomic.Uint64{}
ttfb := atomic.Int64{}
bytesWrittenCb := func(bytesWritten uint64) {
// Record first byte received, this is on a per-protocol basis for Bitswap,
// individual providers (currently) don't get one of these so we take the
// duration from the time we started collecting candidates.
// If we end up taking responsibility for dialing peers, and end up with
// first-byte events per peer, we could move the first-byte duration up to
// the post-connect time to match http and graphsync (alternatively)
if totalWritten.Load() == 0 {
ttfbD := br.clock.Since(firstCandidatesTime)
ttfb.Store(int64(ttfbD))
br.events(events.FirstByte(
br.clock.Now(),
br.request.RetrievalID,
bitswapCandidate,
ttfbD,
multicodec.TransportBitswap,
))
}
totalWritten.Add(bytesWritten)
blockCount.Add(1)
// reset the timer
if bytesWritten > 0 && lastBytesReceivedTimer != nil {
lastBytesReceivedTimer.Reset(br.cfg.BlockTimeout)
}
}

// set up the storage system, including the preloader if configured
var preloader preload.Loader
traversalLinkSys := br.request.LinkSystem
Expand Down Expand Up @@ -300,6 +315,7 @@ func (br *bitswapRetrieval) RetrieveFromAsyncCandidates(ayncCandidates types.Inb
TotalPayment: big.Zero(),
NumPayments: 0,
AskPrice: big.Zero(),
TimeToFirstByte: time.Duration(ttfb.Load()),
}, nil
}

Expand Down
90 changes: 48 additions & 42 deletions pkg/retriever/bitswapretriever_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,22 +95,24 @@ func TestBitswapRetriever(t *testing.T) {
},
expectedStats: map[cid.Cid]*types.RetrievalStats{
cid1: {
RootCid: cid1,
Size: sizeOf(tbc1.AllBlocks()),
Blocks: 100,
Duration: remoteBlockDuration * 100,
AverageSpeed: uint64(float64(sizeOf(tbc1.AllBlocks())) / (remoteBlockDuration * 100).Seconds()),
TotalPayment: big.Zero(),
AskPrice: big.Zero(),
RootCid: cid1,
Size: sizeOf(tbc1.AllBlocks()),
Blocks: 100,
Duration: remoteBlockDuration * 100,
AverageSpeed: uint64(float64(sizeOf(tbc1.AllBlocks())) / (remoteBlockDuration * 100).Seconds()),
TotalPayment: big.Zero(),
AskPrice: big.Zero(),
TimeToFirstByte: remoteBlockDuration,
},
cid2: {
RootCid: cid2,
Size: sizeOf(tbc2.AllBlocks()),
Blocks: 100,
Duration: remoteBlockDuration * 100,
AverageSpeed: uint64(float64(sizeOf(tbc2.AllBlocks())) / (remoteBlockDuration * 100).Seconds()),
TotalPayment: big.Zero(),
AskPrice: big.Zero(),
RootCid: cid2,
Size: sizeOf(tbc2.AllBlocks()),
Blocks: 100,
Duration: remoteBlockDuration * 100,
AverageSpeed: uint64(float64(sizeOf(tbc2.AllBlocks())) / (remoteBlockDuration * 100).Seconds()),
TotalPayment: big.Zero(),
AskPrice: big.Zero(),
TimeToFirstByte: remoteBlockDuration,
},
},
},
Expand Down Expand Up @@ -141,22 +143,24 @@ func TestBitswapRetriever(t *testing.T) {
},
expectedStats: map[cid.Cid]*types.RetrievalStats{
cid1: {
RootCid: cid1,
Size: sizeOf(tbc1.Blocks(50, 100)),
Blocks: 50,
Duration: remoteBlockDuration * 50,
AverageSpeed: uint64(float64(sizeOf(tbc1.Blocks(50, 100))) / (remoteBlockDuration * 50).Seconds()),
TotalPayment: big.Zero(),
AskPrice: big.Zero(),
RootCid: cid1,
Size: sizeOf(tbc1.Blocks(50, 100)),
Blocks: 50,
Duration: remoteBlockDuration * 50,
AverageSpeed: uint64(float64(sizeOf(tbc1.Blocks(50, 100))) / (remoteBlockDuration * 50).Seconds()),
TotalPayment: big.Zero(),
AskPrice: big.Zero(),
TimeToFirstByte: remoteBlockDuration,
},
cid2: {
RootCid: cid2,
Size: sizeOf(append(tbc2.Blocks(25, 45), tbc2.Blocks(75, 100)...)),
Blocks: 45,
Duration: remoteBlockDuration * 45,
AverageSpeed: uint64(float64(sizeOf(append(tbc2.Blocks(25, 45), tbc2.Blocks(75, 100)...))) / (remoteBlockDuration * 45).Seconds()),
TotalPayment: big.Zero(),
AskPrice: big.Zero(),
RootCid: cid2,
Size: sizeOf(append(tbc2.Blocks(25, 45), tbc2.Blocks(75, 100)...)),
Blocks: 45,
Duration: remoteBlockDuration * 45,
AverageSpeed: uint64(float64(sizeOf(append(tbc2.Blocks(25, 45), tbc2.Blocks(75, 100)...))) / (remoteBlockDuration * 45).Seconds()),
TotalPayment: big.Zero(),
AskPrice: big.Zero(),
TimeToFirstByte: remoteBlockDuration,
},
},
},
Expand Down Expand Up @@ -185,22 +189,24 @@ func TestBitswapRetriever(t *testing.T) {
},
expectedStats: map[cid.Cid]*types.RetrievalStats{
cid1: {
RootCid: cid1,
Size: sizeOf(tbc1.AllBlocks()[:5]),
Blocks: 5,
Duration: remoteBlockDuration * 5,
AverageSpeed: uint64(float64(sizeOf(tbc1.AllBlocks()[:5])) / (remoteBlockDuration * 5).Seconds()),
TotalPayment: big.Zero(),
AskPrice: big.Zero(),
RootCid: cid1,
Size: sizeOf(tbc1.AllBlocks()[:5]),
Blocks: 5,
Duration: remoteBlockDuration * 5,
AverageSpeed: uint64(float64(sizeOf(tbc1.AllBlocks()[:5])) / (remoteBlockDuration * 5).Seconds()),
TotalPayment: big.Zero(),
AskPrice: big.Zero(),
TimeToFirstByte: remoteBlockDuration,
},
cid2: {
RootCid: cid2,
Size: sizeOf(tbc2.AllBlocks()[:5]),
Blocks: 5,
Duration: remoteBlockDuration * 5,
AverageSpeed: uint64(float64(sizeOf(tbc2.AllBlocks()[:5])) / (remoteBlockDuration * 5).Seconds()),
TotalPayment: big.Zero(),
AskPrice: big.Zero(),
RootCid: cid2,
Size: sizeOf(tbc2.AllBlocks()[:5]),
Blocks: 5,
Duration: remoteBlockDuration * 5,
AverageSpeed: uint64(float64(sizeOf(tbc2.AllBlocks()[:5])) / (remoteBlockDuration * 5).Seconds()),
TotalPayment: big.Zero(),
AskPrice: big.Zero(),
TimeToFirstByte: remoteBlockDuration,
},
},
},
Expand Down

0 comments on commit 8f92fcc

Please sign in to comment.