Skip to content

Commit

Permalink
Merge pull request #3 from omnia-network/release/0.3.2
Browse files Browse the repository at this point in the history
release/0.3.2
  • Loading branch information
ilbertt authored Dec 15, 2023
2 parents 2b46b15 + eac3528 commit 6395248
Show file tree
Hide file tree
Showing 11 changed files with 364 additions and 119 deletions.
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "ic-websocket-js",
"version": "0.3.1",
"version": "0.3.2",
"description": "IC WebSocket on the Internet Computer",
"license": "MIT",
"repository": {
Expand Down
303 changes: 234 additions & 69 deletions src/ic-websocket.test.ts

Large diffs are not rendered by default.

64 changes: 55 additions & 9 deletions src/ic-websocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { IDL } from "@dfinity/candid";
import { Principal } from "@dfinity/principal";
import {
CanisterAckMessageContent,
CanisterCloseMessageContent,
CanisterWsMessageArguments,
ClientKeepAliveMessageContent,
ClientKey,
Expand Down Expand Up @@ -36,10 +37,15 @@ import {
import { WsAgent } from "./agent";

/**
* The default expiration time for receiving an ack message from the canister after sending a message.
* It's **3/2 times** the canister's default send ack period.
* The default interval (in milliseconds) at which the canister sends an ack message.
*/
const DEFAULT_ACK_MESSAGE_TIMEOUT_MS = 450_000;
const DEFAULT_ACK_MESSAGE_INTERVAL_MS = 300_000;
/**
* The maximum communication latency allowed between the client and the canister (same as in the canister).
*
* Used to determine the ack message timeout.
*/
export const COMMUNICATION_LATENCY_BOUND_MS = 30_000;

/**
* Interface to create a new IcWebSocketConfig. For a simple configuration, use {@link createWsConfig}.
Expand All @@ -62,12 +68,12 @@ export interface IcWebSocketConfig<S extends _WS_CANISTER_SERVICE> {
*/
networkUrl: string;
/**
* The expiration (in milliseconds) time for receiving an ack message from the canister after sending a message.
* If the ack message is not received within this time, the connection will be closed.
* This parameter should always me **3/2 times or more** the canister's send ack period.
* @default 450_000 (7.5 minutes = 3/2 default send ack period on the canister)
* The interval (in milliseconds) at which the canister sends an ack message.
* This parameter must be **equal** to the canister's send ack interval.
*
* @default 300_000 (default send ack period on the canister)
*/
ackMessageTimeout?: number;
ackMessageIntervalMs?: number;
/**
* The maximum age of the certificate received from the canister, in minutes. You won't likely need to set this parameter. Used in tests.
*
Expand Down Expand Up @@ -104,6 +110,7 @@ export default class IcWebSocket<
private _clientKey: ClientKey;
private _gatewayPrincipal: Principal | null = null;
private _maxCertificateAgeInMinutes = 5;
private _openTimeout: NodeJS.Timeout | null = null;

onclose: ((this: IcWebSocket<S, ApplicationMessageType>, ev: CloseEvent) => any) | null = null;
onerror: ((this: IcWebSocket<S, ApplicationMessageType>, ev: ErrorEvent) => any) | null = null;
Expand Down Expand Up @@ -174,7 +181,7 @@ export default class IcWebSocket<
});

this._ackMessagesQueue = new AckMessagesQueue({
expirationMs: config.ackMessageTimeout || DEFAULT_ACK_MESSAGE_TIMEOUT_MS,
expirationMs: (config.ackMessageIntervalMs || DEFAULT_ACK_MESSAGE_INTERVAL_MS) + COMMUNICATION_LATENCY_BOUND_MS,
timeoutExpiredCallback: this._onAckMessageTimeout.bind(this),
});

Expand Down Expand Up @@ -226,6 +233,27 @@ export default class IcWebSocket<
this._incomingMessagesQueue.addAndProcess(event.data);
}

private _startOpenTimeout() {
// the timeout is double the maximum allowed network latency,
// because opening the connection involves a message sent by the client and one by the canister
this._openTimeout = setTimeout(() => {
if (!this._isConnectionEstablished) {
logger.error("[onWsOpen] Error: Open timeout expired before receiving the open message");
this._callOnErrorCallback(new Error("Open timeout expired before receiving the open message"));
this._wsInstance.close(4000, "Open connection timeout");
}

this._openTimeout = null;
}, 2 * COMMUNICATION_LATENCY_BOUND_MS);
}

private _cancelOpenTimeout() {
if (this._openTimeout) {
clearTimeout(this._openTimeout);
this._openTimeout = null;
}
}

private async _handleHandshakeMessage(handshakeMessage: GatewayHandshakeMessage): Promise<boolean> {
// at this point, we're sure that the gateway_principal is valid
// because the isGatewayHandshakeMessage function checks it
Expand All @@ -234,6 +262,8 @@ export default class IcWebSocket<

try {
await this._sendOpenMessage();

this._startOpenTimeout();
} catch (error) {
logger.error("[onWsMessage] Handshake message error:", error);
// if a handshake message fails, we can't continue
Expand Down Expand Up @@ -335,12 +365,17 @@ export default class IcWebSocket<
}

this._isConnectionEstablished = true;
this._cancelOpenTimeout();

this._callOnOpenCallback();

this._outgoingMessagesQueue.enableAndProcess();
} else if ("AckMessage" in serviceMessage) {
await this._handleAckMessageFromCanister(serviceMessage.AckMessage);
} else if ("CloseMessage" in serviceMessage) {
await this._handleCloseMessageFromCanister(serviceMessage.CloseMessage);
// we don't have to process any further message (there shouldn't be any anyway)
return false;
} else {
throw new Error("Invalid service message from canister");
}
Expand Down Expand Up @@ -369,6 +404,17 @@ export default class IcWebSocket<
await this._sendKeepAliveMessage();
}

private async _handleCloseMessageFromCanister(content: CanisterCloseMessageContent): Promise<void> {
if ("ClosedByApplication" in content.reason) {
logger.debug("[onWsMessage] Received close message from canister. Reason: ClosedByApplication");
this._wsInstance.close(4001, "ClosedByApplication");
} else {
logger.error("[onWsMessage] Received close message from canister. Reason:", content.reason);
this._callOnErrorCallback(new Error(`Received close message from canister. Reason: ${content.reason}`));
this._wsInstance.close(4000, "Received close message from canister");
}
}

private async _sendKeepAliveMessage(): Promise<void> {
const keepAliveMessageContent: ClientKeepAliveMessageContent = {
last_incoming_sequence_num: this._incomingSequenceNum - BigInt(1),
Expand Down
24 changes: 24 additions & 0 deletions src/idl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,26 @@ export type CanisterAckMessageContent = {
export type ClientKeepAliveMessageContent = {
'last_incoming_sequence_num': bigint,
};
export type CloseMessageReason = {
WrongSequenceNumber: null,
} | {
InvalidServiceMessage: null,
} | {
KeepAliveTimeout: null,
} | {
ClosedByApplication: null
};
export type CanisterCloseMessageContent = {
reason: CloseMessageReason,
};
export type WebsocketServiceMessageContent = {
OpenMessage: CanisterOpenMessageContent,
} | {
AckMessage: CanisterAckMessageContent,
} | {
KeepAliveMessage: ClientKeepAliveMessageContent,
} | {
CloseMessage: CanisterCloseMessageContent,
};

const CanisterOpenMessageContentIdl = IDL.Record({
Expand All @@ -95,10 +109,20 @@ const CanisterAckMessageContentIdl = IDL.Record({
const ClientKeepAliveMessageContentIdl = IDL.Record({
'last_incoming_sequence_num': IDL.Nat64,
});
const CloseMessageReasonIdl = IDL.Variant({
'WrongSequenceNumber': IDL.Null,
'InvalidServiceMessage': IDL.Null,
'KeepAliveTimeout': IDL.Null,
'ClosedByApplication': IDL.Null,
});
const CanisterCloseMessageContentIdl = IDL.Record({
'reason': CloseMessageReasonIdl,
})
const WebsocketServiceMessageContentIdl = IDL.Variant({
'OpenMessage': CanisterOpenMessageContentIdl,
'AckMessage': CanisterAckMessageContentIdl,
'KeepAliveMessage': ClientKeepAliveMessageContentIdl,
'CloseMessage': CanisterCloseMessageContentIdl,
});

export const decodeWebsocketServiceMessageContent = (bytes: Uint8Array): WebsocketServiceMessageContent => {
Expand Down
12 changes: 6 additions & 6 deletions src/queues.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ describe("BaseQueue", () => {

beforeEach(() => {
queue = new BaseQueue({
itemCallback: (message: string) => true,
itemCallback: (_: string) => true,
});
});

Expand Down Expand Up @@ -121,6 +121,7 @@ describe("AckMessagesQueue", () => {
const expirationMs = 1000;

beforeEach(() => {
jest.useFakeTimers();
queue = new AckMessagesQueue({
expirationMs,
timeoutExpiredCallback: jest.fn(),
Expand All @@ -140,7 +141,7 @@ describe("AckMessagesQueue", () => {
});

it("should call the timeoutExpiredCallback for expired items when not receiving any ack", () => {
jest.useFakeTimers().setSystemTime(Date.now() + expirationMs + 1);
jest.setSystemTime(Date.now() + expirationMs + 1);
queue.add(BigInt(1));
jest.advanceTimersByTime(expirationMs + 1);
expect(queue.last()).toBeNull();
Expand Down Expand Up @@ -170,19 +171,18 @@ describe("AckMessagesQueue", () => {

it("should call the timeoutExpiredCallback for expired items when receiving the ack", () => {
queue.add(BigInt(1));
jest.useFakeTimers().setSystemTime(Date.now() + expirationMs + 1);
queue.add(BigInt(2));
queue.add(BigInt(3));
jest.setSystemTime(Date.now() + expirationMs + 1);
queue.ack(BigInt(1));
jest.advanceTimersByTime(expirationMs + 1);
expect(queue.last()).toBeNull();
expect(queue["_timeoutExpiredCallback"]).toHaveBeenCalledWith([BigInt(2)]);
expect(queue["_timeoutExpiredCallback"]).toHaveBeenCalledWith([BigInt(2), BigInt(3)]);
});

it("should call the timeoutExpiredCallback for all expired items after not receiving the ack", () => {
queue.add(BigInt(1));
queue.add(BigInt(2));
queue.add(BigInt(3));
jest.useFakeTimers();
queue.ack(BigInt(1));
jest.advanceTimersByTime(expirationMs);
expect(queue.last()).toBeNull();
Expand Down
11 changes: 4 additions & 7 deletions src/queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,13 +173,10 @@ export class AckMessagesQueue {
}

// for the remaining items in the queue, check if they have expired
// if yes, call the callback for the first expired item
for (const item of this._queue) {
if (Date.now() - item.addedAt >= this._expirationMs) {
// if it has expired and is still in the queue,
// it means it has not been acked, so we call the callback
return this._onTimeoutExpired([item]);
}
// if yes, call the callback for the expired items
const expiredItems = this._queue.filter((item) => Date.now() - item.addedAt >= this._expirationMs);
if (expiredItems.length > 0) {
return this._onTimeoutExpired(expiredItems);
}

this._restartLastAckTimeout();
Expand Down
6 changes: 2 additions & 4 deletions src/test/clients.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ import { Principal } from "@dfinity/principal";

export const canisterId = Principal.fromText("bnz7o-iuaaa-aaaaa-qaaaa-cai");

// Principal: "pmisz-prtlk-b6oe6-bj4fl-6l5fy-h7c2h-so6i7-jiz2h-bgto7-piqfr-7ae"
// const client1Seed = "rabbit fun moral twin food kangaroo egg among adjust pottery measure seek";
export const client1Key: ClientKey = {
client_principal: Principal.fromText("pmisz-prtlk-b6oe6-bj4fl-6l5fy-h7c2h-so6i7-jiz2h-bgto7-piqfr-7ae"),
client_nonce: BigInt("5768810803147064100"),
client_principal: Principal.fromText("kj67s-b5v2y-ahlkr-kmume-xbow6-zwbtj-j4j3m-ae46e-qqrcu-uxiby-yae"),
client_nonce: BigInt("385892949151814926"),
};
5 changes: 4 additions & 1 deletion src/test/constants.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import { fromHex } from "@dfinity/agent";
import { Principal } from "@dfinity/principal";

export const GATEWAY_PRINCIPAL = Principal.fromText("i3gux-m3hwt-5mh2w-t7wwm-fwx5j-6z6ht-hxguo-t4rfw-qp24z-g5ivt-2qe");
export const GATEWAY_PRINCIPAL = Principal.fromText("sqdfl-mr4km-2hfjy-gajqo-xqvh7-hf4mf-nra4i-3it6l-neaw4-soolw-tae");

export const LOCAL_REPLICA_ROOT_KEY = fromHex("d9d9f7a66e69635f6170695f76657273696f6e66302e31382e3068726f6f745f6b65795885308182301d060d2b0601040182dc7c0503010201060c2b0601040182dc7c050302010361008005229d89a17c6f9ec403a4b1a8aa103fc48055046c95f1e60ee2fbfb0bb23ab21617a93f48b99b1199ac89008cf3cf0a83e9da35f5cf27d0d51535ceff89c43ee236c31c3a7865cc6b333194ad3f7155b2931a7ffec2066777dffb20f277ca6c696d706c5f76657273696f6e65302e382e3069696d706c5f68617368784064613931633732316637386462393433346561336630303437383939383836346439313731346538626561363862333963633736326662306263383937313662757265706c6963615f6865616c74685f737461747573676865616c746879706365727469666965645f68656967687418d4");
Loading

0 comments on commit 6395248

Please sign in to comment.