Skip to content

Commit

Permalink
Use workers for asyncs save
Browse files Browse the repository at this point in the history
  • Loading branch information
seanavery committed Sep 24, 2024
1 parent 242a187 commit e5f926e
Showing 1 changed file with 28 additions and 11 deletions.
39 changes: 28 additions & 11 deletions cam/cam.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (
maxGRPCSize = 1024 * 1024 * 32 // bytes
deleterInterval = 10 // minutes
retryInterval = 1 // seconds
asyncTimeout = 60 // seconds
tempPath = "/tmp"
)

Expand Down Expand Up @@ -275,20 +276,12 @@ func (vs *videostore) DoCommand(_ context.Context, command map[string]interface{
}
uploadFilePath := generateOutputFilePath(vs.name.Name, formatDateTimeToString(from), metadata, vs.uploadPath)
uploadFileName := filepath.Base(uploadFilePath)
// Async save command will run the concat operation in the background.
// It waits for the segment duration before running to ensure the last segment
// is written to storage before concatenation.
// TODO: (seanp) Optimize this to immediately run as soon as the current segment is completed.
switch async {
case true:
vs.logger.Debug("running save command asynchronously")
go func() {
time.Sleep(vs.conc.segmentDur)
err = vs.conc.concat(from, to, uploadFilePath)
if err != nil {
vs.logger.Error("failed to concat files ", err)
}
}()
vs.workers.Add(func(ctx context.Context) {
vs.asyncSave(ctx, from, to, uploadFilePath)
})
return map[string]interface{}{
"command": "save",
"filename": uploadFileName,
Expand Down Expand Up @@ -399,6 +392,30 @@ func (vs *videostore) deleter(ctx context.Context) {
}
}

// asyncSave command will run the concat operation in the background.
// It waits for the segment duration before running to ensure the last segment
// is written to storage before concatenation.
// TODO: (seanp) Optimize this to immediately run as soon as the current segment is completed.
func (vs *videostore) asyncSave(ctx context.Context, from, to time.Time, path string) {
totalTimeout := time.Duration(asyncTimeout)*time.Second + vs.conc.segmentDur
ctx, cancel := context.WithTimeout(ctx, totalTimeout)
defer cancel()
timer := time.NewTimer(vs.conc.segmentDur)
defer timer.Stop()
select {
case <-timer.C:
vs.logger.Debugf("running async save command for %s", path)
err := vs.conc.concat(from, to, path)
if err != nil {
vs.logger.Error("failed to concat files ", err)
}
return
case <-ctx.Done():
vs.logger.Error("AsyncSave operation cancelled or timed out")
return
}
}

// Close closes the video storage camera component.
func (vs *videostore) Close(ctx context.Context) error {
err := vs.stream.Close(ctx)
Expand Down

0 comments on commit e5f926e

Please sign in to comment.