diff --git a/pkg/source/file/watch.go b/pkg/source/file/watch.go index 47be0e12f..51c2e6ff3 100644 --- a/pkg/source/file/watch.go +++ b/pkg/source/file/watch.go @@ -45,6 +45,7 @@ type jobEvent struct { opt Operation job *Job newFilename string + newFileSize int64 } type Watcher struct { @@ -212,11 +213,10 @@ func (w *Watcher) reportMetric(job *Job) { PipelineName: job.task.pipelineName, SourceName: job.task.sourceName, }, - FileName: job.filename, - Offset: job.endOffset, - LineNumber: job.currentLineNumber, - Lines: job.currentLines, - // FileSize: fileSize, + FileName: job.filename, + Offset: job.endOffset, + LineNumber: job.currentLineNumber, + Lines: job.currentLines, SourceFields: job.task.sourceFields, } job.currentLines = 0 @@ -249,23 +249,47 @@ func (w *Watcher) eventBus(e jobEvent) { } // only care about zombie job write event watchJobId := job.WatchUid() - if existJob, ok := w.zombieJobs[watchJobId]; ok { - err, fdOpen := existJob.Active() - if fdOpen { - w.currentOpenFds++ - } - if err != nil { - log.Error("active job fileName(%s) fail: %s", filename, err) - if existJob.Release() { - w.currentOpenFds-- + existJob, ok := w.zombieJobs[watchJobId] + if !ok { + return + } + + // check whether the file size is less than the offset in the job + filesize := e.newFileSize + currentOffset := job.endOffset + if filesize < currentOffset { + // maybe the file is truncated + log.Info("filesize: %d, currentOffset: %d", filesize, currentOffset) + existRegistry := w.findExistJobRegistry(job) + existAckOffset := existRegistry.Offset + if existAckOffset > filesize+int64(len(job.GetEncodeLineEnd())) { + log.Warn("the job(jobUid:%s) fileName(%s) existRegistry(%+v) ackOffset is larger than file size(%d), the file was truncate", job.Uid(), filename, existRegistry, filesize) + // file was truncated, need to reinitialize the job + job.Delete() + if w.isZombieJob(job) { + w.finalizeJob(job) } return } - existJob.Read() - // zombie job change to active, so without os notify - w.removeOsNotify(existJob.filename) - delete(w.zombieJobs, watchJobId) } + + err, fdOpen := existJob.Active() + if fdOpen { + w.currentOpenFds++ + } + if err != nil { + log.Error("active job fileName(%s) fail: %s", filename, err) + if existJob.Release() { + w.currentOpenFds-- + } + return + } + existJob.Read() + // zombie job change to active, so without os notify + w.removeOsNotify(existJob.filename) + log.Debug("job fileName(%s) change to active", filename) + delete(w.zombieJobs, watchJobId) + case CREATE: if w.currentOpenFds >= w.config.MaxOpenFds { log.Error("maxCollectFiles reached. fileName(%s) will be ignore", filename) @@ -557,12 +581,7 @@ func (w *Watcher) scanActiveJob() { } } -// check zombie job: -// 0. final status -// 1. remove -// 2. fd hold timeout,release fd -// 3. write -// 4. truncated file +// check zombie job func (w *Watcher) scanZombieJob() { for _, job := range w.zombieJobs { if job.IsDelete() { @@ -571,8 +590,6 @@ func (w *Watcher) scanZombieJob() { } filename := job.filename stat, err := os.Stat(filename) - //var stat os.FileInfo - //var err error var checkRemove = func() bool { if err != nil { if os.IsNotExist(err) { @@ -621,8 +638,9 @@ func (w *Watcher) scanZombieJob() { job.nextOffset = 0 job.currentLineNumber = 0 w.eventBus(jobEvent{ - opt: WRITE, - job: job, + opt: WRITE, + job: job, + newFileSize: size, }) continue } @@ -648,8 +666,9 @@ func (w *Watcher) scanZombieJob() { size := stat.Size() if size > job.nextOffset && !job.task.config.IsIgnoreOlder(stat) { w.eventBus(jobEvent{ - opt: WRITE, - job: job, + opt: WRITE, + job: job, + newFileSize: size, }) continue } @@ -713,7 +732,6 @@ func (w *Watcher) run() { case job := <-w.zombieJobChan: w.decideZombieJob(job) case e := <-osEvents: - // log.Info("os event: %v", e) w.osNotify(e) case <-scanFileTicker.C: w.scan() @@ -802,7 +820,6 @@ func (w *Watcher) decideZombieJob(job *Job) { } func (w *Watcher) osNotify(e fsnotify.Event) { - log.Debug("received os notify: %+v", e) if e.Op == fsnotify.Chmod { // File writing will also be received. Ignore it. Only check whether you have read permission when the file job is activated (job. Active()) return @@ -819,6 +836,7 @@ func (w *Watcher) osNotify(e fsnotify.Event) { if e.Op == fsnotify.Rename { return } + log.Debug("received os notify: %+v", e) fileName := e.Name if ignoreSystemFile(fileName) { @@ -856,8 +874,9 @@ func (w *Watcher) osNotify(e fsnotify.Event) { for _, existJob := range w.allJobs { if existJob.Uid() == jobUid { w.eventBus(jobEvent{ - opt: WRITE, - job: existJob, + opt: WRITE, + job: existJob, + newFileSize: stat.Size(), }) } } @@ -1080,7 +1099,7 @@ func (w *Watcher) handleRemoveJobs(jobs ...*Job) { JobUid: jt.Uid(), Filename: jt.filename, } - log.Info("try to delete registry(%+v) because CleanWhenRemoved. deleteTime: %s", r, jt.deleteTime.Load().(time.Time).Format(persistence.TimeFormatPattern)) + log.Info("try to delete registry(%+v). deleteTime: %s", r, jt.deleteTime.Load().(time.Time).Format(persistence.TimeFormatPattern)) w.dbHandler.HandleOpt(persistence.DbOpt{ R: r, OptType: persistence.DeleteByJobUidOpt, diff --git a/pkg/util/persistence/driver/sqlite.go b/pkg/util/persistence/driver/sqlite.go index c481a479f..9c1a44083 100644 --- a/pkg/util/persistence/driver/sqlite.go +++ b/pkg/util/persistence/driver/sqlite.go @@ -162,11 +162,10 @@ func (e *Engine) DeleteBy(jobUid string, sourceName string, pipelineName string) if err != nil { resErr = errors.WithMessagef(err, "%s stmt exec fail", e.String()) } - affected, err := result.RowsAffected() + _, err = result.RowsAffected() if err != nil { resErr = errors.WithMessagef(err, "%s get result fail", e.String()) } - log.Info("delete registry(jobUid:%s, sourceName:%s, pipelineName:%s). affected: %d", jobUid, sourceName, pipelineName, affected) }) return resErr } diff --git a/pkg/util/persistence/persistence.go b/pkg/util/persistence/persistence.go index bcb55c9bd..2e55515af 100644 --- a/pkg/util/persistence/persistence.go +++ b/pkg/util/persistence/persistence.go @@ -258,16 +258,15 @@ func (d *DbHandler) updateFileName(registries []reg.Registry) { func (d *DbHandler) upsertOffsetByJobWatchId(r reg.Registry) { r.CollectTime = time2text(time.Now()) r.Version = api.VERSION - rs := []reg.Registry{r} or := d.FindBy(r.JobUid, r.SourceName, r.PipelineName) if or.JobUid != "" { // update r.Id = or.Id - d.updateRegistry(rs) + d.updateRegistry([]reg.Registry{r}) } else { // insert - d.insertRegistry(rs) + d.insertRegistry([]reg.Registry{r}) } } @@ -276,7 +275,7 @@ func (d *DbHandler) delete(r reg.Registry) { log.Error("%s fail to delete registry %s : %s", d.String(), r.Key(), err) return } - log.Info("delete registry %s because db.cleanInactiveTimeout(%dh) reached. file: %s", r.Key(), d.config.CleanInactiveTimeout/time.Hour, r.Filename) + log.Info("delete registry %s. file: %s", r.Key(), r.Filename) } func (d *DbHandler) deleteRemoved(rs []reg.Registry) { @@ -300,6 +299,7 @@ func (d *DbHandler) cleanData() { t := text2time(collectTime) if time.Since(t) >= d.config.CleanInactiveTimeout { // delete + log.Info("clean inactive registry: %s because CleanInactiveTimeout(%dh) reached ", r.Key(), d.config.CleanInactiveTimeout/time.Hour) d.delete(r) } }