From 587c5c3f25805df2157af6c216146e304cffed73 Mon Sep 17 00:00:00 2001 From: Aleksandr Razumov Date: Wed, 13 Sep 2023 20:53:49 +0300 Subject: [PATCH] feat: apply optimizations --- .github/workflows/ci.yml | 6 ++--- go.mod | 3 ++- go.sum | 3 +++ tail.go | 54 ++++++++++++++++++++++++++-------------- 4 files changed, 44 insertions(+), 22 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4fed6de..6477e89 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -20,15 +20,15 @@ jobs: runner: - ubuntu-latest go: - - 1.18 + - 1.21.x include: - arch: 386 runner: ubuntu-latest - go: 1.18 + go: 1.21.x - arch: amd64 runner: ubuntu-latest flags: "-race" - go: 1.18 + go: 1.21.x steps: - name: Checkout code uses: actions/checkout@v4 diff --git a/go.mod b/go.mod index 8a7defa..100b616 100644 --- a/go.mod +++ b/go.mod @@ -1,11 +1,12 @@ module github.com/go-faster/tail -go 1.18 +go 1.21 require ( github.com/fsnotify/fsnotify v1.6.0 github.com/go-faster/errors v0.6.1 github.com/stretchr/testify v1.8.4 + go.uber.org/atomic v1.11.0 go.uber.org/zap v1.25.0 golang.org/x/sync v0.3.0 ) diff --git a/go.sum b/go.sum index 9614e55..a6153d7 100644 --- a/go.sum +++ b/go.sum @@ -10,7 +10,10 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= +go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= +go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo= go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ= go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.25.0 h1:4Hvk6GtkucQ790dqmj7l1eEnRdKm3k3ZUrUMS2d5+5c= diff --git a/tail.go b/tail.go index 57a3277..0225b16 100644 --- a/tail.go +++ b/tail.go @@ -11,6 +11,7 @@ import ( "github.com/fsnotify/fsnotify" "github.com/go-faster/errors" + "go.uber.org/atomic" "go.uber.org/zap" "go.uber.org/zap/zapcore" ) @@ -101,6 +102,7 @@ type Tailer struct { name string file *os.File reader *bufio.Reader + proxy *offsetProxy watcher *watcher lg *zap.Logger } @@ -132,17 +134,20 @@ func File(filename string, cfg Config) *Tailer { } } -// offset returns the file's current offset and error if any. -// -// NB: can be not accurate -// TODO(ernado): what does it mean exactly? -func (t *Tailer) offset() (offset int64, err error) { - offset, err = t.file.Seek(0, io.SeekCurrent) - if err != nil { - return offset, errors.Wrap(err, "seek") - } - offset -= int64(t.reader.Buffered()) - return offset, nil +type offsetProxy struct { + Reader io.Reader + Offset int64 +} + +func (o *offsetProxy) Read(p []byte) (n int, err error) { + n, err = o.Reader.Read(p) + o.Offset += int64(n) + return n, err +} + +// offset returns the file's current offset. +func (t *Tailer) offset() int64 { + return t.proxy.Offset - int64(t.reader.Buffered()) } func (t *Tailer) closeFile() { @@ -174,6 +179,14 @@ func (t *Tailer) openFile(ctx context.Context) error { } return errors.Wrap(err, "open") } + offset, err := t.file.Seek(0, io.SeekCurrent) + if err != nil { + return errors.Wrap(err, "seek") + } + t.proxy = &offsetProxy{ + Reader: t.file, + Offset: offset, + } return nil } } @@ -228,17 +241,20 @@ func (t *Tailer) Tail(ctx context.Context, h Handler) error { Data: make([]byte, 0, t.cfg.BufferSize), } + // Reduce lock contention. + var done atomic.Bool + go func() { + <-ctx.Done() + done.Store(true) + }() + for { - if err := ctx.Err(); err != nil { + if done.Load() { return ctx.Err() } // Grab the offset in case we need to back up in the event of a half-line. - offset, err := t.offset() - if err != nil { - return errors.Wrap(err, "offset") - } - + offset := t.offset() line.Offset = offset if e := t.lg.Check(zapcore.DebugLevel, "Offset"); e != nil { e.Write(zap.Int64("offset", offset)) @@ -332,4 +348,6 @@ func (t *Tailer) waitForChanges(ctx context.Context, pos int64) error { return nil } -func (t *Tailer) resetReader() { t.reader = bufio.NewReaderSize(t.file, t.cfg.BufferSize) } +func (t *Tailer) resetReader() { + t.reader = bufio.NewReaderSize(t.proxy, t.cfg.BufferSize) +}