Skip to content

Commit

Permalink
Include trace and span identifier in decision logs
Browse files Browse the repository at this point in the history
Currently OPA's decision logs do not include the trace
and span identifier associated with a given request
handled by the server. This information if available
can be helpful to correlate logs and trace data.

This change updates the decision log format to now
include the trace and span identifier if present.

Fixes: open-policy-agent#5230

Signed-off-by: Ashutosh Narkar <anarkar4387@gmail.com>
  • Loading branch information
ashutosh-narkar committed Apr 6, 2023
1 parent 39a252d commit 7a40a71
Show file tree
Hide file tree
Showing 5 changed files with 255 additions and 17 deletions.
36 changes: 19 additions & 17 deletions docs/content/management-decision-logs.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,23 +60,25 @@ represents a policy decision returned by OPA.

Decision log updates contain the following fields:

| Field | Type | Description |
| --- | --- | --- |
| `[_].labels` | `object` | Set of key-value pairs that uniquely identify the OPA instance. |
| `[_].decision_id` | `string` | Unique identifier generated for each decision for traceability. |
| `[_].bundles` | `object` | Set of key-value pairs describing the bundles which contained policy used to produce the decision. |
| `[_].bundles[_].revision` | `string` | Revision of the bundle at the time of evaluation. |
| `[_].path` | `string` | Hierarchical policy decision path, e.g., `/http/example/authz/allow`. Receivers should tolerate slash-prefixed paths. |
| `[_].query` | `string` | Ad-hoc Rego query received by Query API. |
| `[_].input` | `any` | Input data provided in the policy query. |
| `[_].result` | `any` | Policy decision returned to the client, e.g., `true` or `false`. |
| `[_].requested_by` | `string` | Identifier for client that executed policy query, e.g., the client address. |
| `[_].timestamp` | `string` | RFC3999 timestamp of policy decision. |
| `[_].metrics` | `object` | Key-value pairs of [performance metrics](../rest-api#performance-metrics). |
| `[_].erased` | `array[string]` | Set of JSON Pointers specifying fields in the event that were erased. |
| `[_].masked` | `array[string]` | Set of JSON Pointers specifying fields in the event that were masked. |
| `[_].nd_builtin_cache` | `object` | Key-value pairs of non-deterministic builtin names, paired with objects specifying the input/output mappings for each unique invocation of that builtin during policy evaluation. Intended for use in debugging and decision replay. Receivers will need to decode the JSON using Rego's JSON decoders. |
| `[_].req_id` | `number` | Incremental request identifier, and unique only to the OPA instance, for the request that started the policy query. The attribute value is the same as the value present in others logs (request, response, and print) and could be used to correlate them all. This attribute will be included just when OPA runtime is initialized in server mode and the log level is equal to or greater than info. |
| Field | Type | Description |
|---------------------------| --- |--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `[_].labels` | `object` | Set of key-value pairs that uniquely identify the OPA instance. |
| `[_].decision_id` | `string` | Unique identifier generated for each decision for traceability. |
| `[_].trace_id` | `string` | Unique identifier of a trace generated for each incoming request for traceability. This is a hex string representation compliant with the W3C trace-context specification. See more at https://www.w3.org/TR/trace-context/#trace-id. |
| `[_].span_id` | `string` | Unique identifier of a span in a trace to assist traceability. This is a hex string representation compliant with the W3C trace-context specification. See more at https://www.w3.org/TR/trace-context/#parent-id. |
| `[_].bundles` | `object` | Set of key-value pairs describing the bundles which contained policy used to produce the decision. |
| `[_].bundles[_].revision` | `string` | Revision of the bundle at the time of evaluation. |
| `[_].path` | `string` | Hierarchical policy decision path, e.g., `/http/example/authz/allow`. Receivers should tolerate slash-prefixed paths. |
| `[_].query` | `string` | Ad-hoc Rego query received by Query API. |
| `[_].input` | `any` | Input data provided in the policy query. |
| `[_].result` | `any` | Policy decision returned to the client, e.g., `true` or `false`. |
| `[_].requested_by` | `string` | Identifier for client that executed policy query, e.g., the client address. |
| `[_].timestamp` | `string` | RFC3999 timestamp of policy decision. |
| `[_].metrics` | `object` | Key-value pairs of [performance metrics](../rest-api#performance-metrics). |
| `[_].erased` | `array[string]` | Set of JSON Pointers specifying fields in the event that were erased. |
| `[_].masked` | `array[string]` | Set of JSON Pointers specifying fields in the event that were masked. |
| `[_].nd_builtin_cache` | `object` | Key-value pairs of non-deterministic builtin names, paired with objects specifying the input/output mappings for each unique invocation of that builtin during policy evaluation. Intended for use in debugging and decision replay. Receivers will need to decode the JSON using Rego's JSON decoders. |
| `[_].req_id` | `number` | Incremental request identifier, and unique only to the OPA instance, for the request that started the policy query. The attribute value is the same as the value present in others logs (request, response, and print) and could be used to correlate them all. This attribute will be included just when OPA runtime is initialized in server mode and the log level is equal to or greater than info. |

If the decision log was successfully uploaded to the remote service, it should respond with an HTTP 2xx status. If the
service responds with a non-2xx status, OPA will requeue the last chunk containing decision log events and upload it
Expand Down
4 changes: 4 additions & 0 deletions plugins/logs/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ type Logger interface {
type EventV1 struct {
Labels map[string]string `json:"labels"`
DecisionID string `json:"decision_id"`
TraceID string `json:"trace_id,omitempty"`
SpanID string `json:"span_id,omitempty"`
Revision string `json:"revision,omitempty"` // Deprecated: Use Bundles instead
Bundles map[string]BundleInfoV1 `json:"bundles,omitempty"`
Path string `json:"path,omitempty"`
Expand Down Expand Up @@ -597,6 +599,8 @@ func (p *Plugin) Log(ctx context.Context, decision *server.Info) error {
event := EventV1{
Labels: p.manager.Labels(),
DecisionID: decision.DecisionID,
TraceID: decision.TraceID,
SpanID: decision.SpanID,
Revision: decision.Revision,
Bundles: bundles,
Path: decision.Path,
Expand Down
2 changes: 2 additions & 0 deletions server/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ type Info struct {
Revision string // Deprecated: Use `Bundles` instead
Bundles map[string]BundleInfo
DecisionID string
TraceID string
SpanID string
RemoteAddr string
Query string
Path string
Expand Down
7 changes: 7 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1519,6 +1519,7 @@ func (s *Server) v1DataGet(w http.ResponseWriter, r *http.Request) {
return
}
}

err = logger.Log(ctx, txn, urlPath, "", goInput, input, nil, ndbCache, nil, m)
if err != nil {
writer.ErrorAuto(w, err)
Expand Down Expand Up @@ -3011,6 +3012,12 @@ func (l decisionLogger) Log(ctx context.Context, txn storage.Transaction, path s
info.NDBuiltinCache = &x
}

sctx := trace.SpanFromContext(ctx).SpanContext()
if sctx.IsValid() {
info.TraceID = sctx.TraceID().String()
info.SpanID = sctx.SpanID().String()
}

if l.logger != nil {
if err := l.logger(ctx, info); err != nil {
return fmt.Errorf("decision_logs: %w", err)
Expand Down
223 changes: 223 additions & 0 deletions test/e2e/distributedtracing/distributedtracing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"strings"
"testing"

"github.com/open-policy-agent/opa/logging/test"
"github.com/open-policy-agent/opa/runtime"
"github.com/open-policy-agent/opa/server"
"github.com/open-policy-agent/opa/test/e2e"
"github.com/open-policy-agent/opa/tracing"
Expand Down Expand Up @@ -119,6 +121,101 @@ func TestServerSpan(t *testing.T) {
})
}

func TestServerSpanWithDecisionLogging(t *testing.T) {
// setup
spanExp := tracetest.NewInMemoryExporter()
options := tracing.NewOptions(
otelhttp.WithTracerProvider(trace.NewTracerProvider(trace.WithSpanProcessor(trace.NewSimpleSpanProcessor(spanExp)))),
)

testServerParams := e2e.NewAPIServerTestParams()
testServerParams.ConfigOverrides = []string{
"decision_logs.console=true",
}

// Ensure decisions are logged regardless of regular log level
testServerParams.Logging = runtime.LoggingConfig{Level: "error"}
consoleLogger := test.New()
testServerParams.ConsoleLogger = consoleLogger

testServerParams.DistributedTracingOpts = options

e2e.WithRuntime(t, e2e.TestRuntimeOpts{}, testServerParams, func(rt *e2e.TestRuntime) {

spanExp.Reset()
rt.ConsoleLogger = consoleLogger

mr, err := http.Post(rt.URL()+"/v1/data", "application/json", nil)
if err != nil {
t.Fatal(err)
}
defer mr.Body.Close()

if mr.StatusCode != http.StatusOK {
t.Fatalf("expected status %v but got %v", http.StatusOK, mr.StatusCode)
}

spans := spanExp.GetSpans()
if got, expected := len(spans), 1; got != expected {
t.Fatalf("got %d span(s), expected %d", got, expected)
}
if !spans[0].SpanContext.IsValid() {
t.Fatalf("invalid span created: %#v", spans[0].SpanContext)
}

if got, expected := spans[0].SpanKind.String(), "server"; got != expected {
t.Fatalf("Expected span kind to be %q but got %q", expected, got)
}

var entry test.LogEntry
var found bool

for _, entry = range rt.ConsoleLogger.Entries() {
if entry.Message == "Decision Log" {
found = true
}
}

if !found {
t.Fatalf("Did not find 'Decision Log' event in captured log entries")
}

// Check for some important fields
expectedFields := map[string]*struct {
found bool
match func(*testing.T, string)
}{
"labels": {},
"decision_id": {},
"trace_id": {},
"span_id": {},
"result": {},
"timestamp": {},
"type": {match: func(t *testing.T, actual string) {
if actual != "openpolicyagent.org/decision_logs" {
t.Fatalf("Expected field 'type' to be 'openpolicyagent.org/decision_logs'")
}
}},
}

// Ensure expected fields exist
for fieldName, rawField := range entry.Fields {
if fd, ok := expectedFields[fieldName]; ok {
if fieldValue, ok := rawField.(string); ok && fd.match != nil {
fd.match(t, fieldValue)
}
fd.found = true
}
}

for field, fd := range expectedFields {
if !fd.found {
t.Errorf("Missing expected field in decision log: %s\n\nEntry: %+v\n\n", field, entry)
}
}
})
}

// TestClientSpan asserts that for all handlers that end up evaluating policies, the
// http.send calls will emit the proper spans related to the incoming requests.
//
Expand Down Expand Up @@ -330,6 +427,132 @@ func TestClientSpan(t *testing.T) {
})
}

func TestClientSpanWithDecisionLogging(t *testing.T) {
// setup
spanExp := tracetest.NewInMemoryExporter()
options := tracing.NewOptions(
otelhttp.WithTracerProvider(trace.NewTracerProvider(trace.WithSpanProcessor(trace.NewSimpleSpanProcessor(spanExp)))),
)

testServerParams := e2e.NewAPIServerTestParams()
testServerParams.ConfigOverrides = []string{
"decision_logs.console=true",
}

// Ensure decisions are logged regardless of regular log level
testServerParams.Logging = runtime.LoggingConfig{Level: "error"}
consoleLogger := test.New()
testServerParams.ConsoleLogger = consoleLogger

testServerParams.DistributedTracingOpts = options

e2e.WithRuntime(t, e2e.TestRuntimeOpts{}, testServerParams, func(rt *e2e.TestRuntime) {

spanExp.Reset()
rt.ConsoleLogger = consoleLogger

policy := `
package test
response := http.send({"method": "get", "url": "%s/health"})
`

policy = fmt.Sprintf(policy, testRuntime.URL())
err := rt.UploadPolicy(t.Name(), strings.NewReader(policy))
if err != nil {
t.Fatal(err)
}

mr, err := http.Post(rt.URL()+"/v1/data/test", "application/json", nil)
if err != nil {
t.Fatal(err)
}
defer mr.Body.Close()

if mr.StatusCode != http.StatusOK {
t.Fatalf("expected status %v but got %v", http.StatusOK, mr.StatusCode)
}

spans := spanExp.GetSpans()
// Ordered by span emission, which is the reverse of the processing
// code flow:
// 3 = GET /health (HTTP server handler)
// + http.send (HTTP client instrumentation)
// + GET /v1/data/test (HTTP server handler)
if got, expected := len(spans), 3; got != expected {
t.Fatalf("got %d span(s), expected %d", got, expected)
}

if !spans[1].SpanContext.IsValid() {
t.Fatalf("invalid span created: %#v", spans[1].SpanContext)
}
if got, expected := spans[1].SpanKind.String(), "client"; got != expected {
t.Fatalf("Expected span kind to be %q but got %q", expected, got)
}

parentTraceID := spans[2].SpanContext.TraceID()
parentSpanID := spans[2].SpanContext.SpanID()
if got, expected := spans[1].Parent.SpanID(), parentSpanID; got != expected {
t.Errorf("expected span to be child of %v, got parent %v", expected, got)
}

var entry test.LogEntry
var found bool

for _, entry = range rt.ConsoleLogger.Entries() {
if entry.Message == "Decision Log" {
found = true
}
}

if !found {
t.Fatalf("Did not find 'Decision Log' event in captured log entries")
}

// Check for some important fields
expectedFields := map[string]*struct {
found bool
match func(*testing.T, string)
}{
"labels": {},
"decision_id": {},
"trace_id": {match: func(t *testing.T, actual string) {
if actual != parentTraceID.String() {
t.Fatalf("Expected field 'trace_id' to be %v", parentTraceID.String())
}
}},
"span_id": {match: func(t *testing.T, actual string) {
if actual != parentSpanID.String() {
t.Fatalf("Expected field 'span_id' to be %v", parentSpanID.String())
}
}},
"result": {},
"timestamp": {},
"type": {match: func(t *testing.T, actual string) {
if actual != "openpolicyagent.org/decision_logs" {
t.Fatalf("Expected field 'type' to be 'openpolicyagent.org/decision_logs'")
}
}},
}

// Ensure expected fields exist
for fieldName, rawField := range entry.Fields {
if fd, ok := expectedFields[fieldName]; ok {
if fieldValue, ok := rawField.(string); ok && fd.match != nil {
fd.match(t, fieldValue)
}
fd.found = true
}
}

for field, fd := range expectedFields {
if !fd.found {
t.Errorf("Missing expected field in decision log: %s\n\nEntry: %+v\n\n", field, entry)
}
}
})
}

func TestServerSpanWithSystemAuthzPolicy(t *testing.T) {

// setup
Expand Down

0 comments on commit 7a40a71

Please sign in to comment.