Skip to content

Commit

Permalink
Configurable subscription limit
Browse files Browse the repository at this point in the history
  • Loading branch information
wizeguyy committed Oct 7, 2024
1 parent 956f474 commit f74880b
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 19 deletions.
2 changes: 1 addition & 1 deletion cmd/utils/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func makeFullNode(p2p quai.NetworkingAPI, nodeLocation common.Location, slicesRu
// The second return value is the full node instance, which may be nil if the
// node is running as a light client.
func RegisterQuaiService(stack *node.Node, p2p quai.NetworkingAPI, cfg quaiconfig.Config, nodeCtx int, currentExpansionNumber uint8, startingExpansionNumber uint64, genesisBlock *types.WorkObject, logger *log.Logger) (quaiapi.Backend, error) {
backend, err := quai.New(stack, p2p, &cfg, nodeCtx, currentExpansionNumber, startingExpansionNumber, genesisBlock, logger)
backend, err := quai.New(stack, p2p, &cfg, nodeCtx, currentExpansionNumber, startingExpansionNumber, genesisBlock, logger, viper.GetInt(WSMaxSubsFlag.Name))
if err != nil {
Fatalf("Failed to register the Quai service: %v", err)
}
Expand Down
7 changes: 7 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ var RPCFlags = []Flag{
HTTPPortStartFlag,
WSEnabledFlag,
WSListenAddrFlag,
WSMaxSubsFlag,
WSApiFlag,
WSAllowedOriginsFlag,
WSPathPrefixFlag,
Expand Down Expand Up @@ -605,6 +606,12 @@ var (
Usage: "WS-RPC server listening interface" + generateEnvDoc(c_RPCFlagPrefix+"ws-addr"),
}

WSMaxSubsFlag = Flag{
Name: c_RPCFlagPrefix + "ws-max-subs",
Value: 1000,
Usage: "maximum concurrent subscribers to the WS-RPC server",
}

WSApiFlag = Flag{
Name: c_RPCFlagPrefix + "ws-api",
Value: "",
Expand Down
10 changes: 6 additions & 4 deletions quai/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,13 @@ type Quai struct {

lock sync.RWMutex // Protects the variadic fields (e.g. gas price and etherbase)

logger *log.Logger
logger *log.Logger
maxWsSubs int
}

// New creates a new Quai object (including the
// initialisation of the common Quai object)
func New(stack *node.Node, p2p NetworkingAPI, config *quaiconfig.Config, nodeCtx int, currentExpansionNumber uint8, startingExpansionNumber uint64, genesisBlock *types.WorkObject, logger *log.Logger) (*Quai, error) {
func New(stack *node.Node, p2p NetworkingAPI, config *quaiconfig.Config, nodeCtx int, currentExpansionNumber uint8, startingExpansionNumber uint64, genesisBlock *types.WorkObject, logger *log.Logger, maxWsSubs int) (*Quai, error) {
// Ensure configuration values are compatible and sane
if config.Miner.GasPrice == nil || config.Miner.GasPrice.Cmp(common.Big0) <= 0 {
logger.WithFields(log.Fields{
Expand Down Expand Up @@ -160,6 +161,7 @@ func New(stack *node.Node, p2p NetworkingAPI, config *quaiconfig.Config, nodeCtx
primaryCoinbase: config.Miner.PrimaryCoinbase,
bloomRequests: make(chan chan *bloombits.Retrieval),
logger: logger,
maxWsSubs: maxWsSubs,
}

// Copy the chainConfig
Expand Down Expand Up @@ -293,12 +295,12 @@ func (s *Quai) APIs() []rpc.API {
}, {
Namespace: "eth",
Version: "1.0",
Service: filters.NewPublicFilterAPI(s.APIBackend, 5*time.Minute),
Service: filters.NewPublicFilterAPI(s.APIBackend, 5*time.Minute, s.maxWsSubs),
Public: true,
}, {
Namespace: "quai",
Version: "1.0",
Service: filters.NewPublicFilterAPI(s.APIBackend, 5*time.Minute),
Service: filters.NewPublicFilterAPI(s.APIBackend, 5*time.Minute, s.maxWsSubs),
Public: true,
}, {
Namespace: "admin",
Expand Down
56 changes: 42 additions & 14 deletions quai/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,24 +55,28 @@ type filter struct {
// PublicFilterAPI offers support to create and manage filters. This will allow external clients to retrieve various
// information related to the Quai protocol such as blocks, transactions and logs.
type PublicFilterAPI struct {
backend Backend
mux *event.TypeMux
quit chan struct{}
chainDb ethdb.Database
events *EventSystem
filtersMu sync.Mutex
filters map[rpc.ID]*filter
timeout time.Duration
backend Backend
mux *event.TypeMux
quit chan struct{}
chainDb ethdb.Database
events *EventSystem
filtersMu sync.Mutex
filters map[rpc.ID]*filter
timeout time.Duration
subscriptionLimit int
activeSubscriptions int
}

// NewPublicFilterAPI returns a new PublicFilterAPI instance.
func NewPublicFilterAPI(backend Backend, timeout time.Duration) *PublicFilterAPI {
func NewPublicFilterAPI(backend Backend, timeout time.Duration, subscriptionLimit int) *PublicFilterAPI {
api := &PublicFilterAPI{
backend: backend,
chainDb: backend.ChainDb(),
events: NewEventSystem(backend),
filters: make(map[rpc.ID]*filter),
timeout: timeout,
backend: backend,
chainDb: backend.ChainDb(),
events: NewEventSystem(backend),
filters: make(map[rpc.ID]*filter),
timeout: timeout,
subscriptionLimit: subscriptionLimit,
activeSubscriptions: 0,
}
go api.timeoutLoop(timeout)

Expand Down Expand Up @@ -166,6 +170,10 @@ func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID {
// NewPendingTransactions creates a subscription that is triggered each time a transaction
// enters the transaction pool and was signed from one of the transactions this nodes manages.
func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Subscription, error) {
if api.activeSubscriptions >= api.subscriptionLimit {
return &rpc.Subscription{}, errors.New("too many subscribers")
}

notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
Expand All @@ -181,7 +189,9 @@ func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Su
"stacktrace": string(debug.Stack()),
}).Fatal("Go-Quai Panicked")
}
api.activeSubscriptions -= 1
}()
api.activeSubscriptions += 1
txHashes := make(chan []common.Hash, 128)
pendingTxSub := api.events.SubscribePendingTxs(txHashes)

Expand Down Expand Up @@ -251,6 +261,10 @@ func (api *PublicFilterAPI) NewBlockFilter() rpc.ID {

// NewHeads send a notification each time a new (header) block is appended to the chain.
func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) {
if api.activeSubscriptions >= api.subscriptionLimit {
return &rpc.Subscription{}, errors.New("too many subscribers")
}

notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
Expand All @@ -266,7 +280,9 @@ func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, er
"stacktrace": string(debug.Stack()),
}).Fatal("Go-Quai Panicked")
}
api.activeSubscriptions -= 1
}()
api.activeSubscriptions += 1
headers := make(chan *types.WorkObject)
headersSub := api.events.SubscribeNewHeads(headers)

Expand All @@ -291,6 +307,10 @@ func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, er

// Accesses send a notification each time the specified address is accessed
func (api *PublicFilterAPI) Accesses(ctx context.Context, addr common.Address) (*rpc.Subscription, error) {
if api.activeSubscriptions >= api.subscriptionLimit {
return &rpc.Subscription{}, errors.New("too many subscribers")
}

notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
Expand All @@ -306,7 +326,9 @@ func (api *PublicFilterAPI) Accesses(ctx context.Context, addr common.Address) (
"stacktrace": string(debug.Stack()),
}).Fatal("Go-Quai Panicked")
}
api.activeSubscriptions -= 1
}()
api.activeSubscriptions += 1
headers := make(chan *types.WorkObject)
headersSub := api.events.SubscribeNewHeads(headers)

Expand Down Expand Up @@ -348,6 +370,10 @@ func (api *PublicFilterAPI) Accesses(ctx context.Context, addr common.Address) (

// Logs creates a subscription that fires for all new log that match the given filter criteria.
func (api *PublicFilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc.Subscription, error) {
if api.activeSubscriptions >= api.subscriptionLimit {
return &rpc.Subscription{}, errors.New("too many subscribers")
}

notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
Expand All @@ -371,7 +397,9 @@ func (api *PublicFilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc
"stacktrace": string(debug.Stack()),
}).Fatal("Go-Quai Panicked")
}
api.activeSubscriptions -= 1
}()
api.activeSubscriptions += 1
for {
select {
case logs := <-matchedLogs:
Expand Down

0 comments on commit f74880b

Please sign in to comment.