Skip to content

Commit

Permalink
Merge pull request #56 from sonroyaalmerol/remove-passthrough-proxy
Browse files Browse the repository at this point in the history
Ensure RunBackup is async after task monitoring
  • Loading branch information
sonroyaalmerol authored Jan 12, 2025
2 parents 2dc658c + 2399e99 commit c3a48ca
Show file tree
Hide file tree
Showing 16 changed files with 1,171 additions and 1,226 deletions.
167 changes: 54 additions & 113 deletions internal/backend/backup/jobrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"fmt"
"io"
"log"
"os"
"os/exec"
"path/filepath"
Expand Down Expand Up @@ -280,31 +281,22 @@ func RunBackup(job *store.Job, storeInstance *store.Store) (*store.Task, error)
defer stdout.Close()
defer stderr.Close()

// Create task monitoring channel with buffer to prevent goroutine leak
taskChan := make(chan store.Task, 1)

// Start monitoring in background first
monitorCtx, monitorCancel := context.WithCancel(ctx)
defer monitorCancel()
monitorCtx, monitorCancel := context.WithTimeout(ctx, 20*time.Second)

var task *store.Task
var monitorErr error
monitorDone := make(chan struct{})
monitorInitializedChan := make(chan struct{})

readyChan := make(chan struct{})
go func() {
defer close(monitorDone)
task, monitorErr = monitorTask(monitorCtx, storeInstance, job, taskChan, monitorInitializedChan)
defer monitorCancel()
task, monitorErr = storeInstance.GetJobTask(monitorCtx, readyChan, job)
}()

select {
case <-monitorInitializedChan:
case <-monitorDone:
if monitorErr != nil {
return nil, fmt.Errorf("RunBackup: task monitoring failed: %w", monitorErr)
}
case <-readyChan:
case <-monitorCtx.Done():
return nil, fmt.Errorf("RunBackup: timeout waiting for task")
return nil, fmt.Errorf("RunBackup: task monitoring crashed -> %w", monitorErr)
}

// Now start the backup process
Expand All @@ -313,51 +305,42 @@ func RunBackup(job *store.Job, storeInstance *store.Store) (*store.Task, error)
return nil, fmt.Errorf("RunBackup: proxmox-backup-client start error (%s): %w", cmd.String(), err)
}

// Wait for either monitoring to complete or timeout
select {
case <-monitorDone:
if monitorErr != nil {
_ = cmd.Process.Kill()
return nil, fmt.Errorf("RunBackup: task monitoring failed: %w", monitorErr)
}
case <-monitorCtx.Done():
_ = cmd.Process.Kill()
return nil, fmt.Errorf("RunBackup: timeout waiting for task")
}

if task == nil {
_ = cmd.Process.Kill()
return nil, fmt.Errorf("RunBackup: no task created")
}

// Start collecting logs and wait for backup completion
var logLines []string
var logMu sync.Mutex
var logGlobalMu sync.Mutex

go collectLogs(ctx, stdout, stderr, &logLines, &logMu)

// Wait for backup completion
done := make(chan error, 1)
go func() {
done <- cmd.Wait()
logGlobalMu.Lock()
defer logGlobalMu.Unlock()

collectLogs(ctx, stdout, stderr, &logLines, &logMu)
}()

