diff --git a/internal/backend/backup/jobrun.go b/internal/backend/backup/jobrun.go index 173a5dc..e4a326b 100644 --- a/internal/backend/backup/jobrun.go +++ b/internal/backend/backup/jobrun.go @@ -68,6 +68,7 @@ func RunBackup(job *store.Job, storeInstance *store.Store, waitChan chan struct{ srcPath = filepath.Join(srcPath, job.Subpath) taskChan := make(chan store.Task) + logChan := make(chan string, 100) watchCtx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -158,25 +159,12 @@ func RunBackup(job *store.Job, storeInstance *store.Store, waitChan chan struct{ return nil, fmt.Errorf("RunBackup: error creating stderr pipe -> %w", err) } - clientLogs := []string{} - hasError := false - clientLogCtx, clientLogCancel := context.WithCancel(context.Background()) - go func() { - defer clientLogCancel() - + defer close(logChan) readers := io.MultiReader(cmdStdout, cmdStderr) scanner := bufio.NewScanner(readers) for scanner.Scan() { - formattedTime := time.Now().Format(time.RFC3339) - logLine := scanner.Text() - if strings.Contains(logLine, "Error: upload failed:") { - logLine = strings.Replace(logLine, "Error:", "TASK ERROR:", 1) - hasError = true - return - } - - clientLogs = append(clientLogs, fmt.Sprintf("%s: %s", formattedTime, logLine)) + logChan <- scanner.Text() } if err := scanner.Err(); err != nil { log.Printf("Log reader error: %v", err) @@ -214,11 +202,76 @@ func RunBackup(job *store.Job, storeInstance *store.Store, waitChan chan struct{ return nil, fmt.Errorf("RunBackup: task not found") } + logCtx, logCtxCancel := context.WithCancel(context.Background()) + go func(ctx context.Context, task *store.Task) { + logFilePath := utils.GetTaskLogPath(task.UPID) + + logFile, err := os.OpenFile(logFilePath, os.O_APPEND|os.O_WRONLY, 0644) + if err != nil { + log.Printf("Log file for task %s does not exist or cannot be opened: %v", task.UPID, err) + return + } + defer logFile.Close() + writer := bufio.NewWriter(logFile) + + taskError := "" + hasLogs := false + + _, err = writer.WriteString("--- proxmox-backup-client log starts here ---\n") + if err != nil { + log.Printf("Failed to write logs for task %s: %v", task.UPID, err) + return + } + writer.Flush() + + for { + formattedTime := time.Now().Format(time.RFC3339) + + select { + case <-ctx.Done(): + if taskError == "" && hasLogs { + _, err := writer.WriteString(formattedTime + ": TASK OK") + if err != nil { + log.Printf("Failed to write logs for task %s: %v", task.UPID, err) + return + } + } else if taskError != "" { + _, err := writer.WriteString(formattedTime + ": " + taskError) + if err != nil { + log.Printf("Failed to write logs for task %s: %v", task.UPID, err) + return + } + } + writer.Flush() + return + case logLine, ok := <-logChan: + if !ok { + continue + } + + hasLogs = true + + if strings.Contains(logLine, "Error: upload failed:") { + taskError = strings.Replace(logLine, "Error:", "TASK ERROR:", 1) + continue + } + + _, err := writer.WriteString(formattedTime + ": " + logLine + "\n") + if err != nil { + log.Printf("Failed to write logs for task %s: %v", task.UPID, err) + return + } + writer.Flush() + } + } + }(logCtx, task) + go func(currJob *store.Job, currTask *store.Task) { defer func() { if waitChan != nil { close(waitChan) } + logCtxCancel() }() syslogger, err := syslog.InitializeLogger() @@ -261,38 +314,6 @@ func RunBackup(job *store.Job, storeInstance *store.Store, waitChan chan struct{ syslogger.Errorf("RunBackup (goroutine): unable to update job -> %v", err) return } - - syslogger.Info("Waiting for client logging goroutine to finish...") - <-clientLogCtx.Done() - - logFilePath := utils.GetTaskLogPath(currTask.UPID) - - logFile, err := os.OpenFile(logFilePath, os.O_APPEND|os.O_WRONLY, 0644) - if err != nil { - syslogger.Errorf("Log file for task %s does not exist or cannot be opened: %v", task.UPID, err) - return - } - defer logFile.Close() - writer := bufio.NewWriter(logFile) - - _, err = writer.WriteString("--- proxmox-backup-client log starts here ---\n") - if err != nil { - syslogger.Errorf("Failed to write logs for task %s: %v", task.UPID, err) - return - } - - if !hasError { - formattedTime := time.Now().Format(time.RFC3339) - clientLogs = append(clientLogs, fmt.Sprintf("%s: TASK OK", formattedTime)) - } - - _, err = writer.WriteString(strings.Join(clientLogs, "\n")) - if err != nil { - syslogger.Errorf("Failed to write logs for task %s: %v", task.UPID, err) - return - } - - writer.Flush() }(job, task) job.LastRunUpid = &task.UPID