Skip to content

Commit

Permalink
feat: add listener callback to socket-io client subscription functions
Browse files Browse the repository at this point in the history
  • Loading branch information
zone117x committed Jan 5, 2024
1 parent 5050763 commit cc54153
Show file tree
Hide file tree
Showing 7 changed files with 2,640 additions and 2,155 deletions.
4,630 changes: 2,565 additions & 2,065 deletions client/package-lock.json

Large diffs are not rendered by default.

27 changes: 14 additions & 13 deletions client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,27 +43,28 @@
"cross-fetch": "3.1.5",
"eventemitter3": "4.0.7",
"jsonrpc-lite": "2.2.0",
"socket.io-client": "4.6.1",
"ws": "7.5.6"
"socket.io-client": "4.7.3",
"ws": "8.16.0"
},
"devDependencies": {
"@apidevtools/swagger-cli": "4.0.4",
"@openapitools/openapi-generator-cli": "2.4.21",
"@stacks/eslint-config": "1.2.0",
"@stacks/prettier-config": "0.0.7",
"@typescript-eslint/eslint-plugin": "4.33.0",
"@typescript-eslint/parser": "4.33.0",
"@stacks/eslint-config": "2.0.0",
"@stacks/prettier-config": "0.0.10",
"@types/node": "20.10.6",
"@typescript-eslint/eslint-plugin": "6.17.0",
"@typescript-eslint/parser": "6.17.0",
"concurrently": "7.6.0",
"eslint": "7.32.0",
"eslint-config-prettier": "8.3.0",
"eslint-plugin-prettier": "3.4.1",
"eslint": "8.56.0",
"eslint-config-prettier": "9.1.0",
"eslint-plugin-prettier": "5.1.2",
"http-server": "14.0.0",
"microbundle": "0.13.3",
"prettier": "2.8.6",
"prettier": "3.1.1",
"rimraf": "5.0.0",
"shx": "0.3.3",
"ts-node": "9.1.1",
"typedoc": "0.23.10",
"typescript": "4.6.2"
"ts-node": "10.9.2",
"typedoc": "0.25.6",
"typescript": "5.3.3"
}
}
79 changes: 43 additions & 36 deletions client/src/socket-io/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@ import {
ClientToServerMessages,
Topic,
ServerToClientMessages,
MempoolTransaction,
Transaction,
Block,
Microblock,
AddressTransactionWithTransfers,
NftEvent,
AddressStxBalanceResponse,
} from '@stacks/stacks-blockchain-api-types';
import { BASE_PATH } from '../generated/runtime';

Expand Down Expand Up @@ -49,7 +56,7 @@ export class StacksApiSocketClient {
return new StacksApiSocketClient(socket);
}

