diff --git a/bun.lockb b/bun.lockb index da0cfc56..901b02fd 100644 Binary files a/bun.lockb and b/bun.lockb differ diff --git a/package.json b/package.json index 88d34a7b..13f6faab 100644 --- a/package.json +++ b/package.json @@ -17,8 +17,7 @@ "@twurple/api": "6.0.9", "@twurple/auth": "6.0.9", "@twurple/chat": "6.0.9", - "@twurple/eventsub-base": "6.0.9", - "@twurple/eventsub-http": "6.0.9", + "@twurple/eventsub-base": "^7.1.0", "i18next": "^23.11.5", "i18next-fs-backend": "^2.3.1", "socket.io": "^4.7.5", diff --git a/packages/dota/package.json b/packages/dota/package.json index 9636a078..c7fea7fb 100644 --- a/packages/dota/package.json +++ b/packages/dota/package.json @@ -17,13 +17,8 @@ }, "dependencies": { "@node-steam/id": "^1.2.0", - "@supabase/supabase-js": "^2.43.4", - "@twurple/api": "6.0.9", - "@twurple/auth": "6.0.9", - "@twurple/chat": "6.0.9", "@twurple/ebs-helper": "^7.1.0", - "@twurple/eventsub-base": "6.0.9", - "@twurple/eventsub-http": "6.0.9", + "@twurple/eventsub-ws": "^7.1.0", "@types/long": "^5.0.0", "@types/lru-cache": "^7.10.10", "axios": "1.2.0-alpha.1", @@ -37,6 +32,7 @@ "express-body-parser-error-handler": "^1.0.7", "i18next": "^23.11.5", "i18next-fs-backend": "^2.3.1", + "lodash.debounce": "^4.0.8", "lodash.isequal": "^4.5.0", "lru-cache": "^10.2.2", "mongodb": "^6.7.0", diff --git a/packages/dota/src/db/watcher.ts b/packages/dota/src/db/watcher.ts index 2ac301f2..015fe7f8 100644 --- a/packages/dota/src/db/watcher.ts +++ b/packages/dota/src/db/watcher.ts @@ -6,8 +6,8 @@ import findUser from '../dota/lib/connectedStreamers.js' import { didTellUser, gsiHandlers } from '../dota/lib/consts.js' import { getRankDetail } from '../dota/lib/ranks.js' import { DBSettings, getValueOrDefault } from '../settings.js' -import { twitchChat } from '../twitch/chatClient' import { chatClient } from '../twitch/chatClient.js' +import { twitchChat } from '../twitch/index.js' import { toggleDotabod } from '../twitch/toggleDotabod.js' import { logger } from '../utils/logger.js' import getDBUser from './getDBUser.js' @@ -41,8 +41,11 @@ class SetupSupabase { } toggleHandler = async (userId: string, enable: boolean) => { + if (this.IS_DEV && !this.DEV_CHANNELIDS.includes(userId)) return + if (!this.IS_DEV && this.DEV_CHANNELIDS.includes(userId)) return + const client = await getDBUser({ token: userId }) - if (!client || !this.shouldHandleDevChannel(client.name)) return + if (!client) return toggleDotabod(userId, enable, client.name, client.locale) } @@ -79,6 +82,13 @@ class SetupSupabase { const newObj = payload.new const oldObj = payload.old + if (newObj.scope !== oldObj.scope) { + const client = findUser(newObj.userId) + if (client?.Account) { + client.Account.scope = newObj.scope + } + } + // The frontend will set it to false when they relogin // Which allows us to update the authProvider object if (newObj.requires_refresh === false && oldObj.requires_refresh === true) { @@ -109,34 +119,20 @@ class SetupSupabase { client.stream_online = newObj.stream_online // They go offline - if (!client.stream_online && oldObj.stream_online) { + if (!newObj.stream_online && oldObj.stream_online) { return } // They come online if (client.stream_online && !oldObj.stream_online) { - const connectedUser = gsiHandlers.get(client.token) - if (!connectedUser) return - - const betsEnabled = getValueOrDefault(DBSettings.bets, client.settings) const ONE_DAY_IN_MS = 86_400_000 // 1 day in ms const dayAgo = new Date(Date.now() - ONE_DAY_IN_MS).toISOString() const hasNewestScopes = client.Account?.scope?.includes('channel:bot') - if (!hasNewestScopes && !didTellUser.has(client.name.toLowerCase())) { - twitchChat.emit( - 'say', - client.name.toLowerCase(), - t('refreshToken', { - lng: client.locale, - channel: `@${client.name.toLowerCase().replace('#', '')}`, - }), - ) - didTellUser.add(client.name.toLowerCase()) - return - } + const requiresRefresh = client.Account?.requires_refresh + if ((!hasNewestScopes || requiresRefresh) && !didTellUser.has(client.name)) { + didTellUser.add(client.name) - if (connectedUser.client.Account?.requires_refresh && betsEnabled) { const { data, error } = await supabase .from('bets') .select('created_at') @@ -146,15 +142,15 @@ class SetupSupabase { if (data?.length && !error) { logger.info('[WATCHER USER] Sending refresh token messsage', { - name: connectedUser.client.name, - twitchId: connectedUser.client.Account.providerAccountId, - token: connectedUser.client.token, + name: client.name, + twitchId: client.Account?.providerAccountId, + token: client.token, }) chatClient.say( - connectedUser.client.name, + client.name, t('refreshToken', { - lng: connectedUser.client.locale, - channel: connectedUser.client.name, + lng: client.locale, + channel: client.name, }), ) } @@ -167,13 +163,6 @@ class SetupSupabase { client.stream_start_date = newObj.stream_start_date } - async function handler() { - if (!client) return - - const deets = await getRankDetail(newObj.mmr, client.steam32Id) - server.io.to(client.token).emit('update-medal', deets) - } - // dont overwrite with 0 because we use this variable to track currently logged in mmr if (newObj.mmr !== 0 && client.mmr !== newObj.mmr && oldObj.mmr !== newObj.mmr) { client.mmr = newObj.mmr @@ -184,7 +173,8 @@ class SetupSupabase { mmr: newObj.mmr, }) try { - void handler() + const deets = await getRankDetail(newObj.mmr, client.steam32Id) + server.io.to(client.token).emit('update-medal', deets) } catch (e) { logger.error('Error in watcher postgres update', { e }) } diff --git a/packages/dota/src/dota/lib/consts.ts b/packages/dota/src/dota/lib/consts.ts index 326c8d97..40cb1191 100644 --- a/packages/dota/src/dota/lib/consts.ts +++ b/packages/dota/src/dota/lib/consts.ts @@ -112,4 +112,5 @@ export const pendingCheckAuth = new Map() export const lookingupToken = new Map() // Const holding if we told the user to get new scopes +// We'll lose this every reboot (6 hours) so no need to cleanup export const didTellUser = new Set() diff --git a/packages/dota/src/twitch/chatClient.ts b/packages/dota/src/twitch/chatClient.ts index 94a962e6..6b2ef451 100644 --- a/packages/dota/src/twitch/chatClient.ts +++ b/packages/dota/src/twitch/chatClient.ts @@ -1,110 +1,13 @@ -import { io } from 'socket.io-client' - -import { getAppToken } from '@twurple/auth' +import { twitchChat } from '.' import { findUserByName } from '../dota/lib/connectedStreamers' import { isDev } from '../dota/lib/consts.js' -import { eventsubSocket } from './eventSubSocket' +import { getTwitchHeaders } from './lib/getTwitchHeaders' -// Our docker chat forwarder instance -export const twitchChat = io(`ws://${process.env.HOST_TWITCH_CHAT}:5005`) +// Constants const prefix = isDev ? '[DEV] ' : '' +const headers = await getTwitchHeaders() -const appToken = await getAppToken(process.env.TWITCH_CLIENT_ID!, process.env.TWITCH_CLIENT_SECRET!) - -const twitchHeaders = { - 'Client-Id': process.env.TWITCH_CLIENT_ID!, - Authorization: `Bearer ${appToken?.accessToken}`, - Accept: 'application/json', - 'Accept-Encoding': 'gzip', -} - -const conduitsReq = await fetch('https://api.twitch.tv/helix/eventsub/conduits', { - method: 'GET', - headers: twitchHeaders, -}) -const { data } = await conduitsReq.json() -const conduitId = data[0]?.id || (await createConduit()) -console.log({ conduitId }) - -async function createConduit() { - console.log('Creating conduit') - const createReq = await fetch('https://api.twitch.tv/helix/eventsub/conduits', { - method: 'POST', - headers: { - ...twitchHeaders, - 'Content-Type': 'application/json', - }, - body: JSON.stringify({ shard_count: 1 }), - }) - const { data } = await createReq.json() - const { id } = data[0] - return id -} - -function getShards() { - return fetch(`https://api.twitch.tv/helix/eventsub/conduits/shards?conduit_id=${conduitId}`, { - method: 'GET', - headers: { - ...twitchHeaders, - }, - }) -} - -// The Conduit exists -// lets spawn a WebSocket and assign this socket to a shard -// if we are a ID of auto then the shard ID is forced to 0 if we created... -const mySocket = new eventsubSocket({}) -mySocket.on('connected', async (session_id) => { - console.log(`Socket has connected ${session_id} for ${conduitId}`) - - const body = { - conduit_id: conduitId, - shards: [ - { - id: 0, - transport: { - method: 'websocket', - session_id, - }, - }, - ], - } - // connect the socket to the conduit on the stated shard ID - const conduitUpdate = await fetch('https://api.twitch.tv/helix/eventsub/conduits/shards', { - method: 'PATCH', - headers: { - ...twitchHeaders, - 'Content-Type': 'application/json', - }, - body: JSON.stringify(body), - }) - if (conduitUpdate.status != 202) { - console.error(await conduitUpdate.text(), 'Failed to assign socket to shard') - // console.error( - // `Failed to assign socket to shard ${conduitUpdate.status}//${await conduitUpdate.text()}`, - // ); - return - } else { - console.log('Socket assigned to shard') - } - // check for errors - const { data, errors } = await conduitUpdate.json() - if (errors && errors.length > 0) { - console.error(`Failed to udpate the shard`) - console.error(errors) - } else { - console.log('Shard Updated') - } -}) - -mySocket.on('error', (error) => { - console.error('Socket Error', error) -}) - -mySocket.on('notification', (message) => { - console.log('Socket Message', message) -}) - +// Chat client object export const chatClient = { join: (channel: string) => { twitchChat.emit('join', channel) @@ -112,22 +15,25 @@ export const chatClient = { part: (channel: string) => { twitchChat.emit('part', channel) }, - say: (channel: string, text: string) => { - if (isDev) console.log({ channel, text }) - const user = findUserByName(channel.toLowerCase().replace('#', '')) - const hasNewestScopes = user?.Account?.scope?.includes('channel:bot') - if (hasNewestScopes) { - const newPrefix = prefix ? `${prefix}[NEW-API]` : prefix - void fetch('https://api.twitch.tv/helix/chat/messages', { - method: 'POST', - headers: twitchHeaders, - body: JSON.stringify({ - broadcaster_id: user?.Account?.providerAccountId, - sender_id: process.env.TWITCH_BOT_PROVIDERID, - message: `${newPrefix}${text}`, - }), - }) - return + say: async (channel: string, text: string) => { + // New API is in beta, so only dev enabled for now + if (isDev) { + const user = findUserByName(channel.toLowerCase().replace('#', '')) + const hasNewestScopes = user?.Account?.scope?.includes('channel:bot') + + if (hasNewestScopes) { + const newPrefix = prefix ? `${prefix}[NEW-API] ` : prefix + void fetch('https://api.twitch.tv/helix/chat/messages', { + method: 'POST', + headers: { ...headers, 'Content-Type': 'application/json' }, + body: JSON.stringify({ + broadcaster_id: user?.Account?.providerAccountId, + sender_id: process.env.TWITCH_BOT_PROVIDERID, + message: `${newPrefix}${text}`, + }), + }) + return + } } twitchChat.emit('say', channel, `${prefix}${text}`) diff --git a/packages/dota/src/twitch/chatClientv2.ts b/packages/dota/src/twitch/chatClientv2.ts new file mode 100644 index 00000000..c3183587 --- /dev/null +++ b/packages/dota/src/twitch/chatClientv2.ts @@ -0,0 +1,156 @@ +import { getAppToken } from '@twurple/auth' +import type { EventSubChannelChatMessageEventData } from '@twurple/eventsub-base/lib/events/EventSubChannelChatMessageEvent.external' +import type { EventSubWsPacket } from '@twurple/eventsub-ws/lib/EventSubWsPacket.external' +import { logger } from '../utils/logger' +import { EventsubSocket } from './eventSubSocket' + +// Constants +const headers = await getTwitchHeaders() + +// Function to get Twitch headers +export async function getTwitchHeaders(): Promise> { + const appToken = await getAppToken( + process.env.TWITCH_CLIENT_ID || '', + process.env.TWITCH_CLIENT_SECRET || '', + ) + + return { + 'Client-Id': process.env.TWITCH_CLIENT_ID || '', + Authorization: `Bearer ${appToken?.accessToken}`, + Accept: 'application/json', + 'Accept-Encoding': 'gzip', + } +} + +// Function to fetch conduit ID +async function fetchConduitId(): Promise { + const conduitsReq = await fetch('https://api.twitch.tv/helix/eventsub/conduits', { + method: 'GET', + headers, + }) + + const { data } = await conduitsReq.json() + return data[0]?.id || createConduit() +} + +// Function to create a new conduit +async function createConduit(): Promise { + logger.info('Creating conduit') + const createReq = await fetch('https://api.twitch.tv/helix/eventsub/conduits', { + method: 'POST', + headers: { + ...headers, + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ shard_count: 1 }), + }) + + const { data } = await createReq.json() + return data[0].id +} + +// Function to update conduit shard +async function updateConduitShard(session_id: string, conduitId: string) { + const body = { + conduit_id: conduitId, + shards: [ + { + id: 0, + transport: { + method: 'websocket', + session_id, + }, + }, + ], + } + + const conduitUpdate = await fetch('https://api.twitch.tv/helix/eventsub/conduits/shards', { + method: 'PATCH', + headers: { + ...headers, + 'Content-Type': 'application/json', + }, + body: JSON.stringify(body), + }) + + if (conduitUpdate.status !== 202) { + logger.error('Failed to assign socket to shard', { reason: await conduitUpdate.text() }) + return + } + + logger.info('Socket assigned to shard') + const { errors } = await conduitUpdate.json() + if (errors && errors.length > 0) { + logger.error('Failed to update the shard', { errors }) + } else { + logger.info('Shard Updated') + } +} + +// TODO: Move this to twitch-events package +const subscribeToUserUpdate = async (conduit_id: string, broadcaster_user_id: string) => { + const body = { + type: 'channel.chat.message', + version: '1', + condition: { + user_id: '843245458', // bot dotabod + broadcaster_user_id: '32474777', // TL + }, + transport: { + method: 'conduit', + conduit_id, + }, + } + const subscribeReq = await fetch('https://api.twitch.tv/helix/eventsub/subscriptions', { + method: 'POST', + headers: { + ...headers, + 'Content-Type': 'application/json', + }, + body: JSON.stringify(body), + }) + + if (subscribeReq.status !== 202) { + console.error( + `Failed to subscribe to channel.chat.message ${ + subscribeReq.status + } ${await subscribeReq.text()}`, + ) + return + } + console.log('Subscribed to channel.chat.message') +} + +// TODO: Move this to twitch-chat package +// Initialize WebSocket and handle events +async function initializeSocket() { + const conduitId = await fetchConduitId() + logger.info('Conduit ID', { conduitId }) + + const mySocket = new EventsubSocket() + mySocket.on('connected', async (session_id: string) => { + logger.info(`Socket has connected ${session_id} for ${conduitId}`) + await updateConduitShard(session_id, conduitId) + }) + + mySocket.on('error', (error: Error) => { + logger.error('Socket Error', { error }) + }) + + mySocket.on('notification', (message: EventSubWsPacket) => { + if ( + 'subscription' in message.payload && + 'event' in message.payload && + message.payload.subscription.type === 'channel.chat.message' + ) { + const { + chatter_user_login, + message: { text }, + } = message.payload.event as unknown as EventSubChannelChatMessageEventData + logger.info('Socket Message', { chatter_user_login, text }) + } + logger.info('Socket Message', { message: message.payload }) + }) +} + +initializeSocket() diff --git a/packages/dota/src/twitch/eventSubSocket.ts b/packages/dota/src/twitch/eventSubSocket.ts index 88063e70..59e32591 100644 --- a/packages/dota/src/twitch/eventSubSocket.ts +++ b/packages/dota/src/twitch/eventSubSocket.ts @@ -1,9 +1,20 @@ import { EventEmitter } from 'node:events' import WebSocket from 'ws' -export class eventsubSocket extends EventEmitter { - counter = 0 - closeCodes = { +type EventsubSocketOptions = { + url?: string + connect?: boolean + silenceReconnect?: boolean + disableAutoReconnect?: boolean +} + +type CloseCodeDescription = { + [code: number]: string +} + +export class EventsubSocket extends EventEmitter { + private counter = 0 + private readonly closeCodes: CloseCodeDescription = { 4000: 'Internal Server Error', 4001: 'Client sent inbound traffic', 4002: 'Client failed ping-pong', @@ -13,211 +24,166 @@ export class eventsubSocket extends EventEmitter { 4006: 'Network error', 4007: 'Invalid Reconnect', } + private mainUrl: string + private silenceReconnect: boolean + private disableAutoReconnect: boolean + private backoff = 0 + private backoffStack = 100 + private eventsub!: WebSocket & { + twitch_websocket_id?: string + counter?: number + is_reconnecting?: boolean + } + private silenceHandler?: NodeJS.Timeout + private silenceTime = 10 constructor({ url = 'wss://eventsub.wss.twitch.tv/ws', connect = true, silenceReconnect = true, disableAutoReconnect = false, - }) { + }: EventsubSocketOptions = {}) { super() - + this.mainUrl = url this.silenceReconnect = silenceReconnect this.disableAutoReconnect = disableAutoReconnect - this.mainUrl = url if (connect) { this.connect() } } - mainUrl = 'wss://eventsub.wss.twitch.tv/ws' - //mainUrl = "ws://127.0.0.1:8080/ws"; - backoff = 0 - backoffStack = 100 - - connect(url, is_reconnect) { - this.eventsub = {} + private connect(url = this.mainUrl, isReconnect = false): void { this.counter++ + this.eventsub = new WebSocket(url) as WebSocket & { counter?: number } + this.eventsub.counter = this.counter - url = url ? url : this.mainUrl - is_reconnect = is_reconnect ? is_reconnect : false + this.eventsub.addEventListener('open', this.handleOpen.bind(this)) + this.eventsub.addEventListener('close', (close) => this.handleClose(close, isReconnect)) + this.eventsub.addEventListener('error', this.handleError.bind(this)) + this.eventsub.addEventListener('message', this.handleMessage.bind(this)) + } - console.debug(`Connecting to ${url}`) - // this overrites and kills the old reference - this.eventsub = new WebSocket(url) - this.eventsub.counter = this.counter + private handleOpen(): void { + this.backoff = 0 + console.debug('Opened Connection to Twitch') + } - this.eventsub.addEventListener('open', () => { - this.backoff = 0 - console.debug(`Opened Connection to Twitch`) - }) - - // https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/close_event - // https://github.com/Luka967/websocket-close-codes - this.eventsub.addEventListener('close', (close) => { - // forward the close event - this.emit('close', close) - - console.debug( - `${this.eventsub.twitch_websocket_id}/${this.eventsub.counter} Connection Closed: ${close.code} Reason - ${this.closeCodes[close.code]}`, - ) - - // 4000 well damn - // 4001 we should never get... - // 4002 make a new socket - if (close.code == 4003) { - console.debug( - 'Did not subscribe to anything, the client should decide to reconnect (when it is ready)', - ) - return - } - if (close.code == 4004) { - // this is the old connection dying - // we should of made a new connection to the new socket - console.debug('Old Connection is 4004-ing') - return - } - // 4005 make a new socket - // 4006 make a new socket - // 4007 make a new socket as we screwed up the reconnect? - - // anything else we should auto reconnect - // but only if the user wants - if (this.disableAutoReconnect) { - return - } + private handleClose(close: WebSocket.CloseEvent, isReconnect: boolean): void { + this.emit('close', close) + console.debug( + `Connection Closed: ${close.code} Reason - ${this.closeCodes[close.code] || 'Unknown'}`, + ) + + if (close.code === 4003) { + console.debug('Client should decide to reconnect when it is ready') + return + } - //console.debug(`for ${this.eventsub.counter} making new`); + if (close.code === 4004) { + console.debug('Old Connection is 4004-ing') + return + } + + if (!this.disableAutoReconnect) { this.backoff++ - console.debug('retry in', this.backoff * this.backoffStack) - setTimeout(() => { - this.connect() - }, this.backoff * this.backoffStack) - }) - // https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/error_event - this.eventsub.addEventListener('error', (err) => { - //console.debug(err); - console.debug( - `${this.eventsub.twitch_websocket_id}/${this.eventsub.counter} Connection Error`, - ) - }) - // https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/message_event - this.eventsub.addEventListener('message', (message) => { - //console.debug('Message'); - //console.debug(this.eventsub.counter, message); - let { data } = message - data = JSON.parse(data) - - const { metadata, payload } = data - const { message_id, message_type, message_timestamp } = metadata - //console.debug(`Recv ${message_id} - ${message_type}`); - - switch (message_type) { - case 'session_welcome': - const { session } = payload - const { id, keepalive_timeout_seconds } = session - - console.debug(`${this.eventsub.counter} This is Socket ID ${id}`) - this.eventsub.twitch_websocket_id = id - - console.debug( - `${this.eventsub.counter} This socket declared silence as ${keepalive_timeout_seconds} seconds`, - ) - - // is this a reconnect? - if (is_reconnect) { - // we carried subscriptions over - this.emit('reconnected', id) - } else { - // now you would spawn your topics - this.emit('connected', id) - } - - this.silence(keepalive_timeout_seconds) - - break - case 'session_keepalive': - //console.debug(`Recv KeepAlive - ${message_type}`); - this.emit('session_keepalive') - this.silence() - break - - case 'notification': - //console.debug('notification', metadata, payload); - const { subscription } = payload - const { type } = subscription - - // chat.message is NOISY - if (type != 'channel.chat.message') { - console.debug( - `${this.eventsub.twitch_websocket_id}/${this.eventsub.counter} Recv notification ${type}`, - ) - } - - this.emit('notification', { metadata, payload }) - this.emit(type, { metadata, payload }) - this.silence() - - break - - case 'session_reconnect': - this.eventsub.is_reconnecting = true - - const { reconnect_url } = payload.session - - console.debug( - `${this.eventsub.twitch_websocket_id}/${this.eventsub.counter} Reconnect request ${reconnect_url}`, - ) - - this.emit('session_reconnect', reconnect_url) - // stash old socket? - //this.eventsub_dying = this.eventsub; - //this.eventsub_dying.dying = true; - // make new socket - this.connect(reconnect_url, true) - - break - case 'websocket_disconnect': - console.debug(`${this.eventsub.counter} Recv Disconnect`) - console.debug('websocket_disconnect', payload) - - break - - case 'revocation': - console.debug(`${this.eventsub.counter} Recv Topic Revocation`) - console.debug('revocation', payload) - this.emit('revocation', { metadata, payload }) - break - - default: - console.debug(`${this.eventsub.counter} unexpected`, metadata, payload) - break - } - }) + console.debug('Retrying connection in', this.backoff * this.backoffStack) + setTimeout(() => this.connect(this.mainUrl, true), this.backoff * this.backoffStack) + } } - trigger() { - // this function lets you test the disconnect on send method - this.eventsub.send('cat') + private handleError(err: WebSocket.ErrorEvent): void { + console.debug('Connection Error', err) } - close() { - this.eventsub.close() + + private handleMessage(message: WebSocket.MessageEvent): void { + const data = JSON.parse(message.data as string) + const { metadata, payload } = data + const { message_type } = metadata + + switch (message_type) { + case 'session_welcome': + this.handleSessionWelcome(payload, message.isReconnect) + break + case 'session_keepalive': + this.emit('session_keepalive') + this.silence() + break + case 'notification': + this.handleNotification(metadata, payload) + break + case 'session_reconnect': + this.handleSessionReconnect(payload) + break + case 'websocket_disconnect': + console.debug('Received Disconnect', payload) + break + case 'revocation': + console.debug('Received Topic Revocation', payload) + this.emit('revocation', { metadata, payload }) + break + default: + console.debug('Unexpected message type', metadata, payload) + break + } + } + + private handleSessionWelcome(payload: any, isReconnect: boolean): void { + const { session } = payload + const { id, keepalive_timeout_seconds } = session + + this.eventsub.twitch_websocket_id = id + console.debug(`This is Socket ID ${id}`) + console.debug(`Silence timeout set to ${keepalive_timeout_seconds} seconds`) + + if (isReconnect) { + this.emit('reconnected', id) + } else { + this.emit('connected', id) + } + + this.silence(keepalive_timeout_seconds) } - silenceHandler = false - silenceTime = 10 // default per docs is 10 so set that as a good default - silence(keepalive_timeout_seconds) { + private handleNotification(metadata: any, payload: any): void { + const { type } = payload.subscription + + if (type !== 'channel.chat.message') { + console.debug(`Received notification ${type}`) + } + + this.emit('notification', { metadata, payload }) + this.emit(type, { metadata, payload }) + this.silence() + } + + private handleSessionReconnect(payload: any): void { + this.eventsub.is_reconnecting = true + const { reconnect_url } = payload.session + + console.debug(`Reconnect request to ${reconnect_url}`) + this.emit('session_reconnect', reconnect_url) + this.connect(reconnect_url, true) + } + + private silence(keepalive_timeout_seconds?: number): void { if (keepalive_timeout_seconds) { - this.silenceTime = keepalive_timeout_seconds - this.silenceTime++ // add a little window as it's too anal + this.silenceTime = keepalive_timeout_seconds + 1 } clearTimeout(this.silenceHandler) this.silenceHandler = setTimeout(() => { - this.emit('session_silenced') // -> self reconnecting + this.emit('session_silenced') if (this.silenceReconnect) { - this.close() // close it and let it self loop + this.close() } }, this.silenceTime * 1000) } + + public trigger(): void { + this.eventsub.send('cat') + } + + public close(): void { + this.eventsub.close() + } } diff --git a/packages/dota/src/twitch/index.ts b/packages/dota/src/twitch/index.ts index 51f1e118..19607b03 100644 --- a/packages/dota/src/twitch/index.ts +++ b/packages/dota/src/twitch/index.ts @@ -3,13 +3,16 @@ import './commandLoader.js' import type { ChatUser } from '@twurple/chat' import { t } from 'i18next' +import { io } from 'socket.io-client' import getDBUser from '../db/getDBUser.js' import { plebMode } from '../dota/lib/consts.js' import { DBSettings, getValueOrDefault } from '../settings.js' import { logger } from '../utils/logger.js' -import { chatClient, twitchChat } from './chatClient.js' +import { chatClient } from './chatClient.js' import commandHandler from './lib/CommandHandler.js' +export const twitchChat = io(`ws://${process.env.HOST_TWITCH_CHAT}:5005`) + logger.info("Starting 'twitch' package") twitchChat.on('connect', () => { diff --git a/packages/dota/src/twitch/lib/cleanupSubscriptions.ts b/packages/dota/src/twitch/lib/cleanupSubscriptions.ts new file mode 100644 index 00000000..f43558dd --- /dev/null +++ b/packages/dota/src/twitch/lib/cleanupSubscriptions.ts @@ -0,0 +1,244 @@ +import debounce from 'lodash.debounce' +import supabase from '../../db/supabase.js' +import { getTwitchHeaders } from './getTwitchHeaders.js' +// Constants +const headers = await getTwitchHeaders() + +function sleep(ms: number) { + return new Promise((resolve) => setTimeout(resolve, ms)) +} + +async function fetchSubscriptionsWithStatus(status?: string, cursor?: string): Promise { + const url = new URL('https://api.twitch.tv/helix/eventsub/subscriptions') + if (status) url.searchParams.append('status', status) + if (cursor) { + url.searchParams.append('after', cursor) + } + + const response = await fetch(url.toString(), { method: 'GET', headers }) + console.log(response.headers.get('ratelimit-remaining')) + if (response.status !== 200) { + throw new Error( + `Failed to fetch subscriptions with status ${status}: ${ + response.status + } // ${await response.text()}`, + ) + } + return response.json() +} + +// Assuming this counter is defined outside of the deleteSubscription function +let fetchRequestCounter = 0 +const maxRequestsBeforePause = 800 +const pauseDuration = 65000 // 65 seconds in milliseconds + +async function deleteSubscription(id: string) { + let retryDelay = 60 // Start with a 60 second delay + const maxRetries = 5 // Maximum number of retries + let attempt = 0 // Current attempt + + while (attempt < maxRetries) { + // Check if we need to pause before making the next request + if (fetchRequestCounter % maxRequestsBeforePause === 0 && fetchRequestCounter !== 0) { + console.log(`Pausing for ${pauseDuration / 1000} seconds to avoid rate limit...`) + await sleep(pauseDuration) // Wait for 65 seconds + } + + const response = await fetch(`https://api.twitch.tv/helix/eventsub/subscriptions?id=${id}`, { + method: 'DELETE', + headers, + }) + + fetchRequestCounter++ // Increment the request counter after each fetch + + const rateLimitRemaining = Number.parseInt( + response.headers.get('ratelimit-remaining') || '0', + 10, + ) + const rateLimitReset = + Number.parseInt(response.headers.get('ratelimit-reset') || '0', 10) * 1000 // Convert to milliseconds + const currentTime = Date.now() + + if (response.ok) { + console.log('Delete rate limit:', rateLimitRemaining) + return response // Exit function if request was successful + } + + if (rateLimitRemaining === 0 && currentTime < rateLimitReset) { + // Calculate wait time until rate limit reset, plus a small buffer + const waitTime = rateLimitReset - currentTime + 100 // Adding a 100ms buffer + console.log(`Rate limit exceeded. Waiting for ${waitTime}ms`) + await sleep(waitTime) // Wait until rate limit is reset + attempt++ // Increment attempt counter + retryDelay *= 1.2 // Exponential back-off + } else { + // If the request failed for reasons other than rate limit, throw an error + throw new Error( + `Failed to delete subscription: ${response.status} // ${await response.text()}`, + ) + } + } + + throw new Error('Exceeded maximum retry attempts for deleteSubscription') +} + +const requireRefreshAccounts: Set = new Set() +const debouncedUpdateAccounts = debounce(async () => { + const twitchIds = Array.from(requireRefreshAccounts) + if (twitchIds.length === 0) return + + console.log("Batch updating accounts' requires_refresh to true", { total: twitchIds.length }) + + // Function to split the twitchIds array into chunks of 50 + const chunkSize = 50 + const chunks = [] + for (let i = 0; i < twitchIds.length; i += chunkSize) { + chunks.push(twitchIds.slice(i, i + chunkSize)) + } + + // Process each chunk + for (const chunk of chunks) { + const { error } = await supabase + .from('accounts') + .update({ requires_refresh: true }) + .in('providerAccountId', chunk) + .eq('provider', 'twitch') + + if (error) { + console.error('Failed to update accounts', error) + // Consider how to handle partial failures - retry logic, logging, etc. + } + } + + // Clear the set after all updates + requireRefreshAccounts.clear() +}, 500) // Adjust debounce time as needed + +async function updateAccountRequiresRefresh(twitchId: string) { + if (requireRefreshAccounts.has(twitchId)) return + requireRefreshAccounts.add(twitchId) + + // Call the debounced function + return debouncedUpdateAccounts() +} + +async function markRevokedAuthorizationsAsRequiresRefresh(singleLoop = false) { + const statuses = ['user_removed', 'authorization_revoked'] + const updatePromises = new Map() + + for (const status of statuses) { + let cursor: string | undefined + do { + try { + const data = await fetchSubscriptionsWithStatus(status, cursor) + console.log( + `Found subscriptions with status ${status} that require deletion:`, + data.data.length, + ) + + for (const sub of data.data) { + const twitchId = sub.condition.broadcaster_user_id + await deleteSubscription(sub.id) + + if (!updatePromises.has(twitchId)) { + updatePromises.set(twitchId, updateAccountRequiresRefresh(twitchId)) + } + } + + await Promise.all(Array.from(updatePromises.values())) + cursor = data.pagination.cursor + + if (singleLoop) break + } catch (error) { + console.error(error) + break + } + } while (cursor) + + if (singleLoop) break + } +} + +async function getCountOfSubscriptionsWithStatus(status?: string): Promise { + try { + const data = await fetchSubscriptionsWithStatus(status) + console.log(data?.total_cost) + return data.data.length + } catch (error) { + console.error(error) + return 0 + } +} + +async function fetchSubscriptionsWithCostAboveZero(singleLoop = false): Promise { + const url = new URL('https://api.twitch.tv/helix/eventsub/subscriptions') + let allBroadcasterUserIds: string[] = [] + let cursor: string | null = null + const subsetsOfBroadcasterUserIds: string[][] = [] // Track subsets + const subIdMap: Record = {} // Map providerAccountId to sub ids + + console.log("Retrieving subscriptions with 'cost' above zero...") + + let pageCount = 0 + do { + console.log('Fetching page:', pageCount) + if (cursor) { + url.searchParams.set('after', cursor) + } + + const response = await fetch(url.toString(), { method: 'GET', headers }) + if (response.status !== 200) { + throw new Error( + `Failed to fetch subscriptions: ${response.status} // ${await response.text()}`, + ) + } + + const data = await response.json() + const subscriptionsWithCostAboveZero = data.data.filter((sub: any) => sub.cost > 0) + const broadcasterUserIds = subscriptionsWithCostAboveZero.map( + (sub: any) => sub.condition.broadcaster_user_id || sub.condition.user_id, + ) + allBroadcasterUserIds = allBroadcasterUserIds.concat(broadcasterUserIds) + + subsetsOfBroadcasterUserIds.push(broadcasterUserIds) + + // Save sub ids to subIdMap + subscriptionsWithCostAboveZero.forEach((sub: any) => { + const broadcasterUserId = sub.condition.broadcaster_user_id || sub.condition.user_id + if (!subIdMap[broadcasterUserId]) { + subIdMap[broadcasterUserId] = [] + } + subIdMap[broadcasterUserId].push(sub.id) + }) + + pageCount++ + if (singleLoop) break + cursor = data.pagination.cursor + } while (cursor) + + console.log("Found subscriptions with 'cost' above zero:", allBroadcasterUserIds.length) + + const updateRefreshPromises: Promise[] = [] + const deleteSubPromises: Promise[] = [] + + for (const providerAccountId of allBroadcasterUserIds) { + if (providerAccountId) { + updateRefreshPromises.push(updateAccountRequiresRefresh(providerAccountId)) + const subIdsToDelete = subIdMap[providerAccountId] || [] + subIdsToDelete.forEach((subId) => { + deleteSubPromises.push(deleteSubscription(subId)) + }) + } + } + + console.log('Deleting: ', deleteSubPromises.length) + console.log('Updating: ', updateRefreshPromises.length) + + await Promise.all(updateRefreshPromises) + await Promise.all(deleteSubPromises) + + return allBroadcasterUserIds +} + +// await fetchSubscriptionsWithCostAboveZero() +// await getCountOfSubscriptionsWithStatus() diff --git a/packages/dota/src/twitch/lib/getTwitchHeaders.ts b/packages/dota/src/twitch/lib/getTwitchHeaders.ts new file mode 100644 index 00000000..84b2b929 --- /dev/null +++ b/packages/dota/src/twitch/lib/getTwitchHeaders.ts @@ -0,0 +1,17 @@ +import { getAppToken } from '@twurple/auth' + +// Function to get Twitch headers + +export async function getTwitchHeaders(): Promise> { + const appToken = await getAppToken( + process.env.TWITCH_CLIENT_ID || '', + process.env.TWITCH_CLIENT_SECRET || '', + ) + + return { + 'Client-Id': process.env.TWITCH_CLIENT_ID || '', + Authorization: `Bearer ${appToken?.accessToken}`, + Accept: 'application/json', + 'Accept-Encoding': 'gzip', + } +} diff --git a/packages/twitch-chat/package.json b/packages/twitch-chat/package.json index 9c2ecbae..aebde34e 100644 --- a/packages/twitch-chat/package.json +++ b/packages/twitch-chat/package.json @@ -14,12 +14,6 @@ "build": "bun build ./src/index.ts --target=bun --outfile=./dist/index.js" }, "dependencies": { - "@supabase/supabase-js": "^2.43.4", - "@twurple/api": "6.0.9", - "@twurple/auth": "6.0.9", - "@twurple/chat": "6.0.9", - "@twurple/eventsub-base": "6.0.9", - "@twurple/eventsub-http": "6.0.9", "i18next": "^23.11.5", "i18next-fs-backend": "^2.3.1", "socket.io": "^4.7.5", diff --git a/packages/twitch-events/package.json b/packages/twitch-events/package.json index 63327ccd..2b6caad1 100644 --- a/packages/twitch-events/package.json +++ b/packages/twitch-events/package.json @@ -14,11 +14,6 @@ "build": "bun build ./src/index.ts --target=bun --outfile=./dist/index.js" }, "dependencies": { - "@supabase/supabase-js": "^2.43.4", - "@twurple/api": "6.0.9", - "@twurple/auth": "6.0.9", - "@twurple/chat": "6.0.9", - "@twurple/eventsub-base": "6.0.9", "@twurple/eventsub-http": "6.0.9", "express": "^4.19.2", "express-body-parser-error-handler": "^1.0.7", diff --git a/packages/twitch-events/src/SubscribeEvents.ts b/packages/twitch-events/src/SubscribeEvents.ts index 646e84db..20596097 100644 --- a/packages/twitch-events/src/SubscribeEvents.ts +++ b/packages/twitch-events/src/SubscribeEvents.ts @@ -3,6 +3,7 @@ import { transformBetData } from './twitch/events/transformers/transformBetData. import { transformPollData } from './twitch/events/transformers/transformPollData.js' import { offlineEvent } from './twitch/lib/offlineEvent.js' import { onlineEvent } from './twitch/lib/onlineEvent.js' +import { revokeEvent } from './twitch/lib/revokeEvent.js' import { updateUserEvent } from './twitch/lib/updateUserEvent.js' import { DOTABOD_EVENTS_ROOM, eventsIOConnected, socketIo } from './utils/socketUtils.js' @@ -17,6 +18,7 @@ export const SubscribeEvents = (accountIds: string[]) => { const promises: any[] = [] accountIds.forEach((userId) => { try { + promises.push(listener.onUserAuthorizationRevoke(userId, revokeEvent)) promises.push(listener.onStreamOnline(userId, onlineEvent)) promises.push(listener.onStreamOffline(userId, offlineEvent)) promises.push(listener.onUserUpdate(userId, updateUserEvent)) diff --git a/packages/twitch-events/src/twitch/lib/revokeEvent.ts b/packages/twitch-events/src/twitch/lib/revokeEvent.ts new file mode 100644 index 00000000..97b724b0 --- /dev/null +++ b/packages/twitch-events/src/twitch/lib/revokeEvent.ts @@ -0,0 +1,174 @@ +import { getAppToken } from '@twurple/auth' +import type { EventSubUserAuthorizationRevokeEvent } from '@twurple/eventsub-base' +import supabase from '../../db/supabase' + +async function disableChannel(broadcasterId: string) { + const { data: user } = await supabase + .from('accounts') + .select('userId') + .eq('providerAccountId', broadcasterId) + .single() + + if (!user) { + console.log('twitch-events Failed to find user', broadcasterId) + return + } + + const { data: settings } = await supabase + .from('settings') + .select('key, value') + .eq('userId', user?.userId) + + if (!settings) { + console.log('twitch-events Failed to find settings', broadcasterId) + return + } + + if (settings.find((s) => s.key === 'commandDisable' && s.value === true)) { + console.log('twitch-events User already disabled', broadcasterId) + return + } + + console.log('twitch-events Disabling user', broadcasterId) + await supabase.from('settings').upsert( + { + userId: user.userId, + key: 'commandDisable', + value: true, + }, + { + onConflict: 'userId, key', + }, + ) +} + +export async function getTwitchHeaders(): Promise> { + const appToken = await getAppToken( + process.env.TWITCH_CLIENT_ID || '', + process.env.TWITCH_CLIENT_SECRET || '', + ) + + return { + 'Client-Id': process.env.TWITCH_CLIENT_ID || '', + Authorization: `Bearer ${appToken?.accessToken}`, + Accept: 'application/json', + 'Accept-Encoding': 'gzip', + } +} + +const headers = await getTwitchHeaders() + +export async function fetchSubscriptions(providerId: string, cursor?: string): Promise { + const url = new URL('https://api.twitch.tv/helix/eventsub/subscriptions') + url.searchParams.append('user_id', providerId) + if (cursor) { + url.searchParams.append('after', cursor) + } + + const response = await fetch(url.toString(), { method: 'GET', headers }) + if (response.status !== 200) { + throw new Error( + `Failed to fetch subscriptions with providerId ${providerId}: ${ + response.status + } // ${await response.text()}`, + ) + } + return response.json() +} + +function sleep(ms: number) { + return new Promise((resolve) => setTimeout(resolve, ms)) +} + +// Assuming this counter is defined outside of the deleteSubscription function +let fetchRequestCounter = 0 +const maxRequestsBeforePause = 800 +const pauseDuration = 65000 // 65 seconds in milliseconds + +async function deleteSubscription(id: string) { + let retryDelay = 60 // Start with a 60 second delay + const maxRetries = 5 // Maximum number of retries + let attempt = 0 // Current attempt + + while (attempt < maxRetries) { + // Check if we need to pause before making the next request + if (fetchRequestCounter % maxRequestsBeforePause === 0 && fetchRequestCounter !== 0) { + console.log(`Pausing for ${pauseDuration / 1000} seconds to avoid rate limit...`) + await sleep(pauseDuration) // Wait for 65 seconds + } + + const response = await fetch(`https://api.twitch.tv/helix/eventsub/subscriptions?id=${id}`, { + method: 'DELETE', + headers, + }) + + fetchRequestCounter++ // Increment the request counter after each fetch + + const rateLimitRemaining = Number.parseInt( + response.headers.get('ratelimit-remaining') || '0', + 10, + ) + const rateLimitReset = + Number.parseInt(response.headers.get('ratelimit-reset') || '0', 10) * 1000 // Convert to milliseconds + const currentTime = Date.now() + + if (response.ok) { + console.log('Delete rate limit:', rateLimitRemaining) + return response // Exit function if request was successful + } + + if (rateLimitRemaining === 0 && currentTime < rateLimitReset) { + // Calculate wait time until rate limit reset, plus a small buffer + const waitTime = rateLimitReset - currentTime + 100 // Adding a 100ms buffer + console.log(`Rate limit exceeded. Waiting for ${waitTime}ms`) + await sleep(waitTime) // Wait until rate limit is reset + attempt++ // Increment attempt counter + retryDelay *= 1.2 // Exponential back-off + } else { + // If the request failed for reasons other than rate limit, throw an error + throw new Error( + `Failed to delete subscription: ${response.status} // ${await response.text()}`, + ) + } + } + + throw new Error('Exceeded maximum retry attempts for deleteSubscription') +} + +async function deleteAllSubscriptionsForProvider(providerId: string): Promise { + let cursor: string | undefined + do { + // Fetch subscriptions for the given provider ID + const data = await fetchSubscriptions(providerId, cursor) + const subscriptionsForProvider = data.data.filter( + (sub: any) => sub.condition.broadcaster_user_id === providerId, + ) + + console.log('Found subscriptions', subscriptionsForProvider.length) + + // Delete each subscription found for the provider + for (const sub of subscriptionsForProvider) { + await deleteSubscription(sub.id) + } + + // Update cursor for next page of subscriptions, if any + cursor = data.pagination?.cursor + } while (cursor) // Continue until there are no more pages + + console.log(`All subscriptions deleted for provider ID: ${providerId}`) +} + +export async function revokeEvent(data: EventSubUserAuthorizationRevokeEvent) { + console.log(`${data.userId} just revoked`) + + await deleteAllSubscriptionsForProvider(data.userId) + + await supabase + .from('accounts') + .update({ + requires_refresh: true, + }) + .eq('providerAccountId', data.userId) + + await disableChannel(data.userId) +}