Skip to content

Commit

Permalink
Add functions for attaching errors to spans
Browse files Browse the repository at this point in the history
The "spancheck" linter reminds us to call "Span.RecordError" when
returning an error. Two functions help with that: "tracing.Check" and
"tracing.Escape".
  • Loading branch information
cbandy committed Dec 2, 2024
1 parent c9c0526 commit 86efb44
Show file tree
Hide file tree
Showing 9 changed files with 172 additions and 42 deletions.
3 changes: 3 additions & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,11 @@ linters-settings:
no-unaliased: true

spancheck:
checks: [end, record-error]
extra-start-span-signatures:
- 'github.com/crunchydata/postgres-operator/internal/tracing.Start:opentelemetry'
ignore-check-signatures:
- 'tracing.Escape'

issues:
exclude-generated: strict
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (r *CrunchyBridgeClusterReconciler) Reconcile(ctx context.Context, req ctrl
// NotFound cannot be fixed by requeuing so ignore it. During background
// deletion, we receive delete events from crunchybridgecluster's dependents after
// crunchybridgecluster is deleted.
return ctrl.Result{}, client.IgnoreNotFound(err)
return ctrl.Result{}, tracing.Escape(span, client.IgnoreNotFound(err))
}

// Get and validate connection secret for requests
Expand All @@ -152,12 +152,12 @@ func (r *CrunchyBridgeClusterReconciler) Reconcile(ctx context.Context, req ctrl
// is not being deleted.
if result, err := r.handleDelete(ctx, crunchybridgecluster, key); err != nil {
log.Error(err, "deleting")
return ctrl.Result{}, err
return ctrl.Result{}, tracing.Escape(span, err)
} else if result != nil {
if log := log.V(1); log.Enabled() {
log.Info("deleting", "result", fmt.Sprintf("%+v", *result))
}
return *result, err
return *result, tracing.Escape(span, err)
}

// Wonder if there's a better way to handle adding/checking/removing statuses
Expand Down Expand Up @@ -190,7 +190,7 @@ func (r *CrunchyBridgeClusterReconciler) Reconcile(ctx context.Context, req ctrl
// Check if a cluster with the same name already exists
controllerResult, err := r.handleDuplicateClusterName(ctx, key, team, crunchybridgecluster)
if err != nil || controllerResult != nil {
return *controllerResult, err
return *controllerResult, tracing.Escape(span, err)
}

// if we've gotten here then no cluster exists with that name and we're missing the ID, ergo, create cluster
Expand All @@ -204,26 +204,26 @@ func (r *CrunchyBridgeClusterReconciler) Reconcile(ctx context.Context, req ctrl
// Get Cluster
err = r.handleGetCluster(ctx, key, crunchybridgecluster)
if err != nil {
return ctrl.Result{}, err
return ctrl.Result{}, tracing.Escape(span, err)
}

// Get Cluster Status
err = r.handleGetClusterStatus(ctx, key, crunchybridgecluster)
if err != nil {
return ctrl.Result{}, err
return ctrl.Result{}, tracing.Escape(span, err)
}

// Get Cluster Upgrade
err = r.handleGetClusterUpgrade(ctx, key, crunchybridgecluster)
if err != nil {
return ctrl.Result{}, err
return ctrl.Result{}, tracing.Escape(span, err)
}

// Reconcile roles and their secrets
err = r.reconcilePostgresRoles(ctx, key, crunchybridgecluster)
if err != nil {
log.Error(err, "issue reconciling postgres user roles/secrets")
return ctrl.Result{}, err
return ctrl.Result{}, tracing.Escape(span, err)
}

// For now, we skip updating until the upgrade status is cleared.
Expand Down
1 change: 1 addition & 0 deletions internal/controller/pgupgrade/pgupgrade_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func (r *PGUpgradeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
ctx, span := tracing.Start(ctx, "reconcile-pgupgrade")
log := logging.FromContext(ctx)
defer span.End()
defer func(s tracing.Span) { _ = tracing.Escape(s, err) }(span)

// Retrieve the upgrade from the client cache, if it exists. A deferred
// function below will send any changes to its Status field.
Expand Down
21 changes: 10 additions & 11 deletions internal/controller/postgrescluster/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,8 @@ func (r *Reconciler) Reconcile(
// cluster is deleted.
if err = client.IgnoreNotFound(err); err != nil {
log.Error(err, "unable to fetch PostgresCluster")
span.RecordError(err)
}
return runtime.ErrorWithBackoff(err)
return runtime.ErrorWithBackoff(tracing.Escape(span, err))
}

// Set any defaults that may not have been stored in the API. No DeepCopy
Expand All @@ -107,9 +106,8 @@ func (r *Reconciler) Reconcile(
// Check for and handle deletion of cluster. Return early if it is being
// deleted or there was an error.
if result, err := r.handleDelete(ctx, cluster); err != nil {
span.RecordError(err)
log.Error(err, "deleting")
return runtime.ErrorWithBackoff(err)
return runtime.ErrorWithBackoff(tracing.Escape(span, err))

} else if result != nil {
if log := log.V(1); log.Enabled() {
Expand All @@ -130,7 +128,7 @@ func (r *Reconciler) Reconcile(
// specifically allow reconciliation if the cluster is shutdown to
// facilitate upgrades, otherwise return
if !initialize.FromPointer(cluster.Spec.Shutdown) {
return runtime.ErrorWithBackoff(err)
return runtime.ErrorWithBackoff(tracing.Escape(span, err))
}
}
// Issue Warning Event if postgres version is EOL according to PostgreSQL:
Expand All @@ -154,7 +152,7 @@ func (r *Reconciler) Reconcile(
path := field.NewPath("spec", "standby")
err := field.Invalid(path, cluster.Name, "Standby requires a host or repoName to be enabled")
r.Recorder.Event(cluster, corev1.EventTypeWarning, "InvalidStandbyConfiguration", err.Error())
return runtime.ErrorWithBackoff(err)
return runtime.ErrorWithBackoff(tracing.Escape(span, err))
}

var (
Expand Down Expand Up @@ -208,7 +206,7 @@ func (r *Reconciler) Reconcile(

ObservedGeneration: cluster.GetGeneration(),
})
return runtime.ErrorWithBackoff(patchClusterStatus())
return runtime.ErrorWithBackoff(tracing.Escape(span, patchClusterStatus()))
} else {
meta.RemoveStatusCondition(&cluster.Status.Conditions, v1beta1.PostgresClusterProgressing)
}
Expand All @@ -228,7 +226,7 @@ func (r *Reconciler) Reconcile(

ObservedGeneration: cluster.GetGeneration(),
})
return runtime.ErrorWithBackoff(patchClusterStatus())
return runtime.ErrorWithBackoff(tracing.Escape(span, patchClusterStatus()))
} else {
meta.RemoveStatusCondition(&cluster.Status.Conditions, v1beta1.PostgresClusterProgressing)
}
Expand Down Expand Up @@ -259,7 +257,8 @@ func (r *Reconciler) Reconcile(
// return is no longer needed, and reconciliation can proceed normally.
returnEarly, err := r.reconcileDirMoveJobs(ctx, cluster)
if err != nil || returnEarly {
return runtime.ErrorWithBackoff(errors.Join(err, patchClusterStatus()))
return runtime.ErrorWithBackoff(tracing.Escape(span,
errors.Join(err, patchClusterStatus())))
}
}
if err == nil {
Expand Down Expand Up @@ -309,7 +308,7 @@ func (r *Reconciler) Reconcile(
// can proceed normally.
returnEarly, err := r.reconcileDataSource(ctx, cluster, instances, clusterVolumes, rootCA, backupsSpecFound)
if err != nil || returnEarly {
return runtime.ErrorWithBackoff(errors.Join(err, patchClusterStatus()))
return runtime.ErrorWithBackoff(tracing.Escape(span, errors.Join(err, patchClusterStatus())))
}
}
if err == nil {
Expand Down Expand Up @@ -401,7 +400,7 @@ func (r *Reconciler) Reconcile(

log.V(1).Info("reconciled cluster")

return result, errors.Join(err, patchClusterStatus())
return result, tracing.Escape(span, errors.Join(err, patchClusterStatus()))
}

// deleteControlled safely deletes object when it is controlled by cluster.
Expand Down
38 changes: 19 additions & 19 deletions internal/controller/postgrescluster/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -807,8 +807,7 @@ func (r *Reconciler) rolloutInstance(
err = errors.New("unable to switchover")
}

span.RecordError(err)
return err
return tracing.Escape(span, err)
}

// When the cluster has only one instance for failover, perform a series of
Expand Down Expand Up @@ -840,8 +839,7 @@ func (r *Reconciler) rolloutInstance(
logging.FromContext(ctx).V(1).Info("attempted checkpoint",
"duration", elapsed, "stdout", stdout, "stderr", stderr)

span.RecordError(err)
return elapsed, err
return elapsed, tracing.Escape(span, err)
}

duration, err := checkpoint(ctx)
Expand Down Expand Up @@ -950,8 +948,7 @@ func (r *Reconciler) rolloutInstances(
}
}

span.RecordError(err)
return err
return tracing.Escape(span, err)
}

// scaleDownInstances removes extra instances from a cluster until it matches
Expand Down Expand Up @@ -1081,20 +1078,23 @@ func (r *Reconciler) scaleUpInstances(
// While there are fewer instances than specified, generate another empty one
// and append it.
for len(instances) < int(*set.Replicas) {
_, span := tracing.Start(ctx, "generate-instance-name")
next := naming.GenerateInstance(cluster, set)
// if there are any available instance names (as determined by observing any PVCs for the
// instance set that are not currently associated with an instance, e.g. in the event the
// instance STS was deleted), then reuse them instead of generating a new name
if len(availableInstanceNames) > 0 {
next.Name = availableInstanceNames[0]
availableInstanceNames = availableInstanceNames[1:]
} else {
for instanceNames.Has(next.Name) {
next = naming.GenerateInstance(cluster, set)
next := func() metav1.ObjectMeta {
_, span := tracing.Start(ctx, "generate-instance-name")
defer span.End()
n := naming.GenerateInstance(cluster, set)
// if there are any available instance names (as determined by observing any PVCs for the
// instance set that are not currently associated with an instance, e.g. in the event the
// instance STS was deleted), then reuse them instead of generating a new name
if len(availableInstanceNames) > 0 {
n.Name = availableInstanceNames[0]
availableInstanceNames = availableInstanceNames[1:]
} else {
for instanceNames.Has(n.Name) {
n = naming.GenerateInstance(cluster, set)
}
}
}
span.End()
return n
}()

instanceNames.Insert(next.Name)
instances = append(instances, &appsv1.StatefulSet{ObjectMeta: next})
Expand Down
4 changes: 2 additions & 2 deletions internal/controller/standalone_pgadmin/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (r *PGAdminReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
// NotFound cannot be fixed by requeuing so ignore it. During background
// deletion, we receive delete events from pgadmin's dependents after
// pgadmin is deleted.
return ctrl.Result{}, client.IgnoreNotFound(err)
return ctrl.Result{}, tracing.Escape(span, client.IgnoreNotFound(err))
}

// Write any changes to the pgadmin status on the way out.
Expand Down Expand Up @@ -148,7 +148,7 @@ func (r *PGAdminReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
log.V(1).Info("Reconciled pgAdmin")
}

return ctrl.Result{}, err
return ctrl.Result{}, tracing.Escape(span, err)
}

// The owner reference created by controllerutil.SetControllerReference blocks
Expand Down
3 changes: 1 addition & 2 deletions internal/naming/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,10 @@ func KubernetesClusterDomain(ctx context.Context) string {
api := "kubernetes.default.svc"
cname, err := net.DefaultResolver.LookupCNAME(ctx, api)

if err == nil {
if tracing.Check(span, err) {
return strings.TrimPrefix(cname, api+".")
}

span.RecordError(err)
// The kubeadm default is "cluster.local" and is adequate when not running
// in an actual Kubernetes cluster.
return "cluster.local."
Expand Down
34 changes: 34 additions & 0 deletions internal/tracing/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright 2021 - 2024 Crunchy Data Solutions, Inc.
//
// SPDX-License-Identifier: Apache-2.0

package tracing

import (
semconv "go.opentelemetry.io/otel/semconv/v1.27.0"
"go.opentelemetry.io/otel/trace"
)

// Check returns true when err is nil. Otherwise, it adds err as an exception
// event on s and returns false. If you intend to return err, consider using
// [Escape] instead.
//
// See: https://opentelemetry.io/docs/specs/semconv/exceptions/exceptions-spans
func Check(s Span, err error) bool {
if err == nil {
return true
}
if s.IsRecording() {
s.RecordError(err)
}
return false
}

// Escape adds non-nil err as an escaped exception event on s and returns err.
// See: https://opentelemetry.io/docs/specs/semconv/exceptions/exceptions-spans
func Escape(s Span, err error) error {
if err != nil && s.IsRecording() {
s.RecordError(err, trace.WithAttributes(semconv.ExceptionEscaped(true)))
}
return err
}
94 changes: 94 additions & 0 deletions internal/tracing/errors_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright 2021 - 2024 Crunchy Data Solutions, Inc.
//
// SPDX-License-Identifier: Apache-2.0

package tracing

import (
"context"
"errors"
"testing"

"go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
semconv "go.opentelemetry.io/otel/semconv/v1.27.0"
"gotest.tools/v3/assert"
)

func TestCheck(t *testing.T) {
recorder := tracetest.NewSpanRecorder()
tracer := trace.NewTracerProvider(
trace.WithSpanProcessor(recorder),
).Tracer("")

{
_, span := tracer.Start(context.Background(), "")
assert.Assert(t, Check(span, nil))
span.End()

spans := recorder.Ended()
assert.Equal(t, len(spans), 1)
assert.Equal(t, len(spans[0].Events()), 0, "expected no events")
}

{
_, span := tracer.Start(context.Background(), "")
assert.Assert(t, !Check(span, errors.New("msg")))
span.End()

spans := recorder.Ended()
assert.Equal(t, len(spans), 2)
assert.Equal(t, len(spans[1].Events()), 1, "expected one event")

event := spans[1].Events()[0]
assert.Equal(t, event.Name, semconv.ExceptionEventName)

attrs := event.Attributes
assert.Equal(t, len(attrs), 2)
assert.Equal(t, string(attrs[0].Key), "exception.type")
assert.Equal(t, string(attrs[1].Key), "exception.message")
assert.Equal(t, attrs[0].Value.AsInterface(), "*errors.errorString")
assert.Equal(t, attrs[1].Value.AsInterface(), "msg")
}
}

func TestEscape(t *testing.T) {
recorder := tracetest.NewSpanRecorder()
tracer := trace.NewTracerProvider(
trace.WithSpanProcessor(recorder),
).Tracer("")

{
_, span := tracer.Start(context.Background(), "")
assert.NilError(t, Escape(span, nil))
span.End()

spans := recorder.Ended()
assert.Equal(t, len(spans), 1)
assert.Equal(t, len(spans[0].Events()), 0, "expected no events")
}

{
_, span := tracer.Start(context.Background(), "")
expected := errors.New("somesuch")
assert.Assert(t, errors.Is(Escape(span, expected), expected),
"expected to unwrap the original error")
span.End()

spans := recorder.Ended()
assert.Equal(t, len(spans), 2)
assert.Equal(t, len(spans[1].Events()), 1, "expected one event")

event := spans[1].Events()[0]
assert.Equal(t, event.Name, semconv.ExceptionEventName)

attrs := event.Attributes
assert.Equal(t, len(attrs), 3)
assert.Equal(t, string(attrs[0].Key), "exception.escaped")
assert.Equal(t, string(attrs[1].Key), "exception.type")
assert.Equal(t, string(attrs[2].Key), "exception.message")
assert.Equal(t, attrs[0].Value.AsInterface(), true)
assert.Equal(t, attrs[1].Value.AsInterface(), "*errors.errorString")
assert.Equal(t, attrs[2].Value.AsInterface(), "somesuch")
}
}

0 comments on commit 86efb44

Please sign in to comment.