diff --git a/datatype/keyvalue/rpc.go b/datatype/keyvalue/rpc.go index a4045209..f0faf81b 100644 --- a/datatype/keyvalue/rpc.go +++ b/datatype/keyvalue/rpc.go @@ -3,6 +3,7 @@ package keyvalue import ( "context" "fmt" + "sync" "github.com/janelia-flyem/dvid/datastore" "github.com/janelia-flyem/dvid/dvid" @@ -82,13 +83,17 @@ func (d *Data) dumpCloud(cmd datastore.Request, reply *datastore.Response) error defer bucket.Close() // Start goroutines to write key-value pairs to cloud storage. + wg := new(sync.WaitGroup) ch := make(chan *storage.TKeyValue, 1000) for i := 0; i < workers; i++ { - go d.writeWorker(bucket, ch) + wg.Add(1) + go d.writeWorker(wg, bucket, ch) } // Send all key-values to the writer goroutines. d.sendKVsToWriters(versionID, ch) + close(ch) + wg.Wait() reply.Output = []byte(fmt.Sprintf("Dumped all key-values to cloud %q for keyvalue %q, uuid %s\n", refStr, d.DataName(), uuidStr)) @@ -96,7 +101,7 @@ func (d *Data) dumpCloud(cmd datastore.Request, reply *datastore.Response) error } // writeWorker writes key-value pairs to cloud storage. -func (d *Data) writeWorker(bucket *blob.Bucket, ch chan *storage.TKeyValue) { +func (d *Data) writeWorker(wg *sync.WaitGroup, bucket *blob.Bucket, ch chan *storage.TKeyValue) { for kv := range ch { key, err := DecodeTKey(kv.K) if err != nil { @@ -116,6 +121,7 @@ func (d *Data) writeWorker(bucket *blob.Bucket, ch chan *storage.TKeyValue) { dvid.Errorf("Error closing after key %q to cloud bucket: %v\n", key, err) } } + wg.Done() } // iterate through all key-value pairs and send them to the writer channel.