From 7a40a71aedcd5f64d7df2a3c2a74b024bd287e54 Mon Sep 17 00:00:00 2001 From: Ashutosh Narkar Date: Wed, 5 Apr 2023 18:30:55 -0700 Subject: [PATCH] Include trace and span identifier in decision logs Currently OPA's decision logs do not include the trace and span identifier associated with a given request handled by the server. This information if available can be helpful to correlate logs and trace data. This change updates the decision log format to now include the trace and span identifier if present. Fixes: #5230 Signed-off-by: Ashutosh Narkar --- docs/content/management-decision-logs.md | 36 +-- plugins/logs/plugin.go | 4 + server/buffer.go | 2 + server/server.go | 7 + .../distributedtracing_test.go | 223 ++++++++++++++++++ 5 files changed, 255 insertions(+), 17 deletions(-) diff --git a/docs/content/management-decision-logs.md b/docs/content/management-decision-logs.md index ebc414399d..6ea27ad7f9 100644 --- a/docs/content/management-decision-logs.md +++ b/docs/content/management-decision-logs.md @@ -60,23 +60,25 @@ represents a policy decision returned by OPA. Decision log updates contain the following fields: -| Field | Type | Description | -| --- | --- | --- | -| `[_].labels` | `object` | Set of key-value pairs that uniquely identify the OPA instance. | -| `[_].decision_id` | `string` | Unique identifier generated for each decision for traceability. | -| `[_].bundles` | `object` | Set of key-value pairs describing the bundles which contained policy used to produce the decision. | -| `[_].bundles[_].revision` | `string` | Revision of the bundle at the time of evaluation. | -| `[_].path` | `string` | Hierarchical policy decision path, e.g., `/http/example/authz/allow`. Receivers should tolerate slash-prefixed paths. | -| `[_].query` | `string` | Ad-hoc Rego query received by Query API. | -| `[_].input` | `any` | Input data provided in the policy query. | -| `[_].result` | `any` | Policy decision returned to the client, e.g., `true` or `false`. | -| `[_].requested_by` | `string` | Identifier for client that executed policy query, e.g., the client address. | -| `[_].timestamp` | `string` | RFC3999 timestamp of policy decision. | -| `[_].metrics` | `object` | Key-value pairs of [performance metrics](../rest-api#performance-metrics). | -| `[_].erased` | `array[string]` | Set of JSON Pointers specifying fields in the event that were erased. | -| `[_].masked` | `array[string]` | Set of JSON Pointers specifying fields in the event that were masked. | -| `[_].nd_builtin_cache` | `object` | Key-value pairs of non-deterministic builtin names, paired with objects specifying the input/output mappings for each unique invocation of that builtin during policy evaluation. Intended for use in debugging and decision replay. Receivers will need to decode the JSON using Rego's JSON decoders. | -| `[_].req_id` | `number` | Incremental request identifier, and unique only to the OPA instance, for the request that started the policy query. The attribute value is the same as the value present in others logs (request, response, and print) and could be used to correlate them all. This attribute will be included just when OPA runtime is initialized in server mode and the log level is equal to or greater than info. | +| Field | Type | Description | +|---------------------------| --- |--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `[_].labels` | `object` | Set of key-value pairs that uniquely identify the OPA instance. | +| `[_].decision_id` | `string` | Unique identifier generated for each decision for traceability. | +| `[_].trace_id` | `string` | Unique identifier of a trace generated for each incoming request for traceability. This is a hex string representation compliant with the W3C trace-context specification. See more at https://www.w3.org/TR/trace-context/#trace-id. | +| `[_].span_id` | `string` | Unique identifier of a span in a trace to assist traceability. This is a hex string representation compliant with the W3C trace-context specification. See more at https://www.w3.org/TR/trace-context/#parent-id. | +| `[_].bundles` | `object` | Set of key-value pairs describing the bundles which contained policy used to produce the decision. | +| `[_].bundles[_].revision` | `string` | Revision of the bundle at the time of evaluation. | +| `[_].path` | `string` | Hierarchical policy decision path, e.g., `/http/example/authz/allow`. Receivers should tolerate slash-prefixed paths. | +| `[_].query` | `string` | Ad-hoc Rego query received by Query API. | +| `[_].input` | `any` | Input data provided in the policy query. | +| `[_].result` | `any` | Policy decision returned to the client, e.g., `true` or `false`. | +| `[_].requested_by` | `string` | Identifier for client that executed policy query, e.g., the client address. | +| `[_].timestamp` | `string` | RFC3999 timestamp of policy decision. | +| `[_].metrics` | `object` | Key-value pairs of [performance metrics](../rest-api#performance-metrics). | +| `[_].erased` | `array[string]` | Set of JSON Pointers specifying fields in the event that were erased. | +| `[_].masked` | `array[string]` | Set of JSON Pointers specifying fields in the event that were masked. | +| `[_].nd_builtin_cache` | `object` | Key-value pairs of non-deterministic builtin names, paired with objects specifying the input/output mappings for each unique invocation of that builtin during policy evaluation. Intended for use in debugging and decision replay. Receivers will need to decode the JSON using Rego's JSON decoders. | +| `[_].req_id` | `number` | Incremental request identifier, and unique only to the OPA instance, for the request that started the policy query. The attribute value is the same as the value present in others logs (request, response, and print) and could be used to correlate them all. This attribute will be included just when OPA runtime is initialized in server mode and the log level is equal to or greater than info. | If the decision log was successfully uploaded to the remote service, it should respond with an HTTP 2xx status. If the service responds with a non-2xx status, OPA will requeue the last chunk containing decision log events and upload it diff --git a/plugins/logs/plugin.go b/plugins/logs/plugin.go index 9a9c9cfdd7..661906cc34 100644 --- a/plugins/logs/plugin.go +++ b/plugins/logs/plugin.go @@ -47,6 +47,8 @@ type Logger interface { type EventV1 struct { Labels map[string]string `json:"labels"` DecisionID string `json:"decision_id"` + TraceID string `json:"trace_id,omitempty"` + SpanID string `json:"span_id,omitempty"` Revision string `json:"revision,omitempty"` // Deprecated: Use Bundles instead Bundles map[string]BundleInfoV1 `json:"bundles,omitempty"` Path string `json:"path,omitempty"` @@ -597,6 +599,8 @@ func (p *Plugin) Log(ctx context.Context, decision *server.Info) error { event := EventV1{ Labels: p.manager.Labels(), DecisionID: decision.DecisionID, + TraceID: decision.TraceID, + SpanID: decision.SpanID, Revision: decision.Revision, Bundles: bundles, Path: decision.Path, diff --git a/server/buffer.go b/server/buffer.go index ec58dbab41..634cdd006e 100644 --- a/server/buffer.go +++ b/server/buffer.go @@ -19,6 +19,8 @@ type Info struct { Revision string // Deprecated: Use `Bundles` instead Bundles map[string]BundleInfo DecisionID string + TraceID string + SpanID string RemoteAddr string Query string Path string diff --git a/server/server.go b/server/server.go index 15a4c8a6ca..8129e9a456 100644 --- a/server/server.go +++ b/server/server.go @@ -1519,6 +1519,7 @@ func (s *Server) v1DataGet(w http.ResponseWriter, r *http.Request) { return } } + err = logger.Log(ctx, txn, urlPath, "", goInput, input, nil, ndbCache, nil, m) if err != nil { writer.ErrorAuto(w, err) @@ -3011,6 +3012,12 @@ func (l decisionLogger) Log(ctx context.Context, txn storage.Transaction, path s info.NDBuiltinCache = &x } + sctx := trace.SpanFromContext(ctx).SpanContext() + if sctx.IsValid() { + info.TraceID = sctx.TraceID().String() + info.SpanID = sctx.SpanID().String() + } + if l.logger != nil { if err := l.logger(ctx, info); err != nil { return fmt.Errorf("decision_logs: %w", err) diff --git a/test/e2e/distributedtracing/distributedtracing_test.go b/test/e2e/distributedtracing/distributedtracing_test.go index 4c0b60f0c2..6fea8c77a2 100644 --- a/test/e2e/distributedtracing/distributedtracing_test.go +++ b/test/e2e/distributedtracing/distributedtracing_test.go @@ -13,6 +13,8 @@ import ( "strings" "testing" + "github.com/open-policy-agent/opa/logging/test" + "github.com/open-policy-agent/opa/runtime" "github.com/open-policy-agent/opa/server" "github.com/open-policy-agent/opa/test/e2e" "github.com/open-policy-agent/opa/tracing" @@ -119,6 +121,101 @@ func TestServerSpan(t *testing.T) { }) } +func TestServerSpanWithDecisionLogging(t *testing.T) { + // setup + spanExp := tracetest.NewInMemoryExporter() + options := tracing.NewOptions( + otelhttp.WithTracerProvider(trace.NewTracerProvider(trace.WithSpanProcessor(trace.NewSimpleSpanProcessor(spanExp)))), + ) + + testServerParams := e2e.NewAPIServerTestParams() + testServerParams.ConfigOverrides = []string{ + "decision_logs.console=true", + } + + // Ensure decisions are logged regardless of regular log level + testServerParams.Logging = runtime.LoggingConfig{Level: "error"} + consoleLogger := test.New() + testServerParams.ConsoleLogger = consoleLogger + + testServerParams.DistributedTracingOpts = options + + e2e.WithRuntime(t, e2e.TestRuntimeOpts{}, testServerParams, func(rt *e2e.TestRuntime) { + + spanExp.Reset() + rt.ConsoleLogger = consoleLogger + + mr, err := http.Post(rt.URL()+"/v1/data", "application/json", nil) + if err != nil { + t.Fatal(err) + } + defer mr.Body.Close() + + if mr.StatusCode != http.StatusOK { + t.Fatalf("expected status %v but got %v", http.StatusOK, mr.StatusCode) + } + + spans := spanExp.GetSpans() + if got, expected := len(spans), 1; got != expected { + t.Fatalf("got %d span(s), expected %d", got, expected) + } + if !spans[0].SpanContext.IsValid() { + t.Fatalf("invalid span created: %#v", spans[0].SpanContext) + } + + if got, expected := spans[0].SpanKind.String(), "server"; got != expected { + t.Fatalf("Expected span kind to be %q but got %q", expected, got) + } + + var entry test.LogEntry + var found bool + + for _, entry = range rt.ConsoleLogger.Entries() { + if entry.Message == "Decision Log" { + found = true + } + } + + if !found { + t.Fatalf("Did not find 'Decision Log' event in captured log entries") + } + + // Check for some important fields + expectedFields := map[string]*struct { + found bool + match func(*testing.T, string) + }{ + "labels": {}, + "decision_id": {}, + "trace_id": {}, + "span_id": {}, + "result": {}, + "timestamp": {}, + "type": {match: func(t *testing.T, actual string) { + if actual != "openpolicyagent.org/decision_logs" { + t.Fatalf("Expected field 'type' to be 'openpolicyagent.org/decision_logs'") + } + }}, + } + + // Ensure expected fields exist + for fieldName, rawField := range entry.Fields { + if fd, ok := expectedFields[fieldName]; ok { + if fieldValue, ok := rawField.(string); ok && fd.match != nil { + fd.match(t, fieldValue) + } + fd.found = true + } + } + + for field, fd := range expectedFields { + if !fd.found { + t.Errorf("Missing expected field in decision log: %s\n\nEntry: %+v\n\n", field, entry) + } + } + }) +} + // TestClientSpan asserts that for all handlers that end up evaluating policies, the // http.send calls will emit the proper spans related to the incoming requests. // @@ -330,6 +427,132 @@ func TestClientSpan(t *testing.T) { }) } +func TestClientSpanWithDecisionLogging(t *testing.T) { + // setup + spanExp := tracetest.NewInMemoryExporter() + options := tracing.NewOptions( + otelhttp.WithTracerProvider(trace.NewTracerProvider(trace.WithSpanProcessor(trace.NewSimpleSpanProcessor(spanExp)))), + ) + + testServerParams := e2e.NewAPIServerTestParams() + testServerParams.ConfigOverrides = []string{ + "decision_logs.console=true", + } + + // Ensure decisions are logged regardless of regular log level + testServerParams.Logging = runtime.LoggingConfig{Level: "error"} + consoleLogger := test.New() + testServerParams.ConsoleLogger = consoleLogger + + testServerParams.DistributedTracingOpts = options + + e2e.WithRuntime(t, e2e.TestRuntimeOpts{}, testServerParams, func(rt *e2e.TestRuntime) { + + spanExp.Reset() + rt.ConsoleLogger = consoleLogger + + policy := ` + package test + + response := http.send({"method": "get", "url": "%s/health"}) + ` + + policy = fmt.Sprintf(policy, testRuntime.URL()) + err := rt.UploadPolicy(t.Name(), strings.NewReader(policy)) + if err != nil { + t.Fatal(err) + } + + mr, err := http.Post(rt.URL()+"/v1/data/test", "application/json", nil) + if err != nil { + t.Fatal(err) + } + defer mr.Body.Close() + + if mr.StatusCode != http.StatusOK { + t.Fatalf("expected status %v but got %v", http.StatusOK, mr.StatusCode) + } + + spans := spanExp.GetSpans() + // Ordered by span emission, which is the reverse of the processing + // code flow: + // 3 = GET /health (HTTP server handler) + // + http.send (HTTP client instrumentation) + // + GET /v1/data/test (HTTP server handler) + if got, expected := len(spans), 3; got != expected { + t.Fatalf("got %d span(s), expected %d", got, expected) + } + + if !spans[1].SpanContext.IsValid() { + t.Fatalf("invalid span created: %#v", spans[1].SpanContext) + } + if got, expected := spans[1].SpanKind.String(), "client"; got != expected { + t.Fatalf("Expected span kind to be %q but got %q", expected, got) + } + + parentTraceID := spans[2].SpanContext.TraceID() + parentSpanID := spans[2].SpanContext.SpanID() + if got, expected := spans[1].Parent.SpanID(), parentSpanID; got != expected { + t.Errorf("expected span to be child of %v, got parent %v", expected, got) + } + + var entry test.LogEntry + var found bool + + for _, entry = range rt.ConsoleLogger.Entries() { + if entry.Message == "Decision Log" { + found = true + } + } + + if !found { + t.Fatalf("Did not find 'Decision Log' event in captured log entries") + } + + // Check for some important fields + expectedFields := map[string]*struct { + found bool + match func(*testing.T, string) + }{ + "labels": {}, + "decision_id": {}, + "trace_id": {match: func(t *testing.T, actual string) { + if actual != parentTraceID.String() { + t.Fatalf("Expected field 'trace_id' to be %v", parentTraceID.String()) + } + }}, + "span_id": {match: func(t *testing.T, actual string) { + if actual != parentSpanID.String() { + t.Fatalf("Expected field 'span_id' to be %v", parentSpanID.String()) + } + }}, + "result": {}, + "timestamp": {}, + "type": {match: func(t *testing.T, actual string) { + if actual != "openpolicyagent.org/decision_logs" { + t.Fatalf("Expected field 'type' to be 'openpolicyagent.org/decision_logs'") + } + }}, + } + + // Ensure expected fields exist + for fieldName, rawField := range entry.Fields { + if fd, ok := expectedFields[fieldName]; ok { + if fieldValue, ok := rawField.(string); ok && fd.match != nil { + fd.match(t, fieldValue) + } + fd.found = true + } + } + + for field, fd := range expectedFields { + if !fd.found { + t.Errorf("Missing expected field in decision log: %s\n\nEntry: %+v\n\n", field, entry) + } + } + }) +} + func TestServerSpanWithSystemAuthzPolicy(t *testing.T) { // setup