Skip to content

Commit

Permalink
Merge pull request #52 from sonroyaalmerol/remove-passthrough-proxy
Browse files Browse the repository at this point in the history
fix task monitoring
  • Loading branch information
sonroyaalmerol authored Jan 11, 2025
2 parents 6fb9036 + a984ac2 commit 0c8ba4f
Showing 1 changed file with 90 additions and 60 deletions.
150 changes: 90 additions & 60 deletions internal/backend/backup/jobrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"context"
"fmt"
"io"
"log"
"os"
"os/exec"
"path/filepath"
Expand Down Expand Up @@ -216,13 +215,15 @@ type openResult struct {
}

func RunBackup(job *store.Job, storeInstance *store.Store) (*store.Task, error) {
// Create a context with timeout for the entire operation
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

if storeInstance.APIToken == nil {
return nil, fmt.Errorf("RunBackup: api token is required")
}

// Validate and setup target
target, err := storeInstance.GetTarget(job.Target)
if err != nil {
return nil, fmt.Errorf("RunBackup: failed to get target: %w", err)
Expand Down Expand Up @@ -254,21 +255,84 @@ func RunBackup(job *store.Job, storeInstance *store.Store) (*store.Task, error)

srcPath = filepath.Join(srcPath, job.Subpath)

// Create task monitoring channel with buffer to prevent goroutine leak
taskChan := make(chan store.Task, 1)

task, err := monitorTask(ctx, storeInstance, job, taskChan)
// Prepare backup command
cmd, err := prepareBackupCommand(ctx, job, storeInstance, srcPath, isAgent)
if err != nil {
return nil, err
}

cmd, err := prepareBackupCommand(ctx, job, storeInstance, srcPath, isAgent)
// Setup command pipes
stdout, stderr, err := setupCommandPipes(cmd)
if err != nil {
return nil, err
}
defer stdout.Close()
defer stderr.Close()

if err := executeBackup(ctx, cmd, job, task, storeInstance); err != nil {
return nil, err
// Start the backup process first
if err := cmd.Start(); err != nil {
return nil, fmt.Errorf("RunBackup: proxmox-backup-client start error (%s): %w", cmd.String(), err)
}

// Create task monitoring channel with buffer to prevent goroutine leak
taskChan := make(chan store.Task, 1)

// Start monitoring in background
monitorCtx, monitorCancel := context.WithTimeout(ctx, 20*time.Second)
defer monitorCancel()

var task *store.Task
var monitorErr error

monitorDone := make(chan struct{})
go func() {
defer close(monitorDone)
task, monitorErr = monitorTask(monitorCtx, storeInstance, job, taskChan)
}()

// Wait for either monitoring to complete or timeout
select {
case <-monitorDone:
if monitorErr != nil {
_ = cmd.Process.Kill()
return nil, fmt.Errorf("RunBackup: task monitoring failed: %w", monitorErr)
}
case <-monitorCtx.Done():
_ = cmd.Process.Kill()
return nil, fmt.Errorf("RunBackup: timeout waiting for task")
}

if task == nil {
_ = cmd.Process.Kill()
return nil, fmt.Errorf("RunBackup: no task created")
}

// Start collecting logs and wait for backup completion
var logLines []string
var logMu sync.Mutex

go collectLogs(ctx, stdout, stderr, &logLines, &logMu)

// Wait for backup completion
done := make(chan error, 1)
go func() {
done <- cmd.Wait()
}()

// Wait for completion or context cancellation
select {
case err := <-done:
if err != nil {
return task, fmt.Errorf("RunBackup: backup execution failed: %w", err)
}
case <-ctx.Done():
_ = cmd.Process.Kill()
return task, fmt.Errorf("RunBackup: backup timeout: %w", ctx.Err())
}

// Update job status
if err := updateJobStatus(job, task, storeInstance, logLines); err != nil {
return task, fmt.Errorf("RunBackup: failed to update job status: %w", err)
}

return task, nil
Expand Down Expand Up @@ -310,26 +374,37 @@ func monitorTask(ctx context.Context, storeInstance *store.Store, job *store.Job
}
defer backupMutex.Unlock()

errChan := make(chan error, 1)
resultChan := make(chan *store.Task, 1)

monitorCtx, cancel := context.WithTimeout(ctx, 20*time.Second)
defer cancel()

// Start task monitoring
go func() {
if err := storeInstance.GetJobTask(monitorCtx, taskChan, job); err != nil {
log.Printf("RunBackup: unable to monitor tasks folder: %v", err)
cancel() // Ensure cleanup on error
errChan <- fmt.Errorf("unable to monitor tasks folder: %w", err)
return
}
}()

// Wait for task with timeout
select {
case taskResult := <-taskChan:
if monitorCtx.Err() != nil {
return nil, fmt.Errorf("context cancelled while receiving task")
errChan <- fmt.Errorf("context cancelled while receiving task")
return nil, <-errChan
}
return &taskResult, nil
resultChan <- &taskResult
case err := <-errChan:
return nil, err
case <-monitorCtx.Done():
return nil, fmt.Errorf("RunBackup: timeout waiting for task: %w", monitorCtx.Err())
return nil, fmt.Errorf("timeout waiting for task: %w", ctx.Err())
}

select {
case result := <-resultChan:
return result, nil
case err := <-errChan:
return nil, err
}
}

Expand Down Expand Up @@ -359,51 +434,6 @@ func prepareBackupCommand(ctx context.Context, job *store.Job, storeInstance *st
return cmd, nil
}

func executeBackup(ctx context.Context, cmd *exec.Cmd, job *store.Job, task *store.Task, storeInstance *store.Store) error {
var logLines []string
var logMu sync.Mutex

stdout, stderr, err := setupCommandPipes(cmd)
if err != nil {
return err
}
defer stdout.Close()
defer stderr.Close()

if err := cmd.Start(); err != nil {
return fmt.Errorf("RunBackup: proxmox-backup-client start error (%s): %w", cmd.String(), err)
}

// Start log collection in background
logCtx, logCancel := context.WithCancel(ctx)
defer logCancel()

go collectLogs(logCtx, stdout, stderr, &logLines, &logMu)

// Process completion channel
done := make(chan error, 1)
go func() {
done <- cmd.Wait()
}()

// Wait for completion or context cancellation
select {
case err := <-done:
if err != nil {
return fmt.Errorf("RunBackup: backup execution failed: %w", err)
}
case <-ctx.Done():
return fmt.Errorf("RunBackup: backup timeout: %w", ctx.Err())
}

// Update job status
if err := updateJobStatus(job, task, storeInstance, logLines); err != nil {
return fmt.Errorf("RunBackup: failed to update job status: %w", err)
}

return nil
}

func getBackupId(isAgent bool, targetName string) (string, error) {
if !isAgent {
hostname, err := os.Hostname()
Expand Down

0 comments on commit 0c8ba4f

Please sign in to comment.