Skip to content

Commit

Permalink
Refactor healthcheck signalling between server and service (jaegertra…
Browse files Browse the repository at this point in the history
…cing#5308)

## Which problem is this PR solving?
Resolves jaegertracing#5307

## Description of the changes
- Simplifies the signalling of healthcheck status from the "servers" to
the "service": instead of using 2 channels to feed healthcheck status
back to the Service.HealthCheck, we just give the server components
direct access to the Healthcheck which they can update directly.
- This is possible because the Healthcheck package is threadsafe (uses
`atomic.Value` for state).
- This pattern is consistent with how the service's Healtcheck is passed
directly to cmd/collector/app package.

## How was this change tested?
- `make lint test`

## Checklist
- [x] I have read
https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md
- [x] I have signed all commits
- [ ] I have added unit tests for the new functionality
- [x] I have run lint and test steps successfully
  - for `jaeger`: `make lint test`
  - for `jaeger-ui`: `yarn lint` and `yarn test`

---------

Signed-off-by: Will Sewell <willsewell@monzo.com>
Signed-off-by: Yuri Shkuro <github@ysh.us>
Co-authored-by: Yuri Shkuro <github@ysh.us>
  • Loading branch information
Will Sewell and yurishkuro authored Mar 28, 2024
1 parent 0be0cb9 commit 7c9dce4
Show file tree
Hide file tree
Showing 11 changed files with 63 additions and 186 deletions.
7 changes: 1 addition & 6 deletions cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,15 +283,10 @@ func startQuery(
) *queryApp.Server {
spanReader = storageMetrics.NewReadMetricsDecorator(spanReader, metricsFactory)
qs := querysvc.NewQueryService(spanReader, depReader, *queryOpts)
server, err := queryApp.NewServer(svc.Logger, qs, metricsQueryService, qOpts, tm, jt)
server, err := queryApp.NewServer(svc.Logger, svc.HC(), qs, metricsQueryService, qOpts, tm, jt)
if err != nil {
svc.Logger.Fatal("Could not create jaeger-query", zap.Error(err))
}
go func() {
for s := range server.HealthCheckStatus() {
svc.SetHealthCheckStatus(s)
}
}()
if err := server.Start(); err != nil {
svc.Logger.Fatal("Could not start jaeger-query", zap.Error(err))
}
Expand Down
23 changes: 3 additions & 20 deletions cmd/internal/flags/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,20 +50,16 @@ type Service struct {
MetricsFactory metrics.Factory

signalsChannel chan os.Signal

hcStatusChannel chan healthcheck.Status
}

// NewService creates a new Service.
func NewService(adminPort int) *Service {
signalsChannel := make(chan os.Signal, 1)
hcStatusChannel := make(chan healthcheck.Status)
signal.Notify(signalsChannel, os.Interrupt, syscall.SIGTERM)

return &Service{
Admin: NewAdminServer(ports.PortToHostPort(adminPort)),
signalsChannel: signalsChannel,
hcStatusChannel: hcStatusChannel,
Admin: NewAdminServer(ports.PortToHostPort(adminPort)),
signalsChannel: signalsChannel,
}
}

Expand All @@ -79,11 +75,6 @@ func (s *Service) AddFlags(flagSet *flag.FlagSet) {
s.Admin.AddFlags(flagSet)
}

// SetHealthCheckStatus sets status of healthcheck
func (s *Service) SetHealthCheckStatus(status healthcheck.Status) {
s.hcStatusChannel <- status
}

// Start bootstraps the service and starts the admin server.
func (s *Service) Start(v *viper.Viper) error {
if err := TryLoadConfigFile(v); err != nil {
Expand Down Expand Up @@ -143,15 +134,7 @@ func (s *Service) HC() *healthcheck.HealthCheck {
func (s *Service) RunAndThen(shutdown func()) {
s.HC().Ready()

statusLoop:
for {
select {
case status := <-s.hcStatusChannel:
s.HC().Set(status)
case <-s.signalsChannel:
break statusLoop
}
}
<-s.signalsChannel

s.Logger.Info("Shutting down")
s.HC().Set(healthcheck.Unavailable)
Expand Down
2 changes: 1 addition & 1 deletion cmd/internal/flags/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func TestStartErrors(t *testing.T) {
go s.RunAndThen(shutdown)

waitForEqual(t, healthcheck.Ready, func() interface{} { return s.HC().Get() })
s.SetHealthCheckStatus(healthcheck.Unavailable)
s.HC().Set(healthcheck.Unavailable)
waitForEqual(t, healthcheck.Unavailable, func() interface{} { return s.HC().Get() })

s.signalsChannel <- os.Interrupt
Expand Down
3 changes: 3 additions & 0 deletions cmd/jaeger/internal/extension/jaegerquery/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage"
queryApp "github.com/jaegertracing/jaeger/cmd/query/app"
"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
"github.com/jaegertracing/jaeger/pkg/healthcheck"
"github.com/jaegertracing/jaeger/pkg/jtracer"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/plugin/metrics/disabled"
Expand Down Expand Up @@ -81,6 +82,8 @@ func (s *server) Start(ctx context.Context, host component.Host) error {
//nolint
s.server, err = queryApp.NewServer(
s.logger,
// TODO propagate healthcheck updates up to the collector's runtime
healthcheck.New(),
qs,
metricsQueryService,
s.makeQueryOptions(),
Expand Down
45 changes: 20 additions & 25 deletions cmd/query/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,23 +48,23 @@ import (
// Server runs HTTP, Mux and a grpc server
type Server struct {
logger *zap.Logger
healthCheck *healthcheck.HealthCheck
querySvc *querysvc.QueryService
queryOptions *QueryOptions

tracer *jtracer.JTracer // TODO make part of flags.Service

conn net.Listener
grpcConn net.Listener
httpConn net.Listener
cmuxServer cmux.CMux
grpcServer *grpc.Server
httpServer *http.Server
separatePorts bool
unavailableChannel chan healthcheck.Status
conn net.Listener
grpcConn net.Listener
httpConn net.Listener
cmuxServer cmux.CMux
grpcServer *grpc.Server
httpServer *http.Server
separatePorts bool
}

// NewServer creates and initializes Server
func NewServer(logger *zap.Logger, querySvc *querysvc.QueryService, metricsQuerySvc querysvc.MetricsQueryService, options *QueryOptions, tm *tenancy.Manager, tracer *jtracer.JTracer) (*Server, error) {
func NewServer(logger *zap.Logger, healthCheck *healthcheck.HealthCheck, querySvc *querysvc.QueryService, metricsQuerySvc querysvc.MetricsQueryService, options *QueryOptions, tm *tenancy.Manager, tracer *jtracer.JTracer) (*Server, error) {
_, httpPort, err := net.SplitHostPort(options.HTTPHostPort)
if err != nil {
return nil, fmt.Errorf("invalid HTTP server host:port: %w", err)
Expand All @@ -89,22 +89,17 @@ func NewServer(logger *zap.Logger, querySvc *querysvc.QueryService, metricsQuery
}

return &Server{
logger: logger,
querySvc: querySvc,
queryOptions: options,
tracer: tracer,
grpcServer: grpcServer,
httpServer: httpServer,
separatePorts: grpcPort != httpPort,
unavailableChannel: make(chan healthcheck.Status),
logger: logger,
healthCheck: healthCheck,
querySvc: querySvc,
queryOptions: options,
tracer: tracer,
grpcServer: grpcServer,
httpServer: httpServer,
separatePorts: grpcPort != httpPort,
}, nil
}

// HealthCheckStatus returns health check status channel a client can subscribe to
func (s Server) HealthCheckStatus() chan healthcheck.Status {
return s.unavailableChannel
}

func createGRPCServer(querySvc *querysvc.QueryService, metricsQuerySvc querysvc.MetricsQueryService, options *QueryOptions, tm *tenancy.Manager, logger *zap.Logger, tracer *jtracer.JTracer) (*grpc.Server, error) {
var grpcOpts []grpc.ServerOption

Expand Down Expand Up @@ -292,7 +287,7 @@ func (s *Server) Start() error {
s.logger.Error("Could not start HTTP server", zap.Error(err))
}

s.unavailableChannel <- healthcheck.Unavailable
s.healthCheck.Set(healthcheck.Unavailable)
}()

// Start GRPC server concurrently
Expand All @@ -302,7 +297,7 @@ func (s *Server) Start() error {
if err := s.grpcServer.Serve(s.grpcConn); err != nil {
s.logger.Error("Could not start GRPC server", zap.Error(err))
}
s.unavailableChannel <- healthcheck.Unavailable
s.healthCheck.Set(healthcheck.Unavailable)
}()

// Start cmux server concurrently.
Expand All @@ -315,7 +310,7 @@ func (s *Server) Start() error {
if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
s.logger.Error("Could not start multiplexed server", zap.Error(err))
}
s.unavailableChannel <- healthcheck.Unavailable
s.healthCheck.Set(healthcheck.Unavailable)
}()
}

Expand Down
Loading

0 comments on commit 7c9dce4

Please sign in to comment.