Skip to content

Commit

Permalink
gridstore: make sure http response writes aren't concurrent
Browse files Browse the repository at this point in the history
  • Loading branch information
DocSavage committed Nov 29, 2024
1 parent 835339c commit 1dd86eb
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 46 deletions.
125 changes: 91 additions & 34 deletions datatype/imageblk/imageblk.go
Original file line number Diff line number Diff line change
Expand Up @@ -1902,6 +1902,70 @@ func (d *Data) SendSerializedBlock(w http.ResponseWriter, x, y, z int32, v []byt
return nil
}

func (d *Data) SendGridBlocks(w http.ResponseWriter, blockCoords []dvid.ChunkPoint3d, gridStore storage.GridStoreGetter, compression string) error {
// Setup a single goroutine for writing to http.ResponseWriter since only one writer at a time is allowed.
writeCh := make(chan *storage.Block, 1000)
var writeWg sync.WaitGroup
writeWg.Add(1)
go func() {
defer writeWg.Done()
for b := range writeCh {
if err := d.SendUnserializedBlock(w, b.Coord[0], b.Coord[1], b.Coord[2], b.Value, compression); err != nil {
dvid.Errorf("Error writing block %s to http.ResponseWriter: %v\n", b.Coord, err)
return
}
}
}()

// Retrieve blocks in parallel
for _, blockCoord := range blockCoords {
value, err := gridStore.GridGet(d.ScaleLevel, blockCoord)
if err != nil {
dvid.Infof("gridStore GET on scale %d, chunk %s had err: %v", d.ScaleLevel, blockCoord, err)
return err
}
if value == nil {
dvid.Infof("gridStore GET on scale %d, chunk %s had nil value\n", d.ScaleLevel, blockCoord)
return nil
}
writeCh <- &storage.Block{Coord: blockCoord, Value: value}
}
writeWg.Wait()
return nil
}

// SendGridVolume writes blocks to the writer with given compression.
func (d *Data) SendGridVolume(w http.ResponseWriter, subvol *dvid.Subvolume, gridStore storage.GridStoreGetter, compression string) error {
// Setup a single goroutine for writing to http.ResponseWriter since only one writer at a time is allowed.
writeCh := make(chan *storage.Block, 1000)
var writeWg sync.WaitGroup
writeWg.Add(1)
go func() {
defer writeWg.Done()
for b := range writeCh {
if err := d.SendUnserializedBlock(w, b.Coord[0], b.Coord[1], b.Coord[2], b.Value, compression); err != nil {
dvid.Errorf("Error writing block %s to http.ResponseWriter: %v\n", b.Coord, err)
return
}
}
}()

// Retrieve blocks in parallel
ordered := false
minBlock, maxBlock, err := subvol.BoundingChunks(d.BlockSize())
if err != nil {
return err
}
err = gridStore.GridGetVolume(d.ScaleLevel, minBlock, maxBlock, ordered, func(b *storage.Block) error {
if b.Value != nil {
writeCh <- b
}
return nil
})
writeWg.Wait()
return err
}

