diff --git a/README.md b/README.md index 1605de0..bc128e1 100644 --- a/README.md +++ b/README.md @@ -18,6 +18,7 @@ simple token support, and may provide a starting point for developing production contracts that can be used with this connector. To be usable by this connector, an ERC1155 contract should do all of the following: + 1. Conform to [IERC1155MixedFungible](samples/solidity/contracts/IERC1155MixedFungible.sol). 2. Group tokens into clear fungible and non-fungible pools by partitioning the token ID space via the split bit implementation detailed in the comments in [ERC1155MixedFungible](samples/solidity/contracts/ERC1155MixedFungible.sol). @@ -32,8 +33,9 @@ are additional methods used by the token connector to guess at the contract ABI but is the preferred method for most use cases. To leverage this capability in a running FireFly environment, you must: + 1. [Upload the token contract ABI to FireFly](https://hyperledger.github.io/firefly/tutorials/custom_contracts/ethereum.html) -as a contract interface. + as a contract interface. 2. Include the `interface` parameter when [creating the pool on FireFly](https://hyperledger.github.io/firefly/tutorials/tokens). This will cause FireFly to parse the interface and provide ABI details @@ -94,8 +96,8 @@ All approvals are global and will apply to all tokens across _all_ pools on a pa The following APIs are not part of the fftokens standard, but are exposed under `/api/v1`: -* `GET /balance` - Get token balance -* `GET /receipt/:id` - Get receipt for a previous request +- `GET /balance` - Get token balance +- `GET /receipt/:id` - Get receipt for a previous request ## Running the service @@ -143,3 +145,29 @@ $ npm run lint # formatting $ npm run format ``` + +## Blockchain retry behaviour + +Most short-term outages should be handled by the blockchain connector. For example if the blockchain node returns `HTTP 429` due to rate limiting +it is the blockchain connector's responsibility to use appropriate back-off retries to attempt to make the required blockchain call successfully. + +There are cases where the token connector may need to perform its own back-off retry for a blockchain action. For example if the blockchain connector +microservice has crashed and is in the process of restarting just as the token connector is trying to query an NFT token URI to enrich a token event, if +the token connector doesn't perform a retry then the event will be returned without the token URI populated. + +The token connector has configurable retry behaviour for all blockchain related calls. By default the connector will perform up to 15 retries with a back-off +interval between each one. The default first retry interval is 100ms and doubles up to a maximum of 10s per retry interval. Retries are only performed where +the error returned from the REST call matches a configurable regular expression retry condition. The default retry condition is `.*ECONN.*` which ensures +retries take place for common TCP errors such as `ECONNRESET` and `ECONNREFUSED`. + +The configurable retry settings are: + +- `RETRY_BACKOFF_FACTOR` (default `2`) +- `RETRY_BACKOFF_LIMIT_MS` (default `10000`) +- `RETRY_BACKOFF_INITIAL_MS` (default `100`) +- `RETRY_CONDITION` (default `.*ECONN.*`) +- `RETRY_MAX_ATTEMPTS` (default `15`) + +Setting `RETRY_CONDITION` to `""` disables retries. Setting `RETRY_MAX_ATTEMPTS` to `-1` causes it to retry indefinitely. + +Note, the token connector will make a total of `RETRY_MAX_ATTEMPTS` + 1 calls for a given retryable call (1 original attempt and `RETRY_MAX_ATTEMPTS` retries) diff --git a/src/main.ts b/src/main.ts index 6313190..9455d74 100644 --- a/src/main.ts +++ b/src/main.ts @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2023 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -32,7 +32,7 @@ import { TokenTransferEvent, } from './tokens/tokens.interfaces'; import { EventStreamService } from './event-stream/event-stream.service'; -import { BlockchainConnectorService } from './tokens/blockchain.service'; +import { BlockchainConnectorService, RetryConfiguration } from './tokens/blockchain.service'; import { requestIDMiddleware } from './request-context/request-id.middleware'; import { newContext } from './request-context/request-context.decorator'; @@ -81,6 +81,15 @@ async function bootstrap() { const contractAddress = config.get('CONTRACT_ADDRESS', ''); const passthroughHeaderString = config.get('PASSTHROUGH_HEADERS', ''); + // Configuration for blockchain call retries + const blockchainRetryCfg: RetryConfiguration = { + retryBackOffFactor: config.get('RETRY_BACKOFF_FACTOR', 2), + retryBackOffLimit: config.get('RETRY_BACKOFF_LIMIT_MS', 10000), + retryBackOffInitial: config.get('RETRY_BACKOFF_INITIAL_MS', 100), + retryCondition: config.get('RETRY_CONDITION', '.*ECONN.*'), + retriesMax: config.get('RETRY_MAX_ATTEMPTS', 15), + }; + const passthroughHeaders: string[] = []; for (const h of passthroughHeaderString.split(',')) { passthroughHeaders.push(h.toLowerCase()); @@ -90,7 +99,7 @@ async function bootstrap() { app.get(TokensService).configure(ethConnectUrl, instancePath, topic, contractAddress); app .get(BlockchainConnectorService) - .configure(ethConnectUrl, username, password, passthroughHeaders); + .configure(ethConnectUrl, username, password, passthroughHeaders, blockchainRetryCfg); if (autoInit.toLowerCase() !== 'false') { await app.get(TokensService).init(newContext()); diff --git a/src/tokens/blockchain.service.ts b/src/tokens/blockchain.service.ts index dcff090..66f8202 100644 --- a/src/tokens/blockchain.service.ts +++ b/src/tokens/blockchain.service.ts @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2023 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -38,6 +38,14 @@ import { const sendTransactionHeader = 'SendTransaction'; const queryHeader = 'Query'; +export interface RetryConfiguration { + retryBackOffFactor: number; + retryBackOffLimit: number; + retryBackOffInitial: number; + retryCondition: string; + retriesMax: number; +} + @Injectable() export class BlockchainConnectorService { private readonly logger = new Logger(BlockchainConnectorService.name); @@ -47,13 +55,22 @@ export class BlockchainConnectorService { password: string; passthroughHeaders: string[]; + retryConfiguration: RetryConfiguration; + constructor(private http: HttpService) {} - configure(baseUrl: string, username: string, password: string, passthroughHeaders: string[]) { + configure( + baseUrl: string, + username: string, + password: string, + passthroughHeaders: string[], + retryConfiguration: RetryConfiguration, + ) { this.baseUrl = baseUrl; this.username = username; this.password = password; this.passthroughHeaders = passthroughHeaders; + this.retryConfiguration = retryConfiguration; } private requestOptions(ctx: Context): AxiosRequestConfig { @@ -85,22 +102,77 @@ export class BlockchainConnectorService { }); } + // Check if retry condition matches the err that's been hit + private matchesRetryCondition(err: any): boolean { + return ( + this.retryConfiguration.retryCondition != '' && + `${err}`.match(this.retryConfiguration.retryCondition) !== null + ); + } + + // Delay by the appropriate amount of time given the iteration the caller is in + private async backoffDelay(iteration: number) { + const delay = Math.min( + this.retryConfiguration.retryBackOffInitial * + Math.pow(this.retryConfiguration.retryBackOffFactor, iteration), + this.retryConfiguration.retryBackOffLimit, + ); + await new Promise(resolve => setTimeout(resolve, delay)); + } + + // Generic helper function that makes a given blockchain function retryable + // by using synchronous back-off delays for cases where the function returns + // an error which matches the configured retry condition + private async retryableCall( + blockchainFunction: () => Promise>, + ): Promise> { + let retries = 0; + for ( + ; + this.retryConfiguration.retriesMax == -1 || retries <= this.retryConfiguration.retriesMax; + this.retryConfiguration.retriesMax == -1 || retries++ // Don't inc 'retries' if 'retriesMax' if set to -1 (infinite retries) + ) { + try { + return await blockchainFunction(); + } catch (e) { + if (this.matchesRetryCondition(e)) { + this.logger.debug(`Retry condition matched for error ${e}`); + // Wait for a backed-off delay before trying again + await this.backoffDelay(retries); + } else { + // Whatever the error was it's not one we will retry for + throw e; + } + } + } + + throw new InternalServerErrorException( + `Call to blockchain connector failed after ${retries} attempts`, + ); + } + async getContractInfo(ctx: Context, url: string) { const response = await this.wrapError( - lastValueFrom(this.http.get(url, this.requestOptions(ctx))), + this.retryableCall( + async (): Promise> => { + return lastValueFrom(this.http.get(url, this.requestOptions(ctx))); + }, + ), ); return response.data; } async query(ctx: Context, to: string, method?: IAbiMethod, params?: any[]) { const response = await this.wrapError( - lastValueFrom( - this.http.post( - this.baseUrl, - { headers: { type: queryHeader }, to, method, params }, - this.requestOptions(ctx), - ), - ), + this.retryableCall(async (): Promise> => { + return lastValueFrom( + this.http.post( + this.baseUrl, + { headers: { type: queryHeader }, to, method, params }, + this.requestOptions(ctx), + ), + ); + }), ); return response.data; } @@ -114,18 +186,22 @@ export class BlockchainConnectorService { params?: any[], ) { const response = await this.wrapError( - lastValueFrom( - this.http.post( - this.baseUrl, - { - headers: { id, type: sendTransactionHeader }, - from, - to, - method, - params, - }, - this.requestOptions(ctx), - ), + this.retryableCall( + async (): Promise> => { + return lastValueFrom( + this.http.post( + this.baseUrl, + { + headers: { id, type: sendTransactionHeader }, + from, + to, + method, + params, + }, + this.requestOptions(ctx), + ), + ); + }, ), ); return response.data; @@ -133,12 +209,14 @@ export class BlockchainConnectorService { async getReceipt(ctx: Context, id: string): Promise { const response = await this.wrapError( - lastValueFrom( - this.http.get(new URL(`/reply/${id}`, this.baseUrl).href, { - validateStatus: status => status < 300 || status === 404, - ...this.requestOptions(ctx), - }), - ), + this.retryableCall(async (): Promise> => { + return lastValueFrom( + this.http.get(new URL(`/reply/${id}`, this.baseUrl).href, { + validateStatus: status => status < 300 || status === 404, + ...this.requestOptions(ctx), + }), + ); + }), ); if (response.status === 404) { throw new NotFoundException(); diff --git a/src/tokens/tokens.service.spec.ts b/src/tokens/tokens.service.spec.ts index d5bd523..688a96a 100644 --- a/src/tokens/tokens.service.spec.ts +++ b/src/tokens/tokens.service.spec.ts @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2023 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -18,20 +18,55 @@ import { HttpService } from '@nestjs/axios'; import { Test, TestingModule } from '@nestjs/testing'; import { EventStreamService } from '../event-stream/event-stream.service'; import { EventStreamProxyGateway } from '../eventstream-proxy/eventstream-proxy.gateway'; +import { AxiosResponse } from '@nestjs/terminus/dist/health-indicator/http/axios.interfaces'; +import { Observer } from 'rxjs'; import { AbiMapperService } from './abimapper.service'; -import { BlockchainConnectorService } from './blockchain.service'; +import { BlockchainConnectorService, RetryConfiguration } from './blockchain.service'; import { TokensService } from './tokens.service'; import { newContext } from '../request-context/request-context.decorator'; +import { EthConnectReturn } from './tokens.interfaces'; + +const BASE_URL = 'http://eth'; + +class FakeObservable { + constructor(public data: T) {} + subscribe(observer?: Partial>>) { + observer?.next && + observer?.next({ + status: 200, + statusText: 'OK', + headers: {}, + config: {}, + data: this.data, + }); + observer?.complete && observer?.complete(); + } +} describe('TokensService', () => { + let http: { + post: ReturnType; + }; let service: TokensService; let eventStream: { addListener: ReturnType; getStreams: ReturnType; getSubscriptions: ReturnType; }; + let blockchain: BlockchainConnectorService; + + const mockECONNErrors = (count: number) => { + for (let i = 0; i < count; i++) { + http.post.mockImplementationOnce(() => { + throw new Error('connect ECONNREFUSED 10.1.2.3'); + }); + } + }; beforeEach(async () => { + http = { + post: jest.fn(), + }; eventStream = { addListener: jest.fn(), getStreams: jest.fn(), @@ -56,9 +91,22 @@ describe('TokensService', () => { useValue: { addListener: jest.fn() }, }, ], - }).compile(); + }) + .overrideProvider(HttpService) + .useValue(http) + .compile(); + + const blockchainRetryCfg: RetryConfiguration = { + retryBackOffFactor: 2, + retryBackOffLimit: 500, + retryBackOffInitial: 50, + retryCondition: '.*ECONN.*', + retriesMax: 15, + }; service = module.get(TokensService); + blockchain = module.get(BlockchainConnectorService); + blockchain.configure(BASE_URL, '', '', [], blockchainRetryCfg); }); it('should be defined', () => { @@ -107,4 +155,37 @@ describe('TokensService', () => { expect(await service.migrationCheck(newContext())).toBe(false); }); }); + + describe('Query token URI', () => { + it('should get the token URI', async () => { + const ctx = newContext(); + + http.post.mockReturnValueOnce( + new FakeObservable({ + output: 'ff://my/nft/uri', + }), + ); + + const val = await blockchain.query(ctx, '', undefined, undefined); + + expect(val.output).toBe('ff://my/nft/uri'); + expect(http.post).toHaveBeenCalledTimes(1); // Expect call to work first time + }); + + it('should get the token URI after 6 ECONNREFUSED retries', async () => { + const ctx = newContext(); + + mockECONNErrors(6); + http.post.mockReturnValueOnce( + new FakeObservable({ + output: 'ff://my/nft/uri', + }), + ); + + const val = await blockchain.query(ctx, '', undefined, undefined); + + expect(val.output).toBe('ff://my/nft/uri'); + expect(http.post).toHaveBeenCalledTimes(7); // Expect 6 ECONN errors, then final call OK = 7 POSTs + }); + }); }); diff --git a/test/app.e2e-context.ts b/test/app.e2e-context.ts index fb6a200..3380980 100644 --- a/test/app.e2e-context.ts +++ b/test/app.e2e-context.ts @@ -11,7 +11,7 @@ import { EventStreamReply, EventBatch } from '../src/event-stream/event-stream.i import { EventStreamService } from '../src/event-stream/event-stream.service'; import { EventStreamProxyGateway } from '../src/eventstream-proxy/eventstream-proxy.gateway'; import { TokensService } from '../src/tokens/tokens.service'; -import { BlockchainConnectorService } from '../src/tokens/blockchain.service'; +import { BlockchainConnectorService, RetryConfiguration } from '../src/tokens/blockchain.service'; import { requestIDMiddleware } from '../src/request-context/request-id.middleware'; export const BASE_URL = 'http://eth'; @@ -72,9 +72,17 @@ export class TestContext { this.app.use(requestIDMiddleware); await this.app.init(); + let blockchainRetryCfg: RetryConfiguration = { + retryBackOffFactor: 2, + retryBackOffLimit: 500, + retryBackOffInitial: 50, + retryCondition: '.*ECONN.*', + retriesMax: 15, + }; + this.app.get(EventStreamProxyGateway).configure('url', TOPIC); this.app.get(TokensService).configure(BASE_URL, INSTANCE_PATH, TOPIC, CONTRACT_ADDRESS); - this.app.get(BlockchainConnectorService).configure(BASE_URL, '', '', []); + this.app.get(BlockchainConnectorService).configure(BASE_URL, '', '', [], blockchainRetryCfg); (this.app.getHttpServer() as Server).listen(); this.server = request(this.app.getHttpServer());