Skip to content

Commit

Permalink
Added control knob for substreams tier1 max active requests
Browse files Browse the repository at this point in the history
  • Loading branch information
maoueh committed Jan 16, 2025
1 parent 65096c9 commit adf9fd6
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 23 deletions.
18 changes: 18 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,24 @@ If you were at `firehose-core` version `1.0.0` and are bumping to `1.1.0`, you s
* Add support for ConnectWeb firehose requests.
* Always use gzip compression on firehose requests for clients that support it (instead of always answering with the same compression as the request).

### Substreams

- The `substreams-tier1` app now has two new configuration flags named respectively `substreams-tier1-active-requests-soft-limit` and `substreams-tier1-active-requests-hard-limit`
helping better load balance active requests across a pool of `tier1` instances.

The `substreams-tier1-active-requests-soft-limit` limits the number of client active requests that a tier1 accepts before starting
to be report itself as 'unready' within the health check endpoint. A limit of 0 or less means no limit.

This is useful to load balance active requests more easily across a pool of tier1 instance. When the instance reaches the soft
limit, it will start to be unready from the load balancer standpoint. The load balancer in return will remove it from the list
of available instances, and new connections will be routed to remaining clients, spreading the load.

The `substreams-tier1-active-requests-hard-limit` limits the number of client active requests that a tier1 accepts before
rejecting incoming gRPC requests with 'Unavailable' code and setting itself as unready. A limit of 0 or less means no limit.

This is useful to prevent the tier1 from being overwhelmed by too many requests, most client auto-reconnects on 'Unavailable' code
so they should end up on another tier1 instance, assuming you have proper auto-scaling of the number of instances available.

## v1.6.9

