Skip to content

Commit

Permalink
initial work, missing v1proxy and local
Browse files Browse the repository at this point in the history
  • Loading branch information
sduchesneau committed Jan 14, 2025
1 parent f370f93 commit b7c6625
Show file tree
Hide file tree
Showing 6 changed files with 249 additions and 199 deletions.
19 changes: 8 additions & 11 deletions firehose/server/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"time"

connect "connectrpc.com/connect"
"github.com/streamingfast/bstream"
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
"github.com/streamingfast/bstream/stream"
Expand All @@ -19,7 +20,6 @@ import (
pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
Expand Down Expand Up @@ -74,8 +74,8 @@ func (s *Server) Block(ctx context.Context, request *pbfirehose.SingleBlockReque
return resp, nil
}

func (s *Server) Blocks(request *pbfirehose.Request, streamSrv pbfirehose.Stream_BlocksServer) error {
ctx := streamSrv.Context()
// Blocks(context.Context, *connect.Request[v2.Request], *connect.ServerStream[v2.Response]) error
func (s *Server) Blocks(ctx context.Context, request *connect.Request[pbfirehose.Request], streamSrv *connect.ServerStream[pbfirehose.Response]) error {
metrics.RequestCounter.Inc()

logger := logging.Logger(ctx, s.logger)
Expand All @@ -102,10 +102,7 @@ func (s *Server) Blocks(request *pbfirehose.Request, streamSrv pbfirehose.Stream
hostname = "unknown"
logger.Warn("cannot determine hostname, using 'unknown'", zap.Error(err))
}
md := metadata.New(map[string]string{"hostname": hostname})
if err := streamSrv.SendHeader(md); err != nil {
logger.Warn("cannot send metadata header", zap.Error(err))
}
streamSrv.ResponseHeader().Add("hostname", hostname)
}

var blockCount uint64
Expand All @@ -123,7 +120,7 @@ func (s *Server) Blocks(request *pbfirehose.Request, streamSrv pbfirehose.Stream
obj = block.Payload
}

protoStep, skip := stepToProto(step, request.FinalBlocksOnly)
protoStep, skip := stepToProto(step, request.Msg.FinalBlocksOnly)
if skip {
return nil
}
Expand Down Expand Up @@ -175,7 +172,7 @@ func (s *Server) Blocks(request *pbfirehose.Request, streamSrv pbfirehose.Stream
return nil
})

if len(request.Transforms) > 0 && s.transformRegistry == nil {
if len(request.Msg.Transforms) > 0 && s.transformRegistry == nil {
return status.Errorf(codes.Unimplemented, "no transforms registry configured within this instance")
}

Expand Down Expand Up @@ -209,11 +206,11 @@ func (s *Server) Blocks(request *pbfirehose.Request, streamSrv pbfirehose.Stream
})
}

ctx = s.initFunc(ctx, request)
ctx = s.initFunc(ctx, request.Msg)
str, err := s.streamFactory.New(
ctx,
handlerFunc,
request,
request.Msg,
logger,
stream.WithLiveSourceHandlerMiddleware(liveSourceMiddlewareHandler),
stream.WithFileSourceHandlerMiddleware(fileSourceMiddlewareHandler),
Expand Down
31 changes: 16 additions & 15 deletions firehose/server/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,19 @@ func (p *BlocksPipe) Recv() (*pbfirehose.Response, error) {
}
}

func (s *Server) BlocksFromLocal(ctx context.Context, req *pbfirehose.Request) pbfirehose.Stream_BlocksClient {
cctx, cancel := context.WithCancel(ctx)

pipe := &BlocksPipe{
ctx: cctx,
pipeChan: make(chan *pbfirehose.Response),
}
go func() {
err := s.Blocks(req, pipe)
pipe.err = err
cancel()
}()

return pipe
}
//func (s *Server) BlocksFromLocal(ctx context.Context, req *pbfirehose.Request) pbfirehose.Stream_BlocksClient {
// cctx, cancel := context.WithCancel(ctx)
//
// pipe := &BlocksPipe{
// ctx: cctx,
// pipeChan: make(chan *pbfirehose.Response),
// }
// go func() {
// err := s.Blocks(ctx, &connect.Request[pbfirehose.Request]{*req}, pipe)
// pipe.err = err
// cancel()
// }()
//
// return pipe
//}
//
100 changes: 67 additions & 33 deletions firehose/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,27 @@ package server

