Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for x scope org id header in loki source api #1805

1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Main (unreleased)

- Add the function `path_join` to the stdlib. (@wildum)
- Add support to `loki.source.syslog` for the RFC3164 format ("BSD syslog"). (@sushain97)
- Add support to `loki.source.api` to be able to extract the tenant from the HTTP `X-Scope-OrgID` header (@QuentinBisson)
- (_Experimental_) Add a `loki.secretfilter` component to redact secrets from collected logs.

### Enhancements
Expand Down
6 changes: 6 additions & 0 deletions docs/sources/reference/components/loki/loki.source.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ loki.source.api "loki_push_api" {
}
```

### Technical details

`loki.source.api` filters out all labels that start with `__` like `__tenant_id__`.

If you need to be able to set the tenant id, you should either make sure the `X-Scope-OrgID` header or use the `loki.process` component.
QuentinBisson marked this conversation as resolved.
Show resolved Hide resolved

<!-- START GENERATED COMPATIBLE COMPONENTS -->

## Compatible components
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/go-kit/log"
"github.com/gorilla/mux"
"github.com/grafana/dskit/tenant"
"github.com/grafana/dskit/user"
"github.com/grafana/loki/v3/pkg/loghttp/push"
"github.com/grafana/loki/v3/pkg/logproto"
util_log "github.com/grafana/loki/v3/pkg/util/log"
Expand All @@ -22,6 +23,7 @@ import (
promql_parser "github.com/prometheus/prometheus/promql/parser"

"github.com/grafana/alloy/internal/component/common/loki"
"github.com/grafana/alloy/internal/component/common/loki/client"
fnet "github.com/grafana/alloy/internal/component/common/net"
frelabel "github.com/grafana/alloy/internal/component/common/relabel"
"github.com/grafana/alloy/internal/runtime/logging/level"
Expand Down Expand Up @@ -64,21 +66,38 @@ func (s *PushAPIServer) Run() error {
level.Info(s.logger).Log("msg", "starting push API server")

err := s.server.MountAndRun(func(router *mux.Router) {

// Extract the tenant ID from the request and add it to the context.
tenantHeaderExtractor := func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, ctx, _ := user.ExtractOrgIDFromHTTPRequest(r)
next.ServeHTTP(w, r.WithContext(ctx))
})
}

// This redirecting is so we can avoid breaking changes where we originally implemented it with
// the loki prefix.
router.Path("/api/v1/push").Methods("POST").Handler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
r.URL.Path = "/loki/api/v1/push"
r.RequestURI = "/loki/api/v1/push"
s.handleLoki(w, r)
}))
router.Path("/api/v1/raw").Methods("POST").Handler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
r.URL.Path = "/loki/api/v1/raw"
r.RequestURI = "/loki/api/v1/raw"
s.handlePlaintext(w, r)
}))
router.Path("/api/v1/push").Methods("POST").Handler(
tenantHeaderExtractor(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
r.URL.Path = "/loki/api/v1/push"
r.RequestURI = "/loki/api/v1/push"
s.handleLoki(w, r)
}),
),
)
router.Path("/api/v1/raw").Methods("POST").Handler(
tenantHeaderExtractor(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
r.URL.Path = "/loki/api/v1/raw"
r.RequestURI = "/loki/api/v1/raw"
s.handlePlaintext(w, r)
}),
),
)
router.Path("/ready").Methods("GET").Handler(http.HandlerFunc(s.ready))
router.Path("/loki/api/v1/push").Methods("POST").Handler(http.HandlerFunc(s.handleLoki))
router.Path("/loki/api/v1/raw").Methods("POST").Handler(http.HandlerFunc(s.handlePlaintext))
router.Path("/loki/api/v1/push").Methods("POST").Handler(tenantHeaderExtractor(http.HandlerFunc(s.handleLoki)))
router.Path("/loki/api/v1/raw").Methods("POST").Handler(tenantHeaderExtractor(http.HandlerFunc(s.handlePlaintext)))
})
return err
}
Expand Down Expand Up @@ -137,10 +156,10 @@ func (s *PushAPIServer) getRelabelRules() []*relabel.Config {
// Only the HTTP handler functions are copied to allow for Alloy-specific server configuration and lifecycle management.
func (s *PushAPIServer) handleLoki(w http.ResponseWriter, r *http.Request) {
logger := util_log.WithContext(r.Context(), util_log.Logger)
userID, _ := tenant.TenantID(r.Context())
tenantID, _ := tenant.TenantID(r.Context())
req, err := push.ParseRequest(
logger,
userID,
tenantID,
r,
nil, // tenants retention
nil, // limits
Expand Down Expand Up @@ -190,6 +209,11 @@ func (s *PushAPIServer) handleLoki(w http.ResponseWriter, r *http.Request) {
filtered[model.LabelName(processed[i].Name)] = model.LabelValue(processed[i].Value)
}

// Add tenant ID to the filtered labels if it is set
if tenantID != "" {
filtered[model.LabelName(client.ReservedLabelTenantID)] = model.LabelValue(tenantID)
}

for _, entry := range stream.Entries {
e := loki.Entry{
Labels: filtered.Clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,98 @@ regex = "dropme"
pt.Shutdown()
}

func TestLokiPushTargetWithXScopeOrgIDHeader(t *testing.T) {
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)
pt, port, eh := createPushServer(t, logger)

pt.SetLabels(model.LabelSet{
"pushserver": "pushserver1",
"dropme": "label",
})
pt.SetKeepTimestamp(true)

relabelRule := frelabel.Config{}
relabelStr := `
action = "labeldrop"
regex = "dropme"
`
err := syntax.Unmarshal([]byte(relabelStr), &relabelRule)
require.NoError(t, err)
pt.SetRelabelRules(frelabel.Rules{&relabelRule})

// Build a client to send logs
serverURL := flagext.URLValue{}
err = serverURL.Set("http://" + localhost + ":" + strconv.Itoa(port) + "/api/v1/push")
require.NoError(t, err)

ccfg := client.Config{
URL: serverURL,
Timeout: 1 * time.Second,
BatchWait: 1 * time.Second,
BatchSize: 100 * 1024,
Headers: map[string]string{
"X-Scope-OrgID": "tenant1",
},
}
m := client.NewMetrics(prometheus.DefaultRegisterer)
pc, err := client.New(m, ccfg, 0, 0, false, logger)
require.NoError(t, err)
defer pc.Stop()

// Send some logs
labels := model.LabelSet{
"stream": "stream1",
"__anotherdroplabel": "dropme",
}
for i := 0; i < 100; i++ {
pc.Chan() <- loki.Entry{
Labels: labels,
Entry: logproto.Entry{
Timestamp: time.Unix(int64(i), 0),
Line: "line" + strconv.Itoa(i),
StructuredMetadata: push.LabelsAdapter{
{Name: "i", Value: strconv.Itoa(i)},
{Name: "anotherMetaData", Value: "val"},
},
},
}
}

// Wait for them to appear in the test handler
countdown := 10000
for len(eh.Received()) != 100 && countdown > 0 {
time.Sleep(1 * time.Millisecond)
countdown--
}

// Make sure we didn't timeout
require.Equal(t, 100, len(eh.Received()))

// Verify labels
expectedLabels := model.LabelSet{
"pushserver": "pushserver1",
"stream": "stream1",
"__tenant_id__": "tenant1",
}

expectedStructuredMetadata := push.LabelsAdapter{
{Name: "i", Value: strconv.Itoa(0)},
{Name: "anotherMetaData", Value: "val"},
}

// Spot check the first value in the result to make sure relabel rules were applied properly
require.Equal(t, expectedLabels, eh.Received()[0].Labels)

// Spot check the first value in the result to make sure structured metadata was received properly
require.Equal(t, expectedStructuredMetadata, eh.Received()[0].StructuredMetadata)

// With keep timestamp enabled, verify timestamp
require.Equal(t, time.Unix(99, 0).Unix(), eh.Received()[99].Timestamp.Unix())

pt.Shutdown()
}

func TestPlaintextPushTarget(t *testing.T) {
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)
Expand Down Expand Up @@ -267,6 +359,85 @@ func TestPlaintextPushTarget(t *testing.T) {
pt.Shutdown()
}

func TestPlaintextPushTargetWithXScopeOrgIDHeader(t *testing.T) {
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)
//Create PushAPIServerOld
eh := fake.NewClient(func() {})
defer eh.Stop()

// Get a randomly available port by open and closing a TCP socket
addr, err := net.ResolveTCPAddr("tcp", localhost+":0")
require.NoError(t, err)
l, err := net.ListenTCP("tcp", addr)
require.NoError(t, err)
port := l.Addr().(*net.TCPAddr).Port
err = l.Close()
require.NoError(t, err)

serverConfig := &fnet.ServerConfig{
HTTP: &fnet.HTTPConfig{
ListenAddress: localhost,
ListenPort: port,
},
GRPC: &fnet.GRPCConfig{ListenPort: getFreePort(t)},
}

pt, err := NewPushAPIServer(logger, serverConfig, eh, prometheus.NewRegistry())
require.NoError(t, err)

err = pt.Run()
require.NoError(t, err)

pt.SetLabels(model.LabelSet{
"pushserver": "pushserver2",
"keepme": "label",
})
pt.SetKeepTimestamp(true)

// Send some logs
ts := time.Now()
body := new(bytes.Buffer)
client := &http.Client{}
for i := 0; i < 100; i++ {
body.WriteString("line" + strconv.Itoa(i))
url := fmt.Sprintf("http://%s:%d/api/v1/raw", localhost, port)

// Create a new request
req, err := http.NewRequest("POST", url, body)
require.NoError(t, err)
req.Header.Add("Content-Type", "text/json")
req.Header.Add("X-Scope-OrgID", "tenant1")
resp, err := client.Do(req)
require.NoError(t, err)
defer resp.Body.Close()
body.Reset()
}

// Wait for them to appear in the test handler
countdown := 10000
for len(eh.Received()) != 100 && countdown > 0 {
time.Sleep(1 * time.Millisecond)
countdown--
}

// Make sure we didn't timeout
require.Equal(t, 100, len(eh.Received()))

// Verify labels
expectedLabels := model.LabelSet{
"pushserver": "pushserver2",
"keepme": "label",
}
// Spot check the first value in the result to make sure relabel rules were applied properly
require.Equal(t, expectedLabels, eh.Received()[0].Labels)

// Timestamp is always set in the handler, we expect received timestamps to be slightly higher than the timestamp when we started sending logs.
require.GreaterOrEqual(t, eh.Received()[99].Timestamp.Unix(), ts.Unix())

pt.Shutdown()
}

func TestReady(t *testing.T) {
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)
Expand Down