handleSubscription(topic: Topic, subscribe = false) {
handleSubscription(topic: Topic, subscribe = false, listener?: (...args: any[]) => void) {
const subscriptions = new Set(this.socket.io.opts.query?.subscriptions.split(',') ?? []);
if (subscribe) {
this.socket.emit('subscribe', topic, error => {
Expand All @@ -67,81 +74,93 @@ export class StacksApiSocketClient {
this.socket.io.opts.query.subscriptions = Array.from(subscriptions).join(',');
return {
unsubscribe: () => {
if (listener) {
this.socket.off(topic, listener);
}
this.handleSubscription(topic, false);
},
};
}

subscribeBlocks() {
return this.handleSubscription('block', true);
subscribeBlocks(listener?: (tx: Block) => void) {
if (listener) this.socket.on('block', listener);
return this.handleSubscription('block', true, listener);
}

unsubscribeBlocks() {
this.handleSubscription('block', false);
}

subscribeMicroblocks() {
return this.handleSubscription('microblock', true);
subscribeMicroblocks(listener?: (tx: Microblock) => void) {
if (listener) this.socket.on('microblock', listener);
return this.handleSubscription('microblock', true, listener);
}

unsubscribeMicroblocks() {
this.handleSubscription('microblock', false);
}

subscribeMempool() {
return this.handleSubscription('mempool', true);
subscribeMempool(listener?: (tx: MempoolTransaction) => void) {
if (listener) this.socket.on('mempool', listener);
return this.handleSubscription('mempool', true, listener);
}

unsubscribeMempool() {
this.handleSubscription('mempool', false);
}

subscribeAddressTransactions(address: string) {
return this.handleSubscription(`address-transaction:${address}` as const, true);
subscribeAddressTransactions(address: string, listener?: (address: string, tx: AddressTransactionWithTransfers) => void) {
if (listener) this.socket.on(`address-transaction:${address}`, listener);
return this.handleSubscription(`address-transaction:${address}`, true, listener);
}

unsubscribeAddressTransactions(address: string) {
this.handleSubscription(`address-transaction:${address}` as const, false);
this.handleSubscription(`address-transaction:${address}`, false);
}

subscribeAddressStxBalance(address: string) {
return this.handleSubscription(`address-stx-balance:${address}` as const, true);
subscribeAddressStxBalance(address: string, listener?: (address: string, stxBalance: AddressStxBalanceResponse) => void) {
if (listener) this.socket.on(`address-stx-balance:${address}`, listener);
return this.handleSubscription(`address-stx-balance:${address}`, true, listener);
}

unsubscribeAddressStxBalance(address: string) {
this.handleSubscription(`address-stx-balance:${address}` as const, false);
this.handleSubscription(`address-stx-balance:${address}`, false);
}

subscribeTransaction(txId: string) {
return this.handleSubscription(`transaction:${txId}` as const, true);
subscribeTransaction(txId: string, listener?: (tx: MempoolTransaction | Transaction) => void) {
if (listener) this.socket.on(`transaction:${txId}`, listener);
return this.handleSubscription(`transaction:${txId}`, true, listener);
}

unsubscribeTransaction(txId: string) {
this.handleSubscription(`transaction:${txId}` as const, false);
this.handleSubscription(`transaction:${txId}`, false);
}

subscribeNftEvent() {
return this.handleSubscription('nft-event', true);
subscribeNftEvent(listener?: (event: NftEvent) => void) {
if (listener) this.socket.on('nft-event', listener);
return this.handleSubscription('nft-event', true, listener);
}

unsubscribeNftEvent() {
this.handleSubscription('nft-event', false);
}

subscribeNftAssetEvent(assetIdentifier: string, value: string) {
return this.handleSubscription(`nft-asset-event:${assetIdentifier}+${value}` as const, true);
subscribeNftAssetEvent(assetIdentifier: string, value: string, listener?: (assetIdentifier: string, value: string, event: NftEvent) => void) {
if (listener) this.socket.on(`nft-asset-event:${assetIdentifier}+${value}`, listener);
return this.handleSubscription(`nft-asset-event:${assetIdentifier}+${value}`, true, listener);
}

unsubscribeNftAssetEvent(assetIdentifier: string, value: string) {
this.handleSubscription(`nft-asset-event:${assetIdentifier}+${value}` as const, false);
this.handleSubscription(`nft-asset-event:${assetIdentifier}+${value}`, false);
}

subscribeNftCollectionEvent(assetIdentifier: string) {
return this.handleSubscription(`nft-collection-event:${assetIdentifier}` as const, true);
subscribeNftCollectionEvent(assetIdentifier: string, listener?: (assetIdentifier: string, event: NftEvent) => void) {
if (listener) this.socket.on(`nft-collection-event:${assetIdentifier}`, listener);
return this.handleSubscription(`nft-collection-event:${assetIdentifier}`, true, listener);
}

unsubscribeNftCollectionEvent(assetIdentifier: string) {
this.handleSubscription(`nft-collection-event:${assetIdentifier}` as const, false);
this.handleSubscription(`nft-collection-event:${assetIdentifier}`, false);
}

logEvents() {
Expand All @@ -151,18 +170,6 @@ export class StacksApiSocketClient {
this.socket.on('block', block => console.log('block', block));
this.socket.on('microblock', microblock => console.log('microblock', microblock));
this.socket.on('mempool', tx => console.log('mempool', tx));
this.socket.on('address-transaction', (address, data) =>
console.log('address-transaction', address, data)
);
this.socket.on('address-stx-balance', (address, data) =>
console.log('address-stx-balance', address, data)
);
this.socket.on('nft-event', event => console.log('nft-event', event));
this.socket.on('nft-asset-event', (assetIdentifier, value, event) =>
console.log('nft-asset-event', assetIdentifier, value, event)
);
this.socket.on('nft-collection-event', (assetIdentifier, event) =>
console.log('nft-collection-event', assetIdentifier, event)
);
}
}
2 changes: 1 addition & 1 deletion client/tsconfig.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"compilerOptions": {
"target": "es2017",
"target": "es2020",
"module": "commonjs",
"moduleResolution": "node",
"declaration": true,
Expand Down
17 changes: 1 addition & 16 deletions docs/socket-io/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,25 +37,10 @@ export interface ServerToClientMessages<isSender extends boolean = false> {
block: (block: Block, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
microblock: (microblock: Microblock, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
mempool: (transaction: MempoolTransaction, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
transaction: (transaction: Transaction | MempoolTransaction, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;

// @ts-ignore scheduled for support in TS v4.3 https://github.com/microsoft/TypeScript/pull/26797
[key: 'nft-event']: (event: NftEvent, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
'nft-event': (event: NftEvent, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;

// @ts-ignore scheduled for support in TS v4.3 https://github.com/microsoft/TypeScript/pull/26797
[key: TransactionTopic]: (transaction: Transaction | MempoolTransaction, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
[key: NftAssetEventTopic]: (assetIdentifier: string, value: string, event: NftEvent, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
'nft-asset-event': (assetIdentifier: string, value: string, event: NftEvent, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;

// @ts-ignore scheduled for support in TS v4.3 https://github.com/microsoft/TypeScript/pull/26797
[key: NftCollectionEventTopic]: (assetIdentifier: string, event: NftEvent, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
'nft-collection-event': (assetIdentifier: string, event: NftEvent, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;

// @ts-ignore scheduled for support in TS v4.3 https://github.com/microsoft/TypeScript/pull/26797
[key: AddressTransactionTopic]: (address: string, stxBalance: AddressTransactionWithTransfers, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
'address-transaction': (address: string, tx: AddressTransactionWithTransfers, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;

// @ts-ignore scheduled for support in TS v4.3 https://github.com/microsoft/TypeScript/pull/26797
[key: AddressStxBalanceTopic]: (address: string, stxBalance: AddressStxBalanceResponse, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
'address-stx-balance': (address: string, stxBalance: AddressStxBalanceResponse, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
}
36 changes: 14 additions & 22 deletions src/api/routes/ws/channels/socket-io-channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ import {
AddressStxBalanceTopic,
AddressTransactionTopic,
ClientToServerMessages,
NftAssetEventTopic,
NftCollectionEventTopic,
ServerToClientMessages,
Topic,
TransactionTopic,
} from 'docs/socket-io';
import * as http from 'http';
import { Server as SocketIOServer } from 'socket.io';
Expand Down Expand Up @@ -63,8 +66,8 @@ export class SocketIOChannel extends WebSocketChannel {
const topics = [...[subscriptions]].flat().flatMap(r => r.split(','));
for (const topic of topics) {
this.prometheus?.subscribe(socket, topic);
await socket.join(topic);
}
await socket.join(topics);
}
socket.on('disconnect', reason => {
logger.debug(`disconnected ${socket.id}: ${reason}`, component);
Expand Down Expand Up @@ -170,15 +173,7 @@ export class SocketIOChannel extends WebSocketChannel {
if (!this.io) {
return;
}
const sockets = [];
const socketIds = await this.io.to(room).allSockets();
for (const id of socketIds) {
const socket = this.io.sockets.sockets.get(id);
if (socket) {
sockets.push(socket);
}
}
return sockets;
return await this.io.to(room).fetchSockets();
}

send<P extends keyof WebSocketPayload>(
Expand Down Expand Up @@ -225,9 +220,10 @@ export class SocketIOChannel extends WebSocketChannel {
case 'transaction': {
const [tx] = args as ListenerType<WebSocketPayload['transaction']>;
this.prometheus?.sendEvent('transaction');
void this.getTopicSockets(`transaction:${tx.tx_id}`).then(sockets =>
const topic: TransactionTopic = `transaction:${tx.tx_id}`;
void this.getTopicSockets(topic).then(sockets =>
sockets?.forEach(socket =>
socket.timeout(timeout).emit('transaction', tx, _ => socket.disconnect(true))
socket.timeout(timeout).emit(topic, tx, _ => socket.disconnect(true))
)
);
break;
Expand All @@ -247,11 +243,12 @@ export class SocketIOChannel extends WebSocketChannel {
WebSocketPayload['nftAssetEvent']
>;
this.prometheus?.sendEvent('nft-asset-event');
void this.getTopicSockets(`nft-event`).then(sockets =>
const topic: NftAssetEventTopic = `nft-asset-event:${assetIdentifier}+${value}`;
void this.getTopicSockets(topic).then(sockets =>
sockets?.forEach(socket =>
socket
.timeout(timeout)
.emit('nft-asset-event', assetIdentifier, value, event, _ => socket.disconnect(true))
.emit(topic, assetIdentifier, value, event, _ => socket.disconnect(true))
)
);
break;
Expand All @@ -261,11 +258,12 @@ export class SocketIOChannel extends WebSocketChannel {
WebSocketPayload['nftCollectionEvent']
>;
this.prometheus?.sendEvent('nft-collection-event');
void this.getTopicSockets(`nft-event`).then(sockets =>
const topic: NftCollectionEventTopic = `nft-collection-event:${assetIdentifier}`;
void this.getTopicSockets(topic).then(sockets =>
sockets?.forEach(socket =>
socket
.timeout(timeout)
.emit('nft-collection-event', assetIdentifier, event, _ => socket.disconnect(true))
.emit(topic, assetIdentifier, event, _ => socket.disconnect(true))
)
);
break;
Expand All @@ -276,9 +274,6 @@ export class SocketIOChannel extends WebSocketChannel {
this.prometheus?.sendEvent('address-transaction');
void this.getTopicSockets(topic).then(sockets =>
sockets?.forEach(socket => {
socket
.timeout(timeout)
.emit('address-transaction', principal, tx, _ => socket.disconnect(true));
socket.timeout(timeout).emit(topic, principal, tx, _ => socket.disconnect(true));
})
);
Expand All @@ -290,9 +285,6 @@ export class SocketIOChannel extends WebSocketChannel {
this.prometheus?.sendEvent('address-stx-balance');
void this.getTopicSockets(topic).then(sockets =>
sockets?.forEach(socket => {
socket
.timeout(timeout)
.emit('address-stx-balance', principal, balance, _ => socket.disconnect(true));
socket.timeout(timeout).emit(topic, principal, balance, _ => socket.disconnect(true));
})
);
Expand Down
4 changes: 2 additions & 2 deletions src/tests/socket-io-tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -293,12 +293,12 @@ describe('socket-io', () => {
socket.on(`nft-event`, event => {
nftEventWaiters[event.event_index].finish(event);
});
socket.on(`nft-asset-event`, (assetIdentifier, value, event) => {
socket.on(`nft-asset-event:${crashPunks}+${valueHex1}`, (assetIdentifier, value, event) => {
if (assetIdentifier == crashPunks && value == valueHex1) {
crashPunksWaiter.finish(event);
}
});
socket.on(`nft-collection-event`, (assetIdentifier, event) => {
socket.on(`nft-collection-event:${wastelandApes}`, (assetIdentifier, event) => {
if (assetIdentifier == wastelandApes) {
if (event.event_index == 2) {
apeWaiters[0].finish(event);
Expand Down

0 comments on commit cc54153

Please sign in to comment.