Skip to content

Commit

Permalink
PR feedback
Browse files Browse the repository at this point in the history
Signed-off-by: Nicko Guyer <nicko.guyer@kaleido.io>
  • Loading branch information
nguyer committed Feb 26, 2024
1 parent 9bf89be commit 0b76326
Show file tree
Hide file tree
Showing 34 changed files with 143 additions and 88 deletions.
1 change: 0 additions & 1 deletion .env
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,3 @@ ETHCONNECT_URL=http://127.0.0.1:5102
ETHCONNECT_INSTANCE=/contracts/erc1155
ETHCONNECT_TOPIC=token
CONTRACT_ADDRESS=
AUTO_INIT=true
2 changes: 1 addition & 1 deletion src/app.module.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2021 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down
2 changes: 1 addition & 1 deletion src/event-stream/event-stream.interfaces.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down
2 changes: 1 addition & 1 deletion src/event-stream/event-stream.module.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down
2 changes: 1 addition & 1 deletion src/event-stream/event-stream.service.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down
22 changes: 15 additions & 7 deletions src/event-stream/event-stream.service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand All @@ -22,7 +22,7 @@ import WebSocket from 'ws';
import { FFRequestIDHeader } from '../request-context/constants';
import { Context, newContext } from '../request-context/request-context.decorator';
import { IAbiMethod } from '../tokens/tokens.interfaces';
import { getHttpRequestOptions, getWebsocketOptions } from '../utils';
import { eventStreamName, getHttpRequestOptions, getWebsocketOptions } from '../utils';
import {
Event,
EventBatch,
Expand Down Expand Up @@ -68,7 +68,7 @@ export class EventStreamSocket {
} else {
this.logger.log('Event stream websocket connected');
}
this.produce({ type: 'listen', topic: `${this.topic}/${this.namespace}` });
this.produce({ type: 'listen', topic: `${eventStreamName(this.topic, this.namespace)}` });
this.produce({ type: 'listenreplies' });
this.ping();
})
Expand All @@ -84,7 +84,7 @@ export class EventStreamSocket {
}
})
.on('message', (message: string) => {
this.logger.debug(`WS => ${message}`);
this.logger.verbose(`WS => ${message}`);
this.handleMessage(JSON.parse(message));
})
.on('pong', () => {
Expand All @@ -111,11 +111,19 @@ export class EventStreamSocket {
}

ack(batchNumber: number | undefined) {
this.produce({ type: 'ack', topic: `${this.topic}/${this.namespace}`, batchNumber });
this.produce({
type: 'ack',
topic: `${eventStreamName(this.topic, this.namespace)}`,
batchNumber,
});
}

nack(batchNumber: number | undefined) {
this.produce({ type: 'nack', topic: `${this.topic}/${this.namespace}`, batchNumber });
this.produce({
type: 'nack',
topic: `${eventStreamName(this.topic, this.namespace)}`,
batchNumber,
});
}

close() {
Expand Down Expand Up @@ -345,7 +353,7 @@ export class EventStreamService {
handleEvents: (events: EventBatch) => void,
handleReceipt: (receipt: EventStreamReply) => void,
) {
const name = `${topic}/${namespace}`;
const name = eventStreamName(topic, namespace);
await this.createOrUpdateStream(newContext(), name, topic);

return new EventStreamSocket(
Expand Down
66 changes: 39 additions & 27 deletions src/eventstream-proxy/eventstream-proxy.base.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand All @@ -15,12 +15,12 @@
// limitations under the License.

import { Logger } from '@nestjs/common';
import { MessageBody, SubscribeMessage } from '@nestjs/websockets';
import { v4 as uuidv4 } from 'uuid';
import { Context, newContext } from '../request-context/request-context.decorator';
import { EventBatch, EventStreamReply } from '../event-stream/event-stream.interfaces';
import { EventStreamService, EventStreamSocket } from '../event-stream/event-stream.service';
import {
WebSocketAck,
WebSocketActionBase,
WebSocketEventsBase,
WebSocketEx,
Expand Down Expand Up @@ -49,7 +49,8 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase {

private connectListeners: ConnectionListener[] = [];
private eventListeners: EventListener[] = [];
private awaitingAck: WebSocketMessageWithId[] = [];
// Map of client IDs to all the messages for which we are awaiting an ack
private awaitingAck: Map<string, WebSocketMessageWithId[]> = new Map();
private subscriptionNames = new Map<string, string>();
private queue = Promise.resolve();

Expand All @@ -75,6 +76,9 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase {
const startAction = action as WebSocketStart;
this.startListening(client, startAction.namespace);
break;
case 'ack':
const ackAction = action as WebSocketAck;
this.handleAck(client, ackAction);
}
});
}
Expand Down Expand Up @@ -134,12 +138,13 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase {
// Nack any messages that are inflight for that namespace
const nackedMessageIds: Set<string> = new Set();
this.awaitingAck
.filter(msg => msg.namespace === namespace)
?.get(client.id)
?.filter(msg => msg.namespace === namespace)
.map(msg => {
this.namespaceEventStreamSocket.get(namespace)?.nack(msg.batchNumber);
nackedMessageIds.add(msg.id);
});
this.awaitingAck = this.awaitingAck.filter(msg => nackedMessageIds.has(msg.id));
this.awaitingAck.delete(client.id);

// If all clients for this namespace have disconnected, also close the connection to EVMConnect
if (clientSet.size == 0) {
Expand Down Expand Up @@ -189,8 +194,7 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase {
},
batchNumber: batch.batchNumber,
};
this.awaitingAck.push(message);
this.send(namespace, JSON.stringify(message));
this.send(namespace, message);
}

private async getSubscriptionName(ctx: Context, subId: string) {
Expand All @@ -211,41 +215,49 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase {
return undefined;
}

@SubscribeMessage('ack')
handleAck(@MessageBody() data: AckMessageData) {
handleAck(client: WebSocketEx, data: AckMessageData) {
if (data.id === undefined) {
this.logger.error('Received malformed ack');
return;
}

const inflight = this.awaitingAck.find(msg => msg.id === data.id);
this.logger.log(`Received ack ${data.id} inflight=${!!inflight}`);
if (this.namespaceEventStreamSocket !== undefined && inflight !== undefined) {
this.awaitingAck = this.awaitingAck.filter(msg => msg.id !== data.id);
if (
// If nothing is left awaiting an ack - then we clearly need to ack
this.awaitingAck.length === 0 ||
// Or if we have a batch number associated with this ID, then we can only ack if there
// are no other messages in-flight with the same batch number.
(inflight.batchNumber !== undefined &&
!this.awaitingAck.find(msg => msg.batchNumber === inflight.batchNumber))
) {
this.logger.log(`In-flight batch complete (batchNumber=${inflight.batchNumber})`);
this.namespaceEventStreamSocket.get(inflight.namespace)?.ack(inflight.batchNumber);
let awaitingAck = this.awaitingAck.get(client.id);

if (awaitingAck) {
const inflight = awaitingAck.find(msg => msg.id === data.id);
this.logger.log(`Received ack ${data.id} inflight=${!!inflight}`);
if (this.namespaceEventStreamSocket !== undefined && inflight !== undefined) {
// Remove the acked message id from the queue
awaitingAck = awaitingAck.filter(msg => msg.id !== data.id);
this.awaitingAck.set(client.id, awaitingAck);
if (
// If nothing is left awaiting an ack - then we clearly need to ack
awaitingAck.length === 0 ||
// Or if we have a batch number associated with this ID, then we can only ack if there
// are no other messages in-flight with the same batch number.
(inflight.batchNumber !== undefined &&
!awaitingAck.filter(msg => msg.batchNumber === inflight.batchNumber))
) {
this.logger.log(`In-flight batch complete (batchNumber=${inflight.batchNumber})`);
this.namespaceEventStreamSocket.get(inflight.namespace)?.ack(inflight.batchNumber);
}
}
} else {
this.logger.warn(`Received unrecognized ack from client ${client.id} for message ${data.id}`);
}
}

send(namespace, payload: string) {
send(namespace, payload: WebSocketMessageWithId) {
const clients = this.namespaceClients.get(namespace);
if (clients) {
// Randomly select a connected client for this namespace to distribute load
const selected = Math.floor(Math.random() * clients.size);
let i = 0;
for (let client of clients.keys()) {
for (const client of clients.keys()) {
if (i++ == selected) {
this.logger.debug(`WS <= ${payload}`);
client.send(payload);
this.awaitingAck.get(client.id)?.push(payload);
this.logger.verbose(`WS <= ${payload}`);
client.send(JSON.stringify(payload));
return;
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/eventstream-proxy/eventstream-proxy.gateway.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down
2 changes: 1 addition & 1 deletion src/eventstream-proxy/eventstream-proxy.gateway.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down
2 changes: 1 addition & 1 deletion src/eventstream-proxy/eventstream-proxy.interfaces.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down
2 changes: 1 addition & 1 deletion src/eventstream-proxy/eventstream-proxy.module.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down
2 changes: 1 addition & 1 deletion src/health/health.controller.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down
2 changes: 1 addition & 1 deletion src/health/health.module.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down
2 changes: 1 addition & 1 deletion src/main.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down
2 changes: 1 addition & 1 deletion src/request-context/request-id.middleware.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down
2 changes: 1 addition & 1 deletion src/request-logging.interceptor.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2021 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down
2 changes: 1 addition & 1 deletion src/request-logging.interceptor.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2021 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down
2 changes: 1 addition & 1 deletion src/tokens/abimapper.service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down
2 changes: 1 addition & 1 deletion src/tokens/blockchain.service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down
2 changes: 1 addition & 1 deletion src/tokens/erc1155.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down
2 changes: 1 addition & 1 deletion src/tokens/erc165.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down
2 changes: 1 addition & 1 deletion src/tokens/tokens.controller.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2021 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down
7 changes: 5 additions & 2 deletions src/tokens/tokens.controller.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2021 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -48,7 +48,10 @@ import { TokensService } from './tokens.service';

@Controller()
export class TokensController {
constructor(private service: TokensService, private blockchain: BlockchainConnectorService) {}
constructor(
private service: TokensService,
private blockchain: BlockchainConnectorService,
) {}

@Post('init')
@HttpCode(204)
Expand Down
2 changes: 1 addition & 1 deletion src/tokens/tokens.interfaces.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2021 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down
2 changes: 1 addition & 1 deletion src/tokens/tokens.listener.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2021 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down
2 changes: 1 addition & 1 deletion src/tokens/tokens.module.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2021 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down
2 changes: 1 addition & 1 deletion src/tokens/tokens.service.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down
Loading

0 comments on commit 0b76326

Please sign in to comment.