diff --git a/components/grpc/ammo/ammo.go b/components/grpc/ammo/ammo.go new file mode 100644 index 000000000..3defba11a --- /dev/null +++ b/components/grpc/ammo/ammo.go @@ -0,0 +1,39 @@ +// Copyright (c) 2017 Yandex LLC. All rights reserved. +// Use of this source code is governed by a MPL 2.0 +// license that can be found in the LICENSE file. +// Author: Vladimir Skipor + +package ammo + +type Ammo struct { + Tag string `json:"tag"` + Call string `json:"call"` + Metadata map[string]string `json:"metadata"` + Payload map[string]interface{} `json:"payload"` + id int + isInvalid bool +} + +func (a *Ammo) Reset(tag string, call string, metadata map[string]string, payload map[string]interface{}) { + *a = Ammo{tag, call, metadata, payload, -1, false} +} + +func (a *Ammo) SetID(id int) { + a.id = id +} + +func (a *Ammo) ID() int { + return a.id +} + +func (a *Ammo) Invalidate() { + a.isInvalid = true +} + +func (a *Ammo) IsInvalid() bool { + return a.isInvalid +} + +func (a *Ammo) IsValid() bool { + return !a.isInvalid +} diff --git a/components/grpc/ammo/grpcjson/provider.go b/components/grpc/ammo/grpcjson/provider.go new file mode 100644 index 000000000..79c3e5dcd --- /dev/null +++ b/components/grpc/ammo/grpcjson/provider.go @@ -0,0 +1,102 @@ +// Copyright (c) 2017 Yandex LLC. All rights reserved. +// Use of this source code is governed by a MPL 2.0 +// license that can be found in the LICENSE file. +// Author: Vladimir Skipor + +package grpcjson + +import ( + "bufio" + "context" + + "github.com/pkg/errors" + "go.uber.org/zap" + + "github.com/yandex/pandora/components/grpc/ammo" + + jsoniter "github.com/json-iterator/go" + "github.com/spf13/afero" +) + +func NewProvider(fs afero.Fs, conf Config) *Provider { + var p Provider + if conf.Source.Path != "" { + conf.File = conf.Source.Path + } + p = Provider{ + Provider: ammo.NewProvider(fs, conf.File, p.start), + Config: conf, + } + return &p +} + +type Provider struct { + ammo.Provider + Config + log *zap.Logger +} + +type Source struct { + Type string + Path string +} + +type Config struct { + File string //`validate:"required"` + // Limit limits total num of ammo. Unlimited if zero. + Limit int `validate:"min=0"` + // Passes limits ammo file passes. Unlimited if zero. + Passes int `validate:"min=0"` + ContinueOnError bool + //Maximum number of byte in an ammo. Default is bufio.MaxScanTokenSize + MaxAmmoSize int + Source Source `config:"source"` +} + +func (p *Provider) start(ctx context.Context, ammoFile afero.File) error { + var ammoNum, passNum int + for { + passNum++ + scanner := bufio.NewScanner(ammoFile) + if p.Config.MaxAmmoSize != 0 { + var buffer []byte + scanner.Buffer(buffer, p.Config.MaxAmmoSize) + } + for line := 1; scanner.Scan() && (p.Limit == 0 || ammoNum < p.Limit); line++ { + data := scanner.Bytes() + a, err := decodeAmmo(data, p.Pool.Get().(*ammo.Ammo)) + if err != nil { + if p.Config.ContinueOnError { + a.Invalidate() + } else { + return errors.Wrapf(err, "failed to decode ammo at line: %v; data: %q", line, data) + } + } + ammoNum++ + select { + case p.Sink <- a: + case <-ctx.Done(): + return nil + } + } + if p.Passes != 0 && passNum >= p.Passes { + break + } + _, err := ammoFile.Seek(0, 0) + if err != nil { + return errors.Wrap(err, "Failed to seek ammo file") + } + } + return nil +} + +func decodeAmmo(jsonDoc []byte, am *ammo.Ammo) (*ammo.Ammo, error) { + var ammo ammo.Ammo + err := jsoniter.Unmarshal(jsonDoc, &ammo) + if err != nil { + return am, errors.WithStack(err) + } + + am.Reset(ammo.Tag, ammo.Call, ammo.Metadata, ammo.Payload) + return am, nil +} diff --git a/components/grpc/ammo/provider.go b/components/grpc/ammo/provider.go new file mode 100644 index 000000000..e390b1947 --- /dev/null +++ b/components/grpc/ammo/provider.go @@ -0,0 +1,63 @@ +// Copyright (c) 2017 Yandex LLC. All rights reserved. +// Use of this source code is governed by a MPL 2.0 +// license that can be found in the LICENSE file. +// Author: Vladimir Skipor + +package ammo + +import ( + "context" + "sync" + + "github.com/pkg/errors" + "github.com/spf13/afero" + "go.uber.org/atomic" + + "github.com/yandex/pandora/core" +) + +func NewProvider(fs afero.Fs, fileName string, start func(ctx context.Context, file afero.File) error) Provider { + return Provider{ + fs: fs, + fileName: fileName, + start: start, + Sink: make(chan *Ammo, 128), + Pool: sync.Pool{New: func() interface{} { return &Ammo{} }}, + Close: func() {}, + } +} + +type Provider struct { + fs afero.Fs + fileName string + start func(ctx context.Context, file afero.File) error + Sink chan *Ammo + Pool sync.Pool + idCounter atomic.Int64 + Close func() + core.ProviderDeps +} + +func (p *Provider) Acquire() (core.Ammo, bool) { + ammo, ok := <-p.Sink + if ok { + ammo.SetID(int(p.idCounter.Inc() - 1)) + } + return ammo, ok +} + +func (p *Provider) Release(a core.Ammo) { + p.Pool.Put(a) +} + +func (p *Provider) Run(ctx context.Context, deps core.ProviderDeps) error { + defer p.Close() + p.ProviderDeps = deps + defer close(p.Sink) + file, err := p.fs.Open(p.fileName) + if err != nil { + return errors.Wrap(err, "failed to open ammo file") + } + defer file.Close() + return p.start(ctx, file) +} diff --git a/components/grpc/core.go b/components/grpc/core.go index cb12d543b..5fa5a9340 100644 --- a/components/grpc/core.go +++ b/components/grpc/core.go @@ -13,6 +13,7 @@ import ( "google.golang.org/grpc/credentials" reflectpb "google.golang.org/grpc/reflection/grpc_reflection_v1alpha" + "github.com/yandex/pandora/components/grpc/ammo" "github.com/yandex/pandora/core" "github.com/yandex/pandora/core/aggregator/netsample" @@ -26,13 +27,6 @@ import ( "google.golang.org/grpc/status" ) -type Ammo struct { - Tag string `json:"tag"` - Call string `json:"call"` - Metadata map[string]string `json:"metadata"` - Payload map[string]interface{} `json:"payload"` -} - type Sample struct { URL string ShootTimeSeconds float64 @@ -122,12 +116,12 @@ func (g *Gun) Bind(aggr core.Aggregator, deps core.GunDeps) error { return nil } -func (g *Gun) Shoot(ammo core.Ammo) { - customAmmo := ammo.(*Ammo) +func (g *Gun) Shoot(am core.Ammo) { + customAmmo := am.(*ammo.Ammo) g.shoot(customAmmo) } -func (g *Gun) shoot(ammo *Ammo) { +func (g *Gun) shoot(ammo *ammo.Ammo) { code := 0 sample := netsample.Acquire(ammo.Tag) diff --git a/components/grpc/import/import.go b/components/grpc/import/import.go index 9271e4281..77c362ac3 100644 --- a/components/grpc/import/import.go +++ b/components/grpc/import/import.go @@ -6,14 +6,18 @@ package example import ( + "github.com/spf13/afero" "github.com/yandex/pandora/components/grpc" + "github.com/yandex/pandora/components/grpc/ammo/grpcjson" "github.com/yandex/pandora/core" - coreimport "github.com/yandex/pandora/core/import" "github.com/yandex/pandora/core/register" ) -func Import() { - coreimport.RegisterCustomJSONProvider("grpc/json", func() core.Ammo { return &grpc.Ammo{} }) +func Import(fs afero.Fs) { + + register.Provider("grpc/json", func(conf grpcjson.Config) core.Provider { + return grpcjson.NewProvider(fs, conf) + }) register.Gun("grpc", grpc.NewGun, func() grpc.GunConfig { return grpc.GunConfig{ diff --git a/main.go b/main.go index 744067b8b..bcad8b157 100644 --- a/main.go +++ b/main.go @@ -22,7 +22,7 @@ func main() { coreimport.Import(fs) phttp.Import(fs) example.Import() - grpc.Import() + grpc.Import(fs) cli.Run() }