diff --git a/cmd/remote-storage/app/server.go b/cmd/remote-storage/app/server.go index a79187ca471..58b72445720 100644 --- a/cmd/remote-storage/app/server.go +++ b/cmd/remote-storage/app/server.go @@ -19,6 +19,7 @@ import ( "net" "sync" + "go.opentelemetry.io/collector/component" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -26,7 +27,7 @@ import ( "google.golang.org/grpc/reflection" "github.com/jaegertracing/jaeger/cmd/query/app/querysvc" - "github.com/jaegertracing/jaeger/pkg/healthcheck" + "github.com/jaegertracing/jaeger/pkg/telemetery" "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/plugin/storage/grpc/shared" "github.com/jaegertracing/jaeger/storage" @@ -36,32 +37,30 @@ import ( // Server runs a gRPC server type Server struct { - logger *zap.Logger - healthcheck *healthcheck.HealthCheck - opts *Options + opts *Options grpcConn net.Listener grpcServer *grpc.Server wg sync.WaitGroup + telemetery.Setting } // NewServer creates and initializes Server. -func NewServer(options *Options, storageFactory storage.Factory, tm *tenancy.Manager, logger *zap.Logger, healthcheck *healthcheck.HealthCheck) (*Server, error) { - handler, err := createGRPCHandler(storageFactory, logger) +func NewServer(options *Options, storageFactory storage.Factory, tm *tenancy.Manager, telset telemetery.Setting) (*Server, error) { + handler, err := createGRPCHandler(storageFactory, telset.Logger) if err != nil { return nil, err } - grpcServer, err := createGRPCServer(options, tm, handler, logger) + grpcServer, err := createGRPCServer(options, tm, handler, telset.Logger) if err != nil { return nil, err } return &Server{ - logger: logger, - healthcheck: healthcheck, - opts: options, - grpcServer: grpcServer, + opts: options, + grpcServer: grpcServer, + Setting: telset, }, nil } @@ -129,15 +128,15 @@ func (s *Server) Start() error { if err != nil { return err } - s.logger.Info("Starting GRPC server", zap.Stringer("addr", listener.Addr())) + s.Logger.Info("Starting GRPC server", zap.Stringer("addr", listener.Addr())) s.grpcConn = listener s.wg.Add(1) go func() { defer s.wg.Done() if err := s.grpcServer.Serve(s.grpcConn); err != nil { - s.logger.Error("GRPC server exited", zap.Error(err)) + s.Logger.Error("GRPC server exited", zap.Error(err)) + s.ReportStatus(component.NewFatalErrorEvent(err)) } - s.healthcheck.Set(healthcheck.Unavailable) }() return nil @@ -149,5 +148,6 @@ func (s *Server) Close() error { s.grpcConn.Close() s.opts.TLSGRPC.Close() s.wg.Wait() + s.ReportStatus(component.NewStatusEvent(component.StatusStopped)) return nil } diff --git a/cmd/remote-storage/app/server_test.go b/cmd/remote-storage/app/server_test.go index 29324de7d05..7b056ef6728 100644 --- a/cmd/remote-storage/app/server_test.go +++ b/cmd/remote-storage/app/server_test.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" "go.uber.org/zap" "go.uber.org/zap/zaptest/observer" "google.golang.org/grpc" @@ -33,6 +34,7 @@ import ( "github.com/jaegertracing/jaeger/internal/grpctest" "github.com/jaegertracing/jaeger/pkg/config/tlscfg" "github.com/jaegertracing/jaeger/pkg/healthcheck" + "github.com/jaegertracing/jaeger/pkg/telemetery" "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/ports" "github.com/jaegertracing/jaeger/proto-gen/storage_v1" @@ -51,14 +53,16 @@ func TestNewServer_CreateStorageErrors(t *testing.T) { factory.On("CreateSpanWriter").Return(nil, nil) factory.On("CreateDependencyReader").Return(nil, errors.New("no deps")).Once() factory.On("CreateDependencyReader").Return(nil, nil) - + telset := telemetery.Setting{ + Logger: zap.NewNop(), + ReportStatus: func(*component.StatusEvent) {}, + } f := func() (*Server, error) { return NewServer( &Options{GRPCHostPort: ":0"}, factory, tenancy.NewManager(&tenancy.Options{}), - zap.NewNop(), - healthcheck.New(), + telset, ) } _, err := f() @@ -123,13 +127,16 @@ func TestNewServer_TLSConfigError(t *testing.T) { KeyPath: "invalid/path", ClientCAPath: "invalid/path", } + telset := telemetery.Setting{ + Logger: zap.NewNop(), + ReportStatus: telemetery.HCAdapter(healthcheck.New()), + } storageMocks := newStorageMocks() _, err := NewServer( &Options{GRPCHostPort: ":8081", TLSGRPC: tlsCfg}, storageMocks.factory, tenancy.NewManager(&tenancy.Options{}), - zap.NewNop(), - healthcheck.New(), + telset, ) require.Error(t, err) assert.Contains(t, err.Error(), "invalid TLS config") @@ -334,12 +341,15 @@ func TestServerGRPCTLS(t *testing.T) { storageMocks.reader.On("GetServices", mock.AnythingOfType("*context.valueCtx")).Return(expectedServices, nil) tm := tenancy.NewManager(&tenancy.Options{Enabled: true}) + telset := telemetery.Setting{ + Logger: flagsSvc.Logger, + ReportStatus: telemetery.HCAdapter(flagsSvc.HC()), + } server, err := NewServer( serverOptions, storageMocks.factory, tm, - flagsSvc.Logger, - flagsSvc.HC(), + telset, ) require.NoError(t, err) require.NoError(t, server.Start()) @@ -380,13 +390,15 @@ func TestServerHandlesPortZero(t *testing.T) { zapCore, logs := observer.New(zap.InfoLevel) flagsSvc.Logger = zap.New(zapCore) storageMocks := newStorageMocks() - + telset := telemetery.Setting{ + Logger: flagsSvc.Logger, + ReportStatus: telemetery.HCAdapter(flagsSvc.HC()), + } server, err := NewServer( &Options{GRPCHostPort: ":0"}, storageMocks.factory, tenancy.NewManager(&tenancy.Options{}), - flagsSvc.Logger, - flagsSvc.HC(), + telset, ) require.NoError(t, err) diff --git a/cmd/remote-storage/main.go b/cmd/remote-storage/main.go index f194b491e6e..a810deab86e 100644 --- a/cmd/remote-storage/main.go +++ b/cmd/remote-storage/main.go @@ -32,6 +32,7 @@ import ( "github.com/jaegertracing/jaeger/cmd/remote-storage/app" "github.com/jaegertracing/jaeger/pkg/config" "github.com/jaegertracing/jaeger/pkg/metrics" + "github.com/jaegertracing/jaeger/pkg/telemetery" "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/pkg/version" "github.com/jaegertracing/jaeger/plugin/storage" @@ -77,7 +78,11 @@ func main() { } tm := tenancy.NewManager(&opts.Tenancy) - server, err := app.NewServer(opts, storageFactory, tm, svc.Logger, svc.HC()) + telset := telemetery.Setting{ + Logger: svc.Logger, + ReportStatus: telemetery.HCAdapter(svc.HC()), + } + server, err := app.NewServer(opts, storageFactory, tm, telset) if err != nil { logger.Fatal("Failed to create server", zap.Error(err)) } diff --git a/plugin/storage/integration/remote_memory_storage.go b/plugin/storage/integration/remote_memory_storage.go index 1bd2dd7b3e5..7ac8d90551f 100644 --- a/plugin/storage/integration/remote_memory_storage.go +++ b/plugin/storage/integration/remote_memory_storage.go @@ -20,6 +20,7 @@ import ( "github.com/jaegertracing/jaeger/pkg/config" "github.com/jaegertracing/jaeger/pkg/healthcheck" "github.com/jaegertracing/jaeger/pkg/metrics" + "github.com/jaegertracing/jaeger/pkg/telemetery" "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/plugin/storage" "github.com/jaegertracing/jaeger/ports" @@ -47,7 +48,11 @@ func StartNewRemoteMemoryStorage(t *testing.T) *RemoteMemoryStorage { require.NoError(t, storageFactory.Initialize(metrics.NullFactory, logger)) t.Logf("Starting in-process remote storage server on %s", opts.GRPCHostPort) - server, err := app.NewServer(opts, storageFactory, tm, logger, healthcheck.New()) + telset := telemetery.Setting{ + Logger: logger, + ReportStatus: telemetery.HCAdapter(healthcheck.New()), + } + server, err := app.NewServer(opts, storageFactory, tm, telset) require.NoError(t, err) require.NoError(t, server.Start())