Skip to content

Commit

Permalink
Merge branch 'main' into crowd-linux
Browse files Browse the repository at this point in the history
  • Loading branch information
epipav committed Oct 20, 2023
2 parents 6d67b34 + 1b21c9f commit b8dc26c
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ export class OrganizationRepository extends RepositoryBase<OrganizationRepositor
FROM organizations o
LEFT JOIN member_data md ON o.id = md."organizationId"
LEFT JOIN identities i ON o.id = i."organizationId"
LEFT JOIN to_merge_data tmd on o.id = tmd."organizationId"
LEFT JOIN no_merge_data nmd on o.id = nmd."organizationId"
WHERE o.id IN ($(ids:csv))
AND o."deletedAt" IS NULL
AND (md."organizationId" IS NOT NULL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,27 @@ import axios from 'axios'
import { DiscordApiMember, DiscordGetMembersInput, DiscordGetMembersOutput } from '../types'
import { IProcessStreamContext } from '../../../types'
import { getRateLimiter } from './handleRateLimit'
import { handleDiscordError } from './errorHandler'

async function getMembers(
input: DiscordGetMembersInput,
ctx: IProcessStreamContext,
): Promise<DiscordGetMembersOutput> {
const rateLimiter = getRateLimiter(ctx)
try {
let url = `https://discord.com/api/v10/guilds/${input.guildId}/members?limit=${input.perPage}`
if (input.page !== undefined && input.page !== '') {
url += `&after=${input.page}`
}
const config = {
method: 'get',
url,
headers: {
Authorization: input.token,
},
}

let url = `https://discord.com/api/v10/guilds/${input.guildId}/members?limit=${input.perPage}`
if (input.page !== undefined && input.page !== '') {
url += `&after=${input.page}`
}

const config = {
method: 'get',
url,
headers: {
Authorization: input.token,
},
}
try {
await rateLimiter.checkRateLimit('getMembers')
await rateLimiter.incrementRateLimit()
const response = await axios(config)
Expand All @@ -35,19 +37,10 @@ async function getMembers(
timeUntilReset,
}
} catch (err) {
if (err.response.status === 429) {
ctx.log.warn(
`Rate limit exceeded in Get Members. Wait value in header is ${err.response.headers['x-ratelimit-reset-after']}`,
)
return {
records: [],
nextPage: input.page,
limit: 0,
timeUntilReset: err.response.headers['x-ratelimit-reset-after'],
}
const newErr = handleDiscordError(err, config, { input }, ctx)
if (newErr) {
throw newErr
}
ctx.log.error({ err, input }, 'Error while getting members from Discord')
throw err
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,29 @@ import axios from 'axios'
import { DiscordApiMessage, DiscordParsedReponse, DiscordGetMessagesInput } from '../types'
import { IProcessStreamContext } from '../../../types'
import { getRateLimiter } from './handleRateLimit'
import { handleDiscordError } from './errorHandler'

async function getMessages(
input: DiscordGetMessagesInput,
ctx: IProcessStreamContext,
showError = true,
showErrors = true,
): Promise<DiscordParsedReponse> {
const rateLimiter = getRateLimiter(ctx)
try {
let url = `https://discord.com/api/v10/channels/${input.channelId}/messages?limit=${input.perPage}`
if (input.page !== undefined && input.page !== '') {
url += `&before=${input.page}`
}
const config = {
method: 'get',
url,
headers: {
Authorization: input.token,
},
}

let url = `https://discord.com/api/v10/channels/${input.channelId}/messages?limit=${input.perPage}`
if (input.page !== undefined && input.page !== '') {
url += `&before=${input.page}`
}

const config = {
method: 'get',
url,
headers: {
Authorization: input.token,
},
}

try {
await rateLimiter.checkRateLimit('getMessages')
await rateLimiter.incrementRateLimit()
const response = await axios(config)
Expand All @@ -37,27 +40,18 @@ async function getMessages(
timeUntilReset,
}
} catch (err) {
if (err.response.status === 429) {
ctx.log.warn(
`Rate limit exceeded in Get Messages. Wait value in header is ${err.response.headers['x-ratelimit-reset-after']}`,
)
return {
records: [],
nextPage: input.page,
limit: 0,
timeUntilReset: err.response.headers['x-ratelimit-reset-after'],
}
}
if (!showError) {
if (!showErrors) {
return {
records: [],
nextPage: '',
limit: 0,
timeUntilReset: 0,
}
}
ctx.log.error({ err, input }, 'Error while getting messages from Discord')
throw err
const newErr = handleDiscordError(err, config, { input }, ctx)
if (newErr) {
throw newErr
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { IProcessStreamContext } from '../../../types'

const DISCORD_RATE_LIMIT = 50
const DISCORD_RATE_LIMIT_TIME = 1
const REDIS_KEY = 'discord-request-count'
const DISCORD_RATE_LIMIT = 100000
const DISCORD_RATE_LIMIT_TIME = 100 // 100 seconds
const REDIS_KEY = 'discord-ratelimits-requests-count'

export const getRateLimiter = (ctx: IProcessStreamContext) => {
return ctx.getRateLimiter(DISCORD_RATE_LIMIT, DISCORD_RATE_LIMIT_TIME, REDIS_KEY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import processWebhookStream from './processWebhookStream'
const descriptor: IIntegrationDescriptor = {
type: PlatformType.DISCORD,
memberAttributes: DISCORD_MEMBER_ATTRIBUTES,
checkEvery: 4 * 60, // 4 hours
checkEvery: 12 * 60, // 12 hours
generateStreams,
processStream,
processData,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ const processMembersStream: ProcessStreamHandler = async (ctx) => {
guildId: data.guildId,
token: getDiscordToken(ctx),
page: data.page,
perPage: 100,
perPage: 1000,
},
ctx,
)
Expand Down
8 changes: 6 additions & 2 deletions services/libs/redis/src/rateLimiter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@ export class RateLimiter implements IRateLimiter {
const requestCount = value === null ? 0 : parseInt(value)
const canMakeRequest = requestCount < this.maxRequests

if (requestCount === 0) {
await this.cache.set(this.counterKey, '0', this.timeWindowSeconds)
}

if (!canMakeRequest) {
const sleepTime = this.timeWindowSeconds + Math.floor(Math.random() * this.maxRequests)
const sleepTime = this.timeWindowSeconds + Math.floor(Math.random() * this.timeWindowSeconds)
throw new RateLimitError(sleepTime, endpoint)
}
}

public async incrementRateLimit() {
await this.cache.increment(this.counterKey, 1, this.timeWindowSeconds)
await this.cache.increment(this.counterKey, 1)
}
}

Expand Down

0 comments on commit b8dc26c

Please sign in to comment.