Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
Geczy committed Jul 1, 2024
1 parent 8445038 commit c5638f3
Show file tree
Hide file tree
Showing 13 changed files with 596 additions and 316 deletions.
Binary file modified bun.lockb
Binary file not shown.
3 changes: 1 addition & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
"@twurple/api": "6.0.9",
"@twurple/auth": "6.0.9",
"@twurple/chat": "6.0.9",
"@twurple/eventsub-base": "6.0.9",
"@twurple/eventsub-http": "6.0.9",
"@twurple/eventsub-base": "^7.1.0",
"i18next": "^23.11.5",
"i18next-fs-backend": "^2.3.1",
"socket.io": "^4.7.5",
Expand Down
8 changes: 2 additions & 6 deletions packages/dota/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,8 @@
},
"dependencies": {
"@node-steam/id": "^1.2.0",
"@supabase/supabase-js": "^2.43.4",
"@twurple/api": "6.0.9",
"@twurple/auth": "6.0.9",
"@twurple/chat": "6.0.9",
"@twurple/ebs-helper": "^7.1.0",
"@twurple/eventsub-base": "6.0.9",
"@twurple/eventsub-http": "6.0.9",
"@twurple/eventsub-ws": "^7.1.0",
"@types/long": "^5.0.0",
"@types/lru-cache": "^7.10.10",
"axios": "1.2.0-alpha.1",
Expand All @@ -37,6 +32,7 @@
"express-body-parser-error-handler": "^1.0.7",
"i18next": "^23.11.5",
"i18next-fs-backend": "^2.3.1",
"lodash.debounce": "^4.0.8",
"lodash.isequal": "^4.5.0",
"lru-cache": "^10.2.2",
"mongodb": "^6.7.0",
Expand Down
7 changes: 5 additions & 2 deletions packages/dota/src/db/watcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import findUser from '../dota/lib/connectedStreamers.js'
import { didTellUser, gsiHandlers } from '../dota/lib/consts.js'
import { getRankDetail } from '../dota/lib/ranks.js'
import { DBSettings, getValueOrDefault } from '../settings.js'
import { twitchChat } from '../twitch/chatClient'
import { chatClient } from '../twitch/chatClient.js'
import { twitchChat } from '../twitch/index.js'
import { toggleDotabod } from '../twitch/toggleDotabod.js'
import { logger } from '../utils/logger.js'
import getDBUser from './getDBUser.js'
Expand Down Expand Up @@ -41,8 +41,11 @@ class SetupSupabase {
}

toggleHandler = async (userId: string, enable: boolean) => {
if (this.IS_DEV && !this.DEV_CHANNELIDS.includes(userId)) return
if (!this.IS_DEV && this.DEV_CHANNELIDS.includes(userId)) return

const client = await getDBUser({ token: userId })
if (!client || !this.shouldHandleDevChannel(client.name)) return
if (!client) return

toggleDotabod(userId, enable, client.name, client.locale)
}
Expand Down
1 change: 1 addition & 0 deletions packages/dota/src/dota/lib/consts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,5 @@ export const pendingCheckAuth = new Map<string, boolean>()
export const lookingupToken = new Map<string, boolean>()

// Const holding if we told the user to get new scopes
// We'll lose this every reboot (6 hours) so no need to cleanup
export const didTellUser = new Set<string>()
142 changes: 24 additions & 118 deletions packages/dota/src/twitch/chatClient.ts
Original file line number Diff line number Diff line change
@@ -1,133 +1,39 @@
import { io } from 'socket.io-client'

import { getAppToken } from '@twurple/auth'
import { twitchChat } from '.'
import { findUserByName } from '../dota/lib/connectedStreamers'
import { isDev } from '../dota/lib/consts.js'
import { eventsubSocket } from './eventSubSocket'
import { getTwitchHeaders } from './lib/getTwitchHeaders'

// Our docker chat forwarder instance
export const twitchChat = io(`ws://${process.env.HOST_TWITCH_CHAT}:5005`)
// Constants
const prefix = isDev ? '[DEV] ' : ''
const headers = await getTwitchHeaders()

const appToken = await getAppToken(process.env.TWITCH_CLIENT_ID!, process.env.TWITCH_CLIENT_SECRET!)

const twitchHeaders = {
'Client-Id': process.env.TWITCH_CLIENT_ID!,
Authorization: `Bearer ${appToken?.accessToken}`,
Accept: 'application/json',
'Accept-Encoding': 'gzip',
}

const conduitsReq = await fetch('https://api.twitch.tv/helix/eventsub/conduits', {
method: 'GET',
headers: twitchHeaders,
})
const { data } = await conduitsReq.json()
const conduitId = data[0]?.id || (await createConduit())
console.log({ conduitId })

async function createConduit() {
console.log('Creating conduit')
const createReq = await fetch('https://api.twitch.tv/helix/eventsub/conduits', {
method: 'POST',
headers: {
...twitchHeaders,
'Content-Type': 'application/json',
},
body: JSON.stringify({ shard_count: 1 }),
})
const { data } = await createReq.json()
const { id } = data[0]
return id
}

function getShards() {
return fetch(`https://api.twitch.tv/helix/eventsub/conduits/shards?conduit_id=${conduitId}`, {
method: 'GET',
headers: {
...twitchHeaders,
},
})
}

// The Conduit exists
// lets spawn a WebSocket and assign this socket to a shard
// if we are a ID of auto then the shard ID is forced to 0 if we created...
const mySocket = new eventsubSocket({})
mySocket.on('connected', async (session_id) => {
console.log(`Socket has connected ${session_id} for ${conduitId}`)

const body = {
conduit_id: conduitId,
shards: [
{
id: 0,
transport: {
method: 'websocket',
session_id,
},
},
],
}
// connect the socket to the conduit on the stated shard ID
const conduitUpdate = await fetch('https://api.twitch.tv/helix/eventsub/conduits/shards', {
method: 'PATCH',
headers: {
...twitchHeaders,
'Content-Type': 'application/json',
},
body: JSON.stringify(body),
})
if (conduitUpdate.status != 202) {
console.error(await conduitUpdate.text(), 'Failed to assign socket to shard')
// console.error(
// `Failed to assign socket to shard ${conduitUpdate.status}//${await conduitUpdate.text()}`,
// );
return
} else {
console.log('Socket assigned to shard')
}
// check for errors
const { data, errors } = await conduitUpdate.json()
if (errors && errors.length > 0) {
console.error(`Failed to udpate the shard`)
console.error(errors)
} else {
console.log('Shard Updated')
}
})

mySocket.on('error', (error) => {
console.error('Socket Error', error)
})

mySocket.on('notification', (message) => {
console.log('Socket Message', message)
})

// Chat client object
export const chatClient = {
join: (channel: string) => {
twitchChat.emit('join', channel)
},
part: (channel: string) => {
twitchChat.emit('part', channel)
},
say: (channel: string, text: string) => {
if (isDev) console.log({ channel, text })
const user = findUserByName(channel.toLowerCase().replace('#', ''))
const hasNewestScopes = user?.Account?.scope?.includes('channel:bot')
if (hasNewestScopes) {
const newPrefix = prefix ? `${prefix}[NEW-API]` : prefix
void fetch('https://api.twitch.tv/helix/chat/messages', {
method: 'POST',
headers: twitchHeaders,
body: JSON.stringify({
broadcaster_id: user?.Account?.providerAccountId,
sender_id: process.env.TWITCH_BOT_PROVIDERID,
message: `${newPrefix}${text}`,
}),
})
return
say: async (channel: string, text: string) => {
// New API is in beta, so only dev enabled for now
if (isDev) {
const user = findUserByName(channel.toLowerCase().replace('#', ''))
const hasNewestScopes = user?.Account?.scope?.includes('channel:bot')

if (hasNewestScopes) {
const newPrefix = prefix ? `${prefix}[NEW-API] ` : prefix
void fetch('https://api.twitch.tv/helix/chat/messages', {
method: 'POST',
headers: { ...headers, 'Content-Type': 'application/json' },
body: JSON.stringify({
broadcaster_id: user?.Account?.providerAccountId,
sender_id: process.env.TWITCH_BOT_PROVIDERID,
message: `${newPrefix}${text}`,
}),
})
return
}
}

twitchChat.emit('say', channel, `${prefix}${text}`)
Expand Down
156 changes: 156 additions & 0 deletions packages/dota/src/twitch/chatClientv2.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
import { getAppToken } from '@twurple/auth'
import type { EventSubChannelChatMessageEventData } from '@twurple/eventsub-base/lib/events/EventSubChannelChatMessageEvent.external'
import type { EventSubWsPacket } from '@twurple/eventsub-ws/lib/EventSubWsPacket.external'
import { logger } from '../utils/logger'
import { EventsubSocket } from './eventSubSocket'

// Constants
const headers = await getTwitchHeaders()

// Function to get Twitch headers
export async function getTwitchHeaders(): Promise<Record<string, string>> {
const appToken = await getAppToken(
process.env.TWITCH_CLIENT_ID || '',
process.env.TWITCH_CLIENT_SECRET || '',
)

return {
'Client-Id': process.env.TWITCH_CLIENT_ID || '',
Authorization: `Bearer ${appToken?.accessToken}`,
Accept: 'application/json',
'Accept-Encoding': 'gzip',
}
}

// Function to fetch conduit ID
async function fetchConduitId(): Promise<string> {
const conduitsReq = await fetch('https://api.twitch.tv/helix/eventsub/conduits', {
method: 'GET',
headers,
})

const { data } = await conduitsReq.json()
return data[0]?.id || createConduit()
}

// Function to create a new conduit
async function createConduit(): Promise<string> {
logger.info('Creating conduit')
const createReq = await fetch('https://api.twitch.tv/helix/eventsub/conduits', {
method: 'POST',
headers: {
...headers,
'Content-Type': 'application/json',
},
body: JSON.stringify({ shard_count: 1 }),
})

const { data } = await createReq.json()
return data[0].id
}

// Function to update conduit shard
async function updateConduitShard(session_id: string, conduitId: string) {
const body = {
conduit_id: conduitId,
shards: [
{
id: 0,
transport: {
method: 'websocket',
session_id,
},
},
],
}

const conduitUpdate = await fetch('https://api.twitch.tv/helix/eventsub/conduits/shards', {
method: 'PATCH',
headers: {
...headers,
'Content-Type': 'application/json',
},
body: JSON.stringify(body),
})

if (conduitUpdate.status !== 202) {
logger.error('Failed to assign socket to shard', { reason: await conduitUpdate.text() })
return
}

logger.info('Socket assigned to shard')
const { errors } = await conduitUpdate.json()
if (errors && errors.length > 0) {
logger.error('Failed to update the shard', { errors })
} else {
logger.info('Shard Updated')
}
}

// TODO: Move this to twitch-events package
const subscribeToUserUpdate = async (conduit_id: string, broadcaster_user_id: string) => {
const body = {
type: 'channel.chat.message',
version: '1',
condition: {
user_id: '843245458', // bot dotabod
broadcaster_user_id: '32474777', // TL
},
transport: {
method: 'conduit',
conduit_id,
},
}
const subscribeReq = await fetch('https://api.twitch.tv/helix/eventsub/subscriptions', {
method: 'POST',
headers: {
...headers,
'Content-Type': 'application/json',
},
body: JSON.stringify(body),
})

if (subscribeReq.status !== 202) {
console.error(
`Failed to subscribe to channel.chat.message ${
subscribeReq.status
} ${await subscribeReq.text()}`,
)
return
}
console.log('Subscribed to channel.chat.message')
}

// TODO: Move this to twitch-chat package
// Initialize WebSocket and handle events
async function initializeSocket() {
const conduitId = await fetchConduitId()
logger.info('Conduit ID', { conduitId })

const mySocket = new EventsubSocket()
mySocket.on('connected', async (session_id: string) => {
logger.info(`Socket has connected ${session_id} for ${conduitId}`)
await updateConduitShard(session_id, conduitId)
})

mySocket.on('error', (error: Error) => {
logger.error('Socket Error', { error })
})

mySocket.on('notification', (message: EventSubWsPacket) => {
if (
'subscription' in message.payload &&
'event' in message.payload &&
message.payload.subscription.type === 'channel.chat.message'
) {
const {
chatter_user_login,
message: { text },
} = message.payload.event as unknown as EventSubChannelChatMessageEventData
logger.info('Socket Message', { chatter_user_login, text })
}
logger.info('Socket Message', { message: message.payload })
})
}

initializeSocket()
Loading

0 comments on commit c5638f3

Please sign in to comment.