Skip to content

Commit

Permalink
Merge pull request #670 from Unknwon/daemon-reader-start-sql
Browse files Browse the repository at this point in the history
reader/sql: improve status check and implement DaemonReader
  • Loading branch information
wonderflow authored Jul 30, 2018
2 parents bcace8b + 8378a54 commit bf1d119
Show file tree
Hide file tree
Showing 23 changed files with 1,912 additions and 1,464 deletions.
22 changes: 21 additions & 1 deletion reader/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"path/filepath"
"strconv"
"strings"
"sync"
"time"

"github.com/json-iterator/go"
Expand Down Expand Up @@ -71,7 +72,8 @@ type Meta struct {
RunnerName string
extrainfo map[string]string

subMetas map[string]*Meta //对于tailx模式的情况会有嵌套的meta
subMetaLock sync.RWMutex
subMetas map[string]*Meta //对于tailx模式的情况会有嵌套的meta
}

func getValidDir(dir string) (realPath string, err error) {
Expand Down Expand Up @@ -195,6 +197,9 @@ func NewMetaWithConf(conf conf.MapConf) (meta *Meta, err error) {
}

func (m *Meta) AddSubMeta(key string, meta *Meta) error {
m.subMetaLock.Lock()
defer m.subMetaLock.Unlock()

if m.subMetas == nil {
m.subMetas = make(map[string]*Meta)
}
Expand All @@ -206,6 +211,9 @@ func (m *Meta) AddSubMeta(key string, meta *Meta) error {
}

func (m *Meta) RemoveSubMeta(key string) {
m.subMetaLock.Lock()
defer m.subMetaLock.Unlock()

delete(m.subMetas, key)
}

Expand Down Expand Up @@ -547,7 +555,11 @@ func (m *Meta) GetDoneFiles() ([]File, error) {
if err != nil {
return nil, err
}

//submeta
m.subMetaLock.RLock()
defer m.subMetaLock.RUnlock()

for _, mv := range m.subMetas {
newfiles, err := mv.GetDoneFiles()
if err != nil {
Expand Down Expand Up @@ -588,6 +600,10 @@ func (m *Meta) SetEncodingWay(e string) {
if e != "UTF-8" {
m.encodingWay = e
}

m.subMetaLock.RLock()
defer m.subMetaLock.RUnlock()

for _, mv := range m.subMetas {
mv.SetEncodingWay(e)
}
Expand Down Expand Up @@ -644,6 +660,10 @@ func (m *Meta) Reset() error {
}
}
}

m.subMetaLock.RLock()
defer m.subMetaLock.RUnlock()

for key, mv := range m.subMetas {
err := mv.Reset()
if err != nil {
Expand Down
Loading

0 comments on commit bf1d119

Please sign in to comment.