// SendBlocksSpecific writes data to the blocks specified -- best for non-ordered backend
func (d *Data) SendBlocksSpecific(ctx *datastore.VersionedCtx, w http.ResponseWriter, compression string, blockstring string, isprefetch bool) (numBlocks int, err error) {
w.Header().Set("Content-type", "application/octet-stream")
Expand All @@ -1913,7 +1977,7 @@ func (d *Data) SendBlocksSpecific(ctx *datastore.VersionedCtx, w http.ResponseWr
timedLog := dvid.NewTimeLog()
defer timedLog.Infof("SendBlocks Specific ")

// extract query string
// extract query string into block coordinates
if blockstring == "" {
return
}
Expand All @@ -1923,10 +1987,26 @@ func (d *Data) SendBlocksSpecific(ctx *datastore.VersionedCtx, w http.ResponseWr
return
}
numBlocks = len(coordarray) / 3
blockCoords := make([]dvid.ChunkPoint3d, numBlocks)
for i := 0; i < len(coordarray); i += 3 {
var xloc, yloc, zloc int
xloc, err = strconv.Atoi(coordarray[i])
if err != nil {
return
}
yloc, err = strconv.Atoi(coordarray[i+1])
if err != nil {
return
}
zloc, err = strconv.Atoi(coordarray[i+2])
if err != nil {
return
}
blockCoords[i/3] = dvid.ChunkPoint3d{int32(xloc), int32(yloc), int32(zloc)}
}

// make a finished queue
finishedRequests := make(chan error, len(coordarray)/3)
var mutex sync.Mutex

// get store for data
var gridStore storage.GridStoreGetter
Expand All @@ -1935,8 +2015,13 @@ func (d *Data) SendBlocksSpecific(ctx *datastore.VersionedCtx, w http.ResponseWr
if err != nil {
return
}
if gridStore != nil {
err = d.SendGridBlocks(w, blockCoords, gridStore, compression)
return
}

// iterate through each block and query
mutex := &sync.Mutex{}
for i := 0; i < len(coordarray); i += 3 {
var xloc, yloc, zloc int
xloc, err = strconv.Atoi(coordarray[i])
Expand All @@ -1962,22 +2047,6 @@ func (d *Data) SendBlocksSpecific(ctx *datastore.VersionedCtx, w http.ResponseWr
chunkPt := dvid.ChunkPoint3d{xloc, yloc, zloc}

var value []byte
if gridStore != nil {
value, err = gridStore.GridGet(d.ScaleLevel, chunkPt)
if err != nil {
dvid.Infof("gridStore GET on scale %d, chunk %s had err: %v", d.ScaleLevel, chunkPt, err)
err = nil
return
}
if value == nil {
dvid.Infof("gridStore GET on scale %d, chunk %s had nil value\n", d.ScaleLevel, chunkPt)
return
}
mutex.Lock()
defer mutex.Unlock()
d.SendUnserializedBlock(w, xloc, yloc, zloc, value, compression)
return
}
idx := dvid.IndexZYX(chunkPt)
key := NewTKey(&idx)
value, err = kvDB.Get(ctx, key)
Expand Down Expand Up @@ -2065,19 +2134,7 @@ func (d *Data) SendBlocks(ctx *datastore.VersionedCtx, w http.ResponseWriter, su
}

if gridStore != nil {
ordered := false
minBlock, maxBlock, err := subvol.BoundingChunks(d.BlockSize())
if err != nil {
return err
}
return gridStore.GridGetVolume(d.ScaleLevel, minBlock, maxBlock, ordered, &storage.BlockOp{}, func(b *storage.Block) error {
if b.Value != nil {
if err := d.SendUnserializedBlock(w, b.Coord[0], b.Coord[1], b.Coord[2], b.Value, compression); err != nil {
return err
}
}
return nil
})
return d.SendGridVolume(w, subvol, gridStore, compression)
}

okv := okvDB.(storage.BufferableOps)
Expand Down Expand Up @@ -2124,7 +2181,7 @@ func (d *Data) SendBlocks(ctx *datastore.VersionedCtx, w http.ResponseWriter, su
})

if err != nil {
return fmt.Errorf("Unable to GET data %s: %v", ctx, err)
return fmt.Errorf("unable to GET data %s: %v", ctx, err)
}
} else {
tkeys := make([]storage.TKey, 0)
Expand Down Expand Up @@ -2158,7 +2215,7 @@ func (d *Data) SendBlocks(ctx *datastore.VersionedCtx, w http.ResponseWriter, su
})

if err != nil {
return fmt.Errorf("Unable to GET data %s: %v", ctx, err)
return fmt.Errorf("unable to GET data %s: %v", ctx, err)
}
}
}
Expand All @@ -2169,7 +2226,7 @@ func (d *Data) SendBlocks(ctx *datastore.VersionedCtx, w http.ResponseWriter, su
err = okv.(storage.RequestBuffer).Flush()

if err != nil {
return fmt.Errorf("Unable to GET data %s: %v", ctx, err)
return fmt.Errorf("unable to GET data %s: %v", ctx, err)

}
}
Expand Down
18 changes: 7 additions & 11 deletions storage/ngprecomputed/ngprecomputed.go
Original file line number Diff line number Diff line change
Expand Up @@ -797,19 +797,19 @@ func (ng *ngStore) GridGet(scaleLevel int, blockCoord dvid.ChunkPoint3d) (val []

// GridGetVolume calls the given function with the results of retrieved block data in an ordered or
// unordered fashion. Missing blocks in the subvolume are not processed.
func (ng *ngStore) GridGetVolume(scaleLevel int, minBlock, maxBlock dvid.ChunkPoint3d, ordered bool, op *storage.BlockOp, f storage.BlockFunc) error {
func (ng *ngStore) GridGetVolume(scaleLevel int, minBlock, maxBlock dvid.ChunkPoint3d, ordered bool, f storage.BlockFunc) error {
if ordered {
return fmt.Errorf("ordered retrieval not implemented at this time")
}
ch := make(chan dvid.ChunkPoint3d)
blockCoordCh := make(chan dvid.ChunkPoint3d)

// Start concurrent processing routines to read each block and then pass it to given function.
// Start concurrent processing routines to read each block and then pass it to a single writer function.
concurrency := 10
wg := new(sync.WaitGroup)
wg.Add(concurrency)
for i := 0; i < concurrency; i++ {
go func() {
for blockCoord := range ch {
for blockCoord := range blockCoordCh {
val, err := ng.GridGet(scaleLevel, blockCoord)
if err != nil {
dvid.Errorf("unable to get block %s in GridGetVolume: %v\n", blockCoord, err)
Expand All @@ -818,11 +818,8 @@ func (ng *ngStore) GridGetVolume(scaleLevel int, minBlock, maxBlock dvid.ChunkPo
if val == nil {
continue
}
if op != nil && op.Wg != nil {
op.Wg.Add(1)
}
block := &storage.Block{
BlockOp: op,
BlockOp: nil,
Coord: blockCoord,
Value: val,
}
Expand All @@ -838,12 +835,11 @@ func (ng *ngStore) GridGetVolume(scaleLevel int, minBlock, maxBlock dvid.ChunkPo
for z := minBlock.Value(2); z <= maxBlock.Value(2); z++ {
for y := minBlock.Value(1); y <= maxBlock.Value(1); y++ {
for x := minBlock.Value(0); x <= maxBlock.Value(0); x++ {
ch <- dvid.ChunkPoint3d{x, y, z}
blockCoordCh <- dvid.ChunkPoint3d{x, y, z}
}
}
}

close(ch)
close(blockCoordCh)
wg.Wait()
return nil
}
2 changes: 1 addition & 1 deletion storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,5 +324,5 @@ type GridProps struct {
type GridStoreGetter interface {
GridProperties(scaleLevel int) (GridProps, error)
GridGet(scaleLevel int, blockCoord dvid.ChunkPoint3d) ([]byte, error)
GridGetVolume(scaleLevel int, minBlock, maxBlock dvid.ChunkPoint3d, ordered bool, op *BlockOp, f BlockFunc) error
GridGetVolume(scaleLevel int, minBlock, maxBlock dvid.ChunkPoint3d, ordered bool, f BlockFunc) error
}

0 comments on commit 1dd86eb

Please sign in to comment.