Skip to content

Commit

Permalink
make sure to shutdown parallel writes for dump-cloud
Browse files Browse the repository at this point in the history
  • Loading branch information
DocSavage committed Aug 4, 2024
1 parent eb1f60f commit b0c933b
Showing 1 changed file with 8 additions and 2 deletions.
10 changes: 8 additions & 2 deletions datatype/keyvalue/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package keyvalue
import (
"context"
"fmt"
"sync"

"github.com/janelia-flyem/dvid/datastore"
"github.com/janelia-flyem/dvid/dvid"
Expand Down Expand Up @@ -82,21 +83,25 @@ func (d *Data) dumpCloud(cmd datastore.Request, reply *datastore.Response) error
defer bucket.Close()

// Start goroutines to write key-value pairs to cloud storage.
wg := new(sync.WaitGroup)
ch := make(chan *storage.TKeyValue, 1000)
for i := 0; i < workers; i++ {
go d.writeWorker(bucket, ch)
wg.Add(1)
go d.writeWorker(wg, bucket, ch)
}

// Send all key-values to the writer goroutines.
d.sendKVsToWriters(versionID, ch)
close(ch)
wg.Wait()

reply.Output = []byte(fmt.Sprintf("Dumped all key-values to cloud %q for keyvalue %q, uuid %s\n",
refStr, d.DataName(), uuidStr))
return nil
}

// writeWorker writes key-value pairs to cloud storage.
func (d *Data) writeWorker(bucket *blob.Bucket, ch chan *storage.TKeyValue) {
func (d *Data) writeWorker(wg *sync.WaitGroup, bucket *blob.Bucket, ch chan *storage.TKeyValue) {
for kv := range ch {
key, err := DecodeTKey(kv.K)
if err != nil {
Expand All @@ -116,6 +121,7 @@ func (d *Data) writeWorker(bucket *blob.Bucket, ch chan *storage.TKeyValue) {
dvid.Errorf("Error closing after key %q to cloud bucket: %v\n", key, err)
}
}
wg.Done()
}

// iterate through all key-value pairs and send them to the writer channel.
Expand Down

0 comments on commit b0c933b

Please sign in to comment.