diff --git a/pkg/thanos/thanos.go b/pkg/thanos/thanos.go index 38c6f0d..8e37d97 100644 --- a/pkg/thanos/thanos.go +++ b/pkg/thanos/thanos.go @@ -11,10 +11,37 @@ type PartialResponseRoundTripper struct { Allow bool } +// AdditionalHeadersRoundTripper adds a new RoundTripper to the chain that sets additional static headers. +type AdditionalHeadersRoundTripper struct { + http.RoundTripper + Headers map[string][]string +} + // RoundTrip implements the RoundTripper interface. func (t *PartialResponseRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { - q := req.URL.Query() + r2 := new(http.Request) + *r2 = *req + q := r2.URL.Query() q.Set("partial_response", strconv.FormatBool(t.Allow)) - req.URL.RawQuery = q.Encode() + r2.URL.RawQuery = q.Encode() + req = r2 return t.RoundTripper.RoundTrip(req) } + +// RoundTrip implements the http.RoundTripper interface. +func (a *AdditionalHeadersRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) { + // The specification of http.RoundTripper says that it shouldn't mutate + // the request so make a copy of req.Header since this is all that is + // modified. + r2 := new(http.Request) + *r2 = *r + r2.Header = make(http.Header) + for k, s := range r.Header { + r2.Header[k] = s + } + for k, s := range a.Headers { + r2.Header[k] = s + } + r = r2 + return a.RoundTripper.RoundTrip(r) +} diff --git a/pkg/thanos/thanos_test.go b/pkg/thanos/thanos_test.go index cf23c09..963e4df 100644 --- a/pkg/thanos/thanos_test.go +++ b/pkg/thanos/thanos_test.go @@ -53,6 +53,53 @@ func TestPartialResponseRoundTripper_X(t *testing.T) { } } +func TestAdditionalHeadersResponseRoundTripper_X(t *testing.T) { + testCases := []struct { + url string + headers map[string][]string + }{ + { + url: "https://thanos.io", + headers: map[string][]string{ + "X-Test-Header": []string{"foobar"}, + }, + }, + { + url: "https://thanos.io?testly=blub", + headers: map[string][]string{}, + }, + { + url: "https://thanos.io", + headers: map[string][]string{ + "X-Test-One": []string{"one"}, + "X-Test-Two": []string{"two"}, + }, + }, + { + url: "https://thanos.io?testly=blub", + headers: map[string][]string{ + "X-Test-One": []string{"one", "two", "three"}, + "X-Test-Two": []string{"two"}, + }, + }, + } + for _, tC := range testCases { + t.Run(fmt.Sprintf("headers %v, url %s", tC.headers, tC.url), func(t *testing.T) { + rt := AdditionalHeadersRoundTripper{ + RoundTripper: roundTripFunc(func(r *http.Request) (*http.Response, error) { + for k, s := range tC.headers { + require.Equal(t, r.Header.Values(k), s) + } + return nil, errors.New("not implemented") + }), + Headers: tC.headers, + } + + _, _ = rt.RoundTrip(httptest.NewRequest("GET", tC.url, nil)) + }) + } +} + type roundTripFunc func(r *http.Request) (*http.Response, error) func (s roundTripFunc) RoundTrip(r *http.Request) (*http.Response, error) { diff --git a/report_command.go b/report_command.go index 08836b7..b0148d3 100644 --- a/report_command.go +++ b/report_command.go @@ -24,6 +24,7 @@ type reportCommand struct { PromQueryTimeout time.Duration ThanosAllowPartialResponses bool + OrgId string } var reportCommandName = "report" @@ -48,6 +49,8 @@ func newReportCommand() *cli.Command { EnvVars: envVars("PROM_QUERY_TIMEOUT"), Destination: &command.PromQueryTimeout, Required: false}, &cli.BoolFlag{Name: "thanos-allow-partial-responses", Usage: "Allows partial responses from Thanos. Can be helpful when querying a Thanos cluster with lost data.", EnvVars: envVars("THANOS_ALLOW_PARTIAL_RESPONSES"), Destination: &command.ThanosAllowPartialResponses, Required: false, DefaultText: "false"}, + &cli.StringFlag{Name: "org-id", Usage: "Sets the X-Scope-OrgID header to this value on requests to Prometheus", Value: "", + EnvVars: envVars("ORG_ID"), Destination: &command.OrgId, Required: false, DefaultText: "empty"}, }, } } @@ -62,7 +65,7 @@ func (cmd *reportCommand) execute(cliCtx *cli.Context) error { ctx := cliCtx.Context log := AppLogger(ctx).WithName(reportCommandName) - promClient, err := newPrometheusAPIClient(cmd.PrometheusURL, cmd.ThanosAllowPartialResponses) + promClient, err := newPrometheusAPIClient(cmd.PrometheusURL, cmd.ThanosAllowPartialResponses, cmd.OrgId) if err != nil { return fmt.Errorf("could not create prometheus client: %w", err) } @@ -128,13 +131,26 @@ func (cmd *reportCommand) runReport(ctx context.Context, db *sqlx.DB, promClient return tx.Commit() } -func newPrometheusAPIClient(promURL string, thanosAllowPartialResponses bool) (apiv1.API, error) { +func newPrometheusAPIClient(promURL string, thanosAllowPartialResponses bool, orgId string) (apiv1.API, error) { + rt := api.DefaultRoundTripper + rt = &thanos.PartialResponseRoundTripper{ + RoundTripper: rt, + Allow: thanosAllowPartialResponses, + } + + if orgId != "" { + rt = &thanos.AdditionalHeadersRoundTripper{ + RoundTripper: rt, + Headers: map[string][]string{ + "X-Scope-OrgID": []string{orgId}, + }, + } + } + client, err := api.NewClient(api.Config{ - Address: promURL, - RoundTripper: &thanos.PartialResponseRoundTripper{ - RoundTripper: api.DefaultRoundTripper, - Allow: thanosAllowPartialResponses, - }, + Address: promURL, + RoundTripper: rt, }) + return apiv1.NewAPI(client), err } diff --git a/tenantmapping_command.go b/tenantmapping_command.go index 73c2948..8e03b4d 100644 --- a/tenantmapping_command.go +++ b/tenantmapping_command.go @@ -25,6 +25,7 @@ type tmapCommand struct { AdditionalMetricSelector string ThanosAllowPartialResponses bool + OrgId string } var tenantmappingCommandName = "tenantmapping" @@ -51,6 +52,8 @@ func newTmapCommand() *cli.Command { EnvVars: envVars("DRY_RUN"), Destination: &command.DryRun, Required: false, DefaultText: "false"}, &cli.StringFlag{Name: "additional-metric-selector", Usage: "Allows further specifying which metrics to choose. Example: --additional-metric-selector='namespace=\"testing\"'", EnvVars: envVars("ADDITIONAL_METRIC_SELECTOR"), Destination: &command.AdditionalMetricSelector, Required: false, DefaultText: "false"}, + &cli.StringFlag{Name: "org-id", Usage: "Sets the X-Scope-OrgID header to this value on requests to Prometheus", Value: "", + EnvVars: envVars("ORG_ID"), Destination: &command.OrgId, Required: false, DefaultText: "empty"}, }, } } @@ -67,7 +70,7 @@ func (cmd *tmapCommand) execute(cliCtx *cli.Context) error { // We really need to fix the inane dance around the AppLogger which needs custom plumbing and can't be used from packages because of import cycles. ctx = logr.NewContext(ctx, log) - promClient, err := newPrometheusAPIClient(cmd.PrometheusURL, cmd.ThanosAllowPartialResponses) + promClient, err := newPrometheusAPIClient(cmd.PrometheusURL, cmd.ThanosAllowPartialResponses, cmd.OrgId) if err != nil { return fmt.Errorf("could not create prometheus client: %w", err) }