Skip to content

Commit

Permalink
Merge pull request #71 from streamingfast/feature/firehose-connectweb
Browse files Browse the repository at this point in the history
switch firehose to connectweb server, enforce compression by default
  • Loading branch information
sduchesneau authored Jan 15, 2025
2 parents 0e7e8d0 + 8593d1f commit 15cfd60
Show file tree
Hide file tree
Showing 11 changed files with 213 additions and 609 deletions.
19 changes: 18 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,42 @@ If you were at `firehose-core` version `1.0.0` and are bumping to `1.1.0`, you s

## Unreleased

### Tools

* Enhanced `firecore tools print merged-blocks` with various small quality of life improvements:
- Now accepts a block range instead of a single start block.
- Passing a single block as the block range will print this single block alone.
- Block range is now optional, defaulting to run until there is no more files to read.
- It's possible to pass a merged blocks file directly, with or without an optional range.

### Firehose

> [!IMPORTANT]
> This release will reject firehose connections from clients that don't support GZIP or ZSTD compression. Use `--firehose-enforce-compression=false` to keep previous behavior, then check the logs for `incoming Substreams Blocks request` logs with the value `compressed: false` to track users who are not using compressed HTTP connections.
> [!IMPORTANT]
> This release removes the old `sf.firehose.v1` protocol (replaced by `sf.firehose.v2` in 2022, this should not affect any reasonably recent client)
* Add support for ConnectWeb firehose requests.
* Always use gzip compression on firehose requests for clients that support it (instead of always answering with the same compression as the request).

## v1.6.9

### Substreams

* Fix an issue preventing proper detection of gzip compression when multiple headers are set (ex: python grpc client)
* Add support for zstd compression on server
* Fix an issue preventing some tier2 requests on last-stage from correctly generating stores. This could lead to some missing "backfilling" jobs and slower time to first block on reconnection.
* Fix a thread leak on cursor resolution resulting in bad counter for active connections
* Add support for zstd encoding on server

## v1.6.8

