Skip to content

Commit

Permalink
Lock transaction fix (#349)
Browse files Browse the repository at this point in the history
* Fix bug where singleTransaction and erorrs cause an unlocked lock

* Use a runCommand wrapper instead of setup and teardown (for locking)

We might be able to avoid all the weird promise-callback assignment too.

Co-authored-by: Misha Kaletsky <mmkal@users.noreply.github.com>
  • Loading branch information
mmkal and mmkal authored Aug 12, 2021
1 parent 52be1ed commit 715df09
Show file tree
Hide file tree
Showing 9 changed files with 85 additions and 22 deletions.
50 changes: 36 additions & 14 deletions packages/migrator/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export class SlonikMigrator extends umzug.Umzug<SlonikMigratorContext> {
parent: slonikMigratorOptions.slonik,
sql,
commit: null as never, // commit function is added later by storage setup.
connection: null as never, // commit function is added later by storage setup.
connection: null as never, // connection function is added later by storage setup.
}),
migrations: () => ({
glob: [this.migrationsGlob(), {cwd: path.resolve(slonikMigratorOptions.migrationsPath)}],
Expand Down Expand Up @@ -137,6 +137,38 @@ export class SlonikMigrator extends umzug.Umzug<SlonikMigratorContext> {
`)
}

async runCommand<T>(command: string, cb: ({context}: {context: SlonikMigratorContext}) => Promise<T>) {
if (command === 'up' || command === 'down') {
return super.runCommand(command, async ({context}) => {
return context.parent.connect(async conn => {
const logger = this.slonikMigratorOptions.logger
const timeout = setTimeout(
() =>
logger?.info({
message: `Waiting for lock. This may mean another process is simultaneously running migrations. You may want to issue a command like "set lock_timeout = '10s'" if this happens frequently. Othrewise, this command may wait until the process is killed.`,
}),
1000,
)
await conn.any(context.sql`select pg_advisory_lock(${this.advisoryLockId()})`)

try {
clearTimeout(timeout)
const result = await cb({context})
return result
} finally {
await conn.any(context.sql`select pg_advisory_unlock(${this.advisoryLockId()})`).catch(error => {
this.slonikMigratorOptions.logger?.error({
message: `Failed to unlock. This is expected if the lock acquisition timed out. Otherwise, you may need to run "select pg_advisory_unlock(${this.advisoryLockId()})" manually`,
originalError: error,
})
})
}
})
})
}
return super.runCommand(command, cb)
}

protected async setup({context}: {context: SlonikMigratorContext}) {
let settle!: Function
const settledPromise = new Promise(resolve => (settle = resolve))
Expand All @@ -159,13 +191,7 @@ export class SlonikMigrator extends umzug.Umzug<SlonikMigratorContext> {
return connectionPromise
}

const logger = this.slonikMigratorOptions.logger

try {
const timeout = setTimeout(() => logger?.info({message: `Waiting for lock...`} as any), 1000)
await context.connection.any(context.sql`select pg_advisory_lock(${this.advisoryLockId()})`)
clearTimeout(timeout)

await this.getOrCreateMigrationsTable(context)
} catch (e) {
await context.commit()
Expand All @@ -174,14 +200,10 @@ export class SlonikMigrator extends umzug.Umzug<SlonikMigratorContext> {
}

protected async teardown({context}: {context: SlonikMigratorContext}) {
await context.connection.query(context.sql`select pg_advisory_unlock(${this.advisoryLockId()})`).catch(error => {
this.slonikMigratorOptions.logger?.error({
message: `Failed to unlock. This is expected if the lock acquisition timed out.`,
originalError: error,
})
})
await context.commit()
// Note: the unlock command needs to be done using the parent connection, since `teardown` can be called when an error is thrown.
// When `singleTransaction: true` this means unlock is _never_ called.
context.connection = null as never
await context.commit()
}

protected hash(name: string) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
drop table locking_test_table;
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
create table locking_test_table(id int primary key);
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
this is a syntax error
2 changes: 1 addition & 1 deletion packages/migrator/test/generated/run/migrations/04.four.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {Migration} from '../../../..'
import {Migration} from '../../../../src'

export const up: Migration = ({context: {connection, sql}}) => connection.query(sql`create table migration_test_4(id int)`)
export const down: Migration = ({context: {connection, sql}}) => connection.query(sql`drop table migration_test_4`)
41 changes: 40 additions & 1 deletion packages/migrator/test/locking.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,39 @@ describe('locking', () => {

expect(await migrator().executed().then(names)).toEqual(['m1.sql', 'm2.sql'])
})

test(`singleTransaction doesn't prevent unlock`, async () => {
const baseDir = path.join(__dirname, 'generated', helper.schemaName, 'singelTransaction')
const syncer = fsSyncer(baseDir, {
migrations: {
'm1.sql': 'create table locking_test_table(id int primary key);',
'm2.sql': 'this is a syntax error',
down: {
'm1.sql': 'drop table locking_test_table;',
'm2.sql': '',
},
},
})
syncer.sync()

const migrator = new SlonikMigrator({
slonik: helper.pool,
migrationsPath: path.join(syncer.baseDir, 'migrations'),
migrationTableName: 'locking_transaction_migrations',
singleTransaction: true,
logger: helper.mockLogger,
})

expect(await migrator.pending().then(names)).toEqual(['m1.sql', 'm2.sql'])

await expect(migrator.up()).rejects.toThrowErrorMatchingInlineSnapshot(
`"Migration m2.sql (up) failed: Original error: syntax error at or near \\"this\\""`,
)

expect(helper.mockLogger.error).not.toHaveBeenCalled()

expect(await migrator.pending().then(names)).toEqual(['m1.sql', 'm2.sql'])
})
})

