diff --git a/cmd/carbonapi/http/find_handlers.go b/cmd/carbonapi/http/find_handlers.go index 7b7349326..298cae2c4 100644 --- a/cmd/carbonapi/http/find_handlers.go +++ b/cmd/carbonapi/http/find_handlers.go @@ -214,9 +214,7 @@ func findHandler(w http.ResponseWriter, r *http.Request) { }() if !ok || !format.ValidFindFormat() { - http.Error(w, "unsupported format: "+formatRaw, http.StatusBadRequest) - accessLogDetails.HTTPCode = http.StatusBadRequest - accessLogDetails.Reason = "unsupported format: " + formatRaw + setError(w, &accessLogDetails, "unsupported format: "+formatRaw, http.StatusBadRequest, uid.String()) logAsError = true return } @@ -244,17 +242,15 @@ func findHandler(w http.ResponseWriter, r *http.Request) { if format == protoV3Format { body, err := io.ReadAll(r.Body) if err != nil { - accessLogDetails.HTTPCode = http.StatusBadRequest - accessLogDetails.Reason = "failed to parse message body: " + err.Error() - http.Error(w, "bad request (failed to parse format): "+err.Error(), http.StatusBadRequest) + setError(w, &accessLogDetails, "failed to parse message body: "+err.Error(), http.StatusBadRequest, uid.String()) + logAsError = true return } err = pv3Request.Unmarshal(body) if err != nil { - accessLogDetails.HTTPCode = http.StatusBadRequest - accessLogDetails.Reason = "failed to parse message body: " + err.Error() - http.Error(w, "bad request (failed to parse format): "+err.Error(), http.StatusBadRequest) + setError(w, &accessLogDetails, "failed to parse message body: "+err.Error(), http.StatusBadRequest, uid.String()) + logAsError = true return } } else { @@ -264,9 +260,7 @@ func findHandler(w http.ResponseWriter, r *http.Request) { } if len(pv3Request.Metrics) == 0 { - http.Error(w, "missing parameter `query`", http.StatusBadRequest) - accessLogDetails.HTTPCode = http.StatusBadRequest - accessLogDetails.Reason = "missing parameter `query`" + setError(w, &accessLogDetails, "missing parameter `query`", http.StatusBadRequest, uid.String()) logAsError = true return } @@ -289,9 +283,7 @@ func findHandler(w http.ResponseWriter, r *http.Request) { if returnCode < 300 { multiGlobs = &pbv3.MultiGlobResponse{Metrics: []pbv3.GlobResponse{}} } else { - http.Error(w, http.StatusText(returnCode), returnCode) - accessLogDetails.HTTPCode = int32(returnCode) - accessLogDetails.Reason = err.Error() + setError(w, &accessLogDetails, http.StatusText(returnCode), returnCode, uid.String()) // We don't want to log this as an error if it's something normal // Normal is everything that is >= 500. So if config.Config.NotFoundStatusCode is 500 - this will be // logged as error @@ -371,9 +363,7 @@ func findHandler(w http.ResponseWriter, r *http.Request) { } if err != nil { - http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) - accessLogDetails.HTTPCode = http.StatusInternalServerError - accessLogDetails.Reason = err.Error() + setError(w, &accessLogDetails, err.Error(), http.StatusInternalServerError, uid.String()) logAsError = true return } diff --git a/cmd/carbonapi/http/render_handler.go b/cmd/carbonapi/http/render_handler.go index 79b8a71cd..03f82a617 100644 --- a/cmd/carbonapi/http/render_handler.go +++ b/cmd/carbonapi/http/render_handler.go @@ -201,9 +201,7 @@ func renderHandler(w http.ResponseWriter, r *http.Request) { if format == protoV3Format { body, err := io.ReadAll(r.Body) if err != nil { - accessLogDetails.HTTPCode = http.StatusBadRequest - accessLogDetails.Reason = "failed to parse message body: " + err.Error() - http.Error(w, "bad request (failed to parse format): "+err.Error(), http.StatusBadRequest) + setError(w, accessLogDetails, "failed to parse message body: "+err.Error(), http.StatusBadRequest, uid.String()) return } @@ -211,9 +209,7 @@ func renderHandler(w http.ResponseWriter, r *http.Request) { err = pv3Request.Unmarshal(body) if err != nil { - accessLogDetails.HTTPCode = http.StatusBadRequest - accessLogDetails.Reason = "failed to parse message body: " + err.Error() - http.Error(w, "bad request (failed to parse format): "+err.Error(), http.StatusBadRequest) + setError(w, accessLogDetails, "failed to parse message body: "+err.Error(), http.StatusBadRequest, uid.String()) return } @@ -327,6 +323,9 @@ func renderHandler(w http.ResponseWriter, r *http.Request) { result, err := expr.FetchAndEvalExp(ctx, config.Config.Evaluator, exp, from32, until32, values) if err != nil { errors[target] = merry.Wrap(err) + // if config.Config.Upstreams.RequireSuccessAll { + // break + // } } results = append(results, result...) @@ -347,7 +346,7 @@ func renderHandler(w http.ResponseWriter, r *http.Request) { var body []byte returnCode := http.StatusOK - if len(results) == 0 { + if len(results) == 0 || (len(errors) > 0 && config.Config.Upstreams.RequireSuccessAll) { // Obtain error code from the errors // In case we have only "Not Found" errors, result should be 404 // Otherwise it should be 500 @@ -355,11 +354,11 @@ func renderHandler(w http.ResponseWriter, r *http.Request) { returnCode, errMsgs = helper.MergeHttpErrorMap(errors) logger.Debug("error response or no response", zap.Strings("error", errMsgs)) // Allow override status code for 404-not-found replies. - if returnCode == 404 { + if returnCode == http.StatusNotFound { returnCode = config.Config.NotFoundStatusCode } - if returnCode == 400 || returnCode == http.StatusForbidden || returnCode >= 500 { + if returnCode == http.StatusBadRequest || returnCode == http.StatusNotFound || returnCode == http.StatusForbidden || returnCode >= 500 { setError(w, accessLogDetails, strings.Join(errMsgs, ","), returnCode, uid.String()) logAsError = true return diff --git a/cmd/mockbackend/e2etesting.go b/cmd/mockbackend/e2etesting.go index 922246364..b6a0ebc9e 100644 --- a/cmd/mockbackend/e2etesting.go +++ b/cmd/mockbackend/e2etesting.go @@ -12,6 +12,7 @@ import ( "net/http" "net/url" "os" + "reflect" "strconv" "strings" "sync" @@ -48,11 +49,21 @@ type ExpectedResponse struct { } type ExpectedResult struct { - SHA256 []string `yaml:"sha256"` - Metrics []CarbonAPIResponse + SHA256 []string `yaml:"sha256"` + Metrics []RenderResponse + MetricsFind []MetricsFindResponse `json:"metricsFind" yaml:"metricsFind"` } -type CarbonAPIResponse struct { +type MetricsFindResponse struct { + AllowChildren int `json:"allowChildren" yaml:"allowChildren"` + Expandable int `json:"expandable" yaml:"expandable"` + Leaf int `json:"leaf" yaml:"leaf"` + Id string `json:"id" yaml:"id"` + Text string `json:"text" yaml:"text"` + Context map[string]string `json:"context" yaml:"context"` +} + +type RenderResponse struct { Target string `json:"target" yaml:"target"` Datapoints []Datapoint `json:"datapoints" yaml:"datapoints"` Tags map[string]string `json:"tags" yaml:"tags"` @@ -121,7 +132,7 @@ func (d *Datapoint) UnmarshalYAML(unmarshal func(interface{}) error) error { return nil } -func isMetricsEqual(m1, m2 CarbonAPIResponse) error { +func isRenderEqual(m1, m2 RenderResponse) error { if m1.Target != m2.Target { return fmt.Errorf("target mismatch, got '%v', expected '%v'", m1.Target, m2.Target) } @@ -158,7 +169,7 @@ func isMetricsEqual(m1, m2 CarbonAPIResponse) error { return nil } -func doTest(logger *zap.Logger, t *Query) []error { +func doTest(logger *zap.Logger, t *Query, verbose bool) []error { client := http.Client{} failures := make([]error, 0) d, err := time.ParseDuration(fmt.Sprintf("%v", t.Delay) + "s") @@ -201,14 +212,6 @@ func doTest(logger *zap.Logger, t *Query) []error { return failures } - if resp.StatusCode != t.ExpectedResponse.HttpCode { - failures = append(failures, merry2.Errorf("unexpected status code, got %v, expected %v", - resp.StatusCode, - t.ExpectedResponse.HttpCode, - ), - ) - } - contentType = resp.Header.Get("Content-Type") if t.ExpectedResponse.ContentType != contentType { failures = append(failures, @@ -226,6 +229,14 @@ func doTest(logger *zap.Logger, t *Query) []error { return failures } + if resp.StatusCode != t.ExpectedResponse.HttpCode { + failures = append(failures, merry2.Errorf("unexpected status code, got %v, expected %v", + resp.StatusCode, + t.ExpectedResponse.HttpCode, + ), + ) + } + // We don't need to actually check body of response if we expect any sort of error (4xx/5xx) if t.ExpectedResponse.HttpCode >= 300 { return failures @@ -245,45 +256,98 @@ func doTest(logger *zap.Logger, t *Query) []error { } if !sha256matched { encodedBody := base64.StdEncoding.EncodeToString(b) - failures = append(failures, merry2.Errorf("sha256 mismatch, got '%v', expected '%v', encodedBodyy: '%v'", hashStr, t.ExpectedResponse.ExpectedResults[0].SHA256, encodedBody)) + failures = append(failures, merry2.Errorf("sha256 mismatch, got '%v', expected '%v', encodedBody: '%v'", hashStr, t.ExpectedResponse.ExpectedResults[0].SHA256, encodedBody)) return failures } case "application/json": - res := make([]CarbonAPIResponse, 0, 1) - err := json.Unmarshal(b, &res) - if err != nil { - err = merry2.Prepend(err, "failed to parse response") - failures = append(failures, err) - return failures - } + if strings.HasPrefix(t.URL, "/metrics/find") { + res := make([]MetricsFindResponse, 0, 1) + err := json.Unmarshal(b, &res) + if err != nil { + err = merry2.Prepend(err, "failed to parse response") + failures = append(failures, err) + return failures + } - if len(t.ExpectedResponse.ExpectedResults) == 0 { - return failures - } + if len(t.ExpectedResponse.ExpectedResults) == 0 { + return failures + } - if len(res) != len(t.ExpectedResponse.ExpectedResults[0].Metrics) { - failures = append(failures, merry2.Errorf("unexpected amount of results, got %v, expected %v", - len(res), - len(t.ExpectedResponse.ExpectedResults[0].Metrics))) - return failures - } + if len(res) != len(t.ExpectedResponse.ExpectedResults[0].MetricsFind) { + failures = append(failures, merry2.Errorf("unexpected amount of metrics find, got %v, expected %v", + len(res), + len(t.ExpectedResponse.ExpectedResults[0].MetricsFind))) + if verbose { + for i := range t.ExpectedResponse.ExpectedResults[0].MetricsFind { + if len(res) > i || !reflect.DeepEqual(res[i], t.ExpectedResponse.ExpectedResults[0].MetricsFind[i]) { + err = fmt.Errorf("metrics find[%d] are not equal, got=`%+v`, expected=`%+v`", i, res[i], t.ExpectedResponse.ExpectedResults[0].MetricsFind[i]) + failures = append(failures, err) + } else { + err = fmt.Errorf("metrics find[%d] got unexpected=`%+v`", i, t.ExpectedResponse.ExpectedResults[0].MetricsFind[i]) + } + failures = append(failures, err) + } + } + return failures + } - for i := range res { - err := isMetricsEqual(res[i], t.ExpectedResponse.ExpectedResults[0].Metrics[i]) + for i := range res { + if !reflect.DeepEqual(res[i], t.ExpectedResponse.ExpectedResults[0].MetricsFind[i]) { + err = fmt.Errorf("metrics find[%d] are not equal, got=`%+v`, expected=`%+v`", i, res[i], t.ExpectedResponse.ExpectedResults[0].MetricsFind[i]) + failures = append(failures, err) + } + } + } else { + // render + res := make([]RenderResponse, 0, 1) + err := json.Unmarshal(b, &res) if err != nil { - err = merry2.Prependf(err, "metrics are not equal, got=`%+v`, expected=`%+v`", res[i], t.ExpectedResponse.ExpectedResults[0].Metrics[i]) + err = merry2.Prepend(err, "failed to parse response") failures = append(failures, err) + return failures + } + + if len(t.ExpectedResponse.ExpectedResults) == 0 { + return failures + } + + if len(res) != len(t.ExpectedResponse.ExpectedResults[0].Metrics) { + failures = append(failures, merry2.Errorf("unexpected amount of results, got %v, expected %v", + len(res), + len(t.ExpectedResponse.ExpectedResults[0].Metrics))) + if verbose { + for i := range t.ExpectedResponse.ExpectedResults[0].Metrics { + if len(res) > i || !reflect.DeepEqual(res[i], t.ExpectedResponse.ExpectedResults[0].Metrics[i]) { + err = fmt.Errorf("metrics[%d] are not equal, got=`%+v`, expected=`%+v`", i, res[i], t.ExpectedResponse.ExpectedResults[0].Metrics[i]) + failures = append(failures, err) + } else { + err = fmt.Errorf("metrics[%d] got unexpected=`%+v`", i, t.ExpectedResponse.ExpectedResults[0].Metrics[i]) + } + failures = append(failures, err) + } + } + return failures + } + + for i := range res { + err := isRenderEqual(res[i], t.ExpectedResponse.ExpectedResults[0].Metrics[i]) + if err != nil { + err = merry2.Prependf(err, "metrics are not equal, got=`%+v`, expected=`%+v`", res[i], t.ExpectedResponse.ExpectedResults[0].Metrics[i]) + failures = append(failures, err) + } } } default: - failures = append(failures, merry2.Errorf("unsupported content-type: got '%v'", contentType)) + if resp.StatusCode == http.StatusOK { + failures = append(failures, merry2.Errorf("unsupported content-type: got '%v'", contentType)) + } } return failures } -func e2eTest(logger *zap.Logger, noapp, breakOnError bool) bool { +func e2eTest(logger *zap.Logger, noapp, breakOnError, verbose bool) bool { failed := false logger.Info("will run test", zap.Any("config", cfg.Test), @@ -307,12 +371,12 @@ func e2eTest(logger *zap.Logger, noapp, breakOnError bool) bool { } for _, t := range cfg.Test.Queries { - failures := doTest(logger, &t) - + failures := doTest(logger, &t, verbose) if len(failures) != 0 { failed = true logger.Error("test failed", zap.Errors("failures", failures), + zap.String("url", t.URL), zap.String("type", t.Type), zap.String("body", t.Body), ) for _, v := range runningApps { if !v.IsRunning() { diff --git a/cmd/mockbackend/find.go b/cmd/mockbackend/find.go index 2c0c14592..74ba2729a 100644 --- a/cmd/mockbackend/find.go +++ b/cmd/mockbackend/find.go @@ -46,7 +46,7 @@ func (cfg *listener) findHandler(wr http.ResponseWriter, req *http.Request) { return } - query := req.Form["query"] + var query []string if format == protoV3Format { body, err := io.ReadAll(req.Body) @@ -56,12 +56,22 @@ func (cfg *listener) findHandler(wr http.ResponseWriter, req *http.Request) { ) http.Error(wr, "Bad request (unsupported format)", http.StatusBadRequest) + return } var pv3Request carbonapi_v3_pb.MultiGlobRequest _ = pv3Request.Unmarshal(body) query = pv3Request.Metrics + } else { + query = req.Form["query"] + } + + if len(query) == 0 { + logger.Error("Bad request (no query)") + http.Error(wr, "Bad request (no query)", + http.StatusBadRequest) + return } logger.Info("request details", @@ -72,23 +82,7 @@ func (cfg *listener) findHandler(wr http.ResponseWriter, req *http.Request) { Metrics: []carbonapi_v3_pb.GlobResponse{}, } - if query[0] != "*" { - for m := range cfg.Listener.Expressions { - globMatches := []carbonapi_v3_pb.GlobMatch{} - - for _, metric := range cfg.Expressions[m].Data { - globMatches = append(globMatches, carbonapi_v3_pb.GlobMatch{ - Path: metric.MetricName, - IsLeaf: true, - }) - } - multiGlobs.Metrics = append(multiGlobs.Metrics, - carbonapi_v3_pb.GlobResponse{ - Name: cfg.Expressions[m].PathExpression, - Matches: globMatches, - }) - } - } else { + if query[0] == "*" { returnMap := make(map[string]struct{}) for m := range cfg.Listener.Expressions { response := cfg.Expressions[m] @@ -115,6 +109,33 @@ func (cfg *listener) findHandler(wr http.ResponseWriter, req *http.Request) { Name: "*", Matches: globMatches, }) + } else { + for _, m := range query { + globMatches := []carbonapi_v3_pb.GlobMatch{} + if response, ok := cfg.Expressions[m]; ok { + if response.ReplyDelayMS > 0 { + delay := time.Duration(response.ReplyDelayMS) * time.Millisecond + time.Sleep(delay) + } + if response.Code != 0 && response.Code != http.StatusOK { + // return first error + http.Error(wr, http.StatusText(response.Code), response.Code) + return + } + + for _, metric := range cfg.Expressions[m].Data { + globMatches = append(globMatches, carbonapi_v3_pb.GlobMatch{ + Path: metric.MetricName, + IsLeaf: true, + }) + } + multiGlobs.Metrics = append(multiGlobs.Metrics, + carbonapi_v3_pb.GlobResponse{ + Name: cfg.Expressions[m].PathExpression, + Matches: globMatches, + }) + } + } } if cfg.Listener.ShuffleResults { diff --git a/cmd/mockbackend/http_common.go b/cmd/mockbackend/http_common.go index 0bb75a912..8e26ebd79 100644 --- a/cmd/mockbackend/http_common.go +++ b/cmd/mockbackend/http_common.go @@ -7,6 +7,7 @@ import ( ) type Response struct { + Code int `yaml:"code"` ReplyDelayMS int `yaml:"replyDelayMS"` PathExpression string `yaml:"pathExpression"` Data []Metric `yaml:"data"` diff --git a/cmd/mockbackend/main.go b/cmd/mockbackend/main.go index 3692083ea..ecfd95cbb 100644 --- a/cmd/mockbackend/main.go +++ b/cmd/mockbackend/main.go @@ -21,7 +21,7 @@ type MainConfig struct { type Listener struct { Address string `yaml:"address"` - Code int `yaml:"httpCode"` + Code int `yaml:"httpCode"` // global responce code ShuffleResults bool `yaml:"shuffleResults"` EmptyBody bool `yaml:"emptyBody"` Expressions map[string]Response `yaml:"expressions"` @@ -36,6 +36,7 @@ type listener struct { func main() { config := flag.String("config", "average.yaml", "yaml where it would be possible to get data") + verbose := flag.Bool("verbose", false, "verbose reporting") testonly := flag.Bool("testonly", false, "run only unit test") noapp := flag.Bool("noapp", false, "do not run application") test := flag.Bool("test", false, "run unit test if present") @@ -120,7 +121,7 @@ func main() { failed := false if cfg.Test != nil && (*test || *testonly) { - failed = e2eTest(logger, *noapp, *breakOnError) + failed = e2eTest(logger, *noapp, *breakOnError, *verbose) } if !*testonly { diff --git a/cmd/mockbackend/render.go b/cmd/mockbackend/render.go index 8d9ff469e..0b2b189a1 100644 --- a/cmd/mockbackend/render.go +++ b/cmd/mockbackend/render.go @@ -91,89 +91,96 @@ func (cfg *listener) renderHandler(wr http.ResponseWriter, req *http.Request) { Metrics: []carbonapi_v3_pb.FetchResponse{}, } - newCfg := Listener{ - Code: cfg.Code, - EmptyBody: cfg.EmptyBody, - Expressions: copyMap(cfg.Expressions), - } - + httpCode := http.StatusOK for _, target := range targets { - response, ok := newCfg.Expressions[target] - if !ok { - wr.WriteHeader(http.StatusNotFound) - _, _ = wr.Write([]byte("Not found")) - return - } - if response.ReplyDelayMS > 0 { - delay := time.Duration(response.ReplyDelayMS) * time.Millisecond - logger.Info("will add extra delay", - zap.Duration("delay", delay), - ) - time.Sleep(delay) - } - for _, m := range response.Data { - step := m.Step - if step == 0 { - step = 1 - } - startTime := m.StartTime - if startTime == 0 { - startTime = step - } - isAbsent := make([]bool, 0, len(m.Values)) - protov2Values := make([]float64, 0, len(m.Values)) - for i := range m.Values { - if math.IsNaN(m.Values[i]) { - isAbsent = append(isAbsent, true) - protov2Values = append(protov2Values, 0.0) - } else { - isAbsent = append(isAbsent, false) - protov2Values = append(protov2Values, m.Values[i]) - } + if response, ok := cfg.Expressions[target]; ok { + if response.ReplyDelayMS > 0 { + delay := time.Duration(response.ReplyDelayMS) * time.Millisecond + logger.Info("will add extra delay", + zap.Duration("delay", delay), + ) + time.Sleep(delay) } - fr2 := carbonapi_v2_pb.FetchResponse{ - Name: m.MetricName, - StartTime: int32(startTime), - StopTime: int32(startTime + step*len(protov2Values)), - StepTime: int32(step), - Values: protov2Values, - IsAbsent: isAbsent, + if response.Code > 0 && response.Code != http.StatusOK { + httpCode = response.Code } + if httpCode == http.StatusOK { + for _, m := range response.Data { + step := m.Step + if step == 0 { + step = 1 + } + startTime := m.StartTime + if startTime == 0 { + startTime = step + } + isAbsent := make([]bool, 0, len(m.Values)) + protov2Values := make([]float64, 0, len(m.Values)) + for i := range m.Values { + if math.IsNaN(m.Values[i]) { + isAbsent = append(isAbsent, true) + protov2Values = append(protov2Values, 0.0) + } else { + isAbsent = append(isAbsent, false) + protov2Values = append(protov2Values, m.Values[i]) + } + } + fr2 := carbonapi_v2_pb.FetchResponse{ + Name: m.MetricName, + StartTime: int32(startTime), + StopTime: int32(startTime + step*len(protov2Values)), + StepTime: int32(step), + Values: protov2Values, + IsAbsent: isAbsent, + } - fr3 := carbonapi_v3_pb.FetchResponse{ - Name: m.MetricName, - PathExpression: target, - ConsolidationFunc: "avg", - StartTime: int64(startTime), - StopTime: int64(startTime + step*len(m.Values)), - StepTime: int64(step), - XFilesFactor: 0, - HighPrecisionTimestamps: false, - Values: m.Values, - RequestStartTime: 1, - RequestStopTime: int64(startTime + step*len(m.Values)), - } + fr3 := carbonapi_v3_pb.FetchResponse{ + Name: m.MetricName, + PathExpression: target, + ConsolidationFunc: "avg", + StartTime: int64(startTime), + StopTime: int64(startTime + step*len(m.Values)), + StepTime: int64(step), + XFilesFactor: 0, + HighPrecisionTimestamps: false, + Values: m.Values, + RequestStartTime: 1, + RequestStopTime: int64(startTime + step*len(m.Values)), + } - multiv2.Metrics = append(multiv2.Metrics, fr2) - multiv3.Metrics = append(multiv3.Metrics, fr3) + multiv2.Metrics = append(multiv2.Metrics, fr2) + multiv3.Metrics = append(multiv3.Metrics, fr3) + } + } } } - if cfg.Listener.ShuffleResults { - rand.Shuffle(len(multiv2.Metrics), func(i, j int) { - multiv2.Metrics[i], multiv2.Metrics[j] = multiv2.Metrics[j], multiv2.Metrics[i] - }) - rand.Shuffle(len(multiv3.Metrics), func(i, j int) { - multiv3.Metrics[i], multiv3.Metrics[j] = multiv3.Metrics[j], multiv3.Metrics[i] - }) - } + if httpCode == http.StatusOK { + if len(multiv2.Metrics) == 0 { + wr.WriteHeader(http.StatusNotFound) + _, _ = wr.Write([]byte("Not found")) + return + } - contentType, d := cfg.marshalResponse(wr, logger, format, multiv3, multiv2) - if d == nil { - return + if cfg.Listener.ShuffleResults { + rand.Shuffle(len(multiv2.Metrics), func(i, j int) { + multiv2.Metrics[i], multiv2.Metrics[j] = multiv2.Metrics[j], multiv2.Metrics[i] + }) + rand.Shuffle(len(multiv3.Metrics), func(i, j int) { + multiv3.Metrics[i], multiv3.Metrics[j] = multiv3.Metrics[j], multiv3.Metrics[i] + }) + } + + contentType, d := cfg.marshalResponse(wr, logger, format, multiv3, multiv2) + if d == nil { + return + } + wr.Header().Set("Content-Type", contentType) + _, _ = wr.Write(d) + } else { + wr.WriteHeader(httpCode) + _, _ = wr.Write([]byte(http.StatusText(httpCode))) } - wr.Header().Set("Content-Type", contentType) - _, _ = wr.Write(d) } func (cfg *listener) marshalResponse(wr http.ResponseWriter, logger *zap.Logger, format responseFormat, multiv3 carbonapi_v3_pb.MultiFetchResponse, multiv2 carbonapi_v2_pb.MultiFetchResponse) (string, []byte) { diff --git a/cmd/mockbackend/testcases/find_error/find_error.yaml b/cmd/mockbackend/testcases/find_error/find_error.yaml new file mode 100644 index 000000000..af092e903 --- /dev/null +++ b/cmd/mockbackend/testcases/find_error/find_error.yaml @@ -0,0 +1,94 @@ +version: "v1" +test: + apps: + - name: "carbonapi" + binary: "./carbonapi" + args: + - "-config" + - "./cmd/mockbackend/testcases/render_error/carbonapi.yaml" + queries: + - endpoint: "http://127.0.0.1:8081" + type: "GET" + URL: "/metrics/find?query=a&format=json" + expectedResponse: + httpCode: 200 + contentType: "application/json" + expectedResults: + - metricsFind: + - allowChildren: 0 + expandable: 0 + leaf: 1 + id: "a" + text: "a" + context: {} + + # empty + - endpoint: "http://127.0.0.1:8081" + type: "GET" + URL: "/render/?target=b&format=json" + expectedResponse: + httpCode: 200 + contentType: "application/json" + + - endpoint: "http://127.0.0.1:8081" + type: "GET" + URL: "/metrics/find?query=a&query=b&format=json" + expectedResponse: + httpCode: 200 + contentType: "application/json" + expectedResults: + - metricsFind: + - allowChildren: 0 + expandable: 0 + leaf: 1 + id: "a" + text: "a" + context: {} + + # timeout + - endpoint: "http://127.0.0.1:8081" + type: "GET" + URL: "/metrics/find?query=c&format=json" + expectedResponse: + httpCode: 503 + contentType: "text/plain; charset=utf-8" + + # 503 + - endpoint: "http://127.0.0.1:8081" + type: "GET" + URL: "/metrics/find?query=d&format=json" + expectedResponse: + httpCode: 503 + contentType: "text/plain; charset=utf-8" + + # 503, partial success + - endpoint: "http://127.0.0.1:8081" + type: "GET" + URL: "/metrics/find?query=a&query=d&format=json" + expectedResponse: + httpCode: 503 + contentType: "text/plain; charset=utf-8" + +listeners: + - address: ":9070" + expressions: + "a": + pathExpression: "a" + data: + - metricName: "a" + values: [0,1,2,2,3] + + # timeout + "c": + pathExpression: "b" + emptyBody: true + code: 404 + replyDelayMS: 7000 + data: + - metricName: "c" + values: [0,1,2,2,3] + + "d": + pathExpression: "d" + emptyBody: true + code: 503 diff --git a/cmd/mockbackend/testcases/render_error/carbonapi.yaml b/cmd/mockbackend/testcases/render_error/carbonapi.yaml new file mode 100644 index 000000000..62841def6 --- /dev/null +++ b/cmd/mockbackend/testcases/render_error/carbonapi.yaml @@ -0,0 +1,57 @@ +listen: "localhost:8081" +expvar: + enabled: true + pprofEnabled: false + listen: "" +concurency: 1000 +notFoundStatusCode: 200 +cache: + type: "mem" + size_mb: 0 + defaultTimeoutSec: 60 +cpus: 0 +tz: "" +maxBatchSize: 0 +graphite: + host: "" + interval: "60s" + prefix: "carbon.api" + pattern: "{prefix}.{fqdn}" +idleConnections: 10 +pidFile: "" +upstreams: + buckets: 10 + timeouts: + find: "2s" + render: "5s" + connect: "200ms" + concurrencyLimitPerServer: 0 + keepAliveInterval: "30s" + maxIdleConnsPerHost: 100 + backendsv2: + backends: + - + groupName: "mock-001" + protocol: "auto" + lbMethod: "all" + maxTries: 1 + maxBatchSize: 0 + keepAliveInterval: "10s" + concurrencyLimit: 0 + forceAttemptHTTP2: true + maxIdleConnsPerHost: 1000 + timeouts: + find: "3s" + render: "5s" + connect: "200ms" + servers: + - "http://127.0.0.1:9070" + graphite09compat: false +expireDelaySec: 10 +logger: + - logger: "" + file: "stderr" + level: "debug" + encoding: "console" + encodingTime: "iso8601" + encodingDuration: "seconds" diff --git a/cmd/mockbackend/testcases/render_error/render_error.yaml b/cmd/mockbackend/testcases/render_error/render_error.yaml new file mode 100644 index 000000000..1280442e4 --- /dev/null +++ b/cmd/mockbackend/testcases/render_error/render_error.yaml @@ -0,0 +1,99 @@ +version: "v1" +test: + apps: + - name: "carbonapi" + binary: "./carbonapi" + args: + - "-config" + - "./cmd/mockbackend/testcases/render_error/carbonapi.yaml" + queries: + - endpoint: "http://127.0.0.1:8081" + type: "GET" + URL: "/render/?target=a&format=json" + expectedResponse: + httpCode: 200 + contentType: "application/json" + expectedResults: + - metrics: + - target: "a" + datapoints: [[0,1],[1,2],[2,3],[2,4],[3,5]] + + # empty + - endpoint: "http://127.0.0.1:8081" + type: "GET" + URL: "/render/?target=b&format=json" + expectedResponse: + httpCode: 200 + contentType: "application/json" + + - endpoint: "http://127.0.0.1:8081" + type: "GET" + URL: "/render/?target=a&target=b&format=json" + expectedResponse: + httpCode: 200 + contentType: "application/json" + expectedResults: + - metrics: + - target: "a" + datapoints: [[0,1],[1,2],[2,3],[2,4],[3,5]] + + # timeout + - endpoint: "http://127.0.0.1:8081" + type: "GET" + URL: "/render/?target=c&format=json" + expectedResponse: + httpCode: 503 + contentType: "text/plain; charset=utf-8" + + # 503 + - endpoint: "http://127.0.0.1:8081" + type: "GET" + URL: "/render/?target=d&format=json" + expectedResponse: + httpCode: 503 + contentType: "text/plain; charset=utf-8" + + # partial success + - endpoint: "http://127.0.0.1:8081" + type: "GET" + URL: "/render/?target=a&target=d&format=json" + expectedResponse: + httpCode: 200 + contentType: "application/json" + expectedResults: + - metrics: + - target: "a" + datapoints: [[0,1],[1,2],[2,3],[2,4],[3,5]] + + # partial success + - endpoint: "http://127.0.0.1:8081" + type: "GET" + URL: "/render/?target=divideSeries(a,d)&format=json" + expectedResponse: + httpCode: 200 + contentType: "application/json" + expectedResults: + - metrics: + - target: "divideSeries(a,MISSING)" + datapoints: [[nan,1],[nan,2],[nan,3],[nan,4],[nan,5]] + +listeners: + - address: ":9070" + expressions: + "a": + pathExpression: "a" + data: + - metricName: "a" + values: [0,1,2,2,3] + + # timeout + "c": + pathExpression: "c" + emptyBody: true + code: 404 + replyDelayMS: 7000 + + "d": + pathExpression: "d" + emptyBody: true + code: 503 diff --git a/cmd/mockbackend/testcases/render_error_all/carbonapi.yaml b/cmd/mockbackend/testcases/render_error_all/carbonapi.yaml new file mode 100644 index 000000000..ada9b85ea --- /dev/null +++ b/cmd/mockbackend/testcases/render_error_all/carbonapi.yaml @@ -0,0 +1,58 @@ +listen: "localhost:8081" +expvar: + enabled: true + pprofEnabled: false + listen: "" +concurency: 1000 +notFoundStatusCode: 200 +cache: + type: "mem" + size_mb: 0 + defaultTimeoutSec: 60 +cpus: 0 +tz: "" +maxBatchSize: 0 +graphite: + host: "" + interval: "60s" + prefix: "carbon.api" + pattern: "{prefix}.{fqdn}" +idleConnections: 10 +pidFile: "" +upstreams: + requireSuccessAll: true + buckets: 10 + timeouts: + find: "2s" + render: "60s" + connect: "200ms" + concurrencyLimitPerServer: 0 + keepAliveInterval: "30s" + maxIdleConnsPerHost: 100 + backendsv2: + backends: + - + groupName: "mock-001" + protocol: "auto" + lbMethod: "all" + maxTries: 1 + maxBatchSize: 0 + keepAliveInterval: "10s" + concurrencyLimit: 0 + forceAttemptHTTP2: true + maxIdleConnsPerHost: 1000 + timeouts: + find: "600s" + render: "5s" + connect: "200ms" + servers: + - "http://127.0.0.1:9070" + graphite09compat: false +expireDelaySec: 10 +logger: + - logger: "" + file: "stderr" + level: "debug" + encoding: "console" + encodingTime: "iso8601" + encodingDuration: "seconds" diff --git a/cmd/mockbackend/testcases/render_error_all/render_error_all.yaml b/cmd/mockbackend/testcases/render_error_all/render_error_all.yaml new file mode 100644 index 000000000..0bc255177 --- /dev/null +++ b/cmd/mockbackend/testcases/render_error_all/render_error_all.yaml @@ -0,0 +1,91 @@ +version: "v1" +test: + apps: + - name: "carbonapi" + binary: "./carbonapi" + args: + - "-config" + - "./cmd/mockbackend/testcases/render_error_all/carbonapi.yaml" + queries: + - endpoint: "http://127.0.0.1:8081" + type: "GET" + URL: "/render/?target=a&format=json" + expectedResponse: + httpCode: 200 + contentType: "application/json" + expectedResults: + - metrics: + - target: "a" + datapoints: [[0,1],[1,2],[2,3],[2,4],[3,5]] + + # empty + - endpoint: "http://127.0.0.1:8081" + type: "GET" + URL: "/render/?target=b&format=json" + expectedResponse: + httpCode: 200 + contentType: "application/json" + + - endpoint: "http://127.0.0.1:8081" + type: "GET" + URL: "/render/?target=a&target=b&format=json" + expectedResponse: + httpCode: 200 + contentType: "application/json" + expectedResults: + - metrics: + - target: "a" + datapoints: [[0,1],[1,2],[2,3],[2,4],[3,5]] + + # timeout + - endpoint: "http://127.0.0.1:8081" + type: "GET" + URL: "/render/?target=c&format=json" + expectedResponse: + httpCode: 503 + contentType: "text/plain; charset=utf-8" + + # 503 + - endpoint: "http://127.0.0.1:8081" + type: "GET" + URL: "/render/?target=d&format=json" + expectedResponse: + httpCode: 503 + contentType: "text/plain; charset=utf-8" + + # partial success + - endpoint: "http://127.0.0.1:8081" + type: "GET" + URL: "/render/?target=a&target=d&format=json" + expectedResponse: + httpCode: 503 + contentType: "text/plain; charset=utf-8" + + # partial success, must fail, target d failed + - endpoint: "http://127.0.0.1:8081" + type: "GET" + URL: "/render/?target=divideSeries(a,d)&format=json" + expectedResponse: + httpCode: 503 + contentType: "text/plain; charset=utf-8" + +listeners: + - address: ":9070" + expressions: + "a": + pathExpression: "a" + data: + - metricName: "a" + values: [0,1,2,2,3] + + # timeout + "c": + pathExpression: "c" + emptyBody: true + code: 404 + replyDelayMS: 7000 + + "d": + pathExpression: "d" + emptyBody: true + code: 503 diff --git a/cmd/mockbackend/testcases/render_error_all_rr/carbonapi.yaml b/cmd/mockbackend/testcases/render_error_all_rr/carbonapi.yaml new file mode 100644 index 000000000..6c7cf2e6e --- /dev/null +++ b/cmd/mockbackend/testcases/render_error_all_rr/carbonapi.yaml @@ -0,0 +1,59 @@ +listen: "localhost:8081" +expvar: + enabled: true + pprofEnabled: false + listen: "" +concurency: 1000 +notFoundStatusCode: 200 +cache: + type: "mem" + size_mb: 0 + defaultTimeoutSec: 60 +cpus: 0 +tz: "" +maxBatchSize: 0 +graphite: + host: "" + interval: "60s" + prefix: "carbon.api" + pattern: "{prefix}.{fqdn}" +idleConnections: 10 +pidFile: "" +upstreams: + requireSuccessAll: true + buckets: 10 + timeouts: + find: "2s" + render: "10s" + connect: "200ms" + concurrencyLimitPerServer: 0 + keepAliveInterval: "30s" + maxIdleConnsPerHost: 100 + backendsv2: + backends: + - + groupName: "mock-001" + protocol: "auto" + lbMethod: "rr" + maxTries: 2 + maxBatchSize: 0 + keepAliveInterval: "10s" + concurrencyLimit: 0 + forceAttemptHTTP2: true + maxIdleConnsPerHost: 1000 + timeouts: + find: "3s" + render: "5s" + connect: "200ms" + servers: + - "http://127.0.0.1:9070" + - "http://127.0.0.1:9071" + graphite09compat: false +expireDelaySec: 10 +logger: + - logger: "" + file: "stderr" + level: "debug" + encoding: "console" + encodingTime: "iso8601" + encodingDuration: "seconds" diff --git a/cmd/mockbackend/testcases/render_error_all_rr/render_error_all_rr.yaml b/cmd/mockbackend/testcases/render_error_all_rr/render_error_all_rr.yaml new file mode 100644 index 000000000..3e9b4c898 --- /dev/null +++ b/cmd/mockbackend/testcases/render_error_all_rr/render_error_all_rr.yaml @@ -0,0 +1,137 @@ +version: "v1" +test: + apps: + - name: "carbonapi" + binary: "./carbonapi" + args: + - "-config" + - "./cmd/mockbackend/testcases/render_error_all_rr/carbonapi.yaml" + queries: + - endpoint: "http://127.0.0.1:8081" + type: "GET" + URL: "/render/?target=a&format=json" + expectedResponse: + httpCode: 200 + contentType: "application/json" + expectedResults: + - metrics: + - target: "a" + datapoints: [[0,1],[1,2],[2,3],[2,4],[3,5]] + + # empty + - endpoint: "http://127.0.0.1:8081" + type: "GET" + URL: "/render/?target=b&format=json" + expectedResponse: + httpCode: 200 + contentType: "application/json" + + - endpoint: "http://127.0.0.1:8081" + type: "GET" + URL: "/render/?target=a&target=b&format=json" + expectedResponse: + httpCode: 200 + contentType: "application/json" + expectedResults: + - metrics: + - target: "a" + datapoints: [[0,1],[1,2],[2,3],[2,4],[3,5]] + + - endpoint: "http://127.0.0.1:8081" + type: "GET" + URL: "/render/?target=c&format=json" + expectedResponse: + httpCode: 200 + contentType: "application/json" + expectedResults: + - metrics: + - target: "c" + datapoints: [[0,1],[1,2],[2,3],[2,4],[4,5]] + + - endpoint: "http://127.0.0.1:8081" + type: "GET" + URL: "/render/?target=a&target=b&target=c&format=json" + expectedResponse: + httpCode: 200 + contentType: "application/json" + expectedResults: + - metrics: + - target: "a" + datapoints: [[0,1],[1,2],[2,3],[2,4],[3,5]] + - target: "c" + datapoints: [[0,1],[1,2],[2,3],[2,4],[4,5]] + + - endpoint: "http://127.0.0.1:8081" + type: "GET" + URL: "/render/?target=divideSeries(a,c)&format=json" + expectedResponse: + httpCode: 200 + contentType: "application/json" + expectedResults: + - metrics: + - target: "divideSeries(a,c)" + tags: {"name": "a"} + datapoints: [[NaN,1],[1,2],[1,3],[1,4],[0.75,5]] + + # 503 + - endpoint: "http://127.0.0.1:8081" + type: "GET" + URL: "/render/?target=d&format=json" + expectedResponse: + httpCode: 503 + contentType: "text/plain; charset=utf-8" + + # partial success + - endpoint: "http://127.0.0.1:8081" + type: "GET" + URL: "/render/?target=a&target=d&format=json" + expectedResponse: + httpCode: 503 + contentType: "text/plain; charset=utf-8" + + # partial success + # TODO: must fail, target d failed + - endpoint: "http://127.0.0.1:8081" + type: "GET" + URL: "/render/?target=divideSeries(a,d)&format=json" + expectedResponse: + httpCode: 503 + contentType: "text/plain; charset=utf-8" + +listeners: + - address: ":9070" + expressions: + "a": + pathExpression: "a" + emptyBody: true + code: 503 + + # 503 + "c": + pathExpression: "c" + mptyBody: true + code: 503 + + "d": + pathExpression: "d" + emptyBody: true + code: 503 + + - address: ":9071" + expressions: + "a": + pathExpression: "a" + data: + - metricName: "a" + values: [0,1,2,2,3] + + "c": + pathExpression: "c" + data: + - metricName: "c" + values: [0,1,2,2,4] + + "d": + pathExpression: "d" + emptyBody: true + code: 503 \ No newline at end of file diff --git a/zipper/broadcast/broadcast_group.go b/zipper/broadcast/broadcast_group.go index 023b5949e..b475a8e24 100644 --- a/zipper/broadcast/broadcast_group.go +++ b/zipper/broadcast/broadcast_group.go @@ -30,6 +30,7 @@ type BroadcastGroup struct { doMultipleRequestsIfSplit bool tldCacheDisabled bool concurrencyLimit int + requireSuccessAll bool fetcher types.Fetcher pathCache pathcache.PathCache @@ -112,6 +113,12 @@ func WithDialer(dialer *net.Dialer) Option { } } +func WithSuccess(requireSuccessAll bool) Option { + return func(bg *BroadcastGroup) { + bg.requireSuccessAll = requireSuccessAll + } +} + func New(opts ...Option) (*BroadcastGroup, merry.Error) { bg := &BroadcastGroup{ limiter: limiter.NoopLimiter{}, @@ -152,7 +159,7 @@ func (bg *BroadcastGroup) SetDoMultipleRequestIfSplit(v bool) { } } -func NewBroadcastGroup(logger *zap.Logger, groupName string, doMultipleRequestsIfSplit bool, servers []types.BackendServer, expireDelaySec int32, concurrencyLimit, maxBatchSize int, timeouts types.Timeouts, tldCacheDisabled bool) (*BroadcastGroup, merry.Error) { +func NewBroadcastGroup(logger *zap.Logger, groupName string, doMultipleRequestsIfSplit bool, servers []types.BackendServer, expireDelaySec int32, concurrencyLimit, maxBatchSize int, timeouts types.Timeouts, tldCacheDisabled bool, requireSuccessAll bool) (*BroadcastGroup, merry.Error) { return New( WithLogger(logger), WithGroupName(groupName), @@ -163,6 +170,7 @@ func NewBroadcastGroup(logger *zap.Logger, groupName string, doMultipleRequestsI WithMaxMetricsPerRequest(maxBatchSize), WithTimeouts(timeouts), WithTLDCache(!tldCacheDisabled), + WithSuccess(requireSuccessAll), ) } @@ -299,12 +307,14 @@ func (bg *BroadcastGroup) doSingleFetch(ctx context.Context, logger *zap.Logger, // TODO(Civil): migrate limiter to merry requests, splitErr := bg.splitRequest(ctx, request, backend) - if len(requests) == 0 && splitErr != nil { - response := types.NewServerFetchResponse() - response.Server = backend.Name() - response.AddError(splitErr) - resCh <- response - return + if len(requests) == 0 { + if splitErr != nil { + response := types.NewServerFetchResponse() + response.Server = backend.Name() + response.AddError(splitErr) + resCh <- response + return + } } logger = logger.With(zap.String("backend_name", backend.Name())) @@ -502,7 +512,7 @@ func (bg *BroadcastGroup) Fetch(ctx context.Context, request *protov3.MultiFetch ) } - if len(result.Response.Metrics) == 0 { + if len(result.Response.Metrics) == 0 || (bg.requireSuccessAll && len(result.Err) > 0) { code, errors := helper.MergeHttpErrors(result.Err) if len(errors) > 0 { err := types.ErrFailedToFetch.WithHTTPCode(code).WithMessage(strings.Join(errors, "\n")) @@ -529,10 +539,22 @@ func (bg *BroadcastGroup) Fetch(ctx context.Context, request *protov3.MultiFetch ) var err merry.Error - if result.Err != nil && len(result.Err) > 0 { - err = types.ErrNonFatalErrors - for _, e := range result.Err { - err = err.WithCause(e) + if len(result.Err) > 0 { + if bg.requireSuccessAll { + code, errors := helper.MergeHttpErrors(result.Err) + if len(errors) > 0 { + err := types.ErrFailedToFetch.WithHTTPCode(code).WithMessage(strings.Join(errors, "\n")) + logger.Debug("errors while fetching data from backends", + zap.Int("httpCode", code), + zap.Strings("errors", errors), + ) + return nil, result.Stats, err + } + } else { + err = types.ErrNonFatalErrors + for _, e := range result.Err { + err = err.WithCause(e) + } } } @@ -605,25 +627,19 @@ func (bg *BroadcastGroup) Find(ctx context.Context, request *protov3.MultiGlobRe ) } - if len(result.Response.Metrics) == 0 { - nonNotFoundErrors := types.ReturnNonNotFoundError(result.Err) - if nonNotFoundErrors != nil { - err := types.ErrFailedToFetch.WithHTTPCode(500) - for _, e := range nonNotFoundErrors { - err = err.WithCause(e) - } - logger.Debug("non-404 errors while fetching data from backends", - zap.Any("errors", result.Err), + var err merry.Error + if len(result.Response.Metrics) == 0 || (bg.requireSuccessAll && len(result.Err) > 0) { + code, errors := helper.MergeHttpErrors(result.Err) + if len(errors) > 0 { + err = types.ErrFailedToFetch.WithHTTPCode(code).WithMessage(strings.Join(errors, "\n")) + logger.Debug("errors while fetching data from backends", + zap.Int("httpCode", code), + zap.Strings("errors", errors), ) - return &protov3.MultiGlobResponse{}, result.Stats, err + return nil, result.Stats, err } - - return &protov3.MultiGlobResponse{}, result.Stats, types.ErrNotFound.WithHTTPCode(404) - } - result.Stats.TotalMetricsCount = 0 - for _, x := range result.Response.Metrics { - result.Stats.TotalMetricsCount += uint64(len(x.Matches)) } + logger.Debug("got some find responses", zap.Int("backends_count", len(backends)), zap.Int("response_count", responseCount), @@ -632,7 +648,14 @@ func (bg *BroadcastGroup) Find(ctx context.Context, request *protov3.MultiGlobRe zap.Any("response", result.Response), ) - var err merry.Error + if len(result.Response.Metrics) == 0 { + return &protov3.MultiGlobResponse{}, result.Stats, types.ErrNotFound.WithHTTPCode(404) + } + result.Stats.TotalMetricsCount = 0 + for _, x := range result.Response.Metrics { + result.Stats.TotalMetricsCount += uint64(len(x.Matches)) + } + if result.Err != nil { err = types.ErrNonFatalErrors for _, e := range result.Err { @@ -705,7 +728,11 @@ func (bg *BroadcastGroup) Info(ctx context.Context, request *protov3.MultiMetric var err merry.Error if result.Err != nil { - err = types.ErrNonFatalErrors + if bg.requireSuccessAll { + err = types.ErrFailedToFetch + } else { + err = types.ErrNonFatalErrors + } for _, e := range result.Err { err = err.WithCause(e) } diff --git a/zipper/broadcast/broadcast_group_test.go b/zipper/broadcast/broadcast_group_test.go index 6125f5d88..0df33f9c4 100644 --- a/zipper/broadcast/broadcast_group_test.go +++ b/zipper/broadcast/broadcast_group_test.go @@ -68,7 +68,7 @@ func TestNewBroadcastGroup(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - b, err := NewBroadcastGroup(logger, tt.name, true, tt.servers, 60, 500, 100, timeouts, false) + b, err := NewBroadcastGroup(logger, tt.name, true, tt.servers, 60, 500, 100, timeouts, false, false) if !errorsAreEqual(err, tt.expectedErr) { t.Fatalf("unexpected error %v, expected %v", err, tt.expectedErr) } @@ -107,18 +107,7 @@ func TestProbeTLDs(t *testing.T) { } for _, tt := range tests { - b, err := New( - WithLogger(logger), - WithGroupName(tt.name), - WithSplitMultipleRequests(true), - WithBackends(tt.servers), - WithPathCache(60), - WithLimiter(500), - WithMaxMetricsPerRequest(100), - WithTimeouts(timeouts), - WithTLDCache(true), - ) - + b, err := NewBroadcastGroup(logger, tt.name, true, tt.servers, 60, 500, 100, timeouts, false, false) if err != nil { t.Fatalf("unexpected error %v", err) } diff --git a/zipper/config/config.go b/zipper/config/config.go index f096d22a2..3abf5ad42 100644 --- a/zipper/config/config.go +++ b/zipper/config/config.go @@ -25,6 +25,7 @@ type Config struct { FallbackMaxBatchSize int `mapstructure:"-"` MaxTries int `mapstructure:"maxTries"` DoMultipleRequestsIfSplit bool `mapstructure:"doMultipleRequestsIfSplit"` + RequireSuccessAll bool `mapstructure:"requireSuccessAll"` // require full success for upstreams queries (for multi-target query) ExpireDelaySec int32 TLDCacheDisabled bool `mapstructure:"tldCacheDisabled"` diff --git a/zipper/helper/requests.go b/zipper/helper/requests.go index 6b047a266..b31e99a13 100644 --- a/zipper/helper/requests.go +++ b/zipper/helper/requests.go @@ -98,6 +98,32 @@ func requestError(err error, server string) merry.Error { return types.ErrResponceError.WithValue("server", server) } +func HttpErrorCode(err merry.Error) (code int) { + if err == nil { + code = http.StatusOK + } else { + c := merry.RootCause(err) + if c == nil { + c = err + } + + code = merry.HTTPCode(err) + if code == http.StatusNotFound { + return + } else if code == http.StatusInternalServerError && merry.Is(c, parser.ErrInvalidArg) { + // check for invalid args, see applyByNode rewrite function + code = http.StatusBadRequest + } + + if code == http.StatusGatewayTimeout || code == http.StatusBadGateway || merry.Is(c, types.ErrFailedToFetch) { + // simplify code, one error type for communications errors, all we can retry + code = http.StatusServiceUnavailable + } + } + + return +} + func MergeHttpErrors(errors []merry.Error) (int, []string) { returnCode := http.StatusNotFound errMsgs := make([]string, 0) diff --git a/zipper/metadata/metadata.go b/zipper/metadata/metadata.go index 8057da53e..ac1f846cf 100644 --- a/zipper/metadata/metadata.go +++ b/zipper/metadata/metadata.go @@ -13,12 +13,12 @@ import ( type md struct { sync.RWMutex SupportedProtocols map[string]struct{} - ProtocolInits map[string]func(*zap.Logger, types.BackendV2, bool) (types.BackendServer, merry.Error) - ProtocolInitsWithLimiter map[string]func(*zap.Logger, types.BackendV2, bool, limiter.ServerLimiter) (types.BackendServer, merry.Error) + ProtocolInits map[string]func(*zap.Logger, types.BackendV2, bool, bool) (types.BackendServer, merry.Error) + ProtocolInitsWithLimiter map[string]func(*zap.Logger, types.BackendV2, bool, bool, limiter.ServerLimiter) (types.BackendServer, merry.Error) } var Metadata = md{ SupportedProtocols: make(map[string]struct{}), - ProtocolInits: make(map[string]func(*zap.Logger, types.BackendV2, bool) (types.BackendServer, merry.Error)), - ProtocolInitsWithLimiter: make(map[string]func(*zap.Logger, types.BackendV2, bool, limiter.ServerLimiter) (types.BackendServer, merry.Error)), + ProtocolInits: make(map[string]func(*zap.Logger, types.BackendV2, bool, bool) (types.BackendServer, merry.Error)), + ProtocolInitsWithLimiter: make(map[string]func(*zap.Logger, types.BackendV2, bool, bool, limiter.ServerLimiter) (types.BackendServer, merry.Error)), } diff --git a/zipper/protocols/auto/auto_group.go b/zipper/protocols/auto/auto_group.go index 686a0b53e..e10e62e11 100644 --- a/zipper/protocols/auto/auto_group.go +++ b/zipper/protocols/auto/auto_group.go @@ -35,7 +35,7 @@ type capabilityResponse struct { protocol string } -//_internal/capabilities/ +// _internal/capabilities/ func doQuery(ctx context.Context, logger *zap.Logger, groupName string, httpClient *http.Client, limiter limiter.ServerLimiter, server string, request types.Request, resChan chan<- capabilityResponse) { httpQuery := helper.NewHttpQuery(groupName, []string{server}, 1, limiter, httpClient, httpHeaders.ContentTypeCarbonAPIv3PB) rewrite, _ := url.Parse("http://127.0.0.1/_internal/capabilities/") @@ -136,11 +136,11 @@ type AutoGroup struct { groupName string } -func NewWithLimiter(logger *zap.Logger, config types.BackendV2, tldCacheDisabled bool, limiter limiter.ServerLimiter) (types.BackendServer, merry.Error) { +func NewWithLimiter(logger *zap.Logger, config types.BackendV2, tldCacheDisabled, requireSuccessAll bool, limiter limiter.ServerLimiter) (types.BackendServer, merry.Error) { return nil, merry.New("auto group doesn't support anything useful except for New") } -func New(logger *zap.Logger, config types.BackendV2, tldCacheDisabled bool) (types.BackendServer, merry.Error) { +func New(logger *zap.Logger, config types.BackendV2, tldCacheDisabled, requireSuccessAll bool) (types.BackendServer, merry.Error) { logger = logger.With(zap.String("type", "autoGroup"), zap.String("name", config.GroupName)) if config.ConcurrencyLimit == nil { @@ -176,7 +176,7 @@ func New(logger *zap.Logger, config types.BackendV2, tldCacheDisabled bool) (typ cfg := config cfg.GroupName = config.GroupName + "_" + proto cfg.Servers = servers - c, ePtr := backendInit(logger, cfg, tldCacheDisabled) + c, ePtr := backendInit(logger, cfg, tldCacheDisabled, requireSuccessAll) if ePtr != nil { return nil, ePtr } @@ -184,17 +184,8 @@ func New(logger *zap.Logger, config types.BackendV2, tldCacheDisabled bool) (typ backends = append(backends, c) } - return broadcast.New( - broadcast.WithLogger(logger), - broadcast.WithGroupName(config.GroupName+"_broadcast"), - broadcast.WithSplitMultipleRequests(config.DoMultipleRequestsIfSplit), - broadcast.WithBackends(backends), - broadcast.WithPathCache(600), - broadcast.WithLimiter(*config.ConcurrencyLimit), - broadcast.WithMaxMetricsPerRequest(*config.MaxBatchSize), - broadcast.WithTimeouts(*config.Timeouts), - broadcast.WithTLDCache(!tldCacheDisabled), - ) + return broadcast.NewBroadcastGroup(logger, config.GroupName+"_broadcast", config.DoMultipleRequestsIfSplit, backends, + 600, *config.ConcurrencyLimit, *config.MaxBatchSize, *config.Timeouts, tldCacheDisabled, requireSuccessAll) } func (c AutoGroup) MaxMetricsPerRequest() int { diff --git a/zipper/protocols/graphite/graphite_group.go b/zipper/protocols/graphite/graphite_group.go index 26854dc57..fc6d1a46a 100644 --- a/zipper/protocols/graphite/graphite_group.go +++ b/zipper/protocols/graphite/graphite_group.go @@ -55,7 +55,7 @@ func (g *GraphiteGroup) Children() []types.BackendServer { return []types.BackendServer{g} } -func NewWithLimiter(logger *zap.Logger, config types.BackendV2, tldCacheDisabled bool, limiter limiter.ServerLimiter) (types.BackendServer, merry.Error) { +func NewWithLimiter(logger *zap.Logger, config types.BackendV2, tldCacheDisabled, requireSuccessAll bool, limiter limiter.ServerLimiter) (types.BackendServer, merry.Error) { logger = logger.With(zap.String("type", "graphite"), zap.String("protocol", config.Protocol), zap.String("name", config.GroupName)) httpClient := helper.GetHTTPClient(logger, config) @@ -79,7 +79,7 @@ func NewWithLimiter(logger *zap.Logger, config types.BackendV2, tldCacheDisabled return c, nil } -func New(logger *zap.Logger, config types.BackendV2, tldCacheDisabled bool) (types.BackendServer, merry.Error) { +func New(logger *zap.Logger, config types.BackendV2, tldCacheDisabled, requireSuccessAll bool) (types.BackendServer, merry.Error) { if config.ConcurrencyLimit == nil { return nil, types.ErrConcurrencyLimitNotSet } @@ -88,7 +88,7 @@ func New(logger *zap.Logger, config types.BackendV2, tldCacheDisabled bool) (typ } limiter := limiter.NewServerLimiter([]string{config.GroupName}, *config.ConcurrencyLimit) - return NewWithLimiter(logger, config, tldCacheDisabled, limiter) + return NewWithLimiter(logger, config, tldCacheDisabled, requireSuccessAll, limiter) } func (c GraphiteGroup) MaxMetricsPerRequest() int { diff --git a/zipper/protocols/irondb/irondb_group.go b/zipper/protocols/irondb/irondb_group.go index 8dc2d4031..416809783 100644 --- a/zipper/protocols/irondb/irondb_group.go +++ b/zipper/protocols/irondb/irondb_group.go @@ -50,7 +50,7 @@ type IronDBGroup struct { graphitePrefix string } -func NewWithLimiter(logger *zap.Logger, config types.BackendV2, tldCacheDisabled bool, limiter limiter.ServerLimiter) (types.BackendServer, merry.Error) { +func NewWithLimiter(logger *zap.Logger, config types.BackendV2, tldCacheDisabled, requireSuccessAll bool, limiter limiter.ServerLimiter) (types.BackendServer, merry.Error) { logger = logger.With(zap.String("type", "irondb"), zap.String("protocol", config.Protocol), zap.String("name", config.GroupName)) logger.Warn("support for this backend protocol is experimental, use with caution") @@ -215,7 +215,7 @@ func NewWithLimiter(logger *zap.Logger, config types.BackendV2, tldCacheDisabled return c, nil } -func New(logger *zap.Logger, config types.BackendV2, tldCacheDisabled bool) (types.BackendServer, merry.Error) { +func New(logger *zap.Logger, config types.BackendV2, tldCacheDisabled, requireSuccessAll bool) (types.BackendServer, merry.Error) { if config.ConcurrencyLimit == nil { return nil, types.ErrConcurrencyLimitNotSet } @@ -224,7 +224,7 @@ func New(logger *zap.Logger, config types.BackendV2, tldCacheDisabled bool) (typ } l := limiter.NewServerLimiter([]string{config.GroupName}, *config.ConcurrencyLimit) - return NewWithLimiter(logger, config, tldCacheDisabled, l) + return NewWithLimiter(logger, config, tldCacheDisabled, requireSuccessAll, l) } func (c *IronDBGroup) Children() []types.BackendServer { diff --git a/zipper/protocols/prometheus/prometheus_group.go b/zipper/protocols/prometheus/prometheus_group.go index 130776b48..261d537da 100644 --- a/zipper/protocols/prometheus/prometheus_group.go +++ b/zipper/protocols/prometheus/prometheus_group.go @@ -77,7 +77,7 @@ type PrometheusGroup struct { httpQuery *helper.HttpQuery } -func NewWithLimiter(logger *zap.Logger, config types.BackendV2, tldCacheDisabled bool, limiter limiter.ServerLimiter) (types.BackendServer, merry.Error) { +func NewWithLimiter(logger *zap.Logger, config types.BackendV2, tldCacheDisabled, requireSuccessAll bool, limiter limiter.ServerLimiter) (types.BackendServer, merry.Error) { logger = logger.With(zap.String("type", "prometheus"), zap.String("protocol", config.Protocol), zap.String("name", config.GroupName)) logger.Warn("support for this backend protocol is experimental, use with caution") @@ -172,10 +172,10 @@ func NewWithLimiter(logger *zap.Logger, config types.BackendV2, tldCacheDisabled httpQuery := helper.NewHttpQuery(config.GroupName, config.Servers, *config.MaxTries, limiter, httpClient, httpHeaders.ContentTypeCarbonAPIv2PB) - return NewWithEverythingInitialized(logger, config, tldCacheDisabled, limiter, step, maxPointsPerQuery, forceMinStepInterval, delay, httpQuery, httpClient) + return NewWithEverythingInitialized(logger, config, tldCacheDisabled, requireSuccessAll, limiter, step, maxPointsPerQuery, forceMinStepInterval, delay, httpQuery, httpClient) } -func NewWithEverythingInitialized(logger *zap.Logger, config types.BackendV2, tldCacheDisabled bool, limiter limiter.ServerLimiter, step, maxPointsPerQuery int64, forceMinStepInterval time.Duration, delay StartDelay, httpQuery *helper.HttpQuery, httpClient *http.Client) (types.BackendServer, merry.Error) { +func NewWithEverythingInitialized(logger *zap.Logger, config types.BackendV2, tldCacheDisabled, requireSuccessAll bool, limiter limiter.ServerLimiter, step, maxPointsPerQuery int64, forceMinStepInterval time.Duration, delay StartDelay, httpQuery *helper.HttpQuery, httpClient *http.Client) (types.BackendServer, merry.Error) { c := &PrometheusGroup{ groupName: config.GroupName, servers: config.Servers, @@ -197,7 +197,7 @@ func NewWithEverythingInitialized(logger *zap.Logger, config types.BackendV2, tl return c, nil } -func New(logger *zap.Logger, config types.BackendV2, tldCacheDisabled bool) (types.BackendServer, merry.Error) { +func New(logger *zap.Logger, config types.BackendV2, tldCacheDisabled, requireSuccessAll bool) (types.BackendServer, merry.Error) { if config.ConcurrencyLimit == nil { return nil, types.ErrConcurrencyLimitNotSet } @@ -206,7 +206,7 @@ func New(logger *zap.Logger, config types.BackendV2, tldCacheDisabled bool) (typ } l := limiter.NewServerLimiter([]string{config.GroupName}, *config.ConcurrencyLimit) - return NewWithLimiter(logger, config, tldCacheDisabled, l) + return NewWithLimiter(logger, config, tldCacheDisabled, requireSuccessAll, l) } func (c *PrometheusGroup) Children() []types.BackendServer { diff --git a/zipper/protocols/v2/protobuf_group.go b/zipper/protocols/v2/protobuf_group.go index 083307f8b..065fd1a40 100644 --- a/zipper/protocols/v2/protobuf_group.go +++ b/zipper/protocols/v2/protobuf_group.go @@ -58,7 +58,7 @@ func (c *ClientProtoV2Group) Children() []types.BackendServer { return []types.BackendServer{c} } -func NewWithLimiter(logger *zap.Logger, config types.BackendV2, tldCacheDisabled bool, l limiter.ServerLimiter) (types.BackendServer, merry.Error) { +func NewWithLimiter(logger *zap.Logger, config types.BackendV2, tldCacheDisabled, requireSuccessAll bool, l limiter.ServerLimiter) (types.BackendServer, merry.Error) { logger = logger.With(zap.String("type", "protoV2Group"), zap.String("name", config.GroupName)) httpClient := helper.GetHTTPClient(logger, config) @@ -82,7 +82,7 @@ func NewWithLimiter(logger *zap.Logger, config types.BackendV2, tldCacheDisabled return c, nil } -func New(logger *zap.Logger, config types.BackendV2, tldCacheDisabled bool) (types.BackendServer, merry.Error) { +func New(logger *zap.Logger, config types.BackendV2, tldCacheDisabled, requireSuccessAll bool) (types.BackendServer, merry.Error) { if config.ConcurrencyLimit == nil { return nil, types.ErrConcurrencyLimitNotSet } @@ -91,7 +91,7 @@ func New(logger *zap.Logger, config types.BackendV2, tldCacheDisabled bool) (typ } limiter := limiter.NewServerLimiter(config.Servers, *config.ConcurrencyLimit) - return NewWithLimiter(logger, config, tldCacheDisabled, limiter) + return NewWithLimiter(logger, config, tldCacheDisabled, requireSuccessAll, limiter) } func (c ClientProtoV2Group) MaxMetricsPerRequest() int { diff --git a/zipper/protocols/v3/protobuf_group.go b/zipper/protocols/v3/protobuf_group.go index 033e35c4a..35a08677f 100644 --- a/zipper/protocols/v3/protobuf_group.go +++ b/zipper/protocols/v3/protobuf_group.go @@ -52,7 +52,7 @@ func (c *ClientProtoV3Group) Children() []types.BackendServer { return []types.BackendServer{c} } -func New(logger *zap.Logger, config types.BackendV2, tldCacheDisabled bool) (types.BackendServer, merry.Error) { +func New(logger *zap.Logger, config types.BackendV2, tldCacheDisabled, requireSuccessAll bool) (types.BackendServer, merry.Error) { if config.ConcurrencyLimit == nil { return nil, types.ErrConcurrencyLimitNotSet } @@ -61,10 +61,10 @@ func New(logger *zap.Logger, config types.BackendV2, tldCacheDisabled bool) (typ } l := limiter.NewServerLimiter(config.Servers, *config.ConcurrencyLimit) - return NewWithLimiter(logger, config, tldCacheDisabled, l) + return NewWithLimiter(logger, config, tldCacheDisabled, requireSuccessAll, l) } -func NewWithLimiter(logger *zap.Logger, config types.BackendV2, tldCacheDisabled bool, l limiter.ServerLimiter) (types.BackendServer, merry.Error) { +func NewWithLimiter(logger *zap.Logger, config types.BackendV2, tldCacheDisabled, requireSuccessAll bool, l limiter.ServerLimiter) (types.BackendServer, merry.Error) { logger = logger.With(zap.String("type", "protoV3Group"), zap.String("name", config.GroupName)) httpClient := helper.GetHTTPClient(logger, config) diff --git a/zipper/protocols/victoriametrics/victoriametrics_group.go b/zipper/protocols/victoriametrics/victoriametrics_group.go index 00031ebda..855a5a809 100644 --- a/zipper/protocols/victoriametrics/victoriametrics_group.go +++ b/zipper/protocols/victoriametrics/victoriametrics_group.go @@ -64,7 +64,7 @@ type VictoriaMetricsGroup struct { featureSet atomic.Value // *vmSupportedFeatures } -func NewWithLimiter(logger *zap.Logger, config types.BackendV2, tldCacheDisabled bool, limiter limiter.ServerLimiter) (types.BackendServer, merry.Error) { +func NewWithLimiter(logger *zap.Logger, config types.BackendV2, tldCacheDisabled, requireSuccessAll bool, limiter limiter.ServerLimiter) (types.BackendServer, merry.Error) { logger = logger.With(zap.String("type", "victoriametrics"), zap.String("protocol", config.Protocol), zap.String("name", config.GroupName)) httpClient := helper.GetHTTPClient(logger, config) @@ -225,7 +225,7 @@ func NewWithLimiter(logger *zap.Logger, config types.BackendV2, tldCacheDisabled } promLogger := logger.With(zap.String("subclass", "prometheus")) - c.BackendServer, _ = prometheus.NewWithEverythingInitialized(promLogger, config, tldCacheDisabled, limiter, step, maxPointsPerQuery, forceMinStepInterval, delay, httpQuery, httpClient) + c.BackendServer, _ = prometheus.NewWithEverythingInitialized(promLogger, config, tldCacheDisabled, requireSuccessAll, limiter, step, maxPointsPerQuery, forceMinStepInterval, delay, httpQuery, httpClient) c.updateFeatureSet(context.Background()) @@ -240,7 +240,7 @@ func NewWithLimiter(logger *zap.Logger, config types.BackendV2, tldCacheDisabled return c, nil } -func New(logger *zap.Logger, config types.BackendV2, tldCacheDisabled bool) (types.BackendServer, merry.Error) { +func New(logger *zap.Logger, config types.BackendV2, tldCacheDisabled, requireSuccessAll bool) (types.BackendServer, merry.Error) { if config.ConcurrencyLimit == nil { return nil, types.ErrConcurrencyLimitNotSet } @@ -249,5 +249,5 @@ func New(logger *zap.Logger, config types.BackendV2, tldCacheDisabled bool) (typ } l := limiter.NewServerLimiter([]string{config.GroupName}, *config.ConcurrencyLimit) - return NewWithLimiter(logger, config, tldCacheDisabled, l) + return NewWithLimiter(logger, config, tldCacheDisabled, requireSuccessAll, l) } diff --git a/zipper/types/response.go b/zipper/types/response.go index 3a3d6e3d3..9227fdce4 100644 --- a/zipper/types/response.go +++ b/zipper/types/response.go @@ -12,7 +12,7 @@ import ( ) // type Fetcher func(ctx context.Context, logger *zap.Logger, client types.BackendServer, reqs interface{}, resCh chan<- types.ServerFetchResponse) { -//type Fetcher func(ctx context.Context, logger *zap.Logger, client BackendServer, reqs interface{}, resCh chan ServerFetchResponse) { +// type Fetcher func(ctx context.Context, logger *zap.Logger, client BackendServer, reqs interface{}, resCh chan ServerFetchResponse) { type Fetcher func(ctx context.Context, logger *zap.Logger, client BackendServer, reqs interface{}, resCh chan ServerFetcherResponse) type ServerFetcherResponse interface { @@ -52,9 +52,11 @@ GATHER: select { case res := <-resCh: answeredServers[res.GetServer()] = struct{}{} - _ = result.MergeI(res) - responseCount++ - + if err := result.MergeI(res); err == nil { + responseCount++ + } else { + result.AddError(err) + } case <-ctx.Done(): err := ErrTimeoutExceeded.WithValue("timedout_backends", NoAnswerBackends(clients, answeredServers)) result.AddError(err) diff --git a/zipper/zipper.go b/zipper/zipper.go index 70d769b7a..b1c3a4187 100644 --- a/zipper/zipper.go +++ b/zipper/zipper.go @@ -46,7 +46,7 @@ type Zipper struct { logger *zap.Logger } -func createBackendsV2(logger *zap.Logger, backends types.BackendsV2, expireDelaySec int32, tldCacheDisabled bool) ([]types.BackendServer, merry.Error) { +func createBackendsV2(logger *zap.Logger, backends types.BackendsV2, expireDelaySec int32, tldCacheDisabled, requireSuccessAll bool) ([]types.BackendServer, merry.Error) { backendServers := make([]types.BackendServer, 0) var e merry.Error timeouts := backends.Timeouts @@ -110,7 +110,7 @@ func createBackendsV2(logger *zap.Logger, backends types.BackendsV2, expireDelay ) } if lbMethod == types.RoundRobinLB { - backendServer, e = backendInit(logger, backend, tldCacheDisabled) + backendServer, e = backendInit(logger, backend, tldCacheDisabled, requireSuccessAll) if e != nil { return nil, e } @@ -121,23 +121,15 @@ func createBackendsV2(logger *zap.Logger, backends types.BackendsV2, expireDelay for _, server := range backend.Servers { config.Servers = []string{server} config.GroupName = server - backendServer, e = backendInit(logger, config, tldCacheDisabled) + backendServer, e = backendInit(logger, config, tldCacheDisabled, requireSuccessAll) if e != nil { return nil, e } backendServers = append(backendServers, backendServer) } - backendServer, err = broadcast.New( - broadcast.WithLogger(logger), - broadcast.WithGroupName(backend.GroupName), - broadcast.WithSplitMultipleRequests(backend.DoMultipleRequestsIfSplit), - broadcast.WithBackends(backendServers), - broadcast.WithPathCache(expireDelaySec), - broadcast.WithLimiter(*backend.ConcurrencyLimit), - broadcast.WithMaxMetricsPerRequest(*backend.MaxBatchSize), - broadcast.WithTimeouts(timeouts), - broadcast.WithTLDCache(!tldCacheDisabled), + backendServer, err = broadcast.NewBroadcastGroup(logger, backend.GroupName, backend.DoMultipleRequestsIfSplit, backendServers, + expireDelaySec, *backend.ConcurrencyLimit, *backend.MaxBatchSize, timeouts, tldCacheDisabled, requireSuccessAll, ) if err != nil { return nil, merry.Wrap(err) @@ -154,25 +146,16 @@ func NewZipper(sender func(*types.Stats), cfg *config.Config, logger *zap.Logger cfg = config.SanitizeConfig(logger, *cfg) } - backends, err := createBackendsV2(logger, cfg.BackendsV2, int32(cfg.InternalRoutingCache.Seconds()), cfg.TLDCacheDisabled) + backends, err := createBackendsV2(logger, cfg.BackendsV2, int32(cfg.InternalRoutingCache.Seconds()), cfg.TLDCacheDisabled, cfg.RequireSuccessAll) if err != nil { logger.Fatal("errors while initialing zipper store backend", zap.Any("error", err), ) } - logger.Error("DEBUG ERROR LOGGGGG", zap.Any("cfg", cfg)) - broadcastGroup, err := broadcast.New( - broadcast.WithLogger(logger), - broadcast.WithGroupName("root"), - broadcast.WithSplitMultipleRequests(cfg.DoMultipleRequestsIfSplit), - broadcast.WithBackends(backends), - broadcast.WithPathCache(int32(cfg.InternalRoutingCache.Seconds())), - broadcast.WithLimiter(cfg.ConcurrencyLimitPerServer), - broadcast.WithMaxMetricsPerRequest(*cfg.MaxBatchSize), - broadcast.WithTimeouts(cfg.Timeouts), - broadcast.WithTLDCache(cfg.TLDCacheDisabled), + broadcastGroup, err := broadcast.NewBroadcastGroup(logger, "root", cfg.DoMultipleRequestsIfSplit, backends, + int32(cfg.InternalRoutingCache.Seconds()), cfg.ConcurrencyLimitPerServer, *cfg.MaxBatchSize, cfg.Timeouts, cfg.TLDCacheDisabled, cfg.RequireSuccessAll, ) if err != nil { logger.Fatal("error while initialing zipper store backend", @@ -284,9 +267,9 @@ func (z Zipper) FindProtoV3(ctx context.Context, request *protov3.MultiGlobReque if len(findResponse.Err) > 0 { var e merry.Error if len(findResponse.Err) == 1 { - e = findResponse.Err[0] + e = helper.HttpErrorByCode(findResponse.Err[0]) } else { - e = findResponse.Err[1].WithCause(findResponse.Err[0]) + e = helper.HttpErrorByCode(findResponse.Err[1].WithCause(findResponse.Err[0])) } logger.Debug("had errors while fetching result", zap.Any("errors", e),