Skip to content

Commit

Permalink
wait for context done
Browse files Browse the repository at this point in the history
  • Loading branch information
sonroyaalmerol committed Nov 18, 2024
1 parent a40bff6 commit 93d0e67
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 23 deletions.
25 changes: 14 additions & 11 deletions internal/backend/backup/jobrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,30 +118,30 @@ func RunBackup(job *store.Job, storeInstance *store.Store, waitChan chan struct{
defer cancel()

go func() {
taskChan, err = storeInstance.GetMostRecentTask(watchCtx, job)
err = storeInstance.GetMostRecentTask(watchCtx, taskChan, job)
if err != nil {
log.Printf("RunBackup: unable to monitor tasks folder -> %v\n", err)
return
}
}()

err = cmd.Start()
if err != nil {
if agentMount != nil {
agentMount.Unmount()
}
return nil, fmt.Errorf("RunBackup: proxmox-backup-client start error (%s) -> %w", cmd.String(), err)
}

var task *store.Task
go func() {
taskC := <-taskChan
log.Printf("Task received: %s\n", taskC.UPID)
task = &taskC

close(taskChan)
cancel()
}()

err = cmd.Start()
if err != nil {
if agentMount != nil {
agentMount.Unmount()
}
return nil, fmt.Errorf("RunBackup: proxmox-backup-client start error (%s) -> %w", cmd.String(), err)
}

for {
line, err := logBuffer.ReadString('\n')
if err != nil && line != "" {
Expand All @@ -155,8 +155,11 @@ func RunBackup(job *store.Job, storeInstance *store.Store, waitChan chan struct{
time.Sleep(time.Millisecond * 100)
}

log.Printf("Waiting for task: %s\n", job.ID)
<-watchCtx.Done()

if task == nil {
return nil, fmt.Errorf("RunBackup: task not found -> %w", err)
return nil, fmt.Errorf("RunBackup: task not found")
}

job.LastRunUpid = &task.UPID
Expand Down
23 changes: 11 additions & 12 deletions internal/store/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func encodeToHexEscapes(input string) string {
return encoded.String()
}

func (storeInstance *Store) GetMostRecentTask(ctx context.Context, job *Job) (chan Task, error) {
func (storeInstance *Store) GetMostRecentTask(ctx context.Context, taskChan chan Task, job *Job) error {
tasksParentPath := "/var/log/proxmox-backup/tasks"
hostname, err := os.Hostname()
if err != nil {
Expand All @@ -70,11 +70,11 @@ func (storeInstance *Store) GetMostRecentTask(ctx context.Context, job *Job) (ch

target, err := storeInstance.GetTarget(job.Target)
if err != nil {
return nil, fmt.Errorf("GetMostRecentTask -> %w", err)
return fmt.Errorf("GetMostRecentTask -> %w", err)
}

if target == nil {
return nil, fmt.Errorf("GetMostRecentTask: Target '%s' does not exist.", job.Target)
return fmt.Errorf("GetMostRecentTask: Target '%s' does not exist.", job.Target)
}

isAgent := strings.HasPrefix(target.Path, "agent://")
Expand All @@ -86,12 +86,12 @@ func (storeInstance *Store) GetMostRecentTask(ctx context.Context, job *Job) (ch

watcher, err := fsnotify.NewWatcher()
if err != nil {
return nil, fmt.Errorf("failed to create watcher: %w", err)
return fmt.Errorf("failed to create watcher: %w", err)
}

err = watcher.Add(tasksParentPath)
if err != nil {
return nil, fmt.Errorf("failed to add folder to watcher: %w", err)
return fmt.Errorf("failed to add folder to watcher: %w", err)
}

err = filepath.Walk(tasksParentPath, func(path string, info os.FileInfo, err error) error {
Expand All @@ -108,11 +108,9 @@ func (storeInstance *Store) GetMostRecentTask(ctx context.Context, job *Job) (ch
return nil
})
if err != nil {
return nil, fmt.Errorf("failed to walk folder: %w", err)
return fmt.Errorf("failed to walk folder: %w", err)
}

returnChan := make(chan Task)

go func() {
defer watcher.Close()
for {
Expand All @@ -131,12 +129,13 @@ func (storeInstance *Store) GetMostRecentTask(ctx context.Context, job *Job) (ch
fileName := filepath.Base(event.Name)
colonSplit := strings.Split(fileName, ":")
actualUpid := colonSplit[:9]

newTask, err := storeInstance.GetTaskByUPID(strings.Join(actualUpid, ":") + ":")
if err != nil {
log.Printf("GetMostRecentTask: error getting tasks: %v\n", err)
return
log.Printf("GetMostRecentTask: error getting task: %v\n", err)
continue
}
returnChan <- *newTask
taskChan <- *newTask
return
}
}
Expand All @@ -151,7 +150,7 @@ func (storeInstance *Store) GetMostRecentTask(ctx context.Context, job *Job) (ch
}
}()

return returnChan, nil
return nil
}

func (storeInstance *Store) GetTaskByUPID(upid string) (*Task, error) {
Expand Down

0 comments on commit 93d0e67

Please sign in to comment.