Skip to content

Commit

Permalink
Merge pull request #602 from redHJ/rawdata
Browse files Browse the repository at this point in the history
1. close reader 2. raw data with mock meta
  • Loading branch information
wonderflow committed Jul 13, 2018
2 parents c36c8b7 + 0889b6b commit 7bdb842
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 9 deletions.
21 changes: 19 additions & 2 deletions mgr/dataflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,15 @@ import (
"errors"
"fmt"
"io"
"os"
"path"
"strconv"
"strings"
"time"

"github.com/json-iterator/go"

"github.com/qiniu/log"
"github.com/qiniu/pandora-go-sdk/base/reqerr"

"github.com/qiniu/logkit/conf"
Expand All @@ -20,19 +25,31 @@ import (
. "github.com/qiniu/logkit/utils/models"
)

const DefaultTryTimes = 3
const (
DefaultTryTimes = 3
MetaTmp = "meta_tmp/"
)

// RawData 从 reader 模块中根据 type 获取字符串形式的样例日志
func RawData(readerConfig conf.MapConf) (string, error) {
if readerConfig == nil {
return "", fmt.Errorf("reader config cannot be empty")
}

runnerName, _ := readerConfig.GetString(GlobalKeyName)
configMetaPath := runnerName + "_" + Hash(strconv.FormatInt(time.Now().Unix(), 10))
metaPath := path.Join(MetaTmp, configMetaPath)
log.Debugf("Runner[%v] Using %s as default metaPath", runnerName, metaPath)
readerConfig[reader.KeyMetaPath] = metaPath

rd, err := reader.NewReader(readerConfig, true)
if err != nil {
return "", err
}
defer rd.Close()
defer func() {
rd.Close()
os.RemoveAll(metaPath)
}()

var rawData string

Expand Down
10 changes: 5 additions & 5 deletions mgr/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,16 +232,16 @@ func NewLogExportRunner(rc RunnerConfig, cleanChan chan<- cleaner.CleanSignal, r
if err != nil {
return nil, err
}
defer func() {
if err != nil && rd != nil {
rd.Close()
}
}()
if len(rc.CleanerConfig) > 0 {
rd, err = rr.NewReaderWithMeta(rc.ReaderConfig, meta, false)
if err != nil {
return nil, err
}
defer func() {
if err != nil {
rd.Close()
}
}()
cl, err = cleaner.NewCleaner(rc.CleanerConfig, meta, cleanChan, meta.LogPath())
if err != nil {
return nil, err
Expand Down
7 changes: 5 additions & 2 deletions reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,12 +265,15 @@ func (reg *Registry) NewReaderWithMeta(conf conf.MapConf, meta *Meta, errDirectR

reader, err = constructor(meta, conf)
if err != nil {
return
return nil, err
}
if headPattern != "" {
err = reader.SetMode(ReadModeHeadPatternString, headPattern)
if err != nil {
return nil, err
}
}
return
return reader, nil
}

func NewFileDirReader(meta *Meta, conf conf.MapConf) (reader Reader, err error) {
Expand Down

0 comments on commit 7bdb842

Please sign in to comment.