Skip to content

Commit

Permalink
Merge pull request #57 from sonroyaalmerol/remove-passthrough-proxy
Browse files Browse the repository at this point in the history
fix unmounting on RunBackup function end
  • Loading branch information
sonroyaalmerol authored Jan 12, 2025
2 parents c3a48ca + b599994 commit c3ffad7
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 55 deletions.
71 changes: 17 additions & 54 deletions internal/backend/backup/jobrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,10 +227,6 @@ func RunBackup(job *store.Job, storeInstance *store.Store) (*store.Task, error)
}
defer backupMutex.Unlock()

// Create a context with timeout for the entire operation
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

if storeInstance.APIToken == nil {
return nil, fmt.Errorf("RunBackup: api token is required")
}
Expand All @@ -252,23 +248,16 @@ func RunBackup(job *store.Job, storeInstance *store.Store) (*store.Task, error)

var agentMount *mount.AgentMount
if isAgent {
if agentMount, err = mountAgent(ctx, storeInstance, target); err != nil {
if agentMount, err = mountAgent(storeInstance, target); err != nil {
return nil, err
}
defer func() {
defer cancel()
if agentMount != nil {
agentMount.Unmount()
agentMount.CloseSFTP()
}
}()
srcPath = agentMount.Path
}

srcPath = filepath.Join(srcPath, job.Subpath)

// Prepare backup command
cmd, err := prepareBackupCommand(ctx, job, storeInstance, srcPath, isAgent)
cmd, err := prepareBackupCommand(job, storeInstance, srcPath, isAgent)
if err != nil {
return nil, err
}
Expand All @@ -282,7 +271,7 @@ func RunBackup(job *store.Job, storeInstance *store.Store) (*store.Task, error)
defer stderr.Close()

// Start monitoring in background first
monitorCtx, monitorCancel := context.WithTimeout(ctx, 20*time.Second)
monitorCtx, monitorCancel := context.WithTimeout(context.Background(), 20*time.Second)

var task *store.Task
var monitorErr error
Expand Down Expand Up @@ -314,7 +303,7 @@ func RunBackup(job *store.Job, storeInstance *store.Store) (*store.Task, error)
logGlobalMu.Lock()
defer logGlobalMu.Unlock()

collectLogs(ctx, stdout, stderr, &logLines, &logMu)
collectLogs(stdout, stderr, &logLines, &logMu)
}()

// Wait for either monitoring to complete or timeout
Expand All @@ -339,36 +328,26 @@ func RunBackup(job *store.Job, storeInstance *store.Store) (*store.Task, error)
if err := updateJobStatus(job, task, storeInstance, logLines); err != nil {
log.Printf("RunBackup: failed to update job status: %v", err)
}

if agentMount != nil {
agentMount.Unmount()
agentMount.CloseSFTP()
}
}()

return task, nil
}

func mountAgent(ctx context.Context, storeInstance *store.Store, target *store.Target) (*mount.AgentMount, error) {
func mountAgent(storeInstance *store.Store, target *store.Target) (*mount.AgentMount, error) {
agentMount, err := mount.Mount(storeInstance, target)
if err != nil {
return nil, fmt.Errorf("RunBackup: mount initialization error: %w", err)
}

// Use context with timeout for mount wait
mountDone := make(chan error, 1)
go func() {
mountDone <- agentMount.Cmd.Wait()
}()

select {
case err := <-mountDone:
if err != nil {
return nil, fmt.Errorf("RunBackup: mount wait error: %w", err)
}
case <-ctx.Done():
return nil, fmt.Errorf("RunBackup: mount timeout: %w", ctx.Err())
}

return agentMount, nil
}

func prepareBackupCommand(ctx context.Context, job *store.Job, storeInstance *store.Store, srcPath string, isAgent bool) (*exec.Cmd, error) {
func prepareBackupCommand(job *store.Job, storeInstance *store.Store, srcPath string, isAgent bool) (*exec.Cmd, error) {
if srcPath == "" {
return nil, fmt.Errorf("RunBackup: source path is required")
}
Expand All @@ -388,7 +367,7 @@ func prepareBackupCommand(ctx context.Context, job *store.Job, storeInstance *st
return nil, fmt.Errorf("RunBackup: failed to build command arguments")
}

cmd := exec.CommandContext(ctx, "/usr/bin/prlimit", cmdArgs...)
cmd := exec.Command("/usr/bin/prlimit", cmdArgs...)
cmd.Env = buildCommandEnv(storeInstance)

return cmd, nil
Expand Down Expand Up @@ -482,30 +461,14 @@ func setupCommandPipes(cmd *exec.Cmd) (io.ReadCloser, io.ReadCloser, error) {
return stdout, stderr, nil
}

func collectLogs(ctx context.Context, stdout, stderr io.ReadCloser, logLines *[]string, logMu *sync.Mutex) {
func collectLogs(stdout, stderr io.ReadCloser, logLines *[]string, logMu *sync.Mutex) {
reader := bufio.NewScanner(io.MultiReader(stdout, stderr))
reader.Buffer(make([]byte, 0, 64*1024), 1024*1024) // Increased buffer size

done := make(chan struct{})
go func() {
defer close(done)
for reader.Scan() {
select {
case <-ctx.Done():
return
default:
logMu.Lock()
*logLines = append(*logLines, reader.Text())
logMu.Unlock()
}
}
}()

select {
case <-ctx.Done():
return
case <-done:
return
for reader.Scan() {
logMu.Lock()
*logLines = append(*logLines, reader.Text())
logMu.Unlock()
}
}

Expand Down
2 changes: 1 addition & 1 deletion internal/backend/mount/mount.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func Mount(storeInstance *store.Store, target *store.Target) (*AgentMount, error

agentMount.Cmd = mnt

err = mnt.Start()
err = mnt.Run()
if err != nil {
agentMount.Unmount()
agentMount.CloseSFTP()
Expand Down

0 comments on commit c3ffad7

Please sign in to comment.