Skip to content

Commit

Permalink
add event poller
Browse files Browse the repository at this point in the history
  • Loading branch information
peterjah committed Aug 21, 2024
1 parent fb88e5a commit 7d416d6
Show file tree
Hide file tree
Showing 14 changed files with 220 additions and 7 deletions.
1 change: 1 addition & 0 deletions .eslintrc.cjs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ module.exports = {
enforceConst: true,
detectObjects: true,
ignoreEnums: true,
ignore: [0,1]
},
],
'@typescript-eslint/ban-tslint-comment': 'off',
Expand Down
1 change: 1 addition & 0 deletions open_rpc/massa.api.json
Original file line number Diff line number Diff line change
Expand Up @@ -2682,6 +2682,7 @@
"current_cycle_time",
"next_cycle_time",
"network_stats",
"last_slot",
"next_slot",
"node_id",
"pool_stats",
Expand Down
9 changes: 8 additions & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@
"ts-node": "^10.9.1",
"typedoc": "^0.25.7",
"typescript": "^5.4.5",
"typescript-eslint": "^7.8.0"
"typescript-eslint": "^7.8.0",
"wait-for-expect": "^3.0.2"
},
"optionalDependencies": {
"bufferutil": "^4.0.7",
Expand Down
1 change: 1 addition & 0 deletions src/client/constants.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export const NB_THREADS = 32
1 change: 1 addition & 0 deletions src/client/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export * from './types'
export * from './jsonRPCClient'
export * from './publicAPI'
export * from './constants'
9 changes: 7 additions & 2 deletions src/client/publicAPI.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ export class PublicAPI {
}

async getAddressInfo(address: string): Promise<AddressInfo> {
return this.getMultipleAddressInfo([address]).then((r) => r[FIRST])
return this.getMultipleAddressInfo([address]).then((r) => r[0])
}

async getBalance(address: string, final = true): Promise<Mas.Mas> {
Expand All @@ -160,7 +160,7 @@ export class PublicAPI {
return withRetry(
() => this.connector.get_addresses_bytecode([addressFilter]),
this.options.retry!
).then((r) => r[FIRST])
).then((r) => r[0])
}

async executeMultipleGetAddressesBytecode(
Expand Down Expand Up @@ -375,6 +375,11 @@ export class PublicAPI {
return status.last_slot.period
}

async getCurrentSlot(): Promise<Slot> {
const { last_slot } = await this.status()
return last_slot
}

private static convertOperationInput(
data: SendOperationInput
): OperationInput {
Expand Down
7 changes: 6 additions & 1 deletion src/client/types.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { EventExecutionContext, Slot } from '../generated/client'
import { EventExecutionContext } from '../generated/client'

export enum Transport {
WebSocket = 'websocket',
Expand All @@ -8,6 +8,11 @@ export enum Transport {
PostMessageIframe = 'postmessageiframe',
}

export type Slot = {
period: number
thread: number
}

export type TransportOptions = {
path?: string
protocol?: string
Expand Down
119 changes: 119 additions & 0 deletions src/events/eventsPoller.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import { EventEmitter } from 'events'
import { Provider } from '../provider'
import { EventFilter, NB_THREADS, SCEvent, Slot } from '../client'

/** Smart Contracts Event Poller */
export const ON_MASSA_EVENT_DATA = 'ON_MASSA_EVENT'
export const ON_MASSA_EVENT_ERROR = 'ON_MASSA_ERROR'

export const DEFAULT_POLL_INTERVAL_MS = 1000

// get the next slot
function nextSlot(prevSlot: Slot): Slot {
const slot = prevSlot
if (slot.thread < NB_THREADS - 1) {
slot.thread++
} else {
slot.thread = 0
slot.period++
}
return slot
}

/**
* The EventPoller class provides a convenient way to poll events from the Massa network.
*/
export class EventPoller extends EventEmitter {
private intervalId: NodeJS.Timeout
private lastSlot: Slot

/**
* Constructor of the EventPoller object.
*
* @param provider - The provider to use for polling.
* @param eventsFilter - The filter to use for the events.
* @param pollIntervalMs - The interval in milliseconds to poll for events.
*/
public constructor(
private readonly provider: Provider,
private readonly eventsFilter: EventFilter,
private readonly pollIntervalMs: number
) {
super()
}

private poll = async (): Promise<void> => {
try {
// get all events using the filter.
if (this.lastSlot) {
this.eventsFilter.start = nextSlot(this.lastSlot)
}
const events = await this.provider.getEvents(this.eventsFilter)

if (events.length) {
this.emit(ON_MASSA_EVENT_DATA, events)
this.lastSlot = events[events.length - 1].context.slot
}
} catch (ex) {
this.emit(ON_MASSA_EVENT_ERROR, ex)
}

// reset the interval.
this.intervalId = setTimeout(this.poll, this.pollIntervalMs)
}

/**
* Stops polling for events.
*/
private stop = (): void => {
if (this.intervalId?.hasRef()) {
clearInterval(this.intervalId)
}
}

/**
* Starts polling for events.
*/
private start(): void {
this.stop()
this.intervalId = setInterval(this.poll, this.pollIntervalMs)
}

/**
* Starts polling for events and returns the stopPolling function.
*
* @param provider - The provider to use for polling.
* @param eventsFilter - The filter to use for the events.
* @param onData - The callback function to call when new events are found.
* @param onError - The callback function to call when an error occurs.
* @param pollIntervalMs - The interval in milliseconds to poll for events. Default is 1000Ms.
*
* @returns An object containing the stopPolling function.
*/
// eslint-disable-next-line max-params
public static start(
provider: Provider,
eventsFilter: EventFilter,
onData?: (data: SCEvent[]) => void,
onError?: (err: Error) => void,
pollIntervalMs = DEFAULT_POLL_INTERVAL_MS
): { stopPolling: () => void } {
const eventPoller = new EventPoller(provider, eventsFilter, pollIntervalMs)
if (onData) {
eventPoller.on(ON_MASSA_EVENT_DATA, (data: SCEvent[]) => {
onData(data)
})
}
if (onError) {
eventPoller.on(ON_MASSA_EVENT_ERROR, (e) => {
onError(e)
})
}

eventPoller.start()

return {
stopPolling: eventPoller.stop,
}
}
}
1 change: 1 addition & 0 deletions src/events/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './eventsPoller'
4 changes: 2 additions & 2 deletions src/generated/client.ts

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ export * from './utils'
export * from './operation'
export * from './provider'
export * from './contracts-wrappers'
export * from './events'
1 change: 1 addition & 0 deletions test/integration/MRC20.spec.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { EventPoller } from '../../src'
import { MRC20 } from '../../src/contracts-wrappers'
import { provider } from './setup'

Expand Down
69 changes: 69 additions & 0 deletions test/integration/events.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import { EventFilter, EventPoller, SCEvent } from '../../src'
import { MRC20 } from '../../src/contracts-wrappers'
import { provider } from './setup'
import waitForExpect from 'wait-for-expect'

const USDC = 'AS12k8viVmqPtRuXzCm6rKXjLgpQWqbuMjc37YHhB452KSUUb9FgL'

let usdcContract: MRC20

describe('SC Event tests', () => {
beforeAll(async () => {
usdcContract = new MRC20(provider, USDC)
})

test('poll transfer event from caller and contract addr', async () => {
const amount = 1_000n
const currentSlot = await provider.client.getCurrentSlot()
const operation = await usdcContract.transfer(
'AU1wN8rn4SkwYSTDF3dHFY4U28KtsqKL1NnEjDZhHnHEy6cEQm53',
amount
)
await operation.waitSpeculativeExecution()

let events: SCEvent[] = []

const filter = {
smartContractAddress: USDC,
callerAddress: provider.address,
start: currentSlot,
}

const { stopPolling } = EventPoller.start(provider, filter, (data) => {
events = data
})

await waitForExpect(() => {
expect(events.length).toEqual(1)
expect(events[0].data).toEqual('TRANSFER SUCCESS')
})
stopPolling()
})

test('poll transfer event from operationId', async () => {
const amount = 1_000n
const currentSlot = await provider.client.getCurrentSlot()
const operation = await usdcContract.transfer(
'AU1wN8rn4SkwYSTDF3dHFY4U28KtsqKL1NnEjDZhHnHEy6cEQm53',
amount
)
await operation.waitSpeculativeExecution()

let events: SCEvent[] = []

const filter: EventFilter = {
operationId: operation.id,
start: currentSlot,
}

const { stopPolling } = EventPoller.start(provider, filter, (data) => {
events = data
})

await waitForExpect(() => {
expect(events.length).toEqual(1)
expect(events[0].data).toEqual('TRANSFER SUCCESS')
})
stopPolling()
})
})

0 comments on commit 7d416d6

Please sign in to comment.