Skip to content

Commit

Permalink
feat(nodebuilder): Invoke traces from fx (celestiaorg#2477)
Browse files Browse the repository at this point in the history
  • Loading branch information
vgonkivs authored Jul 14, 2023
1 parent b59c21c commit 9c40a7f
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 39 deletions.
35 changes: 4 additions & 31 deletions cmd/flags_misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,11 @@ import (
otelpyroscope "github.com/pyroscope-io/otel-profiling-go"
"github.com/spf13/cobra"
flag "github.com/spf13/pflag"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
"go.opentelemetry.io/otel/sdk/resource"
tracesdk "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.11.0"
"go.opentelemetry.io/otel/trace"

"github.com/celestiaorg/celestia-node/logs"
"github.com/celestiaorg/celestia-node/nodebuilder"
"github.com/celestiaorg/celestia-node/nodebuilder/node"
modp2p "github.com/celestiaorg/celestia-node/nodebuilder/p2p"
)

Expand Down Expand Up @@ -201,7 +194,6 @@ func ParseMiscFlags(ctx context.Context, cmd *cobra.Command) (context.Context, e
}

if ok {
var tp trace.TracerProvider
opts := []otlptracehttp.Option{
otlptracehttp.WithCompression(otlptracehttp.GzipCompression),
otlptracehttp.WithEndpoint(cmd.Flag(tracingEndpointFlag).Value.String()),
Expand All @@ -212,31 +204,13 @@ func ParseMiscFlags(ctx context.Context, cmd *cobra.Command) (context.Context, e
opts = append(opts, otlptracehttp.WithInsecure())
}

client := otlptracehttp.NewClient(opts...)
exporter, err := otlptrace.New(ctx, client)
if err != nil {
return ctx, fmt.Errorf("creating OTLP trace exporter: %w", err)
}

tp = tracesdk.NewTracerProvider(
tracesdk.WithSampler(tracesdk.AlwaysSample()),
// Always be sure to batch in production.
tracesdk.WithBatcher(exporter),
// Record information about this application in a Resource.
tracesdk.WithResource(resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String(fmt.Sprintf("Celestia-%s", NodeType(ctx).String())),
// TODO(@Wondertan): Versioning: semconv.ServiceVersionKey
)),
)

pyroOpts := make([]otelpyroscope.Option, 0)
ok, err = cmd.Flags().GetBool(pyroscopeTracing)
if err != nil {
panic(err)
}
if ok {
tp = otelpyroscope.NewTracerProvider(
tp,
pyroOpts = append(pyroOpts,
otelpyroscope.WithAppName("celestia.da-node"),
otelpyroscope.WithPyroscopeURL(cmd.Flag(pyroscopeEndpoint).Value.String()),
otelpyroscope.WithRootSpanOnly(true),
Expand All @@ -245,8 +219,7 @@ func ParseMiscFlags(ctx context.Context, cmd *cobra.Command) (context.Context, e
otelpyroscope.WithProfileBaselineURL(true),
)
}

otel.SetTracerProvider(tp)
ctx = WithNodeOptions(ctx, nodebuilder.WithTraces(opts, pyroOpts))
}

ok, err = cmd.Flags().GetBool(metricsFlag)
Expand All @@ -265,7 +238,7 @@ func ParseMiscFlags(ctx context.Context, cmd *cobra.Command) (context.Context, e
opts = append(opts, otlpmetrichttp.WithInsecure())
}

ctx = WithNodeOptions(ctx, nodebuilder.WithMetrics(opts, NodeType(ctx), node.GetBuildInfo()))
ctx = WithNodeOptions(ctx, nodebuilder.WithMetrics(opts, NodeType(ctx)))
}

ok, err = cmd.Flags().GetBool(p2pMetrics)
Expand Down
1 change: 0 additions & 1 deletion nodebuilder/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ func TestLifecycle_WithMetrics(t *testing.T) {
otlpmetrichttp.WithInsecure(),
},
tt.tp,
&node.BuildInfo{},
),
)
require.NotNil(t, node)
Expand Down
58 changes: 51 additions & 7 deletions nodebuilder/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,16 @@ import (

"github.com/libp2p/go-libp2p/core/peer"
"github.com/pyroscope-io/client/pyroscope"
otelpyroscope "github.com/pyroscope-io/otel-profiling-go"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
sdk "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
tracesdk "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.11.0"
"go.opentelemetry.io/otel/trace"
"go.uber.org/fx"

"github.com/celestiaorg/go-fraud"
Expand Down Expand Up @@ -62,10 +67,9 @@ func WithPyroscope(endpoint string, nodeType node.Type) fx.Option {
}

// WithMetrics enables metrics exporting for the node.
func WithMetrics(metricOpts []otlpmetrichttp.Option, nodeType node.Type, buildInfo *node.BuildInfo) fx.Option {
func WithMetrics(metricOpts []otlpmetrichttp.Option, nodeType node.Type) fx.Option {
baseComponents := fx.Options(
fx.Supply(metricOpts),
fx.Supply(buildInfo),
fx.Invoke(initializeMetrics),
fx.Invoke(state.WithMetrics),
fx.Invoke(fraud.WithMetrics),
Expand Down Expand Up @@ -105,13 +109,55 @@ func WithMetrics(metricOpts []otlpmetrichttp.Option, nodeType node.Type, buildIn
return opts
}

func WithTraces(opts []otlptracehttp.Option, pyroOpts []otelpyroscope.Option) fx.Option {
options := fx.Options(
fx.Supply(opts),
fx.Supply(pyroOpts),
fx.Invoke(initializeTraces),
)
return options
}

func initializeTraces(
ctx context.Context,
nodeType node.Type,
peerID peer.ID,
network p2p.Network,
opts []otlptracehttp.Option,
pyroOpts []otelpyroscope.Option,
) error {
client := otlptracehttp.NewClient(opts...)
exporter, err := otlptrace.New(ctx, client)
if err != nil {
return fmt.Errorf("creating OTLP trace exporter: %w", err)
}

var tp trace.TracerProvider
tp = tracesdk.NewTracerProvider(
tracesdk.WithSampler(tracesdk.AlwaysSample()),
// Always be sure to batch in production.
tracesdk.WithBatcher(exporter),
// Record information about this application in a Resource.
tracesdk.WithResource(resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNamespaceKey.String(nodeType.String()),
semconv.ServiceNameKey.String(fmt.Sprintf("%s/%s", network.String(), peerID.String()))),
))

if len(pyroOpts) > 0 {
tp = otelpyroscope.NewTracerProvider(tp, pyroOpts...)
}
otel.SetTracerProvider(tp)
return nil
}

// initializeMetrics initializes the global meter provider.
func initializeMetrics(
ctx context.Context,
lc fx.Lifecycle,
peerID peer.ID,
nodeType node.Type,
buildInfo *node.BuildInfo,
network p2p.Network,
opts []otlpmetrichttp.Option,
) error {
exp, err := otlpmetrichttp.New(ctx, opts...)
Expand All @@ -123,10 +169,8 @@ func initializeMetrics(
sdk.WithReader(sdk.NewPeriodicReader(exp, sdk.WithTimeout(2*time.Second))),
sdk.WithResource(resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNamespaceKey.String(fmt.Sprintf("Celestia-%s", nodeType.String())),
semconv.ServiceNameKey.String(fmt.Sprintf("semver-%s", buildInfo.SemanticVersion)),
semconv.ServiceInstanceIDKey.String(peerID.String()),
)))
semconv.ServiceNamespaceKey.String(nodeType.String()),
semconv.ServiceNameKey.String(fmt.Sprintf("%s/%s", network.String(), peerID.String())))))
lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return provider.Shutdown(ctx)
Expand Down

0 comments on commit 9c40a7f

Please sign in to comment.