describe('concurrency', () => {
Expand Down Expand Up @@ -133,7 +166,7 @@ describe('concurrency', () => {
slonik: helper.pool,
migrationsPath: path.join(syncer.baseDir, 'migrations'),
migrationTableName: 'migrations',
logger: undefined,
logger: helper.mockLogger,
})

const [m1, m2] = [migrator(), migrator()]
Expand All @@ -156,5 +189,11 @@ describe('concurrency', () => {
expect(m2MigratingSpy).not.toHaveBeenCalled()

expect(await migrator().executed().then(names)).toEqual(['m1.sql', 'm2.sql'])

expect(helper.mockLogger.info).toHaveBeenCalledWith({
message: expect.stringContaining(
`Waiting for lock. This may mean another process is simultaneously running migrations. You may want to issue a command like "set lock_timeout = '10s'" if this happens frequently. Othrewise, this command may wait until the process is killed.`,
),
})
})
})
9 changes: 4 additions & 5 deletions packages/migrator/test/pool-helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,11 @@ export const getPoolHelper = (params: {__filename: string; config?: ClientConfig
await pool.query(sql`create schema ${schemaIdentifier}`)
})

const mockLog = jest.fn()
const mockLogger = {
debug: mockLog,
info: mockLog,
warn: mockLog,
error: mockLog,
debug: jest.fn(),
info: jest.fn(),
warn: jest.fn(),
error: jest.fn(),
}

/** Get the names from a list of migrations. Useful for light assertions */
Expand Down
2 changes: 1 addition & 1 deletion packages/migrator/test/script-migrations.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ describe('run sql, js and ts migrations', () => {
module.exports.down = ({context: {connection, sql}}) => connection.query(sql\`drop table migration_test_3\`)
`,
'04.four.ts': dedent`
import {Migration} from '../../../..'
import {Migration} from '../../../../src'
export const up: Migration = ({context: {connection, sql}}) => connection.query(sql\`create table migration_test_4(id int)\`)
export const down: Migration = ({context: {connection, sql}}) => connection.query(sql\`drop table migration_test_4\`)
Expand Down

0 comments on commit 715df09

Please sign in to comment.