diff --git a/cam/cam.go b/cam/cam.go index 8d4dda3..0612332 100644 --- a/cam/cam.go +++ b/cam/cam.go @@ -36,6 +36,7 @@ const ( maxGRPCSize = 1024 * 1024 * 32 // bytes deleterInterval = 10 // minutes retryInterval = 1 // seconds + asyncTimeout = 60 // seconds tempPath = "/tmp" ) @@ -275,20 +276,12 @@ func (vs *videostore) DoCommand(_ context.Context, command map[string]interface{ } uploadFilePath := generateOutputFilePath(vs.name.Name, formatDateTimeToString(from), metadata, vs.uploadPath) uploadFileName := filepath.Base(uploadFilePath) - // Async save command will run the concat operation in the background. - // It waits for the segment duration before running to ensure the last segment - // is written to storage before concatenation. - // TODO: (seanp) Optimize this to immediately run as soon as the current segment is completed. switch async { case true: vs.logger.Debug("running save command asynchronously") - go func() { - time.Sleep(vs.conc.segmentDur) - err = vs.conc.concat(from, to, uploadFilePath) - if err != nil { - vs.logger.Error("failed to concat files ", err) - } - }() + vs.workers.Add(func(ctx context.Context) { + vs.asyncSave(ctx, from, to, uploadFilePath) + }) return map[string]interface{}{ "command": "save", "filename": uploadFileName, @@ -399,6 +392,30 @@ func (vs *videostore) deleter(ctx context.Context) { } } +// asyncSave command will run the concat operation in the background. +// It waits for the segment duration before running to ensure the last segment +// is written to storage before concatenation. +// TODO: (seanp) Optimize this to immediately run as soon as the current segment is completed. +func (vs *videostore) asyncSave(ctx context.Context, from, to time.Time, path string) { + totalTimeout := time.Duration(asyncTimeout)*time.Second + vs.conc.segmentDur + ctx, cancel := context.WithTimeout(ctx, totalTimeout) + defer cancel() + timer := time.NewTimer(vs.conc.segmentDur) + defer timer.Stop() + select { + case <-timer.C: + vs.logger.Debugf("running async save command for %s", path) + err := vs.conc.concat(from, to, path) + if err != nil { + vs.logger.Error("failed to concat files ", err) + } + return + case <-ctx.Done(): + vs.logger.Error("AsyncSave operation cancelled or timed out") + return + } +} + // Close closes the video storage camera component. func (vs *videostore) Close(ctx context.Context) error { err := vs.stream.Close(ctx)