Skip to content

Commit

Permalink
Added integration test for max subscription per connection limitation
Browse files Browse the repository at this point in the history
  • Loading branch information
UlyanaAndrukhiv committed Jan 20, 2025
1 parent 5c41d9b commit 1d54095
Showing 1 changed file with 54 additions and 4 deletions.
58 changes: 54 additions & 4 deletions integration/tests/access/cohort4/websocket_subscriptions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/onflow/flow-go/engine/access/rest/util"
"github.com/onflow/flow-go/engine/access/rest/websockets/data_providers"
"github.com/onflow/flow-go/engine/access/rest/websockets/models"
"github.com/onflow/flow-go/engine/access/rpc/backend"
"github.com/onflow/flow-go/engine/common/rpc/convert"
"github.com/onflow/flow-go/integration/testnet"
"github.com/onflow/flow-go/integration/tests/access/common"
Expand All @@ -37,6 +36,7 @@ import (
)

const InactivityTimeout = 20
const MaxSubscriptionsPerConnection = 5

func TestWebsocketSubscription(t *testing.T) {
suite.Run(t, new(WebsocketSubscriptionSuite))
Expand Down Expand Up @@ -80,9 +80,9 @@ func (s *WebsocketSubscriptionSuite) SetupTest() {
testnet.WithAdditionalFlagf("--execution-data-dir=%s", testnet.DefaultExecutionDataServiceDir),
testnet.WithAdditionalFlag("--execution-data-retry-delay=1s"),
testnet.WithAdditionalFlag("--execution-data-indexing-enabled=true"),
testnet.WithAdditionalFlagf("--tx-result-query-mode=%s", backend.IndexQueryModeExecutionNodesOnly),
testnet.WithAdditionalFlagf("--execution-state-dir=%s", testnet.DefaultExecutionStateDir),
testnet.WithAdditionalFlagf("--websocket-inactivity-timeout=%ds", InactivityTimeout),
testnet.WithAdditionalFlagf("--websocket-max-subscriptions-per-connection=%d", MaxSubscriptionsPerConnection),
testnet.WithMetricsServer(),
)

Expand Down Expand Up @@ -193,7 +193,7 @@ func (s *WebsocketSubscriptionSuite) TestInactivityTracker() {

s.Require().Equal(1, len(baseResponses))
subscribeResponse := baseResponses[0]
s.validateBaseMessageResponse(subscriptionRequest.SubscriptionID, baseResponses[0])
s.validateBaseMessageResponse(subscriptionRequest.SubscriptionID, subscribeResponse)

// Step 3: Unsubscribe from the topic
unsubscribeRequest := models.UnsubscribeMessageRequest{
Expand All @@ -218,6 +218,53 @@ func (s *WebsocketSubscriptionSuite) TestInactivityTracker() {
})
}

// TestMaxSubscriptionsPerConnection validates the behavior of the WebSocket server
// when the number of subscriptions exceeds the configured maximum limit.
//
// Expected behavior:
// - For the first `MaxSubscriptionsPerConnection` requests, the server should respond with successful subscription messages.
// - On exceeding the subscription limit, the server should return an error response with a message.

func (s *WebsocketSubscriptionSuite) TestMaxSubscriptionsPerConnection() {
websocketsUrl := getWebsocketsUrl(s.restAccessAddress)
wsClient, err := common.GetWSClient(s.ctx, websocketsUrl)
s.Require().NoError(err)

defer func() { s.Require().NoError(wsClient.Close()) }()

blocksSubscriptionArguments := models.Arguments{"block_status": parser.Finalized}
// Expected error message when exceeding the maximum subscription limit.
expectedErrorMessage := fmt.Sprintf("maximum number of subscription reached: %d", MaxSubscriptionsPerConnection)

// Loop to send subscription requests, including one request exceeding the limit.
for i := 1; i <= MaxSubscriptionsPerConnection+1; i++ {
// Create a subscription message request with a unique ID.
subscriptionToBlocksRequest := s.subscribeMessageRequest(
strconv.Itoa(i),
data_providers.BlocksTopic,
blocksSubscriptionArguments,
)

// send blocks subscription message
err := wsClient.WriteJSON(subscriptionToBlocksRequest)
s.Require().NoError(err, "failed to send subscription message")

// Receive response
_, baseResponses, _ := s.listenWebSocketResponses(wsClient, 2*time.Second, subscriptionToBlocksRequest.SubscriptionID)
s.Require().Equal(1, len(baseResponses))
subscribeResponse := baseResponses[0]

if i <= MaxSubscriptionsPerConnection {
// Validate successful subscription response.
s.validateBaseMessageResponse(subscriptionToBlocksRequest.SubscriptionID, subscribeResponse)
} else {
// Validate error response for exceeding the subscription limit.
//s.Require().Equal(models.SubscribeAction, subscribeResponse.Action)
s.Require().Equal(expectedErrorMessage, subscribeResponse.Error.Message)
}
}
}

// monitorInactivity monitors the WebSocket connection for inactivity.
func monitorInactivity(t *testing.T, client *websocket.Conn, timeout time.Duration) time.Duration {
start := time.Now()
Expand Down Expand Up @@ -999,7 +1046,10 @@ func (s *WebsocketSubscriptionSuite) listenWebSocketResponses(
}

// validateBaseMessageResponse validates the properties of a success BaseMessageResponse.
func (s *WebsocketSubscriptionSuite) validateBaseMessageResponse(expectedSubscriptionID string, actualResponse models.BaseMessageResponse) {
func (s *WebsocketSubscriptionSuite) validateBaseMessageResponse(
expectedSubscriptionID string,
actualResponse models.BaseMessageResponse,
) {
s.Require().Equal(expectedSubscriptionID, actualResponse.SubscriptionID)
s.Require().Equal(0, actualResponse.Error.Code)
s.Require().Empty(actualResponse.Error.Message)
Expand Down

0 comments on commit 1d54095

Please sign in to comment.