// Wait for completion or context cancellation
// Wait for either monitoring to complete or timeout
select {
case err := <-done:
if err != nil {
return task, fmt.Errorf("RunBackup: backup execution failed: %w", err)
case <-monitorCtx.Done():
if task == nil {
_ = cmd.Process.Kill()
return nil, fmt.Errorf("RunBackup: no task created")
}
case <-ctx.Done():
_ = cmd.Process.Kill()
return task, fmt.Errorf("RunBackup: backup timeout: %w", ctx.Err())
}

// Update job status
if err := updateJobStatus(job, task, storeInstance, logLines); err != nil {
if err := updateJobStatus(job, task, storeInstance, nil); err != nil {
return task, fmt.Errorf("RunBackup: failed to update job status: %w", err)
}

go func() {
_ = cmd.Wait()

logGlobalMu.Lock()
defer logGlobalMu.Unlock()

if err := updateJobStatus(job, task, storeInstance, logLines); err != nil {
log.Printf("RunBackup: failed to update job status: %v", err)
}
}()

return task, nil
}

Expand Down Expand Up @@ -385,20 +368,6 @@ func mountAgent(ctx context.Context, storeInstance *store.Store, target *store.T
return agentMount, nil
}

func monitorTask(ctx context.Context, storeInstance *store.Store, job *store.Job, taskChan chan store.Task, readyChan chan struct{}) (*store.Task, error) {
if err := storeInstance.GetJobTask(ctx, taskChan, readyChan, job); err != nil {
return nil, fmt.Errorf("failed to start task monitoring: %w", err)
}

// Wait for task or context cancellation
select {
case taskResult := <-taskChan:
return &taskResult, nil
case <-ctx.Done():
return nil, fmt.Errorf("timeout waiting for task: %w", ctx.Err())
}
}

func prepareBackupCommand(ctx context.Context, job *store.Job, storeInstance *store.Store, srcPath string, isAgent bool) (*exec.Cmd, error) {
if srcPath == "" {
return nil, fmt.Errorf("RunBackup: source path is required")
Expand Down Expand Up @@ -541,68 +510,40 @@ func collectLogs(ctx context.Context, stdout, stderr io.ReadCloser, logLines *[]
}

func updateJobStatus(job *store.Job, task *store.Task, storeInstance *store.Store, logLines []string) error {
errChan := make(chan error, 1)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

go func() {
syslogger, err := syslog.InitializeLogger()
if err != nil {
errChan <- fmt.Errorf("failed to initialize logger: %w", err)
return
}
syslogger, err := syslog.InitializeLogger()
if err != nil {
return fmt.Errorf("failed to initialize logger: %w", err)
}

if logLines != nil {
// Write logs to file
if err := writeLogsToFile(task.UPID, logLines); err != nil {
syslogger.Errorf("Failed to write logs: %v", err)
errChan <- err
return
}

// Update task status
taskFound, err := storeInstance.GetTaskByUPID(task.UPID)
if err != nil {
syslogger.Errorf("Unable to get task by UPID: %v", err)
errChan <- err
return
}

// Update job status
latestJob, err := storeInstance.GetJob(job.ID)
if err != nil {
syslogger.Errorf("Unable to get job: %v", err)
errChan <- err
return
return err
}
}

latestJob.LastRunState = &taskFound.Status
latestJob.LastRunEndtime = &taskFound.EndTime

if err := storeInstance.UpdateJob(*latestJob); err != nil {
syslogger.Errorf("Unable to update job: %v", err)
errChan <- err
return
}

errChan <- nil
}()

// Update immediate job status
job.LastRunUpid = &task.UPID
job.LastRunState = &task.Status
// Update task status
taskFound, err := storeInstance.GetTaskByUPID(task.UPID)
if err != nil {
syslogger.Errorf("Unable to get task by UPID: %v", err)
return err
}

if err := storeInstance.UpdateJob(*job); err != nil {
return fmt.Errorf("unable to update job: %w", err)
// Update job status
latestJob, err := storeInstance.GetJob(job.ID)
if err != nil {
syslogger.Errorf("Unable to get job: %v", err)
return err
}

// Wait for background updates to complete with timeout
select {
case err := <-errChan:
if err != nil {
return fmt.Errorf("background job update failed: %w", err)
}
case <-ctx.Done():
return fmt.Errorf("job status update timed out: %w", ctx.Err())
latestJob.LastRunUpid = &taskFound.UPID
latestJob.LastRunState = &taskFound.Status
latestJob.LastRunEndtime = &taskFound.EndTime

if err := storeInstance.UpdateJob(*latestJob); err != nil {
syslogger.Errorf("Unable to update job: %v", err)
return err
}

return nil
Expand Down
72 changes: 36 additions & 36 deletions internal/proxy/views/d2d_backup/disk_backup.js
Original file line number Diff line number Diff line change
@@ -1,41 +1,41 @@
Ext.define('PBS.D2DManagement', {
extend: 'Ext.tab.Panel',
alias: 'widget.pbsD2DManagement',
Ext.define("PBS.D2DManagement", {
extend: "Ext.tab.Panel",
alias: "widget.pbsD2DManagement",

title: 'Disk Backup',
title: "Disk Backup",

tools: [],
tools: [],

border: true,
defaults: {
border: false,
xtype: 'panel',
},
border: true,
defaults: {
border: false,
xtype: "panel",
},

items: [
{
xtype: 'pbsDiskBackupJobView',
title: gettext('Backup Jobs'),
itemId: 'd2d-backup-jobs',
iconCls: 'fa fa-floppy-o',
},
{
xtype: 'pbsDiskTargetPanel',
title: 'Targets',
itemId: 'targets',
iconCls: 'fa fa-desktop',
},
{
xtype: 'pbsDiskExclusionPanel',
title: 'Global Exclusions',
itemId: 'exclusions',
iconCls: 'fa fa-ban',
},
{
xtype: 'pbsDiskPartialFilePanel',
title: 'Verify File Sizes',
itemId: 'partial-files',
iconCls: 'fa fa-file',
},
],
items: [
{
xtype: "pbsDiskBackupJobView",
title: gettext("Backup Jobs"),
itemId: "d2d-backup-jobs",
iconCls: "fa fa-floppy-o",
},
{
xtype: "pbsDiskTargetPanel",
title: "Targets",
itemId: "targets",
iconCls: "fa fa-desktop",
},
{
xtype: "pbsDiskExclusionPanel",
title: "Global Exclusions",
itemId: "exclusions",
iconCls: "fa fa-ban",
},
{
xtype: "pbsDiskPartialFilePanel",
title: "Verify File Sizes",
itemId: "partial-files",
iconCls: "fa fa-file",
},
],
});
62 changes: 33 additions & 29 deletions internal/proxy/views/d2d_backup/models.js
Original file line number Diff line number Diff line change
@@ -1,36 +1,40 @@
Ext.define('pbs-disk-backup-job-status', {
extend: 'Ext.data.Model',
fields: [
'id', 'store', 'target', 'subpath', 'schedule', 'comment', 'duration',
'next-run', 'last-run-upid', 'last-run-state', 'last-run-endtime', 'rawexclusions',
],
idProperty: 'id',
proxy: {
type: 'proxmox',
url: pbsPlusBaseUrl + '/api2/json/d2d/backup',
},
Ext.define("pbs-disk-backup-job-status", {
extend: "Ext.data.Model",
fields: [
"id",
"store",
"target",
"subpath",
"schedule",
"comment",
"duration",
"next-run",
"last-run-upid",
"last-run-state",
"last-run-endtime",
"rawexclusions",
],
idProperty: "id",
proxy: {
type: "proxmox",
url: pbsPlusBaseUrl + "/api2/json/d2d/backup",
},
});

Ext.define('pbs-model-targets', {
extend: 'Ext.data.Model',
fields: [
'name', 'path',
],
idProperty: 'name',
Ext.define("pbs-model-targets", {
extend: "Ext.data.Model",
fields: ["name", "path"],
idProperty: "name",
});

Ext.define('pbs-model-exclusions', {
extend: 'Ext.data.Model',
fields: [
'path', 'comment',
],
idProperty: 'path',
Ext.define("pbs-model-exclusions", {
extend: "Ext.data.Model",
fields: ["path", "comment"],
idProperty: "path",
});

Ext.define('pbs-model-partial-files', {
extend: 'Ext.data.Model',
fields: [
'path', 'comment',
],
idProperty: 'path',
Ext.define("pbs-model-partial-files", {
extend: "Ext.data.Model",
fields: ["path", "comment"],
idProperty: "path",
});
Loading

0 comments on commit c3a48ca

Please sign in to comment.