Skip to content

Commit

Permalink
Merge pull request #8 from BuoyantIO/siggy/terminate-limit
Browse files Browse the repository at this point in the history
Introduce terminate-after flag, graceful shutdown
  • Loading branch information
siggy authored May 22, 2018
2 parents b876676 + 4891aef commit ab87349
Show file tree
Hide file tree
Showing 12 changed files with 168 additions and 34 deletions.
12 changes: 10 additions & 2 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
## v0.0.2

* Introduce `terminate-after` flag, which instructs the process to shutdown
after a specified number of requests.
* Introduce graceful shutdown. Upon receiving a shutdown message via SIGTERM, or
via `terminate-after`, call shutdown on each server, allowing requests to
drain.

## v0.0.1

bb 0.0.1 is the first public release of bb

* This release supports HTTP 1.1 and gRPC.
* Available strategies are: broadcast channel, point-to-point channel, terminus, and HTTP egress
* Allows users tio define a percentage of requests that should fail and a duration to wait for
* Allows users tio define a percentage of requests that should fail and a duration to wait for
before processing requests.
* This release has been tested locally on Mac OS and on both Google Kubernetes Engine and
* This release has been tested locally on Mac OS and on both Google Kubernetes Engine and
Minikube.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ A test run using the Docker CLI should return usage information and confirm that
--id string identifier for this container
--log-level string log level, must be one of: panic, fatal, error, warn, info, debug (default "debug")
--percent-failure int percentage of requests that this service will automatically fail
--sleep-in-millis int amount of milliseconds to wait before actually start processing as request
--sleep-in-millis int amount of milliseconds to wait before actually start processing a request
--terminate-after int terminate the process after this many requests

Use "bb [command] --help" for more information about a command.

Expand Down
3 changes: 2 additions & 1 deletion cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ func init() {
RootCmd.PersistentFlags().IntVar(&config.GRPCServerPort, "grpc-server-port", -1, "port to bind a gRPC server to")
RootCmd.PersistentFlags().IntVar(&config.H1ServerPort, "h1-server-port", -1, "port to bind a HTTP 1.1 server to")
RootCmd.PersistentFlags().IntVar(&config.PercentageFailedRequests, "percent-failure", 0, "percentage of requests that this service will automatically fail")
RootCmd.PersistentFlags().IntVar(&config.SleepInMillis, "sleep-in-millis", 0, "amount of milliseconds to wait before actually start processing as request")
RootCmd.PersistentFlags().IntVar(&config.SleepInMillis, "sleep-in-millis", 0, "amount of milliseconds to wait before actually start processing a request")
RootCmd.PersistentFlags().IntVar(&config.TerminateAfter, "terminate-after", 0, "terminate the process after this many requests")
RootCmd.PersistentFlags().BoolVar(&config.FireAndForget, "fire-and-forget", false, "do not wait for a response when contacting downstream services.")
RootCmd.PersistentFlags().StringSliceVar(&config.GRPCDownstreamServers, "grpc-downstream-server", []string{}, "list of servers (hostname:port) to send messages to using gRPC, can be repeated")
RootCmd.PersistentFlags().StringSliceVar(&config.H1DownstreamServers, "h1-downstream-server", []string{}, "list of servers (protocol://hostname:port) to send messages to using HTTP 1.1, can be repeated")
Expand Down
17 changes: 13 additions & 4 deletions cmd/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,7 @@ func buildClients(config *service.Config) ([]service.Client, error) {

func newService(config *service.Config, strategyName string) (*service.Service, error) {

handler := &service.RequestHandler{
Config: config,
}
handler := service.NewRequestHandler(config)

servers, err := buildServers(config, handler)
if err != nil {
Expand Down Expand Up @@ -95,7 +93,18 @@ func newService(config *service.Config, strategyName string) (*service.Service,
stop := make(chan os.Signal, 1)
signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
log.Infof("Service [%s] is ready and waiting for incoming connections", config.ID)
<-stop

select {
case <-stop:
log.Infof("Stopping service [%s] due to interrupt", config.ID)
case <-handler.Stopping():
log.Infof("Stopping service [%s] due to handler", config.ID)
}

for _, server := range servers {
server.Shutdown()
}

return service, nil
}

Expand Down
8 changes: 8 additions & 0 deletions protocols/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
)

type theGrpcServer struct {
grpcServer *grpc.Server
port int
serviceHandler *service.RequestHandler
}
Expand All @@ -20,6 +21,12 @@ func (s *theGrpcServer) GetID() string {
return fmt.Sprintf("grpc-%d", s.port)
}

func (s *theGrpcServer) Shutdown() error {
log.Infof("Shutting down [%s]", s.GetID())
s.grpcServer.GracefulStop()
return nil
}

func (s *theGrpcServer) TheFunction(ctx context.Context, req *pb.TheRequest) (*pb.TheResponse, error) {
resp, err := s.serviceHandler.Handle(ctx, req)
log.Infof("Received gRPC request [%s] [%s] Returning response [%+v]", req.RequestUID, req, resp)
Expand Down Expand Up @@ -59,6 +66,7 @@ func NewGrpcServerIfConfigured(config *service.Config, serviceHandler *service.R
grpcServer := grpc.NewServer()

theGrpcServer := &theGrpcServer{
grpcServer: grpcServer,
port: grpcServerPort,
serviceHandler: serviceHandler,
}
Expand Down
8 changes: 6 additions & 2 deletions protocols/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ func TestTheGrpcServer(t *testing.T) {
theResponseToReturn: expectedProtoResponse,
}

grpcServer := theGrpcServer{serviceHandler: &service.RequestHandler{Config: &service.Config{}, Strategy: strategy}}
requestHandler := service.NewRequestHandler(&service.Config{})
requestHandler.Strategy = strategy
grpcServer := theGrpcServer{serviceHandler: requestHandler}

actualProtoResponse, err := grpcServer.TheFunction(context.TODO(), expectedProtoRequest)
if err != nil {
Expand Down Expand Up @@ -51,7 +53,9 @@ func TestTheGrpcServer(t *testing.T) {
theErrorToReturn: expectedError,
}

grpcServer := theGrpcServer{serviceHandler: &service.RequestHandler{Config: &service.Config{}, Strategy: strategy}}
requestHandler := service.NewRequestHandler(&service.Config{})
requestHandler.Strategy = strategy
grpcServer := theGrpcServer{serviceHandler: requestHandler}

_, actualError := grpcServer.TheFunction(context.TODO(), expectedProtoRequest)
if actualError == nil {
Expand Down
25 changes: 18 additions & 7 deletions protocols/http.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package protocols

import (
"context"
"errors"
"fmt"
"io"
Expand All @@ -19,7 +20,8 @@ import (
var marshaller = &jsonpb.Marshaler{}

type theHTTPServer struct {
port int
httpServer *http.Server
port int
}

type httpHandler struct {
Expand All @@ -30,6 +32,11 @@ func (s *theHTTPServer) GetID() string {
return fmt.Sprintf("h1-%d", s.port)
}

func (s *theHTTPServer) Shutdown() error {
log.Infof("Shutting down [%s]", s.GetID())
return s.httpServer.Shutdown(context.Background())
}

func (h *httpHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
var protoReq pb.TheRequest

Expand All @@ -41,7 +48,7 @@ func (h *httpHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}
protoReq = r
} else {
newRequestUID := newRequestUID("http", h.serviceHandler.Config)
newRequestUID := newRequestUID("http", h.serviceHandler.ConfigID())
log.Infof("Received request with empty body, assigning new request UID [%s] to it", newRequestUID)
protoReq = pb.TheRequest{
RequestUID: newRequestUID,
Expand Down Expand Up @@ -90,8 +97,8 @@ func (c *httpClient) Send(req *pb.TheRequest) (*pb.TheResponse, error) {
return &protoResp, err
}

func newRequestUID(inboundType string, config *service.Config) string {
return fmt.Sprintf("in:%s-sid:%s-%d", inboundType, config.ID, time.Now().Nanosecond())
func newRequestUID(inboundType string, configID string) string {
return fmt.Sprintf("in:%s-sid:%s-%d", inboundType, configID, time.Now().Nanosecond())
}

func marshallProtobufToJSON(msg proto.Message) (string, error) {
Expand Down Expand Up @@ -153,14 +160,18 @@ func NewHTTPServerIfConfigured(config *service.Config, serviceHandler *service.R
return nil, nil
}

handler := newHTTPHandler(serviceHandler)
srv := &http.Server{
Addr: fmt.Sprintf(":%d", config.H1ServerPort),
Handler: newHTTPHandler(serviceHandler),
}
go func() {
log.Infof("HTTP 1.1 server listening on port [%d]", config.H1ServerPort)
http.ListenAndServe(fmt.Sprintf(":%d", config.H1ServerPort), handler)
srv.ListenAndServe()
}()

return &theHTTPServer{
port: config.H1ServerPort,
port: config.H1ServerPort,
httpServer: srv,
}, nil
}

Expand Down
24 changes: 18 additions & 6 deletions protocols/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ func TestTheHTTPServer(t *testing.T) {
theResponseToReturn: expectedProtoResponse,
}

handler := newHTTPHandler(&service.RequestHandler{Strategy: strategy, Config: &service.Config{}})
requestHandler := service.NewRequestHandler(&service.Config{})
requestHandler.Strategy = strategy
handler := newHTTPHandler(requestHandler)
theServer := httptest.NewServer(handler)
defer theServer.Close()

Expand Down Expand Up @@ -68,7 +70,9 @@ func TestTheHTTPServer(t *testing.T) {
theResponseToReturn: expectedProtoResponse,
}

handler := newHTTPHandler(&service.RequestHandler{Config: &service.Config{}, Strategy: strategy})
requestHandler := service.NewRequestHandler(&service.Config{})
requestHandler.Strategy = strategy
handler := newHTTPHandler(requestHandler)
theServer := httptest.NewServer(handler)
defer theServer.Close()

Expand Down Expand Up @@ -109,7 +113,9 @@ func TestTheHTTPServer(t *testing.T) {
t.Run("returns a 500 if payload is not the expected protobuf as json", func(t *testing.T) {
strategy := &stubStrategy{}

handler := newHTTPHandler(&service.RequestHandler{Config: &service.Config{}, Strategy: strategy})
requestHandler := service.NewRequestHandler(&service.Config{})
requestHandler.Strategy = strategy
handler := newHTTPHandler(requestHandler)
theServer := httptest.NewServer(handler)
defer theServer.Close()

Expand Down Expand Up @@ -146,7 +152,9 @@ func TestTheHTTPServer(t *testing.T) {
t.Fatalf("Unexpected error: %v", err)
}

handler := newHTTPHandler(&service.RequestHandler{Config: &service.Config{}, Strategy: strategy})
requestHandler := service.NewRequestHandler(&service.Config{})
requestHandler.Strategy = strategy
handler := newHTTPHandler(requestHandler)
theServer := httptest.NewServer(handler)
defer theServer.Close()

Expand Down Expand Up @@ -194,7 +202,9 @@ func TestHTTPClient(t *testing.T) {
theResponseToReturn: expectedProtoResponse,
}

handler := newHTTPHandler(&service.RequestHandler{Config: &service.Config{}, Strategy: strategy})
requestHandler := service.NewRequestHandler(&service.Config{})
requestHandler.Strategy = strategy
handler := newHTTPHandler(requestHandler)
theServer := httptest.NewServer(handler)
defer theServer.Close()

Expand Down Expand Up @@ -228,7 +238,9 @@ func TestHTTPClient(t *testing.T) {
theErrorToReturn: errors.New("this error was injected by [terminus-grpc:-1-h1:9090]"),
}

handler := newHTTPHandler(&service.RequestHandler{Config: &service.Config{}, Strategy: strategy})
requestHandler := service.NewRequestHandler(&service.Config{})
requestHandler.Strategy = strategy
handler := newHTTPHandler(requestHandler)
theServer := httptest.NewServer(handler)
defer theServer.Close()

Expand Down
58 changes: 53 additions & 5 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type Config struct {
H1DownstreamServers []string
PercentageFailedRequests int
SleepInMillis int
TerminateAfter int
FireAndForget bool
DownstreamConnectionTimeout time.Duration
ExtraArguments map[string]string
Expand Down Expand Up @@ -60,25 +61,72 @@ func MakeFireAndForget(client Client) Client {
// Server is an abstraction representing each server made available to receive inbound connections.
type Server interface {
GetID() string
Shutdown() error
}

// Strategy is the algorithm applied by this service when it receives requests (c.f. http://wiki.c2.com/?StrategyPattern)
type Strategy interface {
Do(context.Context, *pb.TheRequest) (*pb.TheResponse, error)
}

//
// TODO: move RequestHandler into its own file
//

// RequestHandler is a protocol-independent request/response handler interface
type RequestHandler struct {
Config *Config
Strategy Strategy
Strategy Strategy // public due to circular dependency between server and strategy

config *Config
stopCh chan struct{}
requestCount int
counterCh chan struct{}
}

// requestCounter approximates an atomic read/write counter via channels
func (h *RequestHandler) requestCounter() {
for range h.counterCh {
h.requestCount++
if h.requestCount == h.config.TerminateAfter {
log.Infof("TerminateAfter limit hit (%d), stopping [%s]", h.config.TerminateAfter, h.config.ID)
h.stopCh <- struct{}{}
}
}
}

func NewRequestHandler(config *Config) *RequestHandler {
h := &RequestHandler{
config: config,
stopCh: make(chan struct{}),
requestCount: 0,
counterCh: make(chan struct{}),
}

if h.config.TerminateAfter != 0 {
go h.requestCounter()
}

return h
}

func (h *RequestHandler) ConfigID() string {
return h.config.ID
}

func (h *RequestHandler) Stopping() <-chan struct{} {
return h.stopCh
}

// Handle takes in a request, processes it accordingly to its Strategy, an returns the response.
func (h *RequestHandler) Handle(ctx context.Context, req *pb.TheRequest) (*pb.TheResponse, error) {
sleepForConfiguredTime(h)

if shouldFailThisRequest(h) {
return nil, fmt.Errorf("this error was injected by [%s]", h.Config.ID)
return nil, fmt.Errorf("this error was injected by [%s]", h.config.ID)
}

if h.config.TerminateAfter != 0 {
h.counterCh <- struct{}{}
}

reqID := req.RequestUID
Expand All @@ -91,11 +139,11 @@ func (h *RequestHandler) Handle(ctx context.Context, req *pb.TheRequest) (*pb.Th
}

func sleepForConfiguredTime(h *RequestHandler) {
time.Sleep(time.Duration(int64(h.Config.SleepInMillis)) * time.Millisecond)
time.Sleep(time.Duration(int64(h.config.SleepInMillis)) * time.Millisecond)
}

func shouldFailThisRequest(h *RequestHandler) bool {
perc := h.Config.PercentageFailedRequests
perc := h.config.PercentageFailedRequests
rnd := rand.Intn(100)
return rnd < perc
}
Expand Down
Loading

0 comments on commit ab87349

Please sign in to comment.