Skip to content

Commit

Permalink
Improvements to reduce deadlocks (#589)
Browse files Browse the repository at this point in the history
  • Loading branch information
pushchris authored Jan 5, 2025
1 parent a5a82c8 commit e8d2cd9
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
exports.up = async function(knex) {
await knex.schema.alterTable('journey_user_step', function(table) {
table.index(['journey_id', 'type', 'delay_until'])
table.dropIndex(['type', 'delay_until'])
})
}

exports.down = async function(knex) {
await knex.schema.alterTable('journey_user_step', function(table) {
table.index(['type', 'delay_until'])
table.dropIndex(['journey_id', 'type', 'delay_until'])
})
}
7 changes: 0 additions & 7 deletions apps/platform/src/lists/ListEvaluateUserJob.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import App from '../app'
import { cacheIncr } from '../config/redis'
import { Job } from '../queue'
import { JobPriority } from '../queue/Job'
import { getUser } from '../users/UserRepository'
import { DynamicList } from './List'
import { CacheKeys, cleanupList, evaluateUserList, getList } from './ListService'
Expand All @@ -17,12 +16,6 @@ interface ListEvaluateUserParams {
export default class ListEvaluateUserJob extends Job {
static $name = 'list_evaluate_user_job'

options = {
delay: 0,
attempts: 3,
priority: JobPriority.low,
}

static from(params: ListEvaluateUserParams): ListEvaluateUserJob {
return new this(params)
}
Expand Down
15 changes: 14 additions & 1 deletion apps/platform/src/utilities/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,22 @@ export const chunk = async <T>(
modifier: (result: any) => T = (result) => result,
) => {
const chunker = new Chunker(callback, size)
const handler = async (result: any, retries = 3) => {
try {
await chunker.add(modifier(result))
} catch (error: any) {

// In the case of deadlocks, retry the operation
if (['ER_LOCK_WAIT_TIMEOUT', 'ER_LOCK_DEADLOCK'].includes(error.code) && retries > 0) {
setTimeout(() => handler(result, retries - 1), 250)
} else {
throw error
}
}
}
await query.stream(async function(stream) {
for await (const result of stream) {
await chunker.add(modifier(result))
await handler(result)
}
})
await chunker.flush()
Expand Down

0 comments on commit e8d2cd9

Please sign in to comment.