Skip to content

Commit

Permalink
add pipelining to rocket server
Browse files Browse the repository at this point in the history
Summary: add pipelining to rocket server

Reviewed By: codyohl

Differential Revision: D65622276

fbshipit-source-id: e80d8b394bdc44a5978d7c059d0373b0be3ba5c8
  • Loading branch information
awalterschulze authored and facebook-github-bot committed Nov 8, 2024
1 parent 3776256 commit 6406f2c
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 8 deletions.
64 changes: 63 additions & 1 deletion third-party/thrift/src/thrift/lib/go/thrift/rocket_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
rsocket "github.com/rsocket/rsocket-go"
"github.com/rsocket/rsocket-go/core/transport"
"github.com/rsocket/rsocket-go/payload"
"github.com/rsocket/rsocket-go/rx/mono"

"github.com/facebook/fbthrift/thrift/lib/go/thrift/stats"
)
Expand Down Expand Up @@ -108,10 +109,71 @@ func (s *rocketServer) acceptor(ctx context.Context, setup payload.SetupPayload,
return nil, err
}
sendingSocket.MetadataPush(serverMetadataPush)
socket := newRocketServerSocket(ctx, s.proc, s.log)
socket := newRocketServerSocket(ctx, s.proc, s.pipeliningEnabled, s.log)
return rsocket.NewAbstractSocket(
rsocket.MetadataPush(socket.metadataPush),
rsocket.RequestResponse(socket.requestResonse),
rsocket.FireAndForget(socket.fireAndForget),
), nil
}

type rocketServerSocket struct {
ctx context.Context
proc Processor
pipeliningEnabled bool
log func(format string, args ...interface{})
}

func newRocketServerSocket(ctx context.Context, proc Processor, pipeliningEnabled bool, log func(format string, args ...interface{})) *rocketServerSocket {
return &rocketServerSocket{ctx: ctx, proc: proc, pipeliningEnabled: pipeliningEnabled, log: log}
}

func (s *rocketServerSocket) metadataPush(msg payload.Payload) {
_ = decodeClientMetadataPush(msg)
// This is usually something like transportMetadata = map[deciding_accessors:IP=...], but we do not handle it.
}

func (s *rocketServerSocket) requestResonse(msg payload.Payload) mono.Mono {
request, err := decodeRequestPayload(msg)
if err != nil {
return mono.Error(err)
}
protocol, err := newProtocolBufferFromRequest(request)
if err != nil {
return mono.Error(err)
}
if s.pipeliningEnabled {
return mono.FromFunc(func(context.Context) (payload.Payload, error) {
if err := process(s.ctx, s.proc, protocol); err != nil {
return nil, err
}
return encodeResponsePayload(protocol.name, protocol.messageType, protocol.getRequestHeaders(), request.Zstd(), protocol.Bytes())
})
}
if err := process(s.ctx, s.proc, protocol); err != nil {
return mono.Error(err)
}
response, err := encodeResponsePayload(protocol.name, protocol.messageType, protocol.getRequestHeaders(), request.Zstd(), protocol.Bytes())
if err != nil {
return mono.Error(err)
}
return mono.Just(response)
}

func (s *rocketServerSocket) fireAndForget(msg payload.Payload) {
request, err := decodeRequestPayload(msg)
if err != nil {
s.log("rocketServer fireAndForget decode request payload error: %v", err)
return
}
protocol, err := newProtocolBufferFromRequest(request)
if err != nil {
s.log("rocketServer fireAndForget error creating protocol: %v", err)
return
}
// TODO: support pipelining
if err := process(s.ctx, s.proc, protocol); err != nil {
s.log("rocketServer fireAndForget process error: %v", err)
return
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,30 +75,30 @@ func (s *rocketSimpleServer) acceptor(ctx context.Context, setup payload.SetupPa
return nil, err
}
sendingSocket.MetadataPush(serverMetadataPush)
socket := newRocketServerSocket(ctx, s.proc, s.log)
socket := newRocketSimpleServerSocket(ctx, s.proc, s.log)
return rsocket.NewAbstractSocket(
rsocket.MetadataPush(socket.metadataPush),
rsocket.RequestResponse(socket.requestResonse),
rsocket.FireAndForget(socket.fireAndForget),
), nil
}

type rocketServerSocket struct {
type rocketSimpleServerSocket struct {
ctx context.Context
proc Processor
log func(format string, args ...interface{})
}

func newRocketServerSocket(ctx context.Context, proc Processor, log func(format string, args ...interface{})) *rocketServerSocket {
return &rocketServerSocket{ctx: ctx, proc: proc, log: log}
func newRocketSimpleServerSocket(ctx context.Context, proc Processor, log func(format string, args ...interface{})) *rocketSimpleServerSocket {
return &rocketSimpleServerSocket{ctx: ctx, proc: proc, log: log}
}

func (s *rocketServerSocket) metadataPush(msg payload.Payload) {
func (s *rocketSimpleServerSocket) metadataPush(msg payload.Payload) {
_ = decodeClientMetadataPush(msg)
// This is usually something like transportMetadata = map[deciding_accessors:IP=...], but we do not handle it.
}

func (s *rocketServerSocket) requestResonse(msg payload.Payload) mono.Mono {
func (s *rocketSimpleServerSocket) requestResonse(msg payload.Payload) mono.Mono {
request, err := decodeRequestPayload(msg)
if err != nil {
return mono.Error(err)
Expand All @@ -117,7 +117,7 @@ func (s *rocketServerSocket) requestResonse(msg payload.Payload) mono.Mono {
return mono.Just(response)
}

func (s *rocketServerSocket) fireAndForget(msg payload.Payload) {
func (s *rocketSimpleServerSocket) fireAndForget(msg payload.Payload) {
request, err := decodeRequestPayload(msg)
if err != nil {
s.log("rocketServer fireAndForget decode request payload error: %v", err)
Expand Down

0 comments on commit 6406f2c

Please sign in to comment.