> [!NOTE]
<<<<<<< HEAD
> This release will reject connections from clients that don't support GZIP compression. Use `--substreams-tier1-enforce-compression=false` to keep previous behavior, then check the logs for `incoming Substreams Blocks request` logs with the value `compressed: false` to track users who are not using compressed HTTP connections.
=======
> This release will reject substreams connections from clients that don't support GZIP compression. Use `--substreams-tier1-enforce-compression=false` to keep previous behavior, then check the logs for `incoming Substreams Blocks request` logs with the value `compressed: false` to track users who are not using compressed HTTP connections.
>>>>>>> 380546c (switch firehose to connectweb server, disable proxy for firehoseV1, add --firehose-enforce-compression true by default)
* Substreams: add `--substreams-tier1-enforce-compression` to reject connections from clients that do not support GZIP compression
* Substreams performance: reduced the number of `mallocs` (patching some third-party libraries)
Expand Down
5 changes: 5 additions & 0 deletions cmd/apps/firehose.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func RegisterFirehoseApp[B firecore.Block](chain *firecore.Chain[B], rootLog *za
cmd.Flags().String("firehose-discovery-service-url", "", "Url to configure the gRPC discovery service") //traffic-director://xds?vpc_network=vpc-global&use_xds_reds=true
cmd.Flags().Int("firehose-rate-limit-bucket-size", -1, "Rate limit bucket size (default: no rate limit)")
cmd.Flags().Duration("firehose-rate-limit-bucket-fill-rate", 10*time.Second, "Rate limit bucket refill rate (default: 10s)")
cmd.Flags().Bool("firehose-enforce-compression", true, "Reject any request that does not accept gzip or zstd encoding in their GRPC/Connect header")

return nil
},
Expand Down Expand Up @@ -80,6 +81,10 @@ func RegisterFirehoseApp[B firecore.Block](chain *firecore.Chain[B], rootLog *za

var serverOptions []server.Option

if viper.GetBool("firehose-enforce-compression") {
serverOptions = append(serverOptions, server.WithEnforceCompression(true))
}

limiterSize := viper.GetInt("firehose-rate-limit-bucket-size")
limiterRefillRate := viper.GetDuration("firehose-rate-limit-bucket-fill-rate")
if limiterSize > 0 {
Expand Down
2 changes: 1 addition & 1 deletion cmd/apps/substreams_tier1.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func RegisterSubstreamsTier1App[B firecore.Block](chain *firecore.Chain[B], root
cmd.Flags().String("substreams-tier1-discovery-service-url", "", "URL to configure the grpc discovery service, used for communication with tier2") //traffic-director://xds?vpc_network=vpc-global&use_xds_reds=true
cmd.Flags().Bool("substreams-tier1-subrequests-insecure", false, "Connect to tier2 without checking certificate validity")
cmd.Flags().Bool("substreams-tier1-subrequests-plaintext", true, "Connect to tier2 without client in plaintext mode")
cmd.Flags().Bool("substreams-tier1-enforce-compression", true, "Reject any request that does not accept gzip encoding in their GRPC/Connect header")
cmd.Flags().Bool("substreams-tier1-enforce-compression", true, "Reject any request that does not accept gzip or zstd encoding in their GRPC/Connect header")
cmd.Flags().Int("substreams-tier1-max-subrequests", 4, "number of parallel subrequests that the tier1 can make to the tier2 per request")
cmd.Flags().String("substreams-tier1-block-type", "", "Block type to use for the substreams tier1 (Ex: sf.ethereum.type.v2.Block)")

Expand Down
53 changes: 39 additions & 14 deletions firehose/server/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ import (
"errors"
"fmt"
"math/rand"
"net/http"
"os"
"strings"
"time"

connect "connectrpc.com/connect"
"github.com/streamingfast/bstream"
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
"github.com/streamingfast/bstream/stream"
Expand All @@ -19,16 +22,15 @@ import (
pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
)

func (s *Server) Block(ctx context.Context, request *pbfirehose.SingleBlockRequest) (*pbfirehose.SingleBlockResponse, error) {
func (s *Server) Block(ctx context.Context, request *connect.Request[pbfirehose.SingleBlockRequest]) (*connect.Response[pbfirehose.SingleBlockResponse], error) {
var blockNum uint64
var blockHash string
switch ref := request.Reference.(type) {
switch ref := request.Msg.Reference.(type) {
case *pbfirehose.SingleBlockRequest_BlockHashAndNumber_:
blockNum = ref.BlockHashAndNumber.Num
blockHash = ref.BlockHashAndNumber.Hash
Expand Down Expand Up @@ -71,15 +73,22 @@ func (s *Server) Block(ctx context.Context, request *pbfirehose.SingleBlockReque
auth := dauth.FromContext(ctx)
metering.Send(ctx, meter, auth.UserID(), auth.APIKeyID(), auth.RealIP(), auth.Meta(), "sf.firehose.v2.Firehose/Block", resp)

return resp, nil
return connect.NewResponse(resp), nil
}

func (s *Server) Blocks(request *pbfirehose.Request, streamSrv pbfirehose.Stream_BlocksServer) error {
ctx := streamSrv.Context()
// Blocks(context.Context, *connect.Request[v2.Request], *connect.ServerStream[v2.Response]) error
func (s *Server) Blocks(ctx context.Context, request *connect.Request[pbfirehose.Request], streamSrv *connect.ServerStream[pbfirehose.Response]) error {
metrics.RequestCounter.Inc()

logger := logging.Logger(ctx, s.logger)

if !matchHeader(request.Header(), acceptedCompressionValues) {
if s.enforceCompression {
return status.Error(codes.InvalidArgument, "client does not support compression")
}
logger.Info("client does not support compression")
}

if s.rateLimiter != nil {
rlCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
Expand All @@ -102,10 +111,7 @@ func (s *Server) Blocks(request *pbfirehose.Request, streamSrv pbfirehose.Stream
hostname = "unknown"
logger.Warn("cannot determine hostname, using 'unknown'", zap.Error(err))
}
md := metadata.New(map[string]string{"hostname": hostname})
if err := streamSrv.SendHeader(md); err != nil {
logger.Warn("cannot send metadata header", zap.Error(err))
}
streamSrv.ResponseHeader().Add("hostname", hostname)
}

var blockCount uint64
Expand All @@ -123,7 +129,7 @@ func (s *Server) Blocks(request *pbfirehose.Request, streamSrv pbfirehose.Stream
obj = block.Payload
}

protoStep, skip := stepToProto(step, request.FinalBlocksOnly)
protoStep, skip := stepToProto(step, request.Msg.FinalBlocksOnly)
if skip {
return nil
}
Expand Down Expand Up @@ -175,7 +181,7 @@ func (s *Server) Blocks(request *pbfirehose.Request, streamSrv pbfirehose.Stream
return nil
})

if len(request.Transforms) > 0 && s.transformRegistry == nil {
if len(request.Msg.Transforms) > 0 && s.transformRegistry == nil {
return status.Errorf(codes.Unimplemented, "no transforms registry configured within this instance")
}

Expand Down Expand Up @@ -209,11 +215,11 @@ func (s *Server) Blocks(request *pbfirehose.Request, streamSrv pbfirehose.Stream
})
}

ctx = s.initFunc(ctx, request)
ctx = s.initFunc(ctx, request.Msg)
str, err := s.streamFactory.New(
ctx,
handlerFunc,
request,
request.Msg,
logger,
stream.WithLiveSourceHandlerMiddleware(liveSourceMiddlewareHandler),
stream.WithFileSourceHandlerMiddleware(fileSourceMiddlewareHandler),
Expand Down Expand Up @@ -294,3 +300,22 @@ func stepToProto(step bstream.StepType, finalBlocksOnly bool) (outStep pbfirehos
}
return 0, true // simply skip irreversible or stalled here
}

// must be lowercase
var compressionHeader = map[string]bool{"grpc-accept-encoding": true, "connect-accept-encoding": true}
var acceptedCompressionValues = map[string]bool{"gzip": true, "zstd": true}

func matchHeader(headers http.Header, expected map[string]bool) bool {
for k, v := range headers {
if compressionHeader[strings.ToLower(k)] {
for _, vv := range v {
for _, vvv := range strings.Split(vv, ",") {
if expected[strings.TrimSpace(strings.ToLower(vvv))] {
return true
}
}
}
}
}
return false
}
77 changes: 0 additions & 77 deletions firehose/server/local.go

This file was deleted.

Loading

0 comments on commit 15cfd60

Please sign in to comment.