Skip to content

Commit

Permalink
Merge branch 'new-twitch-eventsubs' of https://github.com/dotabod/bac…
Browse files Browse the repository at this point in the history
…kend into new-twitch-eventsubs
  • Loading branch information
Geczy committed Jul 7, 2024
2 parents fa7f94d + 62715d4 commit 6b54119
Show file tree
Hide file tree
Showing 15 changed files with 791 additions and 348 deletions.
Binary file modified bun.lockb
Binary file not shown.
3 changes: 1 addition & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
"@twurple/api": "6.0.9",
"@twurple/auth": "6.0.9",
"@twurple/chat": "6.0.9",
"@twurple/eventsub-base": "6.0.9",
"@twurple/eventsub-http": "6.0.9",
"@twurple/eventsub-base": "^7.1.0",
"i18next": "^23.11.5",
"i18next-fs-backend": "^2.3.1",
"socket.io": "^4.7.5",
Expand Down
8 changes: 2 additions & 6 deletions packages/dota/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,8 @@
},
"dependencies": {
"@node-steam/id": "^1.2.0",
"@supabase/supabase-js": "^2.43.4",
"@twurple/api": "6.0.9",
"@twurple/auth": "6.0.9",
"@twurple/chat": "6.0.9",
"@twurple/ebs-helper": "^7.1.0",
"@twurple/eventsub-base": "6.0.9",
"@twurple/eventsub-http": "6.0.9",
"@twurple/eventsub-ws": "^7.1.0",
"@types/long": "^5.0.0",
"@types/lru-cache": "^7.10.10",
"axios": "1.2.0-alpha.1",
Expand All @@ -37,6 +32,7 @@
"express-body-parser-error-handler": "^1.0.7",
"i18next": "^23.11.5",
"i18next-fs-backend": "^2.3.1",
"lodash.debounce": "^4.0.8",
"lodash.isequal": "^4.5.0",
"lru-cache": "^10.2.2",
"mongodb": "^6.7.0",
Expand Down
58 changes: 24 additions & 34 deletions packages/dota/src/db/watcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import findUser from '../dota/lib/connectedStreamers.js'
import { didTellUser, gsiHandlers } from '../dota/lib/consts.js'
import { getRankDetail } from '../dota/lib/ranks.js'
import { DBSettings, getValueOrDefault } from '../settings.js'
import { twitchChat } from '../twitch/chatClient'
import { chatClient } from '../twitch/chatClient.js'
import { twitchChat } from '../twitch/index.js'
import { toggleDotabod } from '../twitch/toggleDotabod.js'
import { logger } from '../utils/logger.js'
import getDBUser from './getDBUser.js'
Expand Down Expand Up @@ -41,8 +41,11 @@ class SetupSupabase {
}

toggleHandler = async (userId: string, enable: boolean) => {
if (this.IS_DEV && !this.DEV_CHANNELIDS.includes(userId)) return
if (!this.IS_DEV && this.DEV_CHANNELIDS.includes(userId)) return

const client = await getDBUser({ token: userId })
if (!client || !this.shouldHandleDevChannel(client.name)) return
if (!client) return

toggleDotabod(userId, enable, client.name, client.locale)
}
Expand Down Expand Up @@ -79,6 +82,13 @@ class SetupSupabase {
const newObj = payload.new
const oldObj = payload.old

if (newObj.scope !== oldObj.scope) {
const client = findUser(newObj.userId)
if (client?.Account) {
client.Account.scope = newObj.scope
}
}

// The frontend will set it to false when they relogin
// Which allows us to update the authProvider object
if (newObj.requires_refresh === false && oldObj.requires_refresh === true) {
Expand Down Expand Up @@ -109,34 +119,20 @@ class SetupSupabase {
client.stream_online = newObj.stream_online

// They go offline
if (!client.stream_online && oldObj.stream_online) {
if (!newObj.stream_online && oldObj.stream_online) {
return
}

// They come online
if (client.stream_online && !oldObj.stream_online) {
const connectedUser = gsiHandlers.get(client.token)
if (!connectedUser) return

const betsEnabled = getValueOrDefault(DBSettings.bets, client.settings)
const ONE_DAY_IN_MS = 86_400_000 // 1 day in ms
const dayAgo = new Date(Date.now() - ONE_DAY_IN_MS).toISOString()

const hasNewestScopes = client.Account?.scope?.includes('channel:bot')
if (!hasNewestScopes && !didTellUser.has(client.name.toLowerCase())) {
twitchChat.emit(
'say',
client.name.toLowerCase(),
t('refreshToken', {
lng: client.locale,
channel: `@${client.name.toLowerCase().replace('#', '')}`,
}),
)
didTellUser.add(client.name.toLowerCase())
return
}
const requiresRefresh = client.Account?.requires_refresh
if ((!hasNewestScopes || requiresRefresh) && !didTellUser.has(client.name)) {
didTellUser.add(client.name)

if (connectedUser.client.Account?.requires_refresh && betsEnabled) {
const { data, error } = await supabase
.from('bets')
.select('created_at')
Expand All @@ -146,15 +142,15 @@ class SetupSupabase {

if (data?.length && !error) {
logger.info('[WATCHER USER] Sending refresh token messsage', {
name: connectedUser.client.name,
twitchId: connectedUser.client.Account.providerAccountId,
token: connectedUser.client.token,
name: client.name,
twitchId: client.Account?.providerAccountId,
token: client.token,
})
chatClient.say(
connectedUser.client.name,
client.name,
t('refreshToken', {
lng: connectedUser.client.locale,
channel: connectedUser.client.name,
lng: client.locale,
channel: client.name,
}),
)
}
Expand All @@ -167,13 +163,6 @@ class SetupSupabase {
client.stream_start_date = newObj.stream_start_date
}

async function handler() {
if (!client) return

const deets = await getRankDetail(newObj.mmr, client.steam32Id)
server.io.to(client.token).emit('update-medal', deets)
}

// dont overwrite with 0 because we use this variable to track currently logged in mmr
if (newObj.mmr !== 0 && client.mmr !== newObj.mmr && oldObj.mmr !== newObj.mmr) {
client.mmr = newObj.mmr
Expand All @@ -184,7 +173,8 @@ class SetupSupabase {
mmr: newObj.mmr,
})
try {
void handler()
const deets = await getRankDetail(newObj.mmr, client.steam32Id)
server.io.to(client.token).emit('update-medal', deets)
} catch (e) {
logger.error('Error in watcher postgres update', { e })
}
Expand Down
1 change: 1 addition & 0 deletions packages/dota/src/dota/lib/consts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,5 @@ export const pendingCheckAuth = new Map<string, boolean>()
export const lookingupToken = new Map<string, boolean>()

// Const holding if we told the user to get new scopes
// We'll lose this every reboot (6 hours) so no need to cleanup
export const didTellUser = new Set<string>()
142 changes: 24 additions & 118 deletions packages/dota/src/twitch/chatClient.ts
Original file line number Diff line number Diff line change
@@ -1,133 +1,39 @@
import { io } from 'socket.io-client'

import { getAppToken } from '@twurple/auth'
import { twitchChat } from '.'
import { findUserByName } from '../dota/lib/connectedStreamers'
import { isDev } from '../dota/lib/consts.js'
import { eventsubSocket } from './eventSubSocket'
import { getTwitchHeaders } from './lib/getTwitchHeaders'

// Our docker chat forwarder instance
export const twitchChat = io(`ws://${process.env.HOST_TWITCH_CHAT}:5005`)
// Constants
const prefix = isDev ? '[DEV] ' : ''
const headers = await getTwitchHeaders()

const appToken = await getAppToken(process.env.TWITCH_CLIENT_ID!, process.env.TWITCH_CLIENT_SECRET!)

const twitchHeaders = {
'Client-Id': process.env.TWITCH_CLIENT_ID!,
Authorization: `Bearer ${appToken?.accessToken}`,
Accept: 'application/json',
'Accept-Encoding': 'gzip',
}

const conduitsReq = await fetch('https://api.twitch.tv/helix/eventsub/conduits', {
method: 'GET',
headers: twitchHeaders,
})
const { data } = await conduitsReq.json()
const conduitId = data[0]?.id || (await createConduit())
console.log({ conduitId })

async function createConduit() {
console.log('Creating conduit')
const createReq = await fetch('https://api.twitch.tv/helix/eventsub/conduits', {
method: 'POST',
headers: {
...twitchHeaders,
'Content-Type': 'application/json',
},
body: JSON.stringify({ shard_count: 1 }),
})
const { data } = await createReq.json()
const { id } = data[0]
return id
}

function getShards() {
return fetch(`https://api.twitch.tv/helix/eventsub/conduits/shards?conduit_id=${conduitId}`, {
method: 'GET',
headers: {
...twitchHeaders,
},
})
}

// The Conduit exists
// lets spawn a WebSocket and assign this socket to a shard
// if we are a ID of auto then the shard ID is forced to 0 if we created...
const mySocket = new eventsubSocket({})
mySocket.on('connected', async (session_id) => {
console.log(`Socket has connected ${session_id} for ${conduitId}`)

const body = {
conduit_id: conduitId,
shards: [
{
id: 0,
transport: {
method: 'websocket',
session_id,
},
},
],
}
// connect the socket to the conduit on the stated shard ID
const conduitUpdate = await fetch('https://api.twitch.tv/helix/eventsub/conduits/shards', {
method: 'PATCH',
headers: {
...twitchHeaders,
'Content-Type': 'application/json',
},
body: JSON.stringify(body),
})
if (conduitUpdate.status != 202) {
console.error(await conduitUpdate.text(), 'Failed to assign socket to shard')
// console.error(
// `Failed to assign socket to shard ${conduitUpdate.status}//${await conduitUpdate.text()}`,
// );
return
} else {
console.log('Socket assigned to shard')
}
// check for errors
const { data, errors } = await conduitUpdate.json()
if (errors && errors.length > 0) {
console.error(`Failed to udpate the shard`)
console.error(errors)
} else {
console.log('Shard Updated')
}
})

mySocket.on('error', (error) => {
console.error('Socket Error', error)
})

mySocket.on('notification', (message) => {
console.log('Socket Message', message)
})

// Chat client object
export const chatClient = {
join: (channel: string) => {
twitchChat.emit('join', channel)
},
part: (channel: string) => {
twitchChat.emit('part', channel)
},
say: (channel: string, text: string) => {
if (isDev) console.log({ channel, text })
const user = findUserByName(channel.toLowerCase().replace('#', ''))
const hasNewestScopes = user?.Account?.scope?.includes('channel:bot')
if (hasNewestScopes) {
const newPrefix = prefix ? `${prefix}[NEW-API]` : prefix
void fetch('https://api.twitch.tv/helix/chat/messages', {
method: 'POST',
headers: twitchHeaders,
body: JSON.stringify({
broadcaster_id: user?.Account?.providerAccountId,
sender_id: process.env.TWITCH_BOT_PROVIDERID,
message: `${newPrefix}${text}`,
}),
})
return
say: async (channel: string, text: string) => {
// New API is in beta, so only dev enabled for now
if (isDev) {
const user = findUserByName(channel.toLowerCase().replace('#', ''))
const hasNewestScopes = user?.Account?.scope?.includes('channel:bot')

if (hasNewestScopes) {
const newPrefix = prefix ? `${prefix}[NEW-API] ` : prefix
void fetch('https://api.twitch.tv/helix/chat/messages', {
method: 'POST',
headers: { ...headers, 'Content-Type': 'application/json' },
body: JSON.stringify({
broadcaster_id: user?.Account?.providerAccountId,
sender_id: process.env.TWITCH_BOT_PROVIDERID,
message: `${newPrefix}${text}`,
}),
})
return
}
}

twitchChat.emit('say', channel, `${prefix}${text}`)
Expand Down
Loading

0 comments on commit 6b54119

Please sign in to comment.