Skip to content

Commit

Permalink
expr: clean from direct use config zipper/limiter
Browse files Browse the repository at this point in the history
  • Loading branch information
msaf1980 committed Dec 18, 2023
1 parent ea3cf74 commit bd29447
Show file tree
Hide file tree
Showing 220 changed files with 220 additions and 402 deletions.
10 changes: 8 additions & 2 deletions cmd/carbonapi/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ import (
"time"

"github.com/go-graphite/carbonapi/cache"
"github.com/go-graphite/carbonapi/cmd/carbonapi/interfaces"
"github.com/go-graphite/carbonapi/expr"
"github.com/go-graphite/carbonapi/limiter"
"github.com/go-graphite/carbonapi/pkg/tlsconfig"
zipperCfg "github.com/go-graphite/carbonapi/zipper/config"
zipper "github.com/go-graphite/carbonapi/zipper/interfaces"
zipperTypes "github.com/go-graphite/carbonapi/zipper/types"

"github.com/lomik/zapwriter"
Expand Down Expand Up @@ -115,7 +116,7 @@ type ConfigType struct {
DefaultTimeZone *time.Location `mapstructure:"-" json:"-"`

// ZipperInstance is API entry to carbonzipper
ZipperInstance interfaces.CarbonZipper `mapstructure:"-" json:"-"`
ZipperInstance zipper.CarbonZipper `mapstructure:"-" json:"-"`

// Limiter limits concurrent zipper requests
Limiter limiter.SimpleLimiter `mapstructure:"-" json:"-"`
Expand All @@ -131,6 +132,11 @@ func (c ConfigType) String() string {
}
}

// Init init carbonapi packages for use with config
func (c *ConfigType) Init() {
expr.Init(c.Limiter, c.ZipperInstance)
}

var Config = ConfigType{
ExtrapolateExperiment: false,
Buckets: 10,
Expand Down
1 change: 1 addition & 0 deletions cmd/carbonapi/http/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ func init() {
config.SetUpConfigUpstreams(logger)
config.SetUpConfig(logger, "(test)")
config.Config.ZipperInstance = newMockCarbonZipper()
config.Config.Init()
emptyStringList := make([]string, 0)
InitHandlers(emptyStringList, emptyStringList)
}
Expand Down
1 change: 1 addition & 0 deletions cmd/carbonapi/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func main() {
}

config.Config.ZipperInstance = newZipper(carbonapiHttp.ZipperStats, &config.Config.Upstreams, config.Config.IgnoreClientTimeout, zapwriter.Logger("zipper"))
config.Config.Init()

wg := sync.WaitGroup{}
serve := func(listen config.Listener, handler http.Handler) {
Expand Down
31 changes: 23 additions & 8 deletions expr/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,31 @@ import (
"github.com/ansel1/merry"
pb "github.com/go-graphite/protocol/carbonapi_v3_pb"

"github.com/go-graphite/carbonapi/cmd/carbonapi/config"
_ "github.com/go-graphite/carbonapi/expr/functions"
"github.com/go-graphite/carbonapi/expr/helper"
"github.com/go-graphite/carbonapi/expr/metadata"
"github.com/go-graphite/carbonapi/expr/types"
"github.com/go-graphite/carbonapi/limiter"
"github.com/go-graphite/carbonapi/pkg/parser"
utilctx "github.com/go-graphite/carbonapi/util/ctx"
zipper "github.com/go-graphite/carbonapi/zipper/interfaces"
)

type evaluator struct{}
type evaluator struct {
limiter limiter.SimpleLimiter
zipper zipper.CarbonZipper
}

func (eval evaluator) Fetch(ctx context.Context, exprs []parser.Expr, from, until int64, values map[parser.MetricRequest][]*types.MetricData) (map[parser.MetricRequest][]*types.MetricData, error) {
if err := config.Config.Limiter.Enter(ctx); err != nil {
if eval.zipper == nil {
// TODO: may be return error or not check. But for use expr in external applications without CarbonZipper implementation
return values, nil
}

if err := eval.limiter.Enter(ctx); err != nil {
return nil, err
}
defer config.Config.Limiter.Leave()
defer eval.limiter.Leave()

multiFetchRequest := pb.MultiFetchRequest{}
metricRequestCache := make(map[string]parser.MetricRequest)
Expand Down Expand Up @@ -74,7 +83,7 @@ func (eval evaluator) Fetch(ctx context.Context, exprs []parser.Expr, from, unti
}

if len(multiFetchRequest.Metrics) > 0 {
metrics, _, err := config.Config.ZipperInstance.Render(ctx, multiFetchRequest)
metrics, _, err := eval.zipper.Render(ctx, multiFetchRequest)
// If we had only partial result, we want to do our best to actually do our job
if err != nil && merry.HTTPCode(err) >= 400 && !haveFallbackSeries {
return nil, err
Expand All @@ -97,7 +106,7 @@ func (eval evaluator) Fetch(ctx context.Context, exprs []parser.Expr, from, unti
targetValues[m] = values[m]
}

if config.Config.ZipperInstance.ScaleToCommonStep() {
if eval.zipper.ScaleToCommonStep() {
targetValues = helper.ScaleValuesToCommonStep(targetValues)
}

Expand Down Expand Up @@ -131,10 +140,16 @@ func (eval evaluator) Eval(ctx context.Context, exp parser.Expr, from, until int
return EvalExpr(ctx, exp, from, until, values)
}

var _evaluator = evaluator{}
var _evaluator = &evaluator{}

// Init call on configure phase, if need limiter or/and custom zipper (for refetch/etc)
func Init(limiter limiter.SimpleLimiter, zipper zipper.CarbonZipper) {
_evaluator.limiter = limiter
_evaluator.zipper = zipper
}

func init() {
helper.SetEvaluator(_evaluator)
// helper.SetEvaluator(_evaluator)
metadata.SetEvaluator(_evaluator)
}

Expand Down
2 changes: 1 addition & 1 deletion expr/functions/absolute/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func New(configFile string) []interfaces.FunctionMetadata {
}

func (f *absolute) Do(ctx context.Context, e parser.Expr, from, until int64, values map[parser.MetricRequest][]*types.MetricData) ([]*types.MetricData, error) {
return helper.ForEachSeriesDo(ctx, e, from, until, values, func(a *types.MetricData, r *types.MetricData) *types.MetricData {
return helper.ForEachSeriesDo(ctx, f.GetEvaluator(), e, from, until, values, func(a *types.MetricData, r *types.MetricData) *types.MetricData {
for i, v := range a.Values {
if math.IsNaN(a.Values[i]) {
r.Values[i] = math.NaN()
Expand Down
2 changes: 0 additions & 2 deletions expr/functions/absolute/function_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"testing"
"time"

"github.com/go-graphite/carbonapi/expr/helper"
"github.com/go-graphite/carbonapi/expr/metadata"
"github.com/go-graphite/carbonapi/expr/types"
"github.com/go-graphite/carbonapi/pkg/parser"
Expand All @@ -15,7 +14,6 @@ func init() {
md := New("")
evaluator := th.EvaluatorFromFunc(md[0].F)
metadata.SetEvaluator(evaluator)
helper.SetEvaluator(evaluator)
for _, m := range md {
metadata.RegisterFunction(m.Name, m.F)
}
Expand Down
4 changes: 2 additions & 2 deletions expr/functions/aggregate/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (f *aggregate) Do(ctx context.Context, e parser.Expr, from, until int64, va
if e.Target() == "aggregate" {
return nil, err
} else {
args, err = helper.GetSeriesArgsAndRemoveNonExisting(ctx, e, from, until, values)
args, err = helper.GetSeriesArgsAndRemoveNonExisting(ctx, f.GetEvaluator(), e, from, until, values)
if err != nil {
return nil, err
}
Expand All @@ -61,7 +61,7 @@ func (f *aggregate) Do(ctx context.Context, e parser.Expr, from, until int64, va
xFilesFactor = -1 // xFilesFactor is not used by the ...Series functions
}
} else {
args, err = helper.GetSeriesArg(ctx, e.Arg(0), from, until, values)
args, err = helper.GetSeriesArg(ctx, f.GetEvaluator(), e.Arg(0), from, until, values)
if err != nil {
return nil, err
}
Expand Down
2 changes: 0 additions & 2 deletions expr/functions/aggregate/function_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"time"

fconfig "github.com/go-graphite/carbonapi/expr/functions/config"
"github.com/go-graphite/carbonapi/expr/helper"
"github.com/go-graphite/carbonapi/expr/metadata"
"github.com/go-graphite/carbonapi/expr/types"
"github.com/go-graphite/carbonapi/pkg/parser"
Expand All @@ -19,7 +18,6 @@ func init() {
md := New("")
evaluator := th.EvaluatorFromFunc(md[0].F)
metadata.SetEvaluator(evaluator)
helper.SetEvaluator(evaluator)
for _, m := range md {
metadata.RegisterFunction(m.Name, m.F)
}
Expand Down
2 changes: 1 addition & 1 deletion expr/functions/aggregateLine/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func New(configFile string) []interfaces.FunctionMetadata {

// aggregateLine(*seriesLists)
func (f *aggregateLine) Do(ctx context.Context, e parser.Expr, from, until int64, values map[parser.MetricRequest][]*types.MetricData) ([]*types.MetricData, error) {
args, err := helper.GetSeriesArg(ctx, e.Arg(0), from, until, values)
args, err := helper.GetSeriesArg(ctx, f.GetEvaluator(), e.Arg(0), from, until, values)
if err != nil {
return nil, err
}
Expand Down
2 changes: 0 additions & 2 deletions expr/functions/aggregateLine/function_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"testing"
"time"

"github.com/go-graphite/carbonapi/expr/helper"
"github.com/go-graphite/carbonapi/expr/metadata"
"github.com/go-graphite/carbonapi/expr/types"
"github.com/go-graphite/carbonapi/pkg/parser"
Expand All @@ -17,7 +16,6 @@ func init() {
md := New("")
evaluator := th.EvaluatorFromFunc(md[0].F)
metadata.SetEvaluator(evaluator)
helper.SetEvaluator(evaluator)
for _, m := range md {
metadata.RegisterFunction(m.Name, m.F)
}
Expand Down
4 changes: 2 additions & 2 deletions expr/functions/aggregateSeriesLists/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ func (f *aggregateSeriesLists) Do(ctx context.Context, e parser.Expr, from, unti
return nil, parser.ErrMissingArgument
}

seriesList1, err := helper.GetSeriesArg(ctx, e.Arg(0), from, until, values)
seriesList1, err := helper.GetSeriesArg(ctx, f.GetEvaluator(), e.Arg(0), from, until, values)
if err != nil {
return nil, err
}
seriesList2, err := helper.GetSeriesArg(ctx, e.Arg(1), from, until, values)
seriesList2, err := helper.GetSeriesArg(ctx, f.GetEvaluator(), e.Arg(1), from, until, values)
if err != nil {
return nil, err
}
Expand Down
9 changes: 4 additions & 5 deletions expr/functions/aggregateSeriesLists/function_test.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
package aggregateSeriesLists

import (
"github.com/go-graphite/carbonapi/expr/helper"
"math"
"testing"
"time"

"github.com/go-graphite/carbonapi/expr/metadata"
"github.com/go-graphite/carbonapi/expr/types"
"github.com/go-graphite/carbonapi/pkg/parser"
th "github.com/go-graphite/carbonapi/tests"
"math"
"testing"
"time"
)

func init() {
md := New("")
evaluator := th.EvaluatorFromFunc(md[0].F)
metadata.SetEvaluator(evaluator)
helper.SetEvaluator(evaluator)
for _, m := range md {
metadata.RegisterFunction(m.Name, m.F)
}
Expand Down
2 changes: 1 addition & 1 deletion expr/functions/aggregateWithWildcards/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (f *aggregateWithWildcards) Do(ctx context.Context, e parser.Expr, from, un
return nil, parser.ErrMissingArgument
}

args, err := helper.GetSeriesArg(ctx, e.Arg(0), from, until, values)
args, err := helper.GetSeriesArg(ctx, f.GetEvaluator(), e.Arg(0), from, until, values)
if err != nil {
return nil, err
}
Expand Down
2 changes: 0 additions & 2 deletions expr/functions/aggregateWithWildcards/function_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"testing"
"time"

"github.com/go-graphite/carbonapi/expr/helper"
"github.com/go-graphite/carbonapi/expr/metadata"
"github.com/go-graphite/carbonapi/expr/types"
"github.com/go-graphite/carbonapi/pkg/parser"
Expand All @@ -17,7 +16,6 @@ func init() {
md := New("")
evaluator := th.EvaluatorFromFunc(md[0].F)
metadata.SetEvaluator(evaluator)
helper.SetEvaluator(evaluator)
for _, m := range md {
metadata.RegisterFunction(m.Name, m.F)
}
Expand Down
2 changes: 1 addition & 1 deletion expr/functions/alias/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (f *alias) Do(ctx context.Context, e parser.Expr, from, until int64, values
return nil, parser.ErrMissingArgument
}

args, err := helper.GetSeriesArg(ctx, e.Arg(0), from, until, values)
args, err := helper.GetSeriesArg(ctx, f.GetEvaluator(), e.Arg(0), from, until, values)
if err != nil {
return nil, err
}
Expand Down
2 changes: 0 additions & 2 deletions expr/functions/alias/function_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"testing"
"time"

"github.com/go-graphite/carbonapi/expr/helper"
"github.com/go-graphite/carbonapi/expr/metadata"
"github.com/go-graphite/carbonapi/expr/types"
"github.com/go-graphite/carbonapi/pkg/parser"
Expand All @@ -16,7 +15,6 @@ func init() {
md := New("")
evaluator := th.EvaluatorFromFunc(md[0].F)
metadata.SetEvaluator(evaluator)
helper.SetEvaluator(evaluator)
for _, m := range md {
metadata.RegisterFunction(m.Name, m.F)
}
Expand Down
2 changes: 1 addition & 1 deletion expr/functions/aliasByBase64/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func New(configFile string) []interfaces.FunctionMetadata {
}

func (f *aliasByBase64) Do(ctx context.Context, e parser.Expr, from, until int64, values map[parser.MetricRequest][]*types.MetricData) ([]*types.MetricData, error) {
args, err := helper.GetSeriesArg(ctx, e.Arg(0), from, until, values)
args, err := helper.GetSeriesArg(ctx, f.GetEvaluator(), e.Arg(0), from, until, values)
if err != nil {
return nil, err
}
Expand Down
2 changes: 0 additions & 2 deletions expr/functions/aliasByBase64/function_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"testing"
"time"

"github.com/go-graphite/carbonapi/expr/helper"
"github.com/go-graphite/carbonapi/expr/metadata"
"github.com/go-graphite/carbonapi/expr/types"
"github.com/go-graphite/carbonapi/pkg/parser"
Expand All @@ -16,7 +15,6 @@ func init() {
md := New("")
evaluator := th.EvaluatorFromFunc(md[0].F)
metadata.SetEvaluator(evaluator)
helper.SetEvaluator(evaluator)
for _, m := range md {
metadata.RegisterFunction(m.Name, m.F)
}
Expand Down
2 changes: 1 addition & 1 deletion expr/functions/aliasByMetric/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func New(configFile string) []interfaces.FunctionMetadata {
}

func (f *aliasByMetric) Do(ctx context.Context, e parser.Expr, from, until int64, values map[parser.MetricRequest][]*types.MetricData) ([]*types.MetricData, error) {
return helper.ForEachSeriesDo1(ctx, e, from, until, values, func(a *types.MetricData) *types.MetricData {
return helper.ForEachSeriesDo1(ctx, f.GetEvaluator(), e, from, until, values, func(a *types.MetricData) *types.MetricData {
metric := types.ExtractNameTag(a.Name)
part := strings.Split(metric, ".")
name := part[len(part)-1]
Expand Down
2 changes: 0 additions & 2 deletions expr/functions/aliasByMetric/function_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"testing"
"time"

"github.com/go-graphite/carbonapi/expr/helper"
"github.com/go-graphite/carbonapi/expr/metadata"
"github.com/go-graphite/carbonapi/expr/types"
"github.com/go-graphite/carbonapi/pkg/parser"
Expand All @@ -16,7 +15,6 @@ func init() {
md := New("")
evaluator := th.EvaluatorFromFunc(md[0].F)
metadata.SetEvaluator(evaluator)
helper.SetEvaluator(evaluator)
for _, m := range md {
metadata.RegisterFunction(m.Name, m.F)
}
Expand Down
2 changes: 1 addition & 1 deletion expr/functions/aliasByNode/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (f *aliasByNode) Do(ctx context.Context, e parser.Expr, from, until int64,
return nil, parser.ErrMissingArgument
}

args, err := helper.GetSeriesArg(ctx, e.Arg(0), from, until, values)
args, err := helper.GetSeriesArg(ctx, f.GetEvaluator(), e.Arg(0), from, until, values)
if err != nil {
return nil, err
}
Expand Down
2 changes: 0 additions & 2 deletions expr/functions/aliasByNode/function_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"testing"
"time"

"github.com/go-graphite/carbonapi/expr/helper"
"github.com/go-graphite/carbonapi/expr/metadata"
"github.com/go-graphite/carbonapi/expr/types"
"github.com/go-graphite/carbonapi/pkg/parser"
Expand Down Expand Up @@ -42,7 +41,6 @@ func init() {

evaluator := th.EvaluatorFromFuncWithMetadata(metadata.FunctionMD.Functions)
metadata.SetEvaluator(evaluator)
helper.SetEvaluator(evaluator)
}

func TestAliasByNode(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion expr/functions/aliasByPostgres/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (f *aliasByPostgres) Do(ctx context.Context, e parser.Expr, from, until int
}

logger := zapwriter.Logger("functionInit").With(zap.String("function", "aliasByPostgres"))
args, err := helper.GetSeriesArg(ctx, e.Arg(0), from, until, values)
args, err := helper.GetSeriesArg(ctx, f.GetEvaluator(), e.Arg(0), from, until, values)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion expr/functions/aliasByRedis/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func New(configFile string) []interfaces.FunctionMetadata {
}

func (f *aliasByRedis) Do(ctx context.Context, e parser.Expr, from, until int64, values map[parser.MetricRequest][]*types.MetricData) ([]*types.MetricData, error) {
args, err := helper.GetSeriesArg(ctx, e.Arg(0), from, until, values)
args, err := helper.GetSeriesArg(ctx, f.GetEvaluator(), e.Arg(0), from, until, values)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit bd29447

Please sign in to comment.