### Substreams
Expand Down
41 changes: 30 additions & 11 deletions cmd/apps/substreams_tier1.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/streamingfast/cli"
"github.com/streamingfast/dauth"
discoveryservice "github.com/streamingfast/dgrpc/server/discovery-service"
firecore "github.com/streamingfast/firehose-core"
Expand Down Expand Up @@ -52,7 +53,21 @@ func RegisterSubstreamsTier1App[B firecore.Block](chain *firecore.Chain[B], root
cmd.Flags().Bool("substreams-tier1-enforce-compression", true, "Reject any request that does not accept gzip or zstd encoding in their GRPC/Connect header")
cmd.Flags().Int("substreams-tier1-max-subrequests", 4, "number of parallel subrequests that the tier1 can make to the tier2 per request")
cmd.Flags().String("substreams-tier1-block-type", "", "Block type to use for the substreams tier1 (Ex: sf.ethereum.type.v2.Block)")

cmd.Flags().Int("substreams-tier1-active-requests-soft-limit", 0, cli.FlagDescription(`
The number of client active requests that a tier1 accepts before starting to be report itself as 'unready' within the health
check endpoint. A limit of 0 or less means no limit.
This is useful to load balance active requests more easily across a pool of tier1 instance. When the instance reaches the soft
limit, it will start to be unready from the load balancer standpoint. The load balancer in return will remove it from the list
of available instances, and new connections will be routed to remaining clients, spreading the load.
`))
cmd.Flags().Int("substreams-tier1-active-requests-hard-limit", 0, cli.FlagDescription(`
The maximum number of client active requests that a tier1 accepts before rejecting incoming gRPC requests with 'Unavailable' code
and setting itself as unready. A limit of 0 or less means no limit.
This is useful to prevent the tier1 from being overwhelmed by too many requests, most client auto-reconnects on 'Unavailable' code
so they should end up on another tier1 instance, assuming you have proper auto-scaling of the number of instances available.
`))
// all substreams
registerCommonSubstreamsFlags(cmd)
return nil
Expand Down Expand Up @@ -86,6 +101,8 @@ func RegisterSubstreamsTier1App[B firecore.Block](chain *firecore.Chain[B], root
subrequestsInsecure := viper.GetBool("substreams-tier1-subrequests-insecure")
subrequestsPlaintext := viper.GetBool("substreams-tier1-subrequests-plaintext")
maxSubrequests := viper.GetUint64("substreams-tier1-max-subrequests")
activeRequestsSoftLimit := viper.GetInt("substreams-tier1-active-requests-soft-limit")
activeRequestsHardLimit := viper.GetInt("substreams-tier1-active-requests-hard-limit")

var blockType string
if chain.DefaultBlockType != "" {
Expand Down Expand Up @@ -137,16 +154,18 @@ func RegisterSubstreamsTier1App[B firecore.Block](chain *firecore.Chain[B], root
BlockStreamAddr: blockstreamAddr,
TmpDir: tmpDir,

StateStoreURL: stateStoreURL,
StateStoreDefaultTag: stateStoreDefaultTag,
StateBundleSize: stateBundleSize,
MaxSubrequests: maxSubrequests,
SubrequestsEndpoint: subrequestsEndpoint,
SubrequestsInsecure: subrequestsInsecure,
SubrequestsPlaintext: subrequestsPlaintext,
BlockType: blockType,
WASMExtensions: wasmExtensions,
BlockExecutionTimeout: executionTimeout,
StateStoreURL: stateStoreURL,
StateStoreDefaultTag: stateStoreDefaultTag,
StateBundleSize: stateBundleSize,
MaxSubrequests: maxSubrequests,
SubrequestsEndpoint: subrequestsEndpoint,
ActiveRequestsSoftLimit: activeRequestsSoftLimit,
ActiveRequestsHardLimit: activeRequestsHardLimit,
SubrequestsInsecure: subrequestsInsecure,
SubrequestsPlaintext: subrequestsPlaintext,
BlockType: blockType,
WASMExtensions: wasmExtensions,
BlockExecutionTimeout: executionTimeout,

Tracing: tracing,

Expand Down
4 changes: 3 additions & 1 deletion devel/standard/standard.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ start:
- merger
- relayer
- firehose
- substreams-tier1
- substreams-tier2
flags:
advertise-block-id-encoding: "hex"
advertise-chain-name: "acme-dummy-blockchain"
Expand All @@ -20,4 +22,4 @@ start:
--store-dir="{node-data-dir}"
--block-rate=120
--genesis-height=0
--genesis-block-burst=1000
--genesis-block-burst=100
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ require (
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.15.0
github.com/streamingfast/bstream v0.0.2-0.20250114192704-6a23c67c0b4d
github.com/streamingfast/cli v0.0.4-0.20241119021815-815afa473375
github.com/streamingfast/cli v0.0.4-0.20250116003948-fbf66c930cce
github.com/streamingfast/dauth v0.0.0-20240222213226-519afc16cf84
github.com/streamingfast/dbin v0.9.1-0.20231117225723-59790c798e2c
github.com/streamingfast/derr v0.0.0-20230515163924-8570aaa43fe1
github.com/streamingfast/dgrpc v0.0.0-20250109212433-ae21a7f7a01a
github.com/streamingfast/dgrpc v0.0.0-20250115215805-6f4ad2be7eef
github.com/streamingfast/dhammer v0.0.0-20230125192823-c34bbd561bd4
github.com/streamingfast/dmetering v0.0.0-20241101155221-489f5a9d9139
github.com/streamingfast/dmetrics v0.0.0-20230919161904-206fa8ebd545
Expand All @@ -31,7 +31,7 @@ require (
github.com/streamingfast/payment-gateway v0.0.0-20240426151444-581e930c76e2
github.com/streamingfast/pbgo v0.0.6-0.20250114182320-0b43084f4000
github.com/streamingfast/snapshotter v0.0.0-20230316190750-5bcadfde44d0
github.com/streamingfast/substreams v1.11.4-0.20250113142113-36c2750be692
github.com/streamingfast/substreams v1.11.4-0.20250116174758-7b0afb88692e
github.com/stretchr/testify v1.9.0
github.com/test-go/testify v1.1.4
go.uber.org/multierr v1.10.0
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2138,16 +2138,16 @@ github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag
github.com/stoewer/go-strcase v1.3.0/go.mod h1:fAH5hQ5pehh+j3nZfvwdk2RgEgQjAoM8wodgtPmh1xo=
github.com/streamingfast/bstream v0.0.2-0.20250114192704-6a23c67c0b4d h1:5cGG1t9rwbAwXeTq9epU7hm6cBsC2V8DM2jVCIN6JSo=
github.com/streamingfast/bstream v0.0.2-0.20250114192704-6a23c67c0b4d/go.mod h1:n5wy+Vmwp4xbjXO7B81MAkAgjnf1vJ/lI2y6hWWyFbg=
github.com/streamingfast/cli v0.0.4-0.20241119021815-815afa473375 h1:nwuFSEJtQfqTuN62WvysfAtDT4qqwQ6ghFX0i2VY1fY=
github.com/streamingfast/cli v0.0.4-0.20241119021815-815afa473375/go.mod h1:qOksW3DPhHVYBo8dcYxS7K3Q09wlcOChSdopeOjLWng=
github.com/streamingfast/cli v0.0.4-0.20250116003948-fbf66c930cce h1:RWla1PaRrlDf/MOwVoN/dJhIM/dXa9O4rmKZkv9T5bg=
github.com/streamingfast/cli v0.0.4-0.20250116003948-fbf66c930cce/go.mod h1:qOksW3DPhHVYBo8dcYxS7K3Q09wlcOChSdopeOjLWng=
github.com/streamingfast/dauth v0.0.0-20240222213226-519afc16cf84 h1:yCvuNcwQ21J4Ua6YrAmHDBx3bjK04y+ssEYBe65BXRU=
github.com/streamingfast/dauth v0.0.0-20240222213226-519afc16cf84/go.mod h1:cwfI5vaMd+CiwZIL0H0JdP5UDWCZOVFz/ex3L0+o/j4=
github.com/streamingfast/dbin v0.9.1-0.20231117225723-59790c798e2c h1:6WjE2yInE+5jnI7cmCcxOiGZiEs2FQm9Zsg2a9Ivp0Q=
github.com/streamingfast/dbin v0.9.1-0.20231117225723-59790c798e2c/go.mod h1:dbfiy9ORrL8c6ldSq+L0H9pg8TOqqu/FsghsgUEWK54=
github.com/streamingfast/derr v0.0.0-20230515163924-8570aaa43fe1 h1:xJB7rXnOHLesosMjfwWsEL2i/40mFSkzenEb3M0qTyM=
github.com/streamingfast/derr v0.0.0-20230515163924-8570aaa43fe1/go.mod h1:QSm/AfaDsE0k1xBYi0lW580YJ/WDV/FKZI628tkZR0Y=
github.com/streamingfast/dgrpc v0.0.0-20250109212433-ae21a7f7a01a h1:yrxCZ7Py0FdMu80cWPv/EpDvBLyumPlfhehD7iJ5VJM=
github.com/streamingfast/dgrpc v0.0.0-20250109212433-ae21a7f7a01a/go.mod h1:bxRfCxRKQ0ZH2BGi6UcYdlH0nkj8yERm3kpP1jPLQLY=
github.com/streamingfast/dgrpc v0.0.0-20250115215805-6f4ad2be7eef h1:He9qXjmnDtxVrJcHAOfFiWFA6An48zTezpU5iMnNHuY=
github.com/streamingfast/dgrpc v0.0.0-20250115215805-6f4ad2be7eef/go.mod h1:bxRfCxRKQ0ZH2BGi6UcYdlH0nkj8yERm3kpP1jPLQLY=
github.com/streamingfast/dhammer v0.0.0-20230125192823-c34bbd561bd4 h1:HKi8AIkLBzxZWmbCRUo1RxoOLK33iXO6gZprfsE9rf4=
github.com/streamingfast/dhammer v0.0.0-20230125192823-c34bbd561bd4/go.mod h1:ehPytv7E4rI65iLcrwTes4rNGGqPPiugnH+20nDQyp4=
github.com/streamingfast/dmetering v0.0.0-20241101155221-489f5a9d9139 h1:a22XzjeY7n9Xv+0yJMV2pzuPptALtOu6jdg69pOwuO4=
Expand Down Expand Up @@ -2181,8 +2181,8 @@ github.com/streamingfast/shutter v1.5.0 h1:NpzDYzj0HVpSiDJVO/FFSL6QIK/YKOxY0gJAt
github.com/streamingfast/shutter v1.5.0/go.mod h1:B/T6efqdeMGbGwjzPS1ToXzYZI4kDzI5/u4I+7qbjY8=
github.com/streamingfast/snapshotter v0.0.0-20230316190750-5bcadfde44d0 h1:Y15G1Z4fpEdm2b+/70owI7TLuXadlqBtGM7rk4Hxrzk=
github.com/streamingfast/snapshotter v0.0.0-20230316190750-5bcadfde44d0/go.mod h1:/Rnz2TJvaShjUct0scZ9kKV2Jr9/+KBAoWy4UMYxgv4=
github.com/streamingfast/substreams v1.11.4-0.20250113142113-36c2750be692 h1:YrIj24iHkkdhzWHVNqdjdL76BqEOs9PxMjW8ejbEGnk=
github.com/streamingfast/substreams v1.11.4-0.20250113142113-36c2750be692/go.mod h1:gl4g6eqMV3tAvir2J+3tY/JfXwm3TThHe7VL53glywE=
github.com/streamingfast/substreams v1.11.4-0.20250116174758-7b0afb88692e h1:9pk6d5QKvVLMl5TXSXKb8b0VMmEVh6e3kca200yIuk8=
github.com/streamingfast/substreams v1.11.4-0.20250116174758-7b0afb88692e/go.mod h1:Dgbt37alWqMyahFQ4rdhX8iFLZHn2qD8TBhcP3NIuW8=
github.com/streamingfast/wazero v0.0.0-20241202185309-91287c3640ed h1:LU6/c376zP1cMAo9L6rFLyjo0W7RU+hIh7BegH8Zo5M=
github.com/streamingfast/wazero v0.0.0-20241202185309-91287c3640ed/go.mod h1:yAI0XTsMBhREkM/YDAK/zNou3GoiAce1P6+rp/wQhjs=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
2 changes: 1 addition & 1 deletion test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestIntegration(t *testing.T) {
meteringServer.Run()
}()

clientConfig := client.NewSubstreamsClientConfig("localhost:9003", "", 0, false, true)
clientConfig := client.NewSubstreamsClientConfig("localhost:9003", "", 0, false, true, "substreams-test")
substreamsClient, _, _, _, err := client.NewSubstreamsClient(clientConfig)
require.NoError(t, err)

Expand Down
4 changes: 3 additions & 1 deletion utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ func MakeDirs(directories []string) error {
}

// MustReplaceDataDir replaces `{data-dir}` from within the `in` received argument by the
// `dataDir` argument
// `dataDir` argument.
//
// MustReplaceDataDir("/tmp/data", "{data-dir}/subdir") == "/tmp/data/subdir"
func MustReplaceDataDir(dataDir, in string) string {
d, err := filepath.Abs(dataDir)
if err != nil {
Expand Down

0 comments on commit adf9fd6

Please sign in to comment.