Skip to content

Commit

Permalink
customize scheduler in rocket server
Browse files Browse the repository at this point in the history
Summary:
customize scheduler in rocket server

This way numWorkers can still be customized in some way

Reviewed By: codyohl, echistyakov

Differential Revision: D65621858

fbshipit-source-id: af07b9682986e9da95a37b651f55eb759de70fd5
  • Loading branch information
awalterschulze authored and facebook-github-bot committed Nov 8, 2024
1 parent 6ff5dab commit 3776256
Showing 1 changed file with 16 additions and 1 deletion.
17 changes: 16 additions & 1 deletion third-party/thrift/src/thrift/lib/go/thrift/rocket_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"net"

"github.com/jjeffcaii/reactor-go/scheduler"
rsocket "github.com/rsocket/rsocket-go"
"github.com/rsocket/rsocket-go/core/transport"
"github.com/rsocket/rsocket-go/payload"
Expand Down Expand Up @@ -80,10 +81,24 @@ func (s *rocketServer) ServeContext(ctx context.Context) error {
transporter := func(context.Context) (transport.ServerTransport, error) {
return newRocketServerTransport(s.listener, s.connContext, s.proc, s.transportID, s.log), nil
}
r := rsocket.Receive().Acceptor(s.acceptor).Transport(transporter)
r := rsocket.Receive().
Scheduler(s.requestScheduler(), s.responeScheduler()).
Acceptor(s.acceptor).
Transport(transporter)
return r.Serve(ctx)
}

func (s *rocketServer) requestScheduler() scheduler.Scheduler {
if s.numWorkers == GoroutinePerRequest {
return scheduler.Elastic()
}
return scheduler.NewElastic(s.numWorkers)
}

func (s *rocketServer) responeScheduler() scheduler.Scheduler {
return scheduler.Elastic()
}

func (s *rocketServer) acceptor(ctx context.Context, setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) (rsocket.RSocket, error) {
if err := checkRequestSetupMetadata8(setup); err != nil {
return nil, err
Expand Down

0 comments on commit 3776256

Please sign in to comment.