Skip to content

Commit

Permalink
Client should now respect the maxBatchSize setting from the server (#94)
Browse files Browse the repository at this point in the history
Also added to log output how many requests used Bytestream vs CAS
  • Loading branch information
asafflesch authored Apr 9, 2024
1 parent 5d0b58d commit 4cbe22d
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 11 deletions.
7 changes: 6 additions & 1 deletion cmd/casload/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,19 @@ func main() {
}
defer cs.Close()

var finalMaxBatchBlobSize = *maxBatchBlobSize
if cs.maxBatchBlobSize > 0 && cs.maxBatchBlobSize < finalMaxBatchBlobSize {
finalMaxBatchBlobSize = cs.maxBatchBlobSize
}

actionContext := load.ActionContext{
InstanceName: instanceName,
CasClient: cs.casClient,
BytestreamClient: cs.bytestreamClient,
Ctx: ctx,
KnownDigests: make(map[string]bool),
WriteChunkSize: *writeChunkSize,
MaxBatchBlobSize: *maxBatchBlobSize,
MaxBatchBlobSize: finalMaxBatchBlobSize,
}

for _, action := range actions {
Expand Down
20 changes: 15 additions & 5 deletions pkg/load/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,12 @@ func (g *generateAction) String() string {
}

type generateResult struct {
startTime time.Time
endTime time.Time
success int
errors int
startTime time.Time
endTime time.Time
success int
errors int
byteStreamWrites int
blobWrites int
}

type generateWorkItem struct {
Expand Down Expand Up @@ -180,6 +182,12 @@ func (g *generateAction) RunAction(actionContext *ActionContext) error {
}
elapsedTimes[i] = r.elapsed

if int64(r.blobSize) < actionContext.MaxBatchBlobSize {
result.blobWrites += 1
} else {
result.byteStreamWrites += 1
}

if i%100 == 0 {
log.Debugf("progress: %d / %d", i, g.numRequests)
}
Expand All @@ -189,12 +197,14 @@ func (g *generateAction) RunAction(actionContext *ActionContext) error {

close(resultChan)

fmt.Printf("program: %s\n startTime: %s\n endTime: %s\n success: %d\n errors: %d\n",
fmt.Printf("program: %s\n startTime: %s\n endTime: %s\n success: %d\n errors: %d\n byteStreamWrites: %d\n blobWrites: %d\n",
g.String(),
result.startTime.String(),
result.endTime.String(),
result.success,
result.errors,
result.byteStreamWrites,
result.blobWrites,
)

stats.PrintTimingStats(elapsedTimes)
Expand Down
19 changes: 14 additions & 5 deletions pkg/load/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@ func (self *readAction) String() string {
}

type readResult struct {
startTime time.Time
endTime time.Time
success int
errors int
startTime time.Time
endTime time.Time
success int
errors int
byteStreamReads int
blobReads int
}

type readWorkItem struct {
Expand Down Expand Up @@ -208,6 +210,11 @@ func (self *readAction) RunAction(actionContext *ActionContext) error {
}).Error("request error")
}
elapsedTimes[i] = r.elapsed
if int64(r.digest.SizeBytes) < actionContext.MaxBatchBlobSize {
result.blobReads += 1
} else {
result.byteStreamReads += 1
}

if i%100 == 0 {
log.Debugf("progress: %d / %d", i, len(workItems))
Expand All @@ -218,12 +225,14 @@ func (self *readAction) RunAction(actionContext *ActionContext) error {

close(resultChan)

fmt.Printf("program: %s\n startTime: %s\n endTime: %s\n success: %d\n errors: %d\n",
fmt.Printf("program: %s\n startTime: %s\n endTime: %s\n success: %d\n errors: %d\n byteStreamReads: %d\n blobReads: %d\n",
self.String(),
result.startTime.String(),
result.endTime.String(),
result.success,
result.errors,
result.byteStreamReads,
result.blobReads,
)

stats.PrintTimingStats(elapsedTimes)
Expand Down

0 comments on commit 4cbe22d

Please sign in to comment.