Skip to content

Commit

Permalink
Merge pull request #601 from wonderflow/fix1
Browse files Browse the repository at this point in the history
1. change kodo retention default to 0 and give option
  • Loading branch information
wonderflow authored Jul 12, 2018
2 parents a88b397 + 8889362 commit c36c8b7
Show file tree
Hide file tree
Showing 18 changed files with 218 additions and 84 deletions.
10 changes: 8 additions & 2 deletions sender/fault_tolerant.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,18 +333,24 @@ func (ft *FtSender) trySendDatas(datas []Data, failSleep int, isRetry bool) (bac
func (ft *FtSender) handleSendError(err error, datas []Data) (retDatasContext []*datasContext) {
failCtx := new(datasContext)
var binaryUnpack bool
var errMessage string
se, succ := err.(*reqerr.SendError)
if !succ {
// 如果不是SendError 默认所有的数据都发送失败
log.Infof("Runner[%v] Sender[%v] error type is not *SendError! reSend all datas by default", ft.runnerName, ft.innerSender.Name())
errMessage = "error type is not *SendError! reSend all datas by default"
failCtx.Datas = datas
} else {
failCtx.Datas = ConvertDatas(se.GetFailDatas())
if se.ErrorType == reqerr.TypeBinaryUnpack {
binaryUnpack = true
errMessage = "error type is binaryUnpack, will divid to 2 parts and retry"
} else if se.ErrorType == reqerr.TypeSchemaFreeRetry {
errMessage = "maybe this is because of server schema cache, will send all data again"
} else {
errMessage = "error type is default, will send all data again"
}
}
log.Errorf("Runner[%v] Sender[%v] cannot write points: %v, failDatas size: %v, binaryUnpack: %v", ft.runnerName, ft.innerSender.Name(), err, len(failCtx.Datas), binaryUnpack)
log.Errorf("Runner[%v] Sender[%v] cannot write points: %v, failDatas size: %v, %s", ft.runnerName, ft.innerSender.Name(), err, len(failCtx.Datas), errMessage)
log.Debugf("Runner[%v] Sender[%v] failed datas [[%v]]", ft.runnerName, ft.innerSender.Name(), failCtx.Datas)
if binaryUnpack {
lens := len(failCtx.Datas) / 2
Expand Down
50 changes: 46 additions & 4 deletions sender/fault_tolerant/fault_tolerant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestFtSender(t *testing.T) {
assert.NoError(t, err)
datas := []Data{
{"ab": "abcccc"},
{"ab": "E18111:BackupQueue.Depth"},
{"ab": "E18110:BackupQueue.Depth"},
}
err = fts.Send(datas)
se, ok := err.(*StatsError)
Expand Down Expand Up @@ -101,7 +101,7 @@ func TestFtSender(t *testing.T) {
defer os.RemoveAll(ftTestDir3)
assert.NoError(t, err)
datas3 := []Data{
{"ab": "E18111:"},
{"ab": "E18110:"},
}
err = fts3.Send(datas3)
se, ok = err.(*StatsError)
Expand Down Expand Up @@ -150,12 +150,12 @@ func TestFtMemorySender(t *testing.T) {
assert.NoError(t, err)
datas := []Data{
{"ab": "abcccc"},
{"ab": "E18111:BackupQueue.Depth"},
{"ab": "E18110:BackupQueue.Depth"},
}
err = fts.Send(datas)
se, ok := err.(*StatsError)
if !ok {
t.Fatal("ft send return error should .(*SendError)")
t.Fatal("ft send return error should .(*StatsError)")
}
assert.NoError(t, se.ErrorDetail)
time.Sleep(10 * time.Second)
Expand Down Expand Up @@ -471,3 +471,45 @@ func Test_SplitData(t *testing.T) {
assert.Equal(t, len(maxData), len(strings.Join(valArray, "")))
assert.Equal(t, 1, len(valArray))
}

func TestTypeSchemaRetry(t *testing.T) {
_, pt := mock_pandora.NewMockPandoraWithPrefix("/v2")
pandoraSenderConfig := conf.MapConf{
"name": "p",
"pandora_region": "nb",
"pandora_host": "http://127.0.0.1:" + pt,
"pandora_schema": "ab",
"pandora_auto_create": "ab *s",
"pandora_schema_free": "true",
"pandora_ak": "ak",
"pandora_sk": "sk",
"pandora_schema_update_interval": "1",
"pandora_gzip": "false",

"sender_type": "pandora",
}
pandoraSenderConfig["pandora_repo_name"] = "TestTypeSchemaRetry"
s, err := pandora.NewSender(pandoraSenderConfig)
if err != nil {
t.Fatal(err)
}
mp := conf.MapConf{}
mp[sender.KeyFtSaveLogPath] = fttestdir
mp[sender.KeyFtStrategy] = sender.KeyFtStrategyBackupOnly
defer os.RemoveAll(fttestdir)
fts, err := sender.NewFtSender(s, mp, fttestdir)
assert.NoError(t, err)
datas := []Data{
{"ab": "abcccc"},
{"ab": "E18111:"},
}
err = fts.Send(datas)
_, ok := err.(*StatsError)
if !ok {
t.Fatal("ft send return error should .(*StatsError)")
}
time.Sleep(5 * time.Second)
if fts.BackupQueue.Depth() != 1 {
t.Error("Ft send error exp 1 but got ", fts.BackupQueue.Depth())
}
}
20 changes: 14 additions & 6 deletions sender/mock_pandora/mock_pandora.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,27 +144,35 @@ func (s *mock_pandora) PostRepos_Data() echo.HandlerFunc {
s.BodyMux.Lock()
defer s.BodyMux.Unlock()
s.Body = strings.Join(sep, " ")
if strings.Contains(s.Body, "E18111:BackupQueue.Depth") {
return c.JSON(http.StatusNotFound, NewErrorResponse(errors.New("E18111 mock_pandora error")))

if strings.Contains(s.Body, "E18111:") {
c.Response().Header().Set(ContentTypeHeader, ApplicationJson)
c.Response().WriteHeader(http.StatusNotFound)

return jsoniter.NewEncoder(c.Response()).Encode(map[string]string{"error": "E18111: One or more field keys do not resident in the specified repo schema: no such field key <this is mock pandora error>"})
}

if strings.Contains(s.Body, "E18110:BackupQueue.Depth") {
return c.JSON(http.StatusNotFound, NewErrorResponse(errors.New("E18110: mock_pandora error")))
}

if strings.Contains(s.Body, "E18111") {
if strings.Contains(s.Body, "E18110") {
log.Println("get datas: ", s.Body)
c.Response().Header().Set(ContentTypeHeader, ApplicationJson)
c.Response().WriteHeader(http.StatusNotFound)
return jsoniter.NewEncoder(c.Response()).Encode(map[string]string{"error": "E18111 mock_pandora error"})
return jsoniter.NewEncoder(c.Response()).Encode(map[string]string{"error": "E18110: mock_pandora error"})
}

if len(s.Body) > DefaultMaxBatchSize {
c.Response().Header().Set(ContentTypeHeader, ApplicationJson)
c.Response().WriteHeader(http.StatusNotFound)
return jsoniter.NewEncoder(c.Response()).Encode(map[string]string{"error": "E18005 mock_pandora error"})
return jsoniter.NewEncoder(c.Response()).Encode(map[string]string{"error": "E18005: mock_pandora error"})
}

if strings.Contains(s.Body, "typeBinaryUnpack") && !strings.Contains(s.Body, KeyPandoraStash) {
c.Response().Header().Set(ContentTypeHeader, ApplicationJson)
c.Response().WriteHeader(http.StatusBadRequest)
return jsoniter.NewEncoder(c.Response()).Encode(map[string]string{"error": "E18111 mock_pandora error"})
return jsoniter.NewEncoder(c.Response()).Encode(map[string]string{"error": "E18110: mock_pandora error"})
}
s.PostDataNum++
return nil
Expand Down
7 changes: 5 additions & 2 deletions sender/pandora/pandora.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ type PandoraOption struct {
kodoRotateStrategy string
kodoRotateInterval int
kodoRotateSize int
kodoFileRetention int

forceMicrosecond bool
forceDataConvert bool
Expand Down Expand Up @@ -201,6 +202,7 @@ func NewSender(conf conf.MapConf) (pandoraSender sender.Sender, err error) {
kodoRotateSize, _ := conf.GetIntOr(sender.KeyPandoraKodoRotateSize, 500*1024)
kodoRotateSize = kodoRotateSize * 1024
kodoRotateInterval, _ := conf.GetIntOr(sender.KeyPandoraKodoRotateInterval, 10*60)
kodoFileRetention, _ := conf.GetIntOr(sender.KeyPandoraKodoFileRetention, 0)

forceconvert, _ := conf.GetBoolOr(sender.KeyForceDataConvert, false)
ignoreInvalidField, _ := conf.GetBoolOr(sender.KeyIgnoreInvalidField, true)
Expand Down Expand Up @@ -274,6 +276,7 @@ func NewSender(conf conf.MapConf) (pandoraSender sender.Sender, err error) {
kodoRotateStrategy: kodoRotateStrategy,
kodoRotateInterval: kodoRotateInterval,
kodoRotateSize: kodoRotateSize,
kodoFileRetention: kodoFileRetention,

forceMicrosecond: forceMicrosecond,
forceDataConvert: forceconvert,
Expand Down Expand Up @@ -511,7 +514,7 @@ func newPandoraSender(opt *PandoraOption) (s *Sender, err error) {
},
ToKODO: s.opt.enableKodo,
AutoExportToKODOInput: pipeline.AutoExportToKODOInput{
Retention: 30,
Retention: s.opt.kodoFileRetention,
RepoName: s.opt.repoName,
BucketName: s.opt.bucketName,
Email: s.opt.email,
Expand Down Expand Up @@ -1035,7 +1038,7 @@ func (s *Sender) schemaFreeSend(datas []Data) (se error) {
},
ToKODO: s.opt.enableKodo,
AutoExportToKODOInput: pipeline.AutoExportToKODOInput{
Retention: 30,
Retention: s.opt.kodoFileRetention,
RepoName: s.opt.repoName,
BucketName: s.opt.bucketName,
Email: s.opt.email,
Expand Down
17 changes: 14 additions & 3 deletions sender/rest_senders_models.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sender

import (
. "github.com/qiniu/logkit/utils/models"
"github.com/qiniu/pandora-go-sdk/base/config"
)

// ModeUsages 用途说明
Expand Down Expand Up @@ -161,7 +162,7 @@ var ModeKeyOptions = map[string][]Option{
{
KeyName: KeyPandoraHost,
ChooseOnly: false,
Default: "https://pipeline.qiniuapi.com",
Default: config.DefaultPipelineEndpoint,
DefaultNoUse: false,
Description: "大数据平台域名(pandora_host)",
Advance: true,
Expand Down Expand Up @@ -228,7 +229,7 @@ var ModeKeyOptions = map[string][]Option{
{
KeyName: KeyPandoraLogDBHost,
ChooseOnly: false,
Default: "https://insight.qiniuapi.com",
Default: config.DefaultLogDBEndpoint,
DefaultNoUse: false,
Description: "日志分析域名[私有部署才修改](pandora_logdb_host)",
Advance: true,
Expand Down Expand Up @@ -283,7 +284,7 @@ var ModeKeyOptions = map[string][]Option{
{
KeyName: KeyPandoraTSDBHost,
ChooseOnly: false,
Default: "https://tsdb.qiniuapi.com",
Default: config.DefaultTSDBEndpoint,
DefaultNoUse: false,
Description: "时序数据库域名[私有部署才修改](pandora_tsdb_host)",
Advance: true,
Expand Down Expand Up @@ -385,6 +386,16 @@ var ModeKeyOptions = map[string][]Option{
AdvanceDepend: KeyPandoraEnableKodo,
Advance: true,
},
{
KeyName: KeyPandoraKodoFileRetention,
ChooseOnly: false,
Default: "0",
DefaultNoUse: false,
Description: "云存储文件保存时间(pandora_kodo_file_retention)(单位天,0为永久保存)",
ToolTip: "导出到云存储的文件保存时间,数字表示存的天数,0为永久保存",
AdvanceDepend: KeyPandoraEnableKodo,
Advance: true,
},
{
KeyName: KeyPandoraGzip,
Element: Radio,
Expand Down
1 change: 1 addition & 0 deletions sender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const (
KeyPandoraKodoRotateStrategy = "pandora_kodo_rotate_strategy"
KeyPandoraKodoRotateInterval = "pandora_kodo_rotate_interval"
KeyPandoraKodoRotateSize = "pandora_kodo_rotate_size"
KeyPandoraKodoFileRetention = "pandora_kodo_file_retention"

KeyPandoraEmail = "qiniu_email"

Expand Down
6 changes: 3 additions & 3 deletions vendor/github.com/qiniu/pandora-go-sdk/base/config/config.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions vendor/github.com/qiniu/pandora-go-sdk/base/const.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions vendor/github.com/qiniu/pandora-go-sdk/base/reqerr/error.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions vendor/github.com/qiniu/pandora-go-sdk/logdb/error.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 32 additions & 1 deletion vendor/github.com/qiniu/pandora-go-sdk/pipeline/api.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion vendor/github.com/qiniu/pandora-go-sdk/pipeline/error.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit c36c8b7

Please sign in to comment.