Skip to content

Commit

Permalink
simplify waiting for log again and pass array pointer instead
Browse files Browse the repository at this point in the history
  • Loading branch information
sonroyaalmerol committed Jan 13, 2025
1 parent 6cdee38 commit 53e7060
Showing 1 changed file with 29 additions and 178 deletions.
207 changes: 29 additions & 178 deletions internal/backend/backup/jobrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"io"
"log"
"math"
"os"
"os/exec"
"path/filepath"
Expand All @@ -20,193 +21,39 @@ import (
"github.com/sonroyaalmerol/pbs-plus/internal/store"
"github.com/sonroyaalmerol/pbs-plus/internal/syslog"
"github.com/sonroyaalmerol/pbs-plus/internal/utils"
"golang.org/x/sys/unix"
)

func waitForLogFile(logFilePath string, maxWait time.Duration) (*os.File, error) {
// Validate inputs to prevent potential issues
if maxWait <= 0 || maxWait > 5*time.Minute {
return nil, fmt.Errorf("invalid timeout value: must be between 0 and 5 minutes")
}

if len(logFilePath) == 0 || len(logFilePath) > unix.PathMax {
return nil, fmt.Errorf("invalid path length")
}

// Try immediate open first with timeout
openChan := make(chan openResult, 1)
go func() {
file, err := os.OpenFile(logFilePath, os.O_APPEND|os.O_WRONLY, 0644)
openChan <- openResult{file: file, err: err}
}()

select {
case result := <-openChan:
if result.err == nil {
return result.file, nil
}
case <-time.After(100 * time.Millisecond):
// Initial open timed out, continue with waiting
}

// Ensure parent directory exists with timeout
dirPath := filepath.Dir(logFilePath)
mkdirChan := make(chan error, 1)
go func() {
mkdirChan <- os.MkdirAll(dirPath, 0755)
}()

select {
case err := <-mkdirChan:
if err != nil {
return nil, fmt.Errorf("failed to create log directory: %w", err)
}
case <-time.After(500 * time.Millisecond):
return nil, fmt.Errorf("timeout creating directory")
}

// Initialize inotify with rate limiting
fd, err := unix.InotifyInit1(unix.IN_CLOEXEC | unix.IN_NONBLOCK)
if err != nil {
return nil, fmt.Errorf("failed to initialize inotify: %w", err)
}
defer func() {
if fd > 0 {
unix.Close(fd)
}
}()

// Add watch for both the directory and file
wd, err := unix.InotifyAddWatch(fd, dirPath, unix.IN_CREATE|unix.IN_CLOSE_WRITE)
if err != nil {
return nil, fmt.Errorf("failed to add inotify watch: %w", err)
}
defer unix.InotifyRmWatch(fd, uint32(wd))

// Create epoll instance
epfd, err := unix.EpollCreate1(unix.EPOLL_CLOEXEC)
if err != nil {
return nil, fmt.Errorf("failed to create epoll: %w", err)
}
defer func() {
if epfd > 0 {
unix.Close(epfd)
}
}()

// Add inotify fd to epoll
event := unix.EpollEvent{
Events: unix.EPOLLIN,
Fd: int32(fd),
}
if err := unix.EpollCtl(epfd, unix.EPOLL_CTL_ADD, fd, &event); err != nil {
return nil, fmt.Errorf("failed to add fd to epoll: %w", err)
}

// Buffer pool for events to prevent repeated allocations
bufferPool := sync.Pool{
New: func() interface{} {
return make([]byte, 4096) // 4KB chunks
},
}

events := make([]unix.EpollEvent, 1)
deadline := time.Now().Add(maxWait)

// Rate limiting for file operations
rateLimiter := time.NewTicker(50 * time.Millisecond)
defer rateLimiter.Stop()

// Counter for number of attempts
attempts := 0
maxAttempts := 1000 // Prevent infinite loops
start := time.Now()

for {
if attempts >= maxAttempts {
return nil, fmt.Errorf("exceeded maximum number of attempts")
}
attempts++

// Check deadline
if time.Now().After(deadline) {
return nil, fmt.Errorf("timeout waiting for log file: %s", logFilePath)
}

// Calculate timeout for epoll
timeout := int(time.Until(deadline).Milliseconds())
if timeout <= 0 {
return nil, fmt.Errorf("timeout waiting for log file: %s", logFilePath)
}

// Wait for events with timeout
n, err := unix.EpollWait(epfd, events, timeout)
if err != nil {
if err == unix.EINTR {
continue
}
return nil, fmt.Errorf("epoll wait error: %w", err)
// Try to open the file with write permissions
file, err := os.OpenFile(logFilePath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0644)
if err == nil {
return file, nil
}

if n == 0 {
continue // Timeout on epoll
// Check if we've exceeded the maximum wait time
if time.Since(start) > maxWait {
return nil, fmt.Errorf("timeout waiting for file %s to become writable: %v", logFilePath, err)
}

// Get buffer from pool
buffer := bufferPool.Get().([]byte)
eventProcessed := false

// Read events with timeout
readDone := make(chan error, 1)
go func() {
_, err := unix.Read(fd, buffer)
readDone <- err
}()

select {
case err := <-readDone:
if err != nil {
if err == unix.EAGAIN {
bufferPool.Put(buffer)
continue
// Check if the error is due to permissions or if the file doesn't exist
if os.IsPermission(err) {
// File exists but we don't have permission
stat, statErr := os.Stat(logFilePath)
if statErr == nil {
// Check if file is writable by current user
if stat.Mode().Perm()&0200 == 0 {
return nil, fmt.Errorf("file %s exists but is not writable: %v", logFilePath, err)
}
bufferPool.Put(buffer)
return nil, fmt.Errorf("error reading inotify events: %w", err)
}
eventProcessed = true
case <-time.After(100 * time.Millisecond):
bufferPool.Put(buffer)
continue
}

if !eventProcessed {
bufferPool.Put(buffer)
continue
}

// Rate limit our file open attempts
select {
case <-rateLimiter.C:
// Try to open the file
openChan := make(chan openResult, 1)
go func() {
file, err := os.OpenFile(logFilePath, os.O_APPEND|os.O_WRONLY, 0644)
openChan <- openResult{file: file, err: err}
}()

select {
case result := <-openChan:
if result.err == nil {
bufferPool.Put(buffer)
return result.file, nil
}
case <-time.After(100 * time.Millisecond):
// Timeout on file open, continue waiting
}
default:
// Skip this attempt if we're rate limited
}

bufferPool.Put(buffer)
// Sleep for a short duration before retrying
// Using exponential backoff with a maximum of 100ms
backoff := time.Duration(math.Min(100, math.Pow(2, float64(time.Since(start).Milliseconds()/100)))) * time.Millisecond
time.Sleep(backoff)
}
}

Expand Down Expand Up @@ -325,7 +172,7 @@ func RunBackup(job *store.Job, storeInstance *store.Store) (*store.Task, error)
logGlobalMu.Lock()
defer logGlobalMu.Unlock()

if err := updateJobStatus(job, task, storeInstance, logLines); err != nil {
if err := updateJobStatus(job, task, storeInstance, &logLines); err != nil {
log.Printf("RunBackup: failed to update job status: %v", err)
}

Expand Down Expand Up @@ -472,7 +319,7 @@ func collectLogs(stdout, stderr io.ReadCloser, logLines *[]string, logMu *sync.M
}
}

func updateJobStatus(job *store.Job, task *store.Task, storeInstance *store.Store, logLines []string) error {
func updateJobStatus(job *store.Job, task *store.Task, storeInstance *store.Store, logLines *[]string) error {
syslogger, err := syslog.InitializeLogger()
if err != nil {
return fmt.Errorf("failed to initialize logger: %w", err)
Expand Down Expand Up @@ -512,7 +359,11 @@ func updateJobStatus(job *store.Job, task *store.Task, storeInstance *store.Stor
return nil
}

func writeLogsToFile(upid string, logLines []string) error {
func writeLogsToFile(upid string, logLines *[]string) error {
if logLines == nil {
return fmt.Errorf("logLines is nil")
}

logFilePath := utils.GetTaskLogPath(upid)
logFile, err := waitForLogFile(logFilePath, 5*time.Second)
if err != nil {
Expand All @@ -531,7 +382,7 @@ func writeLogsToFile(upid string, logLines []string) error {
var errorString string
timestamp := time.Now().Format(time.RFC3339)

for _, logLine := range logLines {
for _, logLine := range *logLines {
if strings.Contains(logLine, "Error: upload failed:") {
errorString = strings.Replace(logLine, "Error:", "TASK ERROR:", 1)
hasError = true
Expand Down

0 comments on commit 53e7060

Please sign in to comment.