Skip to content

Commit

Permalink
Merge pull request #587 from onflow/jordan/hf-access-get-events-by-he…
Browse files Browse the repository at this point in the history
…ight-range-limit

Access RPC: GetEvents Performance Improvements
  • Loading branch information
jordanschalm authored Apr 5, 2021
2 parents d5baed1 + b253059 commit a9b3abf
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 20 deletions.
1 change: 1 addition & 0 deletions cmd/access/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func main() {
flags.StringVarP(&rpcConf.HistoricalAccessAddrs, "historical-access-addr", "", "", "comma separated rpc addresses for historical access nodes")
flags.DurationVar(&rpcConf.CollectionClientTimeout, "collection-client-timeout", 3*time.Second, "grpc client timeout for a collection node")
flags.DurationVar(&rpcConf.ExecutionClientTimeout, "execution-client-timeout", 3*time.Second, "grpc client timeout for an execution node")
flags.UintVar(&rpcConf.MaxHeightRange, "rpc-max-height-range", backend.DefaultMaxHeightRange, "maximum size for height range requests")
flags.StringSliceVar(&rpcConf.PreferredExecutionNodeIDs, "preferred-execution-node-ids", nil, "comma separated list of execution nodes ids to choose from when making an upstream call e.g. b4a4dbdcd443d...,fb386a6a... etc.")
flags.StringSliceVar(&rpcConf.FixedExecutionNodeIDs, "fixed-execution-node-ids", nil, "comma separated list of execution nodes ids to choose from when making an upstream call if no matching preferred execution id is found e.g. b4a4dbdcd443d...,fb386a6a... etc.")
flags.BoolVar(&logTxTimeToFinalized, "log-tx-time-to-finalized", false, "log transaction time to finalized")
Expand Down
3 changes: 3 additions & 0 deletions engine/access/access_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func (suite *Suite) RunTest(
suite.metrics,
nil,
false,
backend.DefaultMaxHeightRange,
nil,
nil,
suite.log,
Expand Down Expand Up @@ -286,6 +287,7 @@ func (suite *Suite) TestSendTransactionToRandomCollectionNode() {
metrics,
connFactory, // passing in the connection factory
false,
backend.DefaultMaxHeightRange,
nil,
nil,
suite.log,
Expand Down Expand Up @@ -518,6 +520,7 @@ func (suite *Suite) TestExecuteScript() {
suite.metrics,
connFactory,
false,
backend.DefaultMaxHeightRange,
nil,
nil,
suite.log,
Expand Down
7 changes: 6 additions & 1 deletion engine/access/rpc/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import (
// maxExecutionNodesCnt is the max number of execution nodes that will be contacted to complete an execution api request
const maxExecutionNodesCnt = 3

// DefaultMaxHeightRange is the default maximum size of range requests.
const DefaultMaxHeightRange = 250

var preferredENIdentifiers flow.IdentifierList
var fixedENIdentifiers flow.IdentifierList

Expand Down Expand Up @@ -68,6 +71,7 @@ func New(
transactionMetrics module.TransactionMetrics,
connFactory ConnectionFactory,
retryEnabled bool,
maxHeightRange uint,
preferredExecutionNodeIDs []string,
fixedExecutionNodeIDs []string,
log zerolog.Logger,
Expand Down Expand Up @@ -108,10 +112,11 @@ func New(
backendEvents: backendEvents{
staticExecutionRPC: executionRPC,
state: state,
blocks: blocks,
headers: headers,
executionReceipts: executionReceipts,
connFactory: connFactory,
log: log,
maxHeightRange: maxHeightRange,
},
backendBlockHeaders: backendBlockHeaders{
headers: headers,
Expand Down
16 changes: 11 additions & 5 deletions engine/access/rpc/backend/backend_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ import (

type backendEvents struct {
staticExecutionRPC execproto.ExecutionAPIClient
blocks storage.Blocks
headers storage.Headers
executionReceipts storage.ExecutionReceipts
state protocol.State
connFactory ConnectionFactory
log zerolog.Logger
maxHeightRange uint
}

// GetEventsForHeightRange retrieves events for all sealed blocks between the start block height and
Expand Down Expand Up @@ -60,12 +61,12 @@ func (b *backendEvents) GetEventsForHeightRange(
blockHeaders := make([]*flow.Header, 0)

for i := startHeight; i <= endHeight; i++ {
block, err := b.blocks.ByHeight(i)
header, err := b.headers.ByHeight(i)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to get events: %v", err)
}

blockHeaders = append(blockHeaders, block.Header)
blockHeaders = append(blockHeaders, header)
}

return b.getBlockEventsFromExecutionNode(ctx, blockHeaders, eventType)
Expand All @@ -81,12 +82,12 @@ func (b *backendEvents) GetEventsForBlockIDs(
// find the block headers for all the block IDs
blockHeaders := make([]*flow.Header, 0)
for _, blockID := range blockIDs {
block, err := b.blocks.ByID(blockID)
header, err := b.headers.ByBlockID(blockID)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to get events: %v", err)
}

blockHeaders = append(blockHeaders, block.Header)
blockHeaders = append(blockHeaders, header)
}

// forward the request to the execution node
Expand All @@ -109,6 +110,11 @@ func (b *backendEvents) getBlockEventsFromExecutionNode(
return []flow.BlockEvents{}, nil
}

// limit height range queries
if uint(len(blockIDs)) > b.maxHeightRange {
return nil, fmt.Errorf("requested block range (%d) exceeded maximum (%d)", len(blockIDs), b.maxHeightRange)
}

req := execproto.GetEventsForBlockIDsRequest{
Type: eventType,
BlockIds: convert.IdentifiersToMessages(blockIDs),
Expand Down
74 changes: 62 additions & 12 deletions engine/access/rpc/backend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func (suite *Suite) TestPing() {
metrics.NewNoopCollector(),
nil,
false,
DefaultMaxHeightRange,
nil,
nil,
suite.log,
Expand All @@ -110,6 +111,7 @@ func (suite *Suite) TestGetLatestFinalizedBlockHeader() {
metrics.NewNoopCollector(),
nil,
false,
DefaultMaxHeightRange,
nil,
nil,
suite.log,
Expand Down Expand Up @@ -141,6 +143,7 @@ func (suite *Suite) TestGetLatestProtocolStateSnapshot() {
metrics.NewNoopCollector(),
nil,
false,
100,
nil,
nil,
suite.log,
Expand Down Expand Up @@ -173,6 +176,7 @@ func (suite *Suite) TestGetLatestSealedBlockHeader() {
metrics.NewNoopCollector(),
nil,
false,
DefaultMaxHeightRange,
nil,
nil,
suite.log,
Expand Down Expand Up @@ -210,6 +214,7 @@ func (suite *Suite) TestGetTransaction() {
metrics.NewNoopCollector(),
nil,
false,
DefaultMaxHeightRange,
nil,
nil,
suite.log,
Expand Down Expand Up @@ -243,6 +248,7 @@ func (suite *Suite) TestGetCollection() {
metrics.NewNoopCollector(),
nil,
false,
DefaultMaxHeightRange,
nil,
nil,
suite.log,
Expand Down Expand Up @@ -325,6 +331,7 @@ func (suite *Suite) TestTransactionStatusTransition() {
metrics.NewNoopCollector(),
connFactory,
false,
DefaultMaxHeightRange,
nil,
nil,
suite.log,
Expand Down Expand Up @@ -443,6 +450,7 @@ func (suite *Suite) TestTransactionExpiredStatusTransition() {
metrics.NewNoopCollector(),
nil,
false,
DefaultMaxHeightRange,
nil,
nil,
suite.log,
Expand Down Expand Up @@ -603,6 +611,7 @@ func (suite *Suite) TestTransactionPendingToFinalizedStatusTransition() {
metrics.NewNoopCollector(),
connFactory,
false,
100,
nil,
nil,
suite.log,
Expand Down Expand Up @@ -656,6 +665,7 @@ func (suite *Suite) TestTransactionResultUnknown() {
metrics.NewNoopCollector(),
nil,
false,
DefaultMaxHeightRange,
nil,
nil,
suite.log,
Expand Down Expand Up @@ -695,6 +705,7 @@ func (suite *Suite) TestGetLatestFinalizedBlock() {
metrics.NewNoopCollector(),
nil,
false,
DefaultMaxHeightRange,
nil,
nil,
suite.log,
Expand Down Expand Up @@ -726,9 +737,9 @@ func (suite *Suite) TestGetEventsForBlockIDs() {

for i := 0; i < n; i++ {
b := unittest.BlockFixture()
suite.blocks.
On("ByID", b.ID()).
Return(&b, nil).Twice()
suite.headers.
On("ByBlockID", b.ID()).
Return(b.Header, nil).Twice()

headers[i] = b.Header

Expand Down Expand Up @@ -810,13 +821,14 @@ func (suite *Suite) TestGetEventsForBlockIDs() {
suite.state,
suite.execClient, // pass the default client
nil, nil,
suite.blocks,
nil, nil, nil,
nil,
suite.headers, nil, nil,
receipts,
suite.chainID,
metrics.NewNoopCollector(),
nil,
false,
DefaultMaxHeightRange,
nil,
nil,
suite.log,
Expand All @@ -835,13 +847,14 @@ func (suite *Suite) TestGetEventsForBlockIDs() {
suite.state,
nil,
nil, nil,
suite.blocks,
nil, nil, nil,
nil,
suite.headers, nil, nil,
suite.receipts,
suite.chainID,
metrics.NewNoopCollector(),
connFactory, // the connection factory should be used to get the execution node client
false,
DefaultMaxHeightRange,
nil,
validENIDs.Strings(), // set the fixed EN Identifiers to the generated execution IDs
suite.log,
Expand All @@ -861,13 +874,14 @@ func (suite *Suite) TestGetEventsForBlockIDs() {
suite.state,
nil, // no default client, hence the receipts storage should be looked up
nil, nil,
suite.blocks,
nil, nil, nil,
nil,
suite.headers, nil, nil,
suite.receipts,
suite.chainID,
metrics.NewNoopCollector(),
connFactory, // the connection factory should be used to get the execution node client
false,
DefaultMaxHeightRange,
nil,
validENIDs.Strings(),
suite.log,
Expand Down Expand Up @@ -903,9 +917,9 @@ func (suite *Suite) TestGetEventsForHeightRange() {
for i := min; i <= max; i++ {
b := unittest.BlockFixture()

suite.blocks.
suite.headers.
On("ByHeight", i).
Return(&b, nil).Once()
Return(b.Header, nil).Once()

headers = append(headers, b.Header)
}
Expand Down Expand Up @@ -964,12 +978,13 @@ func (suite *Suite) TestGetEventsForHeightRange() {
suite.Run("invalid request max height < min height", func() {
backend := New(
suite.state,
nil, nil, nil, nil, nil, nil, nil,
nil, nil, nil, nil, suite.headers, nil, nil,
suite.receipts,
suite.chainID,
metrics.NewNoopCollector(),
nil,
false,
DefaultMaxHeightRange,
nil,
nil,
suite.log,
Expand Down Expand Up @@ -1003,6 +1018,7 @@ func (suite *Suite) TestGetEventsForHeightRange() {
metrics.NewNoopCollector(),
nil,
false,
DefaultMaxHeightRange,
nil,
nil,
suite.log,
Expand Down Expand Up @@ -1035,6 +1051,7 @@ func (suite *Suite) TestGetEventsForHeightRange() {
metrics.NewNoopCollector(),
nil,
false,
DefaultMaxHeightRange,
nil,
nil,
suite.log,
Expand All @@ -1047,6 +1064,35 @@ func (suite *Suite) TestGetEventsForHeightRange() {
suite.Require().Equal(expectedResp, actualResp)
})

// set max height range to 1 and request range of 2
suite.Run("invalid request exceeding max height range", func() {
headHeight = maxHeight - 1
setupHeadHeight(headHeight)
blockHeaders = setupStorage(minHeight, headHeight)

// create handler
backend := New(
suite.state,
suite.execClient,
nil, nil,
suite.blocks,
suite.headers,
nil, nil,
suite.receipts,
suite.chainID,
metrics.NewNoopCollector(),
nil,
false,
1, // set maximum range to 1
nil,
nil,
suite.log,
)

_, err := backend.GetEventsForHeightRange(ctx, string(flow.EventAccountCreated), minHeight, minHeight+1)
suite.Require().Error(err)
})

suite.Run("invalid request last_sealed_block_height < min height", func() {

// set sealed height to one less than the request start height
Expand All @@ -1069,6 +1115,7 @@ func (suite *Suite) TestGetEventsForHeightRange() {
metrics.NewNoopCollector(),
nil,
false,
DefaultMaxHeightRange,
nil,
nil,
suite.log,
Expand Down Expand Up @@ -1137,6 +1184,7 @@ func (suite *Suite) TestGetAccount() {
metrics.NewNoopCollector(),
connFactory,
false,
DefaultMaxHeightRange,
nil,
nil,
suite.log,
Expand Down Expand Up @@ -1207,6 +1255,7 @@ func (suite *Suite) TestGetAccountAtBlockHeight() {
metrics.NewNoopCollector(),
nil,
false,
DefaultMaxHeightRange,
nil,
nil,
suite.log,
Expand Down Expand Up @@ -1234,6 +1283,7 @@ func (suite *Suite) TestGetNetworkParameters() {
metrics.NewNoopCollector(),
nil,
false,
DefaultMaxHeightRange,
nil,
nil,
suite.log,
Expand Down
2 changes: 2 additions & 0 deletions engine/access/rpc/backend/historical_access_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func (suite *Suite) TestHistoricalTransactionResult() {
metrics.NewNoopCollector(),
nil,
false,
DefaultMaxHeightRange,
nil,
nil,
suite.log,
Expand Down Expand Up @@ -107,6 +108,7 @@ func (suite *Suite) TestHistoricalTransaction() {
metrics.NewNoopCollector(),
nil,
false,
DefaultMaxHeightRange,
nil,
nil,
suite.log,
Expand Down
Loading

0 comments on commit a9b3abf

Please sign in to comment.