diff --git a/internal/backend/backup/jobrun.go b/internal/backend/backup/jobrun.go index e5d9b25..55d1511 100644 --- a/internal/backend/backup/jobrun.go +++ b/internal/backend/backup/jobrun.go @@ -67,7 +67,6 @@ func RunBackup(job *store.Job, storeInstance *store.Store, waitChan chan struct{ srcPath = filepath.Join(srcPath, job.Subpath) taskChan := make(chan store.Task) - logChan := make(chan string, 100) watchCtx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -156,12 +155,25 @@ func RunBackup(job *store.Job, storeInstance *store.Store, waitChan chan struct{ return nil, fmt.Errorf("RunBackup: error creating stderr pipe -> %w", err) } + clientLogs := []string{} + hasError := false + clientLogCtx, clientLogCancel := context.WithCancel(context.Background()) + go func() { - defer close(logChan) + defer clientLogCancel() + readers := io.MultiReader(cmdStdout, cmdStderr) scanner := bufio.NewScanner(readers) for scanner.Scan() { - logChan <- scanner.Text() + formattedTime := time.Now().Format(time.RFC3339) + logLine := scanner.Text() + if strings.Contains(logLine, "Error: upload failed:") { + logLine = strings.Replace(logLine, "Error:", "TASK ERROR:", 1) + hasError = true + return + } + + clientLogs = append(clientLogs, fmt.Sprintf("%s: %s", formattedTime, logLine)) } if err := scanner.Err(); err != nil { log.Printf("Log reader error: %v", err) @@ -199,76 +211,11 @@ func RunBackup(job *store.Job, storeInstance *store.Store, waitChan chan struct{ return nil, fmt.Errorf("RunBackup: task not found") } - logCtx, logCtxCancel := context.WithCancel(context.Background()) - go func(ctx context.Context, task *store.Task) { - logFilePath := utils.GetTaskLogPath(task.UPID) - - logFile, err := os.OpenFile(logFilePath, os.O_APPEND|os.O_WRONLY, 0644) - if err != nil { - log.Printf("Log file for task %s does not exist or cannot be opened: %v", task.UPID, err) - return - } - defer logFile.Close() - writer := bufio.NewWriter(logFile) - - taskError := "" - hasLogs := false - - _, err = writer.WriteString("--- proxmox-backup-client log starts here ---\n") - if err != nil { - log.Printf("Failed to write logs for task %s: %v", task.UPID, err) - return - } - writer.Flush() - - for { - formattedTime := time.Now().Format(time.RFC3339) - - select { - case <-ctx.Done(): - if taskError == "" && hasLogs { - _, err := writer.WriteString(formattedTime + ": TASK OK") - if err != nil { - log.Printf("Failed to write logs for task %s: %v", task.UPID, err) - return - } - } else if taskError != "" { - _, err := writer.WriteString(formattedTime + ": " + taskError) - if err != nil { - log.Printf("Failed to write logs for task %s: %v", task.UPID, err) - return - } - } - writer.Flush() - return - case logLine, ok := <-logChan: - if !ok { - continue - } - - hasLogs = true - - if strings.Contains(logLine, "Error: upload failed:") { - taskError = strings.Replace(logLine, "Error:", "TASK ERROR:", 1) - continue - } - - _, err := writer.WriteString(formattedTime + ": " + logLine + "\n") - if err != nil { - log.Printf("Failed to write logs for task %s: %v", task.UPID, err) - return - } - writer.Flush() - } - } - }(logCtx, task) - go func(currJob *store.Job, currTask *store.Task) { defer func() { if waitChan != nil { close(waitChan) } - logCtxCancel() }() syslogger, err := syslog.InitializeLogger() @@ -311,6 +258,38 @@ func RunBackup(job *store.Job, storeInstance *store.Store, waitChan chan struct{ syslogger.Errorf("RunBackup (goroutine): unable to update job -> %v", err) return } + + syslogger.Info("Waiting for client logging goroutine to finish...") + <-clientLogCtx.Done() + + logFilePath := utils.GetTaskLogPath(currTask.UPID) + + logFile, err := os.OpenFile(logFilePath, os.O_APPEND|os.O_WRONLY, 0644) + if err != nil { + syslogger.Errorf("Log file for task %s does not exist or cannot be opened: %v", task.UPID, err) + return + } + defer logFile.Close() + writer := bufio.NewWriter(logFile) + + _, err = writer.WriteString("--- proxmox-backup-client log starts here ---\n") + if err != nil { + syslogger.Errorf("Failed to write logs for task %s: %v", task.UPID, err) + return + } + + if !hasError { + formattedTime := time.Now().Format(time.RFC3339) + clientLogs = append(clientLogs, fmt.Sprintf("%s: TASK OK", formattedTime)) + } + + _, err = writer.WriteString(strings.Join(clientLogs, "\n")) + if err != nil { + syslogger.Errorf("Failed to write logs for task %s: %v", task.UPID, err) + return + } + + writer.Flush() }(job, task) job.LastRunUpid = &task.UPID