import (
"context"
"net/http"
"net/url"
"strings"
"sync"
"time"

connect_go "connectrpc.com/connect"
_ "github.com/mostynb/go-grpc-compression/zstd"
"github.com/streamingfast/bstream/transform"
"github.com/streamingfast/dauth"
dauthgrpc "github.com/streamingfast/dauth/middleware/grpc"
dgrpcserver "github.com/streamingfast/dgrpc/server"
"github.com/streamingfast/dgrpc/server/factory"
connectweb "github.com/streamingfast/dgrpc/server/connectrpc"
"github.com/streamingfast/dmetering"
firecore "github.com/streamingfast/firehose-core"
"github.com/streamingfast/firehose-core/firehose"
"github.com/streamingfast/firehose-core/firehose/info"
"github.com/streamingfast/firehose-core/firehose/rate"
"github.com/streamingfast/firehose-core/metering"
pbfirehoseV1 "github.com/streamingfast/pbgo/sf/firehose/v1"
fhconnect "github.com/streamingfast/pbgo/sf/firehose/v2/pbfirehoseconnect"

pbfirehoseV2 "github.com/streamingfast/pbgo/sf/firehose/v2"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel"
Expand All @@ -44,7 +47,7 @@ type Server struct {
}

type wrappedServer struct {
dgrpcserver.Server
*connectweb.ConnectWebServer
listenAddr string
}

Expand Down Expand Up @@ -86,7 +89,35 @@ func New(

tracerProvider := otel.GetTracerProvider()

var servers []*wrappedServer
// for _, addr := range strings.Split(listenAddr, ",") {
// if infoServer != nil {
// infoHandlerGetter := func(opts ...connect_go.HandlerOption) (string, http.Handler) {
// out, outh := fhconnect.NewEndpointInfoHandler(infoServer, opts...)
// return out, outh
// }
// handlerGetters = append(handlerGetters, infoHandlerGetter)
// }
//
// options = append(options, dgrpcserver.WithPermissiveCORS())
// srv := connectweb.New(handlerGetters, options...)
// servers = append(servers, srv)
// cleanAddr := strings.ReplaceAll(addr, "*", "")
// go func() {
// srv.Launch(cleanAddr)
// done <- struct{}{}
// }()
// }

s := &Server{
transformRegistry: transformRegistry,
blockGetter: blockGetter,
streamFactory: streamFactory,
initFunc: initFunc,
postHookFunc: postHookFunc,
logger: logger,
servers: []*wrappedServer{},
}

for _, addr := range strings.Split(listenAddr, ",") {
options := []dgrpcserver.Option{
dgrpcserver.WithLogger(logger),
Expand All @@ -96,6 +127,7 @@ func New(
dgrpcserver.WithGRPCServerOptions(grpc.MaxRecvMsgSize(25 * 1024 * 1024)),
dgrpcserver.WithPostUnaryInterceptor(dauthgrpc.UnaryAuthChecker(authenticator, logger)),
dgrpcserver.WithPostStreamInterceptor(dauthgrpc.StreamAuthChecker(authenticator, logger)),
dgrpcserver.WithConnectPermissiveCORS(),
}

if serviceDiscoveryURL != nil {
Expand All @@ -104,42 +136,43 @@ func New(

if strings.Contains(addr, "*") {
options = append(options, dgrpcserver.WithInsecureServer())
addr = strings.ReplaceAll(addr, "*", "")
} else {
options = append(options, dgrpcserver.WithPlainTextServer())
}

srv := factory.ServerFromOptions(options...)
//if blockGetter != nil {
// pbfirehoseV2.RegisterFetchServer(gs, s)
//}
//pbfirehoseV2.RegisterEndpointInfoServer(gs, infoServer)
//pbfirehoseV2.RegisterStreamServer(gs, s)
//pbfirehoseV1.RegisterStreamServer(gs, NewFirehoseProxyV1ToV2(s)) // compatibility with firehose

servers = append(servers, &wrappedServer{
Server: srv,
listenAddr: addr,
streamHandlerGetter := func(opts ...connect_go.HandlerOption) (string, http.Handler) {
return fhconnect.NewStreamHandler(s, opts...)
}
handlerGetters := []connectweb.HandlerGetter{streamHandlerGetter}
// FIXME: info server
// if infoServer != nil {
// infoHandlerGetter := func(opts ...connect_go.HandlerOption) (string, http.Handler) {
// out, outh := fhconnect.NewEndpointInfoHandler(infoServer, opts...)
// return out, outh
// }
// handlerGetters = append(handlerGetters, infoHandlerGetter)
// }
// srv := connectweb.New(handlerGetters, options...)
// servers = append(servers, srv)

cleanAddr := strings.ReplaceAll(addr, "*", "")

srv := connectweb.New(handlerGetters, options...)

s.servers = append(s.servers, &wrappedServer{
ConnectWebServer: srv,
listenAddr: cleanAddr,
})

}

s := &Server{
servers: servers,
transformRegistry: transformRegistry,
blockGetter: blockGetter,
streamFactory: streamFactory,
initFunc: initFunc,
postHookFunc: postHookFunc,
logger: logger,
}

logger.Info("registering grpc services")
for _, srv := range servers {
srv.RegisterService(func(gs grpc.ServiceRegistrar) {
if blockGetter != nil {
pbfirehoseV2.RegisterFetchServer(gs, s)
}
pbfirehoseV2.RegisterEndpointInfoServer(gs, infoServer)
pbfirehoseV2.RegisterStreamServer(gs, s)
pbfirehoseV1.RegisterStreamServer(gs, NewFirehoseProxyV1ToV2(s)) // compatibility with firehose
})
}

for _, opt := range opts {
opt(s)
}
Expand All @@ -154,8 +187,9 @@ func (s *Server) OnTerminated(f func(error)) {
}

func (s *Server) Shutdown(timeout time.Duration) {
// FIXME we need to implement the timeout here
for _, server := range s.servers {
server.Shutdown(timeout)
server.Shutdown(nil)
}
}

Expand All @@ -166,7 +200,7 @@ func (s *Server) Launch() {
go func() {
server.Launch(server.listenAddr)
for _, srv := range s.servers {
srv.Shutdown(0) // immediately shutdown all other servers when one terminates, in case a single one failed
srv.Shutdown(nil) // immediately shutdown all other servers when one terminates, in case a single one failed
}
wg.Done()
}()
Expand Down
Loading

0 comments on commit b7c6625

Please sign in to comment.