Skip to content

Commit

Permalink
improve log stream channel handling (#72)
Browse files Browse the repository at this point in the history
* handle closed log streaming channels

* improve channel handling

* fix tail and follow behaviour

* move defer

* use separate waitGroups

* rename to previous

* move func
  • Loading branch information
elenz97 authored Mar 7, 2024
1 parent c06d586 commit c166810
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 34 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
/mittnitectl
/dist
examples/mittnite.d/local.hcl
cmd/mittnitectl/mittnitectl
63 changes: 45 additions & 18 deletions pkg/proc/basejob.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"os"
"os/exec"
"path"
"sync"
"syscall"
"time"

Expand Down Expand Up @@ -85,20 +86,20 @@ func (job *baseJob) StreamStdOut(ctx context.Context, outChan chan []byte, errCh
if len(job.Config.Stdout) == 0 {
return
}
job.readStdFile(ctx, job.Config.Stdout, outChan, errChan, follow, tailLen)
job.readStdFile(ctx, &job.stdOutWg, job.Config.Stdout, outChan, errChan, follow, tailLen)
}

func (job *baseJob) StreamStdErr(ctx context.Context, outChan chan []byte, errChan chan error, follow bool, tailLen int) {
if len(job.Config.Stderr) == 0 {
return
}
job.readStdFile(ctx, job.Config.Stderr, outChan, errChan, follow, tailLen)
job.readStdFile(ctx, &job.stdErrWg, job.Config.Stderr, outChan, errChan, follow, tailLen)
}

func (job *baseJob) StreamStdOutAndStdErr(ctx context.Context, outChan chan []byte, errChan chan error, follow bool, tailLen int) {
job.StreamStdOut(ctx, outChan, errChan, follow, tailLen)
func (job *baseJob) StreamStdOutAndStdErr(ctx context.Context, outChan chan []byte, stdOutErrChan, stdErrErrChan chan error, follow bool, tailLen int) {
job.StreamStdOut(ctx, outChan, stdOutErrChan, follow, tailLen)
if job.Config.Stdout != job.Config.Stderr {
job.StreamStdErr(ctx, outChan, errChan, follow, tailLen)
job.StreamStdErr(ctx, outChan, stdErrErrChan, follow, tailLen)
}
}

Expand Down Expand Up @@ -196,27 +197,43 @@ func (job *baseJob) closeStdFiles() {
}
}

func (job *baseJob) readStdFile(ctx context.Context, filePath string, outChan chan []byte, errChan chan error, follow bool, tailLen int) {
func (job *baseJob) readStdFile(ctx context.Context, wg *sync.WaitGroup, filePath string, outChan chan []byte, errChan chan error, follow bool, tailLen int) {
stdFile, err := os.OpenFile(filePath, os.O_RDONLY, 0o666)
if err != nil {
errChan <- err
return
}

defer stdFile.Close()
seekTail(tailLen, stdFile, outChan)

seekTail(ctx, wg, tailLen, stdFile, outChan)

read := func() {
scanner := bufio.NewScanner(stdFile)

for scanner.Scan() {
line := scanner.Bytes()
outChan <- line
select {
case <-ctx.Done():
return
case outChan <- scanner.Bytes():
default:
if follow {
continue
}
return
}
}

if err := scanner.Err(); err != nil {
errChan <- err
return
select {
case <-ctx.Done():
return
case errChan <- err:
default:
return
}
}
}

for {
select {
default:
Expand All @@ -225,13 +242,18 @@ func (job *baseJob) readStdFile(ctx context.Context, filePath string, outChan ch
errChan <- io.EOF
return
}

continue
case <-ctx.Done():
return
}
}
}

func seekTail(lines int, stdFile *os.File, outChan chan []byte) {
func seekTail(ctx context.Context, wg *sync.WaitGroup, lines int, stdFile *os.File, outChan chan []byte) {
wg.Add(1)
defer wg.Done()

if lines < 0 {
return
}
Expand All @@ -251,12 +273,17 @@ func seekTail(lines int, stdFile *os.File, outChan chan []byte) {
tailBuffer.PushBack(line)
}
for tailBuffer.Len() > 0 {
item := tailBuffer.Front()
line, ok := item.Value.([]byte)
if ok {
outChan <- line
select {
case <-ctx.Done():
return
default:
item := tailBuffer.Front()
line, ok := item.Value.([]byte)
if ok {
outChan <- line
}
tailBuffer.Remove(item)
}
tailBuffer.Remove(item)
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/proc/job_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (job *CommonJob) Watch() {
for p := range job.watchingFiles {
_, err := os.Stat(p)
if os.IsNotExist(err) {
log.Infof("file %s changed, signalling process %s", p, job.Config.Name)
log.Infof("file %s not found, signalling process %s", p, job.Config.Name)
delete(job.watchingFiles, p)
signal = true
}
Expand Down
42 changes: 27 additions & 15 deletions pkg/proc/runner_api_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"net/http"
"strconv"
"strings"
"sync"
"time"
)

Expand Down Expand Up @@ -150,11 +151,13 @@ func (r *Runner) apiV1JobLogs(writer http.ResponseWriter, req *http.Request) {

streamCtx, cancel := context.WithCancel(context.Background())
outChan := make(chan []byte)
errChan := make(chan error)
stdOutErrChan := make(chan error)
stdErrErrChan := make(chan error)
defer func() {
cancel()
close(outChan)
close(errChan)
close(stdOutErrChan)
close(stdErrErrChan)
}()

// handle client disconnects
Expand All @@ -170,18 +173,12 @@ func (r *Runner) apiV1JobLogs(writer http.ResponseWriter, req *http.Request) {
tailLen = -1
}

go job.StreamStdOutAndStdErr(streamCtx, outChan, errChan, follow, tailLen)
go job.StreamStdOutAndStdErr(streamCtx, outChan, stdOutErrChan, stdErrErrChan, follow, tailLen)

for {
select {
case logLine := <-outChan:
if err := conn.WriteMessage(websocket.TextMessage, logLine); err != nil {
break
}

case err = <-errChan:
if errors.Is(err, io.EOF) {
err = conn.WriteControl(
handleErr := func(err error, wg *sync.WaitGroup) {
if errors.Is(err, io.EOF) {
if !follow {
err := conn.WriteControl(
websocket.CloseMessage,
websocket.FormatCloseMessage(websocket.CloseNormalClosure, "EOF"),
time.Now().Add(time.Second),
Expand All @@ -190,9 +187,24 @@ func (r *Runner) apiV1JobLogs(writer http.ResponseWriter, req *http.Request) {
return
}
}
return
} else {
log.WithField("job.name", job.Config.Name).
Error(fmt.Sprintf("error during logs streaming: %s", err.Error()))
break
Error(fmt.Sprintf("error while streaming logs from stdout: %s", err.Error()))
}
}

for {
select {
case logLine := <-outChan:
if err := conn.WriteMessage(websocket.TextMessage, logLine); err != nil {
break
}

case err := <-stdOutErrChan:
handleErr(err, &job.stdOutWg)
case err := <-stdErrErrChan:
handleErr(err, &job.stdErrWg)

case <-streamCtx.Done():
return
Expand Down
2 changes: 2 additions & 0 deletions pkg/proc/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ type baseJob struct {

ctx context.Context
interrupt context.CancelFunc
stdErrWg sync.WaitGroup
stdOutWg sync.WaitGroup

cmd *exec.Cmd
restart bool
Expand Down

0 comments on commit c166810

Please sign in to comment.