Skip to content

Commit

Permalink
Merge pull request #148 from yandex/arcadia
Browse files Browse the repository at this point in the history
PR from branch users/ligreen/pandora-ammo-provider1
  • Loading branch information
ligreen authored Feb 21, 2022
2 parents fd5baf4 + a78cbef commit 7064cca
Show file tree
Hide file tree
Showing 6 changed files with 216 additions and 14 deletions.
39 changes: 39 additions & 0 deletions components/grpc/ammo/ammo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright (c) 2017 Yandex LLC. All rights reserved.
// Use of this source code is governed by a MPL 2.0
// license that can be found in the LICENSE file.
// Author: Vladimir Skipor <skipor@yandex-team.ru>

package ammo

type Ammo struct {
Tag string `json:"tag"`
Call string `json:"call"`
Metadata map[string]string `json:"metadata"`
Payload map[string]interface{} `json:"payload"`
id int
isInvalid bool
}

func (a *Ammo) Reset(tag string, call string, metadata map[string]string, payload map[string]interface{}) {
*a = Ammo{tag, call, metadata, payload, -1, false}
}

func (a *Ammo) SetID(id int) {
a.id = id
}

func (a *Ammo) ID() int {
return a.id
}

func (a *Ammo) Invalidate() {
a.isInvalid = true
}

func (a *Ammo) IsInvalid() bool {
return a.isInvalid
}

func (a *Ammo) IsValid() bool {
return !a.isInvalid
}
102 changes: 102 additions & 0 deletions components/grpc/ammo/grpcjson/provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright (c) 2017 Yandex LLC. All rights reserved.
// Use of this source code is governed by a MPL 2.0
// license that can be found in the LICENSE file.
// Author: Vladimir Skipor <skipor@yandex-team.ru>

package grpcjson

import (
"bufio"
"context"

"github.com/pkg/errors"
"go.uber.org/zap"

"github.com/yandex/pandora/components/grpc/ammo"

jsoniter "github.com/json-iterator/go"
"github.com/spf13/afero"
)

func NewProvider(fs afero.Fs, conf Config) *Provider {
var p Provider
if conf.Source.Path != "" {
conf.File = conf.Source.Path
}
p = Provider{
Provider: ammo.NewProvider(fs, conf.File, p.start),
Config: conf,
}
return &p
}

type Provider struct {
ammo.Provider
Config
log *zap.Logger
}

type Source struct {
Type string
Path string
}

type Config struct {
File string //`validate:"required"`
// Limit limits total num of ammo. Unlimited if zero.
Limit int `validate:"min=0"`
// Passes limits ammo file passes. Unlimited if zero.
Passes int `validate:"min=0"`
ContinueOnError bool
//Maximum number of byte in an ammo. Default is bufio.MaxScanTokenSize
MaxAmmoSize int
Source Source `config:"source"`
}

func (p *Provider) start(ctx context.Context, ammoFile afero.File) error {
var ammoNum, passNum int
for {
passNum++
scanner := bufio.NewScanner(ammoFile)
if p.Config.MaxAmmoSize != 0 {
var buffer []byte
scanner.Buffer(buffer, p.Config.MaxAmmoSize)
}
for line := 1; scanner.Scan() && (p.Limit == 0 || ammoNum < p.Limit); line++ {
data := scanner.Bytes()
a, err := decodeAmmo(data, p.Pool.Get().(*ammo.Ammo))
if err != nil {
if p.Config.ContinueOnError {
a.Invalidate()
} else {
return errors.Wrapf(err, "failed to decode ammo at line: %v; data: %q", line, data)
}
}
ammoNum++
select {
case p.Sink <- a:
case <-ctx.Done():
return nil
}
}
if p.Passes != 0 && passNum >= p.Passes {
break
}
_, err := ammoFile.Seek(0, 0)
if err != nil {
return errors.Wrap(err, "Failed to seek ammo file")
}
}
return nil
}

func decodeAmmo(jsonDoc []byte, am *ammo.Ammo) (*ammo.Ammo, error) {
var ammo ammo.Ammo
err := jsoniter.Unmarshal(jsonDoc, &ammo)
if err != nil {
return am, errors.WithStack(err)
}

am.Reset(ammo.Tag, ammo.Call, ammo.Metadata, ammo.Payload)
return am, nil
}
63 changes: 63 additions & 0 deletions components/grpc/ammo/provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright (c) 2017 Yandex LLC. All rights reserved.
// Use of this source code is governed by a MPL 2.0
// license that can be found in the LICENSE file.
// Author: Vladimir Skipor <skipor@yandex-team.ru>

package ammo

import (
"context"
"sync"

"github.com/pkg/errors"
"github.com/spf13/afero"
"go.uber.org/atomic"

"github.com/yandex/pandora/core"
)

func NewProvider(fs afero.Fs, fileName string, start func(ctx context.Context, file afero.File) error) Provider {
return Provider{
fs: fs,
fileName: fileName,
start: start,
Sink: make(chan *Ammo, 128),
Pool: sync.Pool{New: func() interface{} { return &Ammo{} }},
Close: func() {},
}
}

type Provider struct {
fs afero.Fs
fileName string
start func(ctx context.Context, file afero.File) error
Sink chan *Ammo
Pool sync.Pool
idCounter atomic.Int64
Close func()
core.ProviderDeps
}

func (p *Provider) Acquire() (core.Ammo, bool) {
ammo, ok := <-p.Sink
if ok {
ammo.SetID(int(p.idCounter.Inc() - 1))
}
return ammo, ok
}

func (p *Provider) Release(a core.Ammo) {
p.Pool.Put(a)
}

func (p *Provider) Run(ctx context.Context, deps core.ProviderDeps) error {
defer p.Close()
p.ProviderDeps = deps
defer close(p.Sink)
file, err := p.fs.Open(p.fileName)
if err != nil {
return errors.Wrap(err, "failed to open ammo file")
}
defer file.Close()
return p.start(ctx, file)
}
14 changes: 4 additions & 10 deletions components/grpc/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"google.golang.org/grpc/credentials"
reflectpb "google.golang.org/grpc/reflection/grpc_reflection_v1alpha"

"github.com/yandex/pandora/components/grpc/ammo"
"github.com/yandex/pandora/core"
"github.com/yandex/pandora/core/aggregator/netsample"

Expand All @@ -26,13 +27,6 @@ import (
"google.golang.org/grpc/status"
)

type Ammo struct {
Tag string `json:"tag"`
Call string `json:"call"`
Metadata map[string]string `json:"metadata"`
Payload map[string]interface{} `json:"payload"`
}

type Sample struct {
URL string
ShootTimeSeconds float64
Expand Down Expand Up @@ -122,12 +116,12 @@ func (g *Gun) Bind(aggr core.Aggregator, deps core.GunDeps) error {
return nil
}

func (g *Gun) Shoot(ammo core.Ammo) {
customAmmo := ammo.(*Ammo)
func (g *Gun) Shoot(am core.Ammo) {
customAmmo := am.(*ammo.Ammo)
g.shoot(customAmmo)
}

func (g *Gun) shoot(ammo *Ammo) {
func (g *Gun) shoot(ammo *ammo.Ammo) {

code := 0
sample := netsample.Acquire(ammo.Tag)
Expand Down
10 changes: 7 additions & 3 deletions components/grpc/import/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,18 @@
package example

import (
"github.com/spf13/afero"
"github.com/yandex/pandora/components/grpc"
"github.com/yandex/pandora/components/grpc/ammo/grpcjson"
"github.com/yandex/pandora/core"
coreimport "github.com/yandex/pandora/core/import"
"github.com/yandex/pandora/core/register"
)

func Import() {
coreimport.RegisterCustomJSONProvider("grpc/json", func() core.Ammo { return &grpc.Ammo{} })
func Import(fs afero.Fs) {

register.Provider("grpc/json", func(conf grpcjson.Config) core.Provider {
return grpcjson.NewProvider(fs, conf)
})

register.Gun("grpc", grpc.NewGun, func() grpc.GunConfig {
return grpc.GunConfig{
Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func main() {
coreimport.Import(fs)
phttp.Import(fs)
example.Import()
grpc.Import()
grpc.Import(fs)

cli.Run()
}

0 comments on commit 7064cca

Please sign in to comment.