From a5fd22383d56b42b1974336a74756b78122bd826 Mon Sep 17 00:00:00 2001 From: Saransh Shankar <103821431+Wise-Wizard@users.noreply.github.com> Date: Wed, 10 Jul 2024 00:53:55 +0530 Subject: [PATCH] Implement Telemetery struct for V1 Components Initialization (#5695) **Which problem is this PR solving?** This PR addresses a part of the issue [#5633 ](https://github.com/jaegertracing/jaeger/issues/5633) **Description of the changes** This is a Draft PR to achieve Observability Parity between V1 and V2 components by creating an unified telemetery container to pass observability clients to V1 components. **How was this change tested?** The changes were tested by running the following command: ```bash make test ``` **Checklist** - [x] I have read [CONTRIBUTING_GUIDELINES.md](https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md) - [x] I have signed all commits - [x] I have added unit tests for the new functionality - [x] I have run lint and test steps successfully - `for jaeger: make lint test` - `for jaeger-ui: yarn lint` and `yarn test` --------- Signed-off-by: Wise-Wizard Signed-off-by: Saransh Shankar <103821431+Wise-Wizard@users.noreply.github.com> Co-authored-by: Yuri Shkuro --- cmd/all-in-one/main.go | 18 ++-- .../internal/extension/jaegerquery/server.go | 35 +++--- cmd/query/app/apiv3/http_gateway.go | 6 +- cmd/query/app/apiv3/http_gateway_test.go | 2 +- cmd/query/app/handler_options.go | 4 +- cmd/query/app/http_handler.go | 7 +- cmd/query/app/http_handler_test.go | 2 +- cmd/query/app/server.go | 100 +++++++++--------- cmd/query/app/server_test.go | 74 +++++++------ cmd/query/app/token_propagation_test.go | 10 +- cmd/query/main.go | 8 +- pkg/telemetery/settings.go | 40 +++++++ pkg/telemetery/settings_test.go | 78 ++++++++++++++ 13 files changed, 268 insertions(+), 116 deletions(-) create mode 100644 pkg/telemetery/settings.go create mode 100644 pkg/telemetery/settings_test.go diff --git a/cmd/all-in-one/main.go b/cmd/all-in-one/main.go index 185c74a74ab..a051573e5a6 100644 --- a/cmd/all-in-one/main.go +++ b/cmd/all-in-one/main.go @@ -43,6 +43,7 @@ import ( "github.com/jaegertracing/jaeger/pkg/config" "github.com/jaegertracing/jaeger/pkg/jtracer" "github.com/jaegertracing/jaeger/pkg/metrics" + "github.com/jaegertracing/jaeger/pkg/telemetery" "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/pkg/version" metricsPlugin "github.com/jaegertracing/jaeger/plugin/metrics" @@ -193,12 +194,17 @@ by default uses only in-memory database.`, logger.Fatal("Could not create collector proxy", zap.Error(err)) } agent := startAgent(cp, aOpts, logger, agentMetricsFactory) - + telset := telemetery.Setting{ + Logger: svc.Logger, + TracerProvider: tracer.OTEL, + Metrics: queryMetricsFactory, + ReportStatus: telemetery.HCAdapter(svc.HC()), + } // query querySrv := startQuery( svc, qOpts, qOpts.BuildQueryServiceOptions(storageFactory, logger), spanReader, dependencyReader, metricsQueryService, - queryMetricsFactory, tm, tracer, + tm, telset, ) svc.RunAndThen(func() { @@ -273,13 +279,13 @@ func startQuery( spanReader spanstore.Reader, depReader dependencystore.Reader, metricsQueryService querysvc.MetricsQueryService, - metricsFactory metrics.Factory, tm *tenancy.Manager, - jt *jtracer.JTracer, + telset telemetery.Setting, ) *queryApp.Server { - spanReader = storageMetrics.NewReadMetricsDecorator(spanReader, metricsFactory) + spanReader = storageMetrics.NewReadMetricsDecorator(spanReader, telset.Metrics) qs := querysvc.NewQueryService(spanReader, depReader, *queryOpts) - server, err := queryApp.NewServer(svc.Logger, svc.HC(), qs, metricsQueryService, qOpts, tm, jt) + + server, err := queryApp.NewServer(qs, metricsQueryService, qOpts, tm, telset) if err != nil { svc.Logger.Fatal("Could not create jaeger-query", zap.Error(err)) } diff --git a/cmd/jaeger/internal/extension/jaegerquery/server.go b/cmd/jaeger/internal/extension/jaegerquery/server.go index dc9fc29323d..a512c8ed07f 100644 --- a/cmd/jaeger/internal/extension/jaegerquery/server.go +++ b/cmd/jaeger/internal/extension/jaegerquery/server.go @@ -10,13 +10,12 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/extension" - "go.uber.org/zap" "github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage" queryApp "github.com/jaegertracing/jaeger/cmd/query/app" "github.com/jaegertracing/jaeger/cmd/query/app/querysvc" - "github.com/jaegertracing/jaeger/pkg/healthcheck" "github.com/jaegertracing/jaeger/pkg/jtracer" + "github.com/jaegertracing/jaeger/pkg/telemetery" "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/plugin/metrics/disabled" "github.com/jaegertracing/jaeger/ports" @@ -28,16 +27,16 @@ var ( ) type server struct { - config *Config - logger *zap.Logger - server *queryApp.Server - jtracer *jtracer.JTracer + config *Config + server *queryApp.Server + telset component.TelemetrySettings + closeTracer func(ctx context.Context) error } func newServer(config *Config, otel component.TelemetrySettings) *server { return &server{ config: config, - logger: otel.Logger, + telset: otel, } } @@ -75,22 +74,26 @@ func (s *server) Start(_ context.Context, host component.Host) error { // TODO OTel-collector does not initialize the tracer currently // https://github.com/open-telemetry/opentelemetry-collector/issues/7532 //nolint - s.jtracer, err = jtracer.New("jaeger") + tracerProvider, err := jtracer.New("jaeger") if err != nil { return fmt.Errorf("could not initialize a tracer: %w", err) } + s.closeTracer = tracerProvider.Close + telset := telemetery.Setting{ + Logger: s.telset.Logger, + TracerProvider: tracerProvider.OTEL, + ReportStatus: s.telset.ReportStatus, + } // TODO contextcheck linter complains about next line that context is not passed. It is not wrong. //nolint s.server, err = queryApp.NewServer( - s.logger, // TODO propagate healthcheck updates up to the collector's runtime - healthcheck.New(), qs, metricsQueryService, s.makeQueryOptions(), tm, - s.jtracer, + telset, ) if err != nil { return fmt.Errorf("could not create jaeger-query: %w", err) @@ -105,7 +108,7 @@ func (s *server) Start(_ context.Context, host component.Host) error { func (s *server) addArchiveStorage(opts *querysvc.QueryServiceOptions, host component.Host) error { if s.config.TraceStorageArchive == "" { - s.logger.Info("Archive storage not configured") + s.telset.Logger.Info("Archive storage not configured") return nil } @@ -114,8 +117,8 @@ func (s *server) addArchiveStorage(opts *querysvc.QueryServiceOptions, host comp return fmt.Errorf("cannot find archive storage factory: %w", err) } - if !opts.InitArchiveStorage(f, s.logger) { - s.logger.Info("Archive storage not initialized") + if !opts.InitArchiveStorage(f, s.telset.Logger) { + s.telset.Logger.Info("Archive storage not initialized") } return nil } @@ -135,8 +138,8 @@ func (s *server) Shutdown(ctx context.Context) error { if s.server != nil { errs = append(errs, s.server.Close()) } - if s.jtracer != nil { - errs = append(errs, s.jtracer.Close(ctx)) + if s.closeTracer != nil { + errs = append(errs, s.closeTracer(ctx)) } return errors.Join(errs...) } diff --git a/cmd/query/app/apiv3/http_gateway.go b/cmd/query/app/apiv3/http_gateway.go index 8aca4a511a2..3b0884c7f12 100644 --- a/cmd/query/app/apiv3/http_gateway.go +++ b/cmd/query/app/apiv3/http_gateway.go @@ -17,12 +17,12 @@ import ( "github.com/gorilla/mux" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "github.com/jaegertracing/jaeger/cmd/query/app/internal/api_v3" "github.com/jaegertracing/jaeger/cmd/query/app/querysvc" "github.com/jaegertracing/jaeger/model" - "github.com/jaegertracing/jaeger/pkg/jtracer" "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/storage/spanstore" ) @@ -48,7 +48,7 @@ type HTTPGateway struct { QueryService *querysvc.QueryService TenancyMgr *tenancy.Manager Logger *zap.Logger - Tracer *jtracer.JTracer + Tracer trace.TracerProvider } // RegisterRoutes registers HTTP endpoints for APIv3 into provided mux. @@ -75,7 +75,7 @@ func (h *HTTPGateway) addRoute( traceMiddleware := otelhttp.NewHandler( otelhttp.WithRouteTag(route, handler), route, - otelhttp.WithTracerProvider(h.Tracer.OTEL)) + otelhttp.WithTracerProvider(h.Tracer)) return router.HandleFunc(route, traceMiddleware.ServeHTTP) } diff --git a/cmd/query/app/apiv3/http_gateway_test.go b/cmd/query/app/apiv3/http_gateway_test.go index 6cd62ba492b..d747bb799ff 100644 --- a/cmd/query/app/apiv3/http_gateway_test.go +++ b/cmd/query/app/apiv3/http_gateway_test.go @@ -46,7 +46,7 @@ func setupHTTPGatewayNoServer( QueryService: q, TenancyMgr: tenancy.NewManager(&tenancyOptions), Logger: zap.NewNop(), - Tracer: jtracer.NoOp(), + Tracer: jtracer.NoOp().OTEL, } gw.router = &mux.Router{} diff --git a/cmd/query/app/handler_options.go b/cmd/query/app/handler_options.go index 853a41a2222..354f6a5fc91 100644 --- a/cmd/query/app/handler_options.go +++ b/cmd/query/app/handler_options.go @@ -18,10 +18,10 @@ package app import ( "time" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "github.com/jaegertracing/jaeger/cmd/query/app/querysvc" - "github.com/jaegertracing/jaeger/pkg/jtracer" ) // HandlerOption is a function that sets some option on the APIHandler @@ -62,7 +62,7 @@ func (handlerOptions) QueryLookbackDuration(queryLookbackDuration time.Duration) } // Tracer creates a HandlerOption that passes the tracer to the handler -func (handlerOptions) Tracer(tracer *jtracer.JTracer) HandlerOption { +func (handlerOptions) Tracer(tracer trace.TracerProvider) HandlerOption { return func(apiHandler *APIHandler) { apiHandler.tracer = tracer } diff --git a/cmd/query/app/http_handler.go b/cmd/query/app/http_handler.go index 5f217118610..58ecb875e33 100644 --- a/cmd/query/app/http_handler.go +++ b/cmd/query/app/http_handler.go @@ -30,6 +30,7 @@ import ( "github.com/gorilla/mux" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "github.com/jaegertracing/jaeger/cmd/query/app/querysvc" @@ -90,7 +91,7 @@ type APIHandler struct { basePath string apiPrefix string logger *zap.Logger - tracer *jtracer.JTracer + tracer trace.TracerProvider } // NewAPIHandler returns an APIHandler @@ -114,7 +115,7 @@ func NewAPIHandler(queryService *querysvc.QueryService, tm *tenancy.Manager, opt aH.logger = zap.NewNop() } if aH.tracer == nil { - aH.tracer = jtracer.NoOp() + aH.tracer = jtracer.NoOp().OTEL } return aH } @@ -151,7 +152,7 @@ func (aH *APIHandler) handleFunc( traceMiddleware := otelhttp.NewHandler( otelhttp.WithRouteTag(route, traceResponseHandler(handler)), route, - otelhttp.WithTracerProvider(aH.tracer.OTEL)) + otelhttp.WithTracerProvider(aH.tracer)) return router.HandleFunc(route, traceMiddleware.ServeHTTP) } diff --git a/cmd/query/app/http_handler_test.go b/cmd/query/app/http_handler_test.go index ec84b37e55a..3034f6595d0 100644 --- a/cmd/query/app/http_handler_test.go +++ b/cmd/query/app/http_handler_test.go @@ -321,7 +321,7 @@ func TestGetTrace(t *testing.T) { jTracer := jtracer.JTracer{OTEL: tracerProvider} defer tracerProvider.Shutdown(context.Background()) - ts := initializeTestServer(HandlerOptions.Tracer(&jTracer)) + ts := initializeTestServer(HandlerOptions.Tracer(jTracer.OTEL)) defer ts.server.Close() ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), model.NewTraceID(0, 0x123456abc)). diff --git a/cmd/query/app/server.go b/cmd/query/app/server.go index c2debad540a..6e5261beadc 100644 --- a/cmd/query/app/server.go +++ b/cmd/query/app/server.go @@ -26,6 +26,7 @@ import ( "github.com/gorilla/handlers" "github.com/soheilhy/cmux" + "go.opentelemetry.io/collector/component" "go.uber.org/zap" "go.uber.org/zap/zapcore" "google.golang.org/grpc" @@ -38,10 +39,9 @@ import ( "github.com/jaegertracing/jaeger/cmd/query/app/internal/api_v3" "github.com/jaegertracing/jaeger/cmd/query/app/querysvc" "github.com/jaegertracing/jaeger/pkg/bearertoken" - "github.com/jaegertracing/jaeger/pkg/healthcheck" - "github.com/jaegertracing/jaeger/pkg/jtracer" "github.com/jaegertracing/jaeger/pkg/netutils" "github.com/jaegertracing/jaeger/pkg/recoveryhandler" + "github.com/jaegertracing/jaeger/pkg/telemetery" "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/proto-gen/api_v2" "github.com/jaegertracing/jaeger/proto-gen/api_v2/metrics" @@ -49,13 +49,9 @@ import ( // Server runs HTTP, Mux and a grpc server type Server struct { - logger *zap.Logger - healthCheck *healthcheck.HealthCheck querySvc *querysvc.QueryService queryOptions *QueryOptions - tracer *jtracer.JTracer // TODO make part of flags.Service - conn net.Listener grpcConn net.Listener httpConn net.Listener @@ -64,10 +60,16 @@ type Server struct { httpServer *httpServer separatePorts bool bgFinished sync.WaitGroup + telemetery.Setting } // NewServer creates and initializes Server -func NewServer(logger *zap.Logger, healthCheck *healthcheck.HealthCheck, querySvc *querysvc.QueryService, metricsQuerySvc querysvc.MetricsQueryService, options *QueryOptions, tm *tenancy.Manager, tracer *jtracer.JTracer) (*Server, error) { +func NewServer(querySvc *querysvc.QueryService, + metricsQuerySvc querysvc.MetricsQueryService, + options *QueryOptions, + tm *tenancy.Manager, + telset telemetery.Setting, +) (*Server, error) { _, httpPort, err := net.SplitHostPort(options.HTTPHostPort) if err != nil { return nil, fmt.Errorf("invalid HTTP server host:port: %w", err) @@ -81,33 +83,31 @@ func NewServer(logger *zap.Logger, healthCheck *healthcheck.HealthCheck, querySv return nil, errors.New("server with TLS enabled can not use same host ports for gRPC and HTTP. Use dedicated HTTP and gRPC host ports instead") } - grpcServer, err := createGRPCServer(querySvc, metricsQuerySvc, options, tm, logger, tracer) + grpcServer, err := createGRPCServer(querySvc, metricsQuerySvc, options, tm, telset) if err != nil { return nil, err } - httpServer, err := createHTTPServer(querySvc, metricsQuerySvc, options, tm, tracer, logger) + httpServer, err := createHTTPServer(querySvc, metricsQuerySvc, options, tm, telset) if err != nil { return nil, err } return &Server{ - logger: logger, - healthCheck: healthCheck, querySvc: querySvc, queryOptions: options, - tracer: tracer, grpcServer: grpcServer, httpServer: httpServer, separatePorts: grpcPort != httpPort, + Setting: telset, }, nil } -func createGRPCServer(querySvc *querysvc.QueryService, metricsQuerySvc querysvc.MetricsQueryService, options *QueryOptions, tm *tenancy.Manager, logger *zap.Logger, tracer *jtracer.JTracer) (*grpc.Server, error) { +func createGRPCServer(querySvc *querysvc.QueryService, metricsQuerySvc querysvc.MetricsQueryService, options *QueryOptions, tm *tenancy.Manager, telset telemetery.Setting) (*grpc.Server, error) { var grpcOpts []grpc.ServerOption if options.TLSGRPC.Enabled { - tlsCfg, err := options.TLSGRPC.Config(logger) + tlsCfg, err := options.TLSGRPC.Config(telset.Logger) if err != nil { return nil, err } @@ -127,8 +127,7 @@ func createGRPCServer(querySvc *querysvc.QueryService, metricsQuerySvc querysvc. reflection.Register(server) handler := NewGRPCHandler(querySvc, metricsQuerySvc, GRPCHandlerOptions{ - Logger: logger, - Tracer: tracer, + Logger: telset.Logger, }) healthServer := health.NewServer() @@ -156,12 +155,11 @@ func createHTTPServer( metricsQuerySvc querysvc.MetricsQueryService, queryOpts *QueryOptions, tm *tenancy.Manager, - tracer *jtracer.JTracer, - logger *zap.Logger, + telset telemetery.Setting, ) (*httpServer, error) { apiHandlerOptions := []HandlerOption{ - HandlerOptions.Logger(logger), - HandlerOptions.Tracer(tracer), + HandlerOptions.Logger(telset.Logger), + HandlerOptions.Tracer(telset.TracerProvider), HandlerOptions.MetricsQueryService(metricsQuerySvc), } @@ -177,20 +175,20 @@ func createHTTPServer( (&apiv3.HTTPGateway{ QueryService: querySvc, TenancyMgr: tm, - Logger: logger, - Tracer: tracer, + Logger: telset.Logger, + Tracer: telset.TracerProvider, }).RegisterRoutes(r) apiHandler.RegisterRoutes(r) var handler http.Handler = r handler = additionalHeadersHandler(handler, queryOpts.AdditionalHeaders) if queryOpts.BearerTokenPropagation { - handler = bearertoken.PropagationHandler(logger, handler) + handler = bearertoken.PropagationHandler(telset.Logger, handler) } handler = handlers.CompressHandler(handler) - recoveryHandler := recoveryhandler.NewRecoveryHandler(logger, true) + recoveryHandler := recoveryhandler.NewRecoveryHandler(telset.Logger, true) - errorLog, _ := zap.NewStdLogAt(logger, zapcore.ErrorLevel) + errorLog, _ := zap.NewStdLogAt(telset.Logger, zapcore.ErrorLevel) server := &httpServer{ Server: &http.Server{ Handler: recoveryHandler(handler), @@ -200,14 +198,14 @@ func createHTTPServer( } if queryOpts.TLSHTTP.Enabled { - tlsCfg, err := queryOpts.TLSHTTP.Config(logger) // This checks if the certificates are correctly provided + tlsCfg, err := queryOpts.TLSHTTP.Config(telset.Logger) // This checks if the certificates are correctly provided if err != nil { return nil, err } server.TLSConfig = tlsCfg } - server.staticHandlerCloser = RegisterStaticHandler(r, logger, queryOpts, querySvc.GetCapabilities()) + server.staticHandlerCloser = RegisterStaticHandler(r, telset.Logger, queryOpts, querySvc.GetCapabilities()) return server, nil } @@ -232,7 +230,7 @@ func (s *Server) initListener() (cmux.CMux, error) { if err != nil { return nil, err } - s.logger.Info( + s.Logger.Info( "Query server started", zap.String("http_addr", s.httpConn.Addr().String()), zap.String("grpc_addr", s.grpcConn.Addr().String()), @@ -253,7 +251,7 @@ func (s *Server) initListener() (cmux.CMux, error) { tcpPort = port } - s.logger.Info( + s.Logger.Info( "Query server started", zap.Int("port", tcpPort), zap.String("addr", s.queryOptions.HTTPHostPort)) @@ -297,7 +295,8 @@ func (s *Server) Start() error { s.bgFinished.Add(1) go func() { - s.logger.Info("Starting HTTP server", zap.Int("port", httpPort), zap.String("addr", s.queryOptions.HTTPHostPort)) + defer s.bgFinished.Done() + s.Logger.Info("Starting HTTP server", zap.Int("port", httpPort), zap.String("addr", s.queryOptions.HTTPHostPort)) var err error if s.queryOptions.TLSHTTP.Enabled { err = s.httpServer.ServeTLS(s.httpConn, "", "") @@ -305,44 +304,45 @@ func (s *Server) Start() error { err = s.httpServer.Serve(s.httpConn) } if err != nil && !errors.Is(err, http.ErrServerClosed) && !errors.Is(err, cmux.ErrListenerClosed) && !errors.Is(err, cmux.ErrServerClosed) { - s.logger.Error("Could not start HTTP server", zap.Error(err)) + s.Logger.Error("Could not start HTTP server", zap.Error(err)) + s.ReportStatus(component.NewFatalErrorEvent(err)) + return } - s.logger.Info("HTTP server stopped", zap.Int("port", httpPort), zap.String("addr", s.queryOptions.HTTPHostPort)) - s.healthCheck.Set(healthcheck.Unavailable) - s.bgFinished.Done() + s.Logger.Info("HTTP server stopped", zap.Int("port", httpPort), zap.String("addr", s.queryOptions.HTTPHostPort)) }() // Start GRPC server concurrently s.bgFinished.Add(1) go func() { - s.logger.Info("Starting GRPC server", zap.Int("port", grpcPort), zap.String("addr", s.queryOptions.GRPCHostPort)) + defer s.bgFinished.Done() + s.Logger.Info("Starting GRPC server", zap.Int("port", grpcPort), zap.String("addr", s.queryOptions.GRPCHostPort)) err := s.grpcServer.Serve(s.grpcConn) if err != nil && !errors.Is(err, cmux.ErrListenerClosed) && !errors.Is(err, cmux.ErrServerClosed) { - s.logger.Error("Could not start GRPC server", zap.Error(err)) + s.Logger.Error("Could not start GRPC server", zap.Error(err)) + s.ReportStatus(component.NewFatalErrorEvent(err)) + return } - s.logger.Info("GRPC server stopped", zap.Int("port", grpcPort), zap.String("addr", s.queryOptions.GRPCHostPort)) - s.healthCheck.Set(healthcheck.Unavailable) - s.bgFinished.Done() + s.Logger.Info("GRPC server stopped", zap.Int("port", grpcPort), zap.String("addr", s.queryOptions.GRPCHostPort)) }() // Start cmux server concurrently. if !s.separatePorts { s.bgFinished.Add(1) go func() { - s.logger.Info("Starting CMUX server", zap.Int("port", tcpPort), zap.String("addr", s.queryOptions.HTTPHostPort)) + defer s.bgFinished.Done() + s.Logger.Info("Starting CMUX server", zap.Int("port", tcpPort), zap.String("addr", s.queryOptions.HTTPHostPort)) err := cmuxServer.Serve() // TODO: find a way to avoid string comparison. Even though cmux has ErrServerClosed, it's not returned here. if err != nil && !strings.Contains(err.Error(), "use of closed network connection") { - s.logger.Error("Could not start multiplexed server", zap.Error(err)) + s.Logger.Error("Could not start multiplexed server", zap.Error(err)) + s.ReportStatus(component.NewFatalErrorEvent(err)) + return } - s.logger.Info("CMUX server stopped", zap.Int("port", tcpPort), zap.String("addr", s.queryOptions.HTTPHostPort)) - s.healthCheck.Set(healthcheck.Unavailable) - s.bgFinished.Done() + s.Logger.Info("CMUX server stopped", zap.Int("port", tcpPort), zap.String("addr", s.queryOptions.HTTPHostPort)) }() } - return nil } @@ -353,20 +353,20 @@ func (s *Server) Close() error { s.queryOptions.TLSHTTP.Close(), } - s.logger.Info("Closing HTTP server") + s.Logger.Info("Closing HTTP server") if err := s.httpServer.Close(); err != nil { errs = append(errs, fmt.Errorf("failed to close HTTP server: %w", err)) } - s.logger.Info("Stopping gRPC server") + s.Logger.Info("Stopping gRPC server") s.grpcServer.Stop() if !s.separatePorts { - s.logger.Info("Closing CMux server") + s.Logger.Info("Closing CMux server") s.cmuxServer.Close() } - s.bgFinished.Wait() - s.logger.Info("Server stopped") + + s.Logger.Info("Server stopped") return errors.Join(errs...) } diff --git a/cmd/query/app/server_test.go b/cmd/query/app/server_test.go index b556aecd87f..e2d5c839365 100644 --- a/cmd/query/app/server_test.go +++ b/cmd/query/app/server_test.go @@ -40,6 +40,7 @@ import ( "github.com/jaegertracing/jaeger/pkg/config/tlscfg" "github.com/jaegertracing/jaeger/pkg/healthcheck" "github.com/jaegertracing/jaeger/pkg/jtracer" + "github.com/jaegertracing/jaeger/pkg/telemetery" "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/ports" "github.com/jaegertracing/jaeger/proto-gen/api_v2" @@ -49,6 +50,14 @@ import ( var testCertKeyLocation = "../../../pkg/config/tlscfg/testdata" +func initTelSet(logger *zap.Logger, tracerProvider *jtracer.JTracer, hc *healthcheck.HealthCheck) telemetery.Setting { + return telemetery.Setting{ + Logger: logger, + TracerProvider: tracerProvider.OTEL, + ReportStatus: telemetery.HCAdapter(hc), + } +} + func TestServerError(t *testing.T) { srv := &Server{ queryOptions: &QueryOptions{ @@ -66,10 +75,10 @@ func TestCreateTLSServerSinglePortError(t *testing.T) { KeyPath: testCertKeyLocation + "/example-server-key.pem", ClientCAPath: testCertKeyLocation + "/example-CA-cert.pem", } - - _, err := NewServer(zaptest.NewLogger(t), healthcheck.New(), &querysvc.QueryService{}, nil, + telset := initTelSet(zaptest.NewLogger(t), jtracer.NoOp(), healthcheck.New()) + _, err := NewServer(&querysvc.QueryService{}, nil, &QueryOptions{HTTPHostPort: ":8080", GRPCHostPort: ":8080", TLSGRPC: tlsCfg, TLSHTTP: tlsCfg}, - tenancy.NewManager(&tenancy.Options{}), jtracer.NoOp()) + tenancy.NewManager(&tenancy.Options{}), telset) require.Error(t, err) } @@ -80,10 +89,10 @@ func TestCreateTLSGrpcServerError(t *testing.T) { KeyPath: "invalid/path", ClientCAPath: "invalid/path", } - - _, err := NewServer(zaptest.NewLogger(t), healthcheck.New(), &querysvc.QueryService{}, nil, + telset := initTelSet(zaptest.NewLogger(t), jtracer.NoOp(), healthcheck.New()) + _, err := NewServer(&querysvc.QueryService{}, nil, &QueryOptions{HTTPHostPort: ":8080", GRPCHostPort: ":8081", TLSGRPC: tlsCfg}, - tenancy.NewManager(&tenancy.Options{}), jtracer.NoOp()) + tenancy.NewManager(&tenancy.Options{}), telset) require.Error(t, err) } @@ -94,10 +103,10 @@ func TestCreateTLSHttpServerError(t *testing.T) { KeyPath: "invalid/path", ClientCAPath: "invalid/path", } - - _, err := NewServer(zaptest.NewLogger(t), healthcheck.New(), &querysvc.QueryService{}, nil, + telset := initTelSet(zaptest.NewLogger(t), jtracer.NoOp(), healthcheck.New()) + _, err := NewServer(&querysvc.QueryService{}, nil, &QueryOptions{HTTPHostPort: ":8080", GRPCHostPort: ":8081", TLSHTTP: tlsCfg}, - tenancy.NewManager(&tenancy.Options{}), jtracer.NoOp()) + tenancy.NewManager(&tenancy.Options{}), telset) require.Error(t, err) } @@ -354,11 +363,11 @@ func TestServerHTTPTLS(t *testing.T) { } flagsSvc := flags.NewService(ports.QueryAdminHTTP) flagsSvc.Logger = zaptest.NewLogger(t) - + telset := initTelSet(flagsSvc.Logger, jtracer.NoOp(), flagsSvc.HC()) querySvc := makeQuerySvc() - server, err := NewServer(flagsSvc.Logger, flagsSvc.HC(), querySvc.qs, + server, err := NewServer(querySvc.qs, nil, serverOptions, tenancy.NewManager(&tenancy.Options{}), - jtracer.NoOp()) + telset) require.NoError(t, err) require.NoError(t, server.Start()) t.Cleanup(func() { @@ -493,9 +502,10 @@ func TestServerGRPCTLS(t *testing.T) { flagsSvc.Logger = zaptest.NewLogger(t) querySvc := makeQuerySvc() - server, err := NewServer(flagsSvc.Logger, flagsSvc.HC(), querySvc.qs, + telset := initTelSet(flagsSvc.Logger, jtracer.NoOp(), flagsSvc.HC()) + server, err := NewServer(querySvc.qs, nil, serverOptions, tenancy.NewManager(&tenancy.Options{}), - jtracer.NoOp()) + telset) require.NoError(t, err) require.NoError(t, server.Start()) t.Cleanup(func() { @@ -535,7 +545,8 @@ func TestServerGRPCTLS(t *testing.T) { } func TestServerBadHostPort(t *testing.T) { - _, err := NewServer(zaptest.NewLogger(t), healthcheck.New(), &querysvc.QueryService{}, nil, + telset := initTelSet(zaptest.NewLogger(t), jtracer.NoOp(), healthcheck.New()) + _, err := NewServer(&querysvc.QueryService{}, nil, &QueryOptions{ HTTPHostPort: "8080", // bad string, not :port GRPCHostPort: "127.0.0.1:8081", @@ -544,10 +555,10 @@ func TestServerBadHostPort(t *testing.T) { }, }, tenancy.NewManager(&tenancy.Options{}), - jtracer.NoOp()) + telset) require.Error(t, err) - _, err = NewServer(zaptest.NewLogger(t), healthcheck.New(), &querysvc.QueryService{}, nil, + _, err = NewServer(&querysvc.QueryService{}, nil, &QueryOptions{ HTTPHostPort: "127.0.0.1:8081", GRPCHostPort: "9123", // bad string, not :port @@ -556,7 +567,7 @@ func TestServerBadHostPort(t *testing.T) { }, }, tenancy.NewManager(&tenancy.Options{}), - jtracer.NoOp()) + telset) require.Error(t, err) } @@ -566,7 +577,7 @@ func TestServerInUseHostPort(t *testing.T) { conn, err := net.Listen("tcp", availableHostPort) require.NoError(t, err) defer func() { require.NoError(t, conn.Close()) }() - + telset := initTelSet(zaptest.NewLogger(t), jtracer.NoOp(), healthcheck.New()) testCases := []struct { name string httpHostPort string @@ -578,8 +589,6 @@ func TestServerInUseHostPort(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { server, err := NewServer( - zaptest.NewLogger(t), - healthcheck.New(), &querysvc.QueryService{}, nil, &QueryOptions{ @@ -590,7 +599,7 @@ func TestServerInUseHostPort(t *testing.T) { }, }, tenancy.NewManager(&tenancy.Options{}), - jtracer.NoOp(), + telset, ) require.NoError(t, err) require.Error(t, server.Start()) @@ -604,7 +613,8 @@ func TestServerSinglePort(t *testing.T) { flagsSvc.Logger = zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())) hostPort := ports.GetAddressFromCLIOptions(ports.QueryHTTP, "") querySvc := makeQuerySvc() - server, err := NewServer(flagsSvc.Logger, flagsSvc.HC(), querySvc.qs, nil, + telset := initTelSet(flagsSvc.Logger, jtracer.NoOp(), flagsSvc.HC()) + server, err := NewServer(querySvc.qs, nil, &QueryOptions{ GRPCHostPort: hostPort, HTTPHostPort: hostPort, @@ -613,7 +623,7 @@ func TestServerSinglePort(t *testing.T) { }, }, tenancy.NewManager(&tenancy.Options{}), - jtracer.NoOp()) + telset) require.NoError(t, err) require.NoError(t, server.Start()) t.Cleanup(func() { @@ -644,9 +654,10 @@ func TestServerGracefulExit(t *testing.T) { hostPort := ports.PortToHostPort(ports.QueryAdminHTTP) querySvc := makeQuerySvc() - server, err := NewServer(flagsSvc.Logger, flagsSvc.HC(), querySvc.qs, nil, + telset := initTelSet(flagsSvc.Logger, jtracer.NoOp(), flagsSvc.HC()) + server, err := NewServer(querySvc.qs, nil, &QueryOptions{GRPCHostPort: hostPort, HTTPHostPort: hostPort}, - tenancy.NewManager(&tenancy.Options{}), jtracer.NoOp()) + tenancy.NewManager(&tenancy.Options{}), telset) require.NoError(t, err) require.NoError(t, server.Start()) @@ -676,11 +687,11 @@ func TestServerHandlesPortZero(t *testing.T) { flagsSvc.Logger = zap.New(zapCore) querySvc := &querysvc.QueryService{} - tracer := jtracer.NoOp() - server, err := NewServer(flagsSvc.Logger, flagsSvc.HC(), querySvc, nil, + telset := initTelSet(flagsSvc.Logger, jtracer.NoOp(), flagsSvc.HC()) + server, err := NewServer(querySvc, nil, &QueryOptions{GRPCHostPort: ":0", HTTPHostPort: ":0"}, tenancy.NewManager(&tenancy.Options{}), - tracer) + telset) require.NoError(t, err) require.NoError(t, server.Start()) defer server.Close() @@ -735,8 +746,9 @@ func TestServerHTTPTenancy(t *testing.T) { tenancyMgr := tenancy.NewManager(&serverOptions.Tenancy) querySvc := makeQuerySvc() querySvc.spanReader.On("FindTraces", mock.Anything, mock.Anything).Return([]*model.Trace{mockTrace}, nil).Once() - server, err := NewServer(zaptest.NewLogger(t), healthcheck.New(), querySvc.qs, - nil, serverOptions, tenancyMgr, jtracer.NoOp()) + telset := initTelSet(zaptest.NewLogger(t), jtracer.NoOp(), healthcheck.New()) + server, err := NewServer(querySvc.qs, + nil, serverOptions, tenancyMgr, telset) require.NoError(t, err) require.NoError(t, server.Start()) t.Cleanup(func() { diff --git a/cmd/query/app/token_propagation_test.go b/cmd/query/app/token_propagation_test.go index 1e9a0ebfea4..a474c1188cc 100644 --- a/cmd/query/app/token_propagation_test.go +++ b/cmd/query/app/token_propagation_test.go @@ -32,6 +32,7 @@ import ( "github.com/jaegertracing/jaeger/pkg/config" "github.com/jaegertracing/jaeger/pkg/jtracer" "github.com/jaegertracing/jaeger/pkg/metrics" + "github.com/jaegertracing/jaeger/pkg/telemetery" "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/plugin/storage/es" "github.com/jaegertracing/jaeger/ports" @@ -91,7 +92,12 @@ func runQueryService(t *testing.T, esURL string) *Server { require.NoError(t, err) querySvc := querysvc.NewQueryService(spanReader, nil, querysvc.QueryServiceOptions{}) - server, err := NewServer(flagsSvc.Logger, flagsSvc.HC(), querySvc, nil, + telset := telemetery.Setting{ + Logger: flagsSvc.Logger, + TracerProvider: jtracer.NoOp().OTEL, + ReportStatus: telemetery.HCAdapter(flagsSvc.HC()), + } + server, err := NewServer(querySvc, nil, &QueryOptions{ GRPCHostPort: ":0", HTTPHostPort: ":0", @@ -100,7 +106,7 @@ func runQueryService(t *testing.T, esURL string) *Server { }, }, tenancy.NewManager(&tenancy.Options{}), - jtracer.NoOp(), + telset, ) require.NoError(t, err) require.NoError(t, server.Start()) diff --git a/cmd/query/main.go b/cmd/query/main.go index 42f698669f0..36c54c63197 100644 --- a/cmd/query/main.go +++ b/cmd/query/main.go @@ -37,6 +37,7 @@ import ( "github.com/jaegertracing/jaeger/pkg/config" "github.com/jaegertracing/jaeger/pkg/jtracer" "github.com/jaegertracing/jaeger/pkg/metrics" + "github.com/jaegertracing/jaeger/pkg/telemetery" "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/pkg/version" metricsPlugin "github.com/jaegertracing/jaeger/plugin/metrics" @@ -113,7 +114,12 @@ func main() { dependencyReader, *queryServiceOptions) tm := tenancy.NewManager(&queryOpts.Tenancy) - server, err := app.NewServer(svc.Logger, svc.HC(), queryService, metricsQueryService, queryOpts, tm, jt) + telset := telemetery.Setting{ + Logger: logger, + TracerProvider: jt.OTEL, + ReportStatus: telemetery.HCAdapter(svc.HC()), + } + server, err := app.NewServer(queryService, metricsQueryService, queryOpts, tm, telset) if err != nil { logger.Fatal("Failed to create server", zap.Error(err)) } diff --git a/pkg/telemetery/settings.go b/pkg/telemetery/settings.go new file mode 100644 index 00000000000..1cb9551a6b9 --- /dev/null +++ b/pkg/telemetery/settings.go @@ -0,0 +1,40 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package telemetery + +import ( + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/otel/trace" + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/pkg/healthcheck" + "github.com/jaegertracing/jaeger/pkg/metrics" +) + +type Setting struct { + Logger *zap.Logger + TracerProvider trace.TracerProvider + Metrics metrics.Factory + ReportStatus func(*component.StatusEvent) +} + +func HCAdapter(hc *healthcheck.HealthCheck) func(*component.StatusEvent) { + return func(event *component.StatusEvent) { + var hcStatus healthcheck.Status + switch event.Status() { + case component.StatusOK: + hcStatus = healthcheck.Ready + case component.StatusStarting, + component.StatusRecoverableError, + component.StatusPermanentError, + component.StatusNone, + component.StatusStopping, + component.StatusStopped: + hcStatus = healthcheck.Unavailable + case component.StatusFatalError: + hcStatus = healthcheck.Broken + } + hc.Set(hcStatus) + } +} diff --git a/pkg/telemetery/settings_test.go b/pkg/telemetery/settings_test.go new file mode 100644 index 00000000000..bb82b61e57d --- /dev/null +++ b/pkg/telemetery/settings_test.go @@ -0,0 +1,78 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package telemetery_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/component" + + "github.com/jaegertracing/jaeger/pkg/healthcheck" + "github.com/jaegertracing/jaeger/pkg/telemetery" + "github.com/jaegertracing/jaeger/pkg/testutils" +) + +func TestHCAdapter(t *testing.T) { + tests := []struct { + name string + status component.Status + expectedHC healthcheck.Status + }{ + { + name: "StatusOK", + status: component.StatusOK, + expectedHC: healthcheck.Ready, + }, + { + name: "StatusStarting", + status: component.StatusStarting, + expectedHC: healthcheck.Unavailable, + }, + { + name: "StatusRecoverableError", + status: component.StatusRecoverableError, + expectedHC: healthcheck.Unavailable, + }, + { + name: "StatusPermanentError", + status: component.StatusPermanentError, + expectedHC: healthcheck.Unavailable, + }, + { + name: "StatusNone", + status: component.StatusNone, + expectedHC: healthcheck.Unavailable, + }, + { + name: "StatusStopping", + status: component.StatusStopping, + expectedHC: healthcheck.Unavailable, + }, + { + name: "StatusStopped", + status: component.StatusStopped, + expectedHC: healthcheck.Unavailable, + }, + { + name: "StatusFatalError", + status: component.StatusFatalError, + expectedHC: healthcheck.Broken, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + hc := healthcheck.New() + hcAdapter := telemetery.HCAdapter(hc) + event := component.NewStatusEvent(tt.status) + hcAdapter(event) + assert.Equal(t, tt.expectedHC, hc.Get()) + }) + } +} + +func TestMain(m *testing.M) { + testutils.VerifyGoLeaks(m) +}