Skip to content

Commit

Permalink
Merge pull request #15 from viam-modules/RSDK-8798
Browse files Browse the repository at this point in the history
[RSDK-8798] - Async save command
  • Loading branch information
seanavery authored Sep 25, 2024
2 parents 8f51ff0 + a95f5f2 commit b9d2829
Show file tree
Hide file tree
Showing 6 changed files with 219 additions and 83 deletions.
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,20 @@ The save command retreives video from local storage and, uploads the clip to the
}
```

#### Async Save Request

The async save command performs the same operation as the save command, but does not wait for the operation to complete. Use this command when you want to save video slices that include the current in-progress video storage segment. It will wait for the current segment to finish recording before saving the video slice.

```json
{
"command": "save",
"from": <start_timestamp>, [required]
"to": <end_timestamp>, [required]
"metadata": <arbitrary_metadata_string>, [optional]
"async": true [optional]
}
```

#### Save Response
```json
{
Expand Down
68 changes: 53 additions & 15 deletions cam/cam.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (
maxGRPCSize = 1024 * 1024 * 32 // bytes
deleterInterval = 10 // minutes
retryInterval = 1 // seconds
asyncTimeout = 60 // seconds
tempPath = "/tmp"
)

Expand Down Expand Up @@ -240,7 +241,6 @@ func newvideostore(
logger,
vs.storagePath,
vs.uploadPath,
vs.name.Name,
segmentSeconds,
)
if err != nil {
Expand Down Expand Up @@ -270,39 +270,54 @@ func (vs *videostore) DoCommand(_ context.Context, command map[string]interface{
// The response contains the name of the uploaded file.
case "save":
vs.logger.Debug("save command received")
from, to, metadata, err := validateSaveCommand(command)
from, to, metadata, async, err := validateSaveCommand(command)
if err != nil {
return nil, err
}
uploadFilePath, err := vs.conc.concat(from, to, metadata, vs.uploadPath)
if err != nil {
vs.logger.Error("failed to concat files ", err)
return nil, err
}
uploadFilePath := generateOutputFilePath(vs.name.Name, formatDateTimeToString(from), metadata, vs.uploadPath)
uploadFileName := filepath.Base(uploadFilePath)
return map[string]interface{}{
"command": "save",
"filename": uploadFileName,
}, nil
switch async {
case true:
vs.logger.Debug("running save command asynchronously")
vs.workers.Add(func(ctx context.Context) {
vs.asyncSave(ctx, from, to, uploadFilePath)
})
return map[string]interface{}{
"command": "save",
"filename": uploadFileName,
"status": "async",
}, nil
default:
err = vs.conc.concat(from, to, uploadFilePath)
if err != nil {
vs.logger.Error("failed to concat files ", err)
return nil, err
}
return map[string]interface{}{
"command": "save",
"filename": uploadFileName,
}, nil
}
case "fetch":
vs.logger.Debug("fetch command received")
from, to, err := validateFetchCommand(command)
if err != nil {
return nil, err
}
tmpFilePath, err := vs.conc.concat(from, to, "", tempPath)
fetchFilePath := generateOutputFilePath(vs.name.Name, formatDateTimeToString(from), "", tempPath)
err = vs.conc.concat(from, to, fetchFilePath)
if err != nil {
vs.logger.Error("failed to concat files ", err)
return nil, err
}
videoSize, err := getFileSize(tmpFilePath)
videoSize, err := getFileSize(fetchFilePath)
if err != nil {
return nil, err
}
if videoSize > maxGRPCSize {
return nil, errors.New("video file size exceeds max grpc size")
}
videoBytes, err := readVideoFile(tmpFilePath)
videoBytes, err := readVideoFile(fetchFilePath)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -377,6 +392,30 @@ func (vs *videostore) deleter(ctx context.Context) {
}
}

// asyncSave command will run the concat operation in the background.
// It waits for the segment duration before running to ensure the last segment
// is written to storage before concatenation.
// TODO: (seanp) Optimize this to immediately run as soon as the current segment is completed.
func (vs *videostore) asyncSave(ctx context.Context, from, to time.Time, path string) {
totalTimeout := time.Duration(asyncTimeout)*time.Second + vs.conc.segmentDur
ctx, cancel := context.WithTimeout(ctx, totalTimeout)
defer cancel()
timer := time.NewTimer(vs.conc.segmentDur)
defer timer.Stop()
select {
case <-timer.C:
vs.logger.Debugf("executing concat for %s", path)
err := vs.conc.concat(from, to, path)
if err != nil {
vs.logger.Error("failed to concat files ", err)
}
return
case <-ctx.Done():
vs.logger.Error("asyncSave operation cancelled or timed out")
return
}
}

// Close closes the video storage camera component.
func (vs *videostore) Close(ctx context.Context) error {
err := vs.stream.Close(ctx)
Expand All @@ -386,7 +425,6 @@ func (vs *videostore) Close(ctx context.Context) error {
vs.workers.Stop()
vs.enc.close()
vs.seg.close()
vs.conc.close()
return nil
}

Expand Down
132 changes: 72 additions & 60 deletions cam/concater.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,84 +15,83 @@ import (
"time"
"unsafe"

"github.com/google/uuid"
"go.viam.com/rdk/logging"
)

const (
conactTextFileName = "concat.txt"
conactTxtFilePattern = "concat_%s.txt"
concatTxtDir = "/tmp"
)

type concater struct {
logger logging.Logger
storagePath string
uploadPath string
camName string
segmentDur time.Duration
concatFile *os.File
}

func newConcater(
logger logging.Logger,
storagePath, uploadPath, camName string,
storagePath, uploadPath string,
segmentSeconds int,
) (*concater, error) {
concatPath := filepath.Join(getHomeDir(), ".viam", conactTextFileName)
logger.Debugf("concatPath: %s", concatPath)
concatFile, err := os.Create(concatPath)
if err != nil {
logger.Error("failed to create concat file", err)
return nil, err
}
return &concater{
c := &concater{
logger: logger,
storagePath: storagePath,
uploadPath: uploadPath,
concatFile: concatFile,
camName: camName,
segmentDur: time.Duration(segmentSeconds) * time.Second,
}, nil
}
err := c.cleanupConcatTxtFiles()
if err != nil {
c.logger.Error("failed to cleanup concat txt files", err)
}
return c, nil
}

// concat takes in from and to timestamps and concates the video files between them.
// returns the path to the concated video file.
func (c *concater) concat(from, to time.Time, metadata, path string) (string, error) {
func (c *concater) concat(from, to time.Time, path string) error {
// Find the storage files that match the concat query.
storageFiles, err := getSortedFiles(c.storagePath)
if err != nil {
c.logger.Error("failed to get sorted files", err)
return "", err
return err
}
if len(storageFiles) == 0 {
return "", errors.New("no video data in storage")
return errors.New("no video data in storage")
}
err = validateTimeRange(storageFiles, from, to)
if err != nil {
return "", err
return err
}
matchingFiles := matchStorageToRange(storageFiles, from, to, c.segmentDur)
if len(matchingFiles) == 0 {
return "", errors.New("no matching video data to save")
return errors.New("no matching video data to save")
}

// Clear the concat file and write the matching files list to it.
c.concatFile.Truncate(0)
c.concatFile.Seek(0, 0)
// Create a temporary file to store the list of files to concatenate.
concatFilePath := generateConcatFilePath()
concatTxtFile, err := os.Create(concatFilePath)
if err != nil {
return err
}
for _, file := range matchingFiles {
_, err := c.concatFile.WriteString(file + "\n")
_, err := concatTxtFile.WriteString(file + "\n")
if err != nil {
return "", err
return err
}
}

concatFilePath := C.CString(c.concatFile.Name())
concatStr := C.CString("concat")
concatFilePathCStr := C.CString(concatFilePath)
concatCStr := C.CString("concat")
defer func() {
C.free(unsafe.Pointer(concatFilePath))
C.free(unsafe.Pointer(concatStr))
C.free(unsafe.Pointer(concatFilePathCStr))
C.free(unsafe.Pointer(concatCStr))
}()
inputFormat := C.av_find_input_format(concatStr)
inputFormat := C.av_find_input_format(concatCStr)
if inputFormat == nil {
return "", errors.New("failed to find input format")
return errors.New("failed to find input format")
}

// Open the input format context with the concat demuxer. This block sets up
Expand All @@ -111,28 +110,20 @@ func (c *concater) concat(from, to time.Time, metadata, path string) (string, er
}()
ret := C.av_dict_set(&options, safeStr, safeValStr, 0)
if ret < 0 {
return "", fmt.Errorf("failed to set option: %s", ffmpegError(ret))
return fmt.Errorf("failed to set option: %s", ffmpegError(ret))
}
ret = C.avformat_open_input(&inputCtx, concatFilePath, inputFormat, &options)
ret = C.avformat_open_input(&inputCtx, concatFilePathCStr, inputFormat, &options)
if ret < 0 {
return "", fmt.Errorf("failed to open input format: %s", ffmpegError(ret))
return fmt.Errorf("failed to open input format: %s", ffmpegError(ret))
}
ret = C.avformat_find_stream_info(inputCtx, nil)
if ret < 0 {
return "", fmt.Errorf("failed to find stream info: %s", ffmpegError(ret))
return fmt.Errorf("failed to find stream info: %s", ffmpegError(ret))
}

// Open the output format context and write the header. This block sets up the
// output format context to write the concatenated video data to a new file.
var outputFilename string
fromStr := formatDateTimeToString(from)
if metadata == "" {
outputFilename = fmt.Sprintf("%s_%s.%s", c.camName, fromStr, defaultVideoFormat)
} else {
outputFilename = fmt.Sprintf("%s_%s_%s.%s", c.camName, fromStr, metadata, defaultVideoFormat)
}
outputPath := filepath.Join(path, outputFilename)
outputPathCStr := C.CString(outputPath)
outputPathCStr := C.CString(path)
var outputCtx *C.AVFormatContext
defer func() {
C.free(unsafe.Pointer(outputPathCStr))
Expand All @@ -142,7 +133,7 @@ func (c *concater) concat(from, to time.Time, metadata, path string) (string, er

ret = C.avformat_alloc_output_context2(&outputCtx, nil, nil, outputPathCStr)
if ret < 0 {
return "", fmt.Errorf("failed to allocate output context: %s", ffmpegError(ret))
return fmt.Errorf("failed to allocate output context: %s", ffmpegError(ret))
}

// Copy codec info from input to output context. This is necessary to ensure
Expand All @@ -153,11 +144,11 @@ func (c *concater) concat(from, to time.Time, metadata, path string) (string, er
uintptr(i)*unsafe.Sizeof(inputCtx.streams)))
outStream := C.avformat_new_stream(outputCtx, nil)
if outStream == nil {
return "", fmt.Errorf("failed to allocate stream")
return fmt.Errorf("failed to allocate stream")
}
ret := C.avcodec_parameters_copy(outStream.codecpar, inStream.codecpar)
if ret < 0 {
return "", fmt.Errorf("failed to copy codec parameters: %s", ffmpegError(ret))
return fmt.Errorf("failed to copy codec parameters: %s", ffmpegError(ret))
}
// Let ffmpeg handle the codec tag for us.
outStream.codecpar.codec_tag = 0
Expand All @@ -166,11 +157,11 @@ func (c *concater) concat(from, to time.Time, metadata, path string) (string, er
// Open the output file and write the header.
ret = C.avio_open(&outputCtx.pb, outputPathCStr, C.AVIO_FLAG_WRITE)
if ret < 0 {
return "", fmt.Errorf("failed to open output file: %s", ffmpegError(ret))
return fmt.Errorf("failed to open output file: %s", ffmpegError(ret))
}
ret = C.avformat_write_header(outputCtx, nil)
if ret < 0 {
return "", fmt.Errorf("failed to write header: %s", ffmpegError(ret))
return fmt.Errorf("failed to write header: %s", ffmpegError(ret))
}

// Adjust the PTS, DTS, and duration correctly for each packet.
Expand All @@ -186,7 +177,7 @@ func (c *concater) concat(from, to time.Time, metadata, path string) (string, er
}
// Any error other than EOF is a problem.
if ret < 0 {
return "", fmt.Errorf("failed to read frame: %s", ffmpegError(ret))
return fmt.Errorf("failed to read frame: %s", ffmpegError(ret))
}
// Can have multiple streams, so need to adjust each packet based on the
// stream it belongs to.
Expand All @@ -203,26 +194,47 @@ func (c *concater) concat(from, to time.Time, metadata, path string) (string, er
packet.pos = -1
ret = C.av_interleaved_write_frame(outputCtx, packet)
if ret < 0 {
return "", fmt.Errorf("failed to write frame: %s", ffmpegError(ret))
return fmt.Errorf("failed to write frame: %s", ffmpegError(ret))
}
}

// Write the trailer, close the output file, and free context memory.
ret = C.av_write_trailer(outputCtx)
if ret < 0 {
return "", fmt.Errorf("failed to write trailer: %s", ffmpegError(ret))
return fmt.Errorf("failed to write trailer: %s", ffmpegError(ret))
}

return outputPath, nil
// Delete tmp concat txt file
if err := os.Remove(concatFilePath); err != nil {
c.logger.Error("failed to remove concat file", err)
}

return nil
}

// close closes the concater and removes the concat file.
// Do not need to clean up FFmpeg resources as they are handled in the concat function.
func (c *concater) close() {
if err := c.concatFile.Close(); err != nil {
c.logger.Error("failed to close concat file", err)
// cleanupConcatTxtFiles cleans up the concat txt files in the tmp directory.
// This is precautionary to ensure that no dangling files are left behind if the
// module is closed during a concat operation.
func (c *concater) cleanupConcatTxtFiles() error {
pattern := fmt.Sprintf(conactTxtFilePattern, "*")
files, err := filepath.Glob(filepath.Join(concatTxtDir, pattern))
if err != nil {
c.logger.Error("failed to list files in /tmp", err)
return err
}
if err := os.Remove(c.concatFile.Name()); err != nil {
c.logger.Error("failed to remove concat file", err)
for _, file := range files {
if err := os.Remove(file); err != nil {
c.logger.Error("failed to remove file", err)
}
}
return nil
}

// generateConcatFilePath generates a unique file name for concat txt reference file.
// This allows multiple concats to be run concurrently without conflicts.
func generateConcatFilePath() string {
uniqueID := uuid.New().String()
fileName := fmt.Sprintf(conactTxtFilePattern, uniqueID)
filePath := filepath.Join(concatTxtDir, fileName)
return filePath
}
Loading

0 comments on commit b9d2829

Please sign in to comment.