From 7c9dce4cf74fe080206e16af4b66d3f2ab74696b Mon Sep 17 00:00:00 2001 From: Will Sewell Date: Thu, 28 Mar 2024 20:49:28 +0000 Subject: [PATCH] Refactor healthcheck signalling between server and service (#5308) ## Which problem is this PR solving? Resolves #5307 ## Description of the changes - Simplifies the signalling of healthcheck status from the "servers" to the "service": instead of using 2 channels to feed healthcheck status back to the Service.HealthCheck, we just give the server components direct access to the Healthcheck which they can update directly. - This is possible because the Healthcheck package is threadsafe (uses `atomic.Value` for state). - This pattern is consistent with how the service's Healtcheck is passed directly to cmd/collector/app package. ## How was this change tested? - `make lint test` ## Checklist - [x] I have read https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md - [x] I have signed all commits - [ ] I have added unit tests for the new functionality - [x] I have run lint and test steps successfully - for `jaeger`: `make lint test` - for `jaeger-ui`: `yarn lint` and `yarn test` --------- Signed-off-by: Will Sewell Signed-off-by: Yuri Shkuro Co-authored-by: Yuri Shkuro --- cmd/all-in-one/main.go | 7 +- cmd/internal/flags/service.go | 23 +---- cmd/internal/flags/service_test.go | 2 +- .../internal/extension/jaegerquery/server.go | 3 + cmd/query/app/server.go | 45 +++++----- cmd/query/app/server_test.go | 87 ++++--------------- cmd/query/app/token_propagation_test.go | 2 +- cmd/query/main.go | 8 +- cmd/remote-storage/app/server.go | 30 +++---- cmd/remote-storage/app/server_test.go | 34 +------- cmd/remote-storage/main.go | 8 +- 11 files changed, 63 insertions(+), 186 deletions(-) diff --git a/cmd/all-in-one/main.go b/cmd/all-in-one/main.go index 721757a6079..08ac4313dff 100644 --- a/cmd/all-in-one/main.go +++ b/cmd/all-in-one/main.go @@ -283,15 +283,10 @@ func startQuery( ) *queryApp.Server { spanReader = storageMetrics.NewReadMetricsDecorator(spanReader, metricsFactory) qs := querysvc.NewQueryService(spanReader, depReader, *queryOpts) - server, err := queryApp.NewServer(svc.Logger, qs, metricsQueryService, qOpts, tm, jt) + server, err := queryApp.NewServer(svc.Logger, svc.HC(), qs, metricsQueryService, qOpts, tm, jt) if err != nil { svc.Logger.Fatal("Could not create jaeger-query", zap.Error(err)) } - go func() { - for s := range server.HealthCheckStatus() { - svc.SetHealthCheckStatus(s) - } - }() if err := server.Start(); err != nil { svc.Logger.Fatal("Could not start jaeger-query", zap.Error(err)) } diff --git a/cmd/internal/flags/service.go b/cmd/internal/flags/service.go index 291659573b8..622a960a4e2 100644 --- a/cmd/internal/flags/service.go +++ b/cmd/internal/flags/service.go @@ -50,20 +50,16 @@ type Service struct { MetricsFactory metrics.Factory signalsChannel chan os.Signal - - hcStatusChannel chan healthcheck.Status } // NewService creates a new Service. func NewService(adminPort int) *Service { signalsChannel := make(chan os.Signal, 1) - hcStatusChannel := make(chan healthcheck.Status) signal.Notify(signalsChannel, os.Interrupt, syscall.SIGTERM) return &Service{ - Admin: NewAdminServer(ports.PortToHostPort(adminPort)), - signalsChannel: signalsChannel, - hcStatusChannel: hcStatusChannel, + Admin: NewAdminServer(ports.PortToHostPort(adminPort)), + signalsChannel: signalsChannel, } } @@ -79,11 +75,6 @@ func (s *Service) AddFlags(flagSet *flag.FlagSet) { s.Admin.AddFlags(flagSet) } -// SetHealthCheckStatus sets status of healthcheck -func (s *Service) SetHealthCheckStatus(status healthcheck.Status) { - s.hcStatusChannel <- status -} - // Start bootstraps the service and starts the admin server. func (s *Service) Start(v *viper.Viper) error { if err := TryLoadConfigFile(v); err != nil { @@ -143,15 +134,7 @@ func (s *Service) HC() *healthcheck.HealthCheck { func (s *Service) RunAndThen(shutdown func()) { s.HC().Ready() -statusLoop: - for { - select { - case status := <-s.hcStatusChannel: - s.HC().Set(status) - case <-s.signalsChannel: - break statusLoop - } - } + <-s.signalsChannel s.Logger.Info("Shutting down") s.HC().Set(healthcheck.Unavailable) diff --git a/cmd/internal/flags/service_test.go b/cmd/internal/flags/service_test.go index 4194b1eab1f..fbdd13fb3cf 100644 --- a/cmd/internal/flags/service_test.go +++ b/cmd/internal/flags/service_test.go @@ -93,7 +93,7 @@ func TestStartErrors(t *testing.T) { go s.RunAndThen(shutdown) waitForEqual(t, healthcheck.Ready, func() interface{} { return s.HC().Get() }) - s.SetHealthCheckStatus(healthcheck.Unavailable) + s.HC().Set(healthcheck.Unavailable) waitForEqual(t, healthcheck.Unavailable, func() interface{} { return s.HC().Get() }) s.signalsChannel <- os.Interrupt diff --git a/cmd/jaeger/internal/extension/jaegerquery/server.go b/cmd/jaeger/internal/extension/jaegerquery/server.go index bc83ba06ef6..91c282ca333 100644 --- a/cmd/jaeger/internal/extension/jaegerquery/server.go +++ b/cmd/jaeger/internal/extension/jaegerquery/server.go @@ -14,6 +14,7 @@ import ( "github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage" queryApp "github.com/jaegertracing/jaeger/cmd/query/app" "github.com/jaegertracing/jaeger/cmd/query/app/querysvc" + "github.com/jaegertracing/jaeger/pkg/healthcheck" "github.com/jaegertracing/jaeger/pkg/jtracer" "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/plugin/metrics/disabled" @@ -81,6 +82,8 @@ func (s *server) Start(ctx context.Context, host component.Host) error { //nolint s.server, err = queryApp.NewServer( s.logger, + // TODO propagate healthcheck updates up to the collector's runtime + healthcheck.New(), qs, metricsQueryService, s.makeQueryOptions(), diff --git a/cmd/query/app/server.go b/cmd/query/app/server.go index 9cdb6ec879d..7882055f65d 100644 --- a/cmd/query/app/server.go +++ b/cmd/query/app/server.go @@ -48,23 +48,23 @@ import ( // Server runs HTTP, Mux and a grpc server type Server struct { logger *zap.Logger + healthCheck *healthcheck.HealthCheck querySvc *querysvc.QueryService queryOptions *QueryOptions tracer *jtracer.JTracer // TODO make part of flags.Service - conn net.Listener - grpcConn net.Listener - httpConn net.Listener - cmuxServer cmux.CMux - grpcServer *grpc.Server - httpServer *http.Server - separatePorts bool - unavailableChannel chan healthcheck.Status + conn net.Listener + grpcConn net.Listener + httpConn net.Listener + cmuxServer cmux.CMux + grpcServer *grpc.Server + httpServer *http.Server + separatePorts bool } // NewServer creates and initializes Server -func NewServer(logger *zap.Logger, querySvc *querysvc.QueryService, metricsQuerySvc querysvc.MetricsQueryService, options *QueryOptions, tm *tenancy.Manager, tracer *jtracer.JTracer) (*Server, error) { +func NewServer(logger *zap.Logger, healthCheck *healthcheck.HealthCheck, querySvc *querysvc.QueryService, metricsQuerySvc querysvc.MetricsQueryService, options *QueryOptions, tm *tenancy.Manager, tracer *jtracer.JTracer) (*Server, error) { _, httpPort, err := net.SplitHostPort(options.HTTPHostPort) if err != nil { return nil, fmt.Errorf("invalid HTTP server host:port: %w", err) @@ -89,22 +89,17 @@ func NewServer(logger *zap.Logger, querySvc *querysvc.QueryService, metricsQuery } return &Server{ - logger: logger, - querySvc: querySvc, - queryOptions: options, - tracer: tracer, - grpcServer: grpcServer, - httpServer: httpServer, - separatePorts: grpcPort != httpPort, - unavailableChannel: make(chan healthcheck.Status), + logger: logger, + healthCheck: healthCheck, + querySvc: querySvc, + queryOptions: options, + tracer: tracer, + grpcServer: grpcServer, + httpServer: httpServer, + separatePorts: grpcPort != httpPort, }, nil } -// HealthCheckStatus returns health check status channel a client can subscribe to -func (s Server) HealthCheckStatus() chan healthcheck.Status { - return s.unavailableChannel -} - func createGRPCServer(querySvc *querysvc.QueryService, metricsQuerySvc querysvc.MetricsQueryService, options *QueryOptions, tm *tenancy.Manager, logger *zap.Logger, tracer *jtracer.JTracer) (*grpc.Server, error) { var grpcOpts []grpc.ServerOption @@ -292,7 +287,7 @@ func (s *Server) Start() error { s.logger.Error("Could not start HTTP server", zap.Error(err)) } - s.unavailableChannel <- healthcheck.Unavailable + s.healthCheck.Set(healthcheck.Unavailable) }() // Start GRPC server concurrently @@ -302,7 +297,7 @@ func (s *Server) Start() error { if err := s.grpcServer.Serve(s.grpcConn); err != nil { s.logger.Error("Could not start GRPC server", zap.Error(err)) } - s.unavailableChannel <- healthcheck.Unavailable + s.healthCheck.Set(healthcheck.Unavailable) }() // Start cmux server concurrently. @@ -315,7 +310,7 @@ func (s *Server) Start() error { if err != nil && !strings.Contains(err.Error(), "use of closed network connection") { s.logger.Error("Could not start multiplexed server", zap.Error(err)) } - s.unavailableChannel <- healthcheck.Unavailable + s.healthCheck.Set(healthcheck.Unavailable) }() } diff --git a/cmd/query/app/server_test.go b/cmd/query/app/server_test.go index 43e0bcc53a4..8cd0b300bb8 100644 --- a/cmd/query/app/server_test.go +++ b/cmd/query/app/server_test.go @@ -20,7 +20,6 @@ import ( "fmt" "net" "net/http" - "sync" "testing" "time" @@ -67,7 +66,7 @@ func TestCreateTLSServerSinglePortError(t *testing.T) { ClientCAPath: testCertKeyLocation + "/example-CA-cert.pem", } - _, err := NewServer(zap.NewNop(), &querysvc.QueryService{}, nil, + _, err := NewServer(zap.NewNop(), healthcheck.New(), &querysvc.QueryService{}, nil, &QueryOptions{HTTPHostPort: ":8080", GRPCHostPort: ":8080", TLSGRPC: tlsCfg, TLSHTTP: tlsCfg}, tenancy.NewManager(&tenancy.Options{}), jtracer.NoOp()) require.Error(t, err) @@ -81,7 +80,7 @@ func TestCreateTLSGrpcServerError(t *testing.T) { ClientCAPath: "invalid/path", } - _, err := NewServer(zap.NewNop(), &querysvc.QueryService{}, nil, + _, err := NewServer(zap.NewNop(), healthcheck.New(), &querysvc.QueryService{}, nil, &QueryOptions{HTTPHostPort: ":8080", GRPCHostPort: ":8081", TLSGRPC: tlsCfg}, tenancy.NewManager(&tenancy.Options{}), jtracer.NoOp()) require.Error(t, err) @@ -95,7 +94,7 @@ func TestCreateTLSHttpServerError(t *testing.T) { ClientCAPath: "invalid/path", } - _, err := NewServer(zap.NewNop(), &querysvc.QueryService{}, nil, + _, err := NewServer(zap.NewNop(), healthcheck.New(), &querysvc.QueryService{}, nil, &QueryOptions{HTTPHostPort: ":8080", GRPCHostPort: ":8081", TLSHTTP: tlsCfg}, tenancy.NewManager(&tenancy.Options{}), jtracer.NoOp()) require.Error(t, err) @@ -340,27 +339,12 @@ func TestServerHTTPTLS(t *testing.T) { spanReader.On("GetServices", mock.AnythingOfType("*context.valueCtx")).Return(expectedServices, nil) querySvc := querysvc.NewQueryService(spanReader, dependencyReader, querysvc.QueryServiceOptions{}) - server, err := NewServer(flagsSvc.Logger, querySvc, nil, - serverOptions, tenancy.NewManager(&tenancy.Options{}), + server, err := NewServer(flagsSvc.Logger, flagsSvc.HC(), querySvc, + nil, serverOptions, tenancy.NewManager(&tenancy.Options{}), jtracer.NoOp()) require.NoError(t, err) require.NoError(t, server.Start()) - var wg sync.WaitGroup - wg.Add(1) - once := sync.Once{} - - go func() { - for s := range server.HealthCheckStatus() { - flagsSvc.HC().Set(s) - if s == healthcheck.Unavailable { - once.Do(func() { - wg.Done() - }) - } - } - }() - var clientError error var clientClose func() error var clientTLSCfg *tls.Config @@ -423,7 +407,6 @@ func TestServerHTTPTLS(t *testing.T) { } } server.Close() - wg.Wait() assert.Equal(t, healthcheck.Unavailable, flagsSvc.HC().Get()) }) } @@ -502,27 +485,12 @@ func TestServerGRPCTLS(t *testing.T) { spanReader.On("GetServices", mock.AnythingOfType("*context.valueCtx")).Return(expectedServices, nil) querySvc := querysvc.NewQueryService(spanReader, dependencyReader, querysvc.QueryServiceOptions{}) - server, err := NewServer(flagsSvc.Logger, querySvc, nil, - serverOptions, tenancy.NewManager(&tenancy.Options{}), + server, err := NewServer(flagsSvc.Logger, flagsSvc.HC(), querySvc, + nil, serverOptions, tenancy.NewManager(&tenancy.Options{}), jtracer.NoOp()) require.NoError(t, err) require.NoError(t, server.Start()) - var wg sync.WaitGroup - wg.Add(1) - once := sync.Once{} - - go func() { - for s := range server.HealthCheckStatus() { - flagsSvc.HC().Set(s) - if s == healthcheck.Unavailable { - once.Do(func() { - wg.Done() - }) - } - } - }() - var clientError error var client *grpcClient @@ -549,14 +517,13 @@ func TestServerGRPCTLS(t *testing.T) { } require.NoError(t, client.conn.Close()) server.Close() - wg.Wait() assert.Equal(t, healthcheck.Unavailable, flagsSvc.HC().Get()) }) } } func TestServerBadHostPort(t *testing.T) { - _, err := NewServer(zap.NewNop(), &querysvc.QueryService{}, nil, + _, err := NewServer(zap.NewNop(), healthcheck.New(), &querysvc.QueryService{}, nil, &QueryOptions{ HTTPHostPort: "8080", GRPCHostPort: "127.0.0.1:8081", @@ -568,7 +535,7 @@ func TestServerBadHostPort(t *testing.T) { jtracer.NoOp()) require.Error(t, err) - _, err = NewServer(zap.NewNop(), &querysvc.QueryService{}, nil, + _, err = NewServer(zap.NewNop(), healthcheck.New(), &querysvc.QueryService{}, nil, &QueryOptions{ HTTPHostPort: "127.0.0.1:8081", GRPCHostPort: "9123", @@ -600,6 +567,7 @@ func TestServerInUseHostPort(t *testing.T) { t.Run(tc.name, func(t *testing.T) { server, err := NewServer( zap.NewNop(), + healthcheck.New(), &querysvc.QueryService{}, nil, &QueryOptions{ @@ -637,7 +605,7 @@ func TestServerSinglePort(t *testing.T) { spanReader.On("GetServices", mock.AnythingOfType("*context.valueCtx")).Return(expectedServices, nil) querySvc := querysvc.NewQueryService(spanReader, dependencyReader, querysvc.QueryServiceOptions{}) - server, err := NewServer(flagsSvc.Logger, querySvc, nil, + server, err := NewServer(flagsSvc.Logger, flagsSvc.HC(), querySvc, nil, &QueryOptions{ GRPCHostPort: hostPort, HTTPHostPort: hostPort, @@ -650,23 +618,6 @@ func TestServerSinglePort(t *testing.T) { require.NoError(t, err) require.NoError(t, server.Start()) - var wg sync.WaitGroup - wg.Add(1) - once := sync.Once{} - - go func() { - for s := range server.HealthCheckStatus() { - flagsSvc.HC().Set(s) - if s == healthcheck.Unavailable { - once.Do(func() { - wg.Done() - }) - } - - } - wg.Done() - }() - client := newGRPCClient(t, hostPort) defer client.conn.Close() @@ -678,7 +629,6 @@ func TestServerSinglePort(t *testing.T) { assert.Equal(t, expectedServices, res.Services) server.Close() - wg.Wait() assert.Equal(t, healthcheck.Unavailable, flagsSvc.HC().Get()) } @@ -694,15 +644,11 @@ func TestServerGracefulExit(t *testing.T) { querySvc := &querysvc.QueryService{} tracer := jtracer.NoOp() - server, err := NewServer(flagsSvc.Logger, querySvc, nil, &QueryOptions{GRPCHostPort: hostPort, HTTPHostPort: hostPort}, + server, err := NewServer(flagsSvc.Logger, flagsSvc.HC(), querySvc, nil, + &QueryOptions{GRPCHostPort: hostPort, HTTPHostPort: hostPort}, tenancy.NewManager(&tenancy.Options{}), tracer) require.NoError(t, err) require.NoError(t, server.Start()) - go func() { - for s := range server.HealthCheckStatus() { - flagsSvc.HC().Set(s) - } - }() // Wait for servers to come up before we can call .Close() // TODO Find a way to wait only as long as necessary. Unconditional sleep slows down the tests. @@ -722,7 +668,7 @@ func TestServerHandlesPortZero(t *testing.T) { querySvc := &querysvc.QueryService{} tracer := jtracer.NoOp() - server, err := NewServer(flagsSvc.Logger, querySvc, nil, + server, err := NewServer(flagsSvc.Logger, flagsSvc.HC(), querySvc, nil, &QueryOptions{GRPCHostPort: ":0", HTTPHostPort: ":0"}, tenancy.NewManager(&tenancy.Options{}), tracer) @@ -783,9 +729,8 @@ func TestServerHTTPTenancy(t *testing.T) { dependencyReader := &depsmocks.Reader{} querySvc := querysvc.NewQueryService(spanReader, dependencyReader, querysvc.QueryServiceOptions{}) - server, err := NewServer(zap.NewNop(), querySvc, nil, - serverOptions, tenancyMgr, - jtracer.NoOp()) + server, err := NewServer(zap.NewNop(), healthcheck.New(), querySvc, + nil, serverOptions, tenancyMgr, jtracer.NoOp()) require.NoError(t, err) require.NoError(t, server.Start()) diff --git a/cmd/query/app/token_propagation_test.go b/cmd/query/app/token_propagation_test.go index 9fd3aca71fa..89eb9491df4 100644 --- a/cmd/query/app/token_propagation_test.go +++ b/cmd/query/app/token_propagation_test.go @@ -90,7 +90,7 @@ func runQueryService(t *testing.T, esURL string) *Server { require.NoError(t, err) querySvc := querysvc.NewQueryService(spanReader, nil, querysvc.QueryServiceOptions{}) - server, err := NewServer(flagsSvc.Logger, querySvc, nil, + server, err := NewServer(flagsSvc.Logger, flagsSvc.HC(), querySvc, nil, &QueryOptions{ GRPCHostPort: ":0", HTTPHostPort: ":0", diff --git a/cmd/query/main.go b/cmd/query/main.go index ccc9ab81933..23310ee64b1 100644 --- a/cmd/query/main.go +++ b/cmd/query/main.go @@ -113,17 +113,11 @@ func main() { dependencyReader, *queryServiceOptions) tm := tenancy.NewManager(&queryOpts.Tenancy) - server, err := app.NewServer(svc.Logger, queryService, metricsQueryService, queryOpts, tm, jt) + server, err := app.NewServer(svc.Logger, svc.HC(), queryService, metricsQueryService, queryOpts, tm, jt) if err != nil { logger.Fatal("Failed to create server", zap.Error(err)) } - go func() { - for s := range server.HealthCheckStatus() { - svc.SetHealthCheckStatus(s) - } - }() - if err := server.Start(); err != nil { logger.Fatal("Could not start servers", zap.Error(err)) } diff --git a/cmd/remote-storage/app/server.go b/cmd/remote-storage/app/server.go index 008d4302285..ecc29f0ff9b 100644 --- a/cmd/remote-storage/app/server.go +++ b/cmd/remote-storage/app/server.go @@ -35,17 +35,17 @@ import ( // Server runs a gRPC server type Server struct { - logger *zap.Logger - opts *Options + logger *zap.Logger + healthcheck *healthcheck.HealthCheck + opts *Options - grpcConn net.Listener - grpcServer *grpc.Server - unavailableChannel chan healthcheck.Status // used to signal to admin server that gRPC server is unavailable - wg sync.WaitGroup + grpcConn net.Listener + grpcServer *grpc.Server + wg sync.WaitGroup } // NewServer creates and initializes Server. -func NewServer(options *Options, storageFactory storage.Factory, tm *tenancy.Manager, logger *zap.Logger) (*Server, error) { +func NewServer(options *Options, storageFactory storage.Factory, tm *tenancy.Manager, logger *zap.Logger, healthcheck *healthcheck.HealthCheck) (*Server, error) { handler, err := createGRPCHandler(storageFactory, logger) if err != nil { return nil, err @@ -57,10 +57,10 @@ func NewServer(options *Options, storageFactory storage.Factory, tm *tenancy.Man } return &Server{ - logger: logger, - opts: options, - grpcServer: grpcServer, - unavailableChannel: make(chan healthcheck.Status), + logger: logger, + healthcheck: healthcheck, + opts: options, + grpcServer: grpcServer, }, nil } @@ -96,11 +96,6 @@ func createGRPCHandler(f storage.Factory, logger *zap.Logger) (*shared.GRPCHandl return handler, nil } -// HealthCheckStatus returns health check status channel a client can subscribe to -func (s *Server) HealthCheckStatus() chan healthcheck.Status { - return s.unavailableChannel -} - func createGRPCServer(opts *Options, tm *tenancy.Manager, handler *shared.GRPCHandler, logger *zap.Logger) (*grpc.Server, error) { var grpcOpts []grpc.ServerOption @@ -140,7 +135,7 @@ func (s *Server) Start() error { if err := s.grpcServer.Serve(s.grpcConn); err != nil { s.logger.Error("GRPC server exited", zap.Error(err)) } - s.unavailableChannel <- healthcheck.Unavailable + s.healthcheck.Set(healthcheck.Unavailable) }() return nil @@ -152,6 +147,5 @@ func (s *Server) Close() error { s.grpcConn.Close() s.opts.TLSGRPC.Close() s.wg.Wait() - close(s.unavailableChannel) return nil } diff --git a/cmd/remote-storage/app/server_test.go b/cmd/remote-storage/app/server_test.go index 201949e8b2f..f36b13629d4 100644 --- a/cmd/remote-storage/app/server_test.go +++ b/cmd/remote-storage/app/server_test.go @@ -17,7 +17,6 @@ package app import ( "context" "errors" - "sync" "testing" "time" @@ -59,6 +58,7 @@ func TestNewServer_CreateStorageErrors(t *testing.T) { factory, tenancy.NewManager(&tenancy.Options{}), zap.NewNop(), + healthcheck.New(), ) } _, err := f() @@ -80,7 +80,6 @@ func TestNewServer_CreateStorageErrors(t *testing.T) { validateGRPCServer(t, s.grpcConn.Addr().String(), s.grpcServer) s.grpcConn.Close() // causes logged error - <-s.HealthCheckStatus() } func TestServerStart_BadPortErrors(t *testing.T) { @@ -130,6 +129,7 @@ func TestNewServer_TLSConfigError(t *testing.T) { storageMocks.factory, tenancy.NewManager(&tenancy.Options{}), zap.NewNop(), + healthcheck.New(), ) require.Error(t, err) assert.Contains(t, err.Error(), "invalid TLS config") @@ -337,23 +337,11 @@ func TestServerGRPCTLS(t *testing.T) { storageMocks.factory, tm, flagsSvc.Logger, + flagsSvc.HC(), ) require.NoError(t, err) require.NoError(t, server.Start()) - var wg sync.WaitGroup - wg.Add(1) - once := sync.Once{} - - go func() { - for s := range server.HealthCheckStatus() { - flagsSvc.HC().Set(s) - if s == healthcheck.Unavailable { - once.Do(wg.Done) - } - } - }() - var clientError error var client *grpcClient @@ -380,7 +368,6 @@ func TestServerGRPCTLS(t *testing.T) { } require.NoError(t, client.conn.Close()) server.Close() - wg.Wait() assert.Equal(t, healthcheck.Unavailable, flagsSvc.HC().Get()) }) } @@ -397,22 +384,10 @@ func TestServerHandlesPortZero(t *testing.T) { storageMocks.factory, tenancy.NewManager(&tenancy.Options{}), flagsSvc.Logger, + flagsSvc.HC(), ) require.NoError(t, err) - var wg sync.WaitGroup - wg.Add(1) - once := sync.Once{} - - go func() { - for s := range server.HealthCheckStatus() { - flagsSvc.HC().Set(s) - if s == healthcheck.Unavailable { - once.Do(wg.Done) - } - } - }() - require.NoError(t, server.Start()) const line = "Starting GRPC server" @@ -424,7 +399,6 @@ func TestServerHandlesPortZero(t *testing.T) { validateGRPCServer(t, hostPort, server.grpcServer) server.Close() - wg.Wait() assert.Equal(t, healthcheck.Unavailable, flagsSvc.HC().Get()) } diff --git a/cmd/remote-storage/main.go b/cmd/remote-storage/main.go index a4f7759a3aa..51166025f6a 100644 --- a/cmd/remote-storage/main.go +++ b/cmd/remote-storage/main.go @@ -77,17 +77,11 @@ func main() { } tm := tenancy.NewManager(&opts.Tenancy) - server, err := app.NewServer(opts, storageFactory, tm, svc.Logger) + server, err := app.NewServer(opts, storageFactory, tm, svc.Logger, svc.HC()) if err != nil { logger.Fatal("Failed to create server", zap.Error(err)) } - go func() { - for s := range server.HealthCheckStatus() { - svc.SetHealthCheckStatus(s) - } - }() - if err := server.Start(); err != nil { logger.Fatal("Could not start servers", zap.Error(err)) }