Skip to content

Commit

Permalink
fix(jobs): try to handle knex pool exhaustion (#2771)
Browse files Browse the repository at this point in the history
## Describe your changes

- Handle unhandledRejection and uncaughtException
This is something we need to do globally, but it's the opportunity to
start. Not entirely sure the error we have is related to uncaught error
but we'll see. We don't properly log errors anywhere because we don't
have a working sentry and our Datadog error log is shaky.

- Add proper health checks
Turns out /health is only called on web service, so our current health
check (which was not doing much) was useless anyway. I try to query
nothing and timeout quickly

- Remove transaction from Delete Syncs cron
This is the only place where there was an exotic way of using
transactions. Honestly, I'm not convinced there was an issue but since
the transaction was useless I'm removing it
  • Loading branch information
bodinsamuel authored Sep 26, 2024
1 parent 7c8a114 commit d695be3
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 51 deletions.
6 changes: 4 additions & 2 deletions packages/database/lib/getConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ export function getDbConfig({ timeoutMs }: { timeoutMs: number }): Knex.Config {
statement_timeout: timeoutMs
},
pool: {
min: parseInt(process.env['NANGO_DB_POOL_MIN'] || '2'),
max: parseInt(process.env['NANGO_DB_POOL_MAX'] || '30')
min: parseInt(process.env['NANGO_DB_POOL_MIN'] || '0'),
max: parseInt(process.env['NANGO_DB_POOL_MAX'] || '30'),
acquireTimeoutMillis: 20000,
createTimeoutMillis: 10000
},
// SearchPath needs the current db and public because extension can only be installed once per DB
searchPath: [defaultSchema, 'public', ...additionalSchemas]
Expand Down
52 changes: 44 additions & 8 deletions packages/jobs/lib/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { deleteSyncsData } from './crons/deleteSyncsData.js';
import { getLogger, stringifyError } from '@nangohq/utils';
import { timeoutLogsOperations } from './crons/timeoutLogsOperations.js';
import { envs } from './env.js';
import db from '@nangohq/database';

const logger = getLogger('Jobs');

Expand All @@ -16,20 +17,55 @@ try {
logger.info(`🚀 service ready at http://localhost:${port}`);
const processor = new Processor(orchestratorUrl);

// We are using a setTimeout because we don't want overlapping setInterval if the DB is down
let healthCheck: NodeJS.Timeout | undefined;
const check = async () => {
try {
await db.knex.raw('SELECT 1').timeout(1000);
healthCheck = setTimeout(check);
} catch (err) {
logger.error('HealthCheck failed...', err);
void close();
}
};
void check();

const close = async () => {
clearTimeout(healthCheck);
processor.stop();
await db.knex.destroy();
srv.close(() => {
process.exit();
});
};

process.on('SIGINT', () => {
logger.info('Received SIGINT...');
void close();
});

process.on('SIGTERM', () => {
logger.info('Received SIGTERM...');
void close();
});

process.on('unhandledRejection', (reason) => {
logger.error('Received unhandledRejection...', reason);
process.exitCode = 1;
void close();
});

process.on('uncaughtException', (e) => {
logger.error('Received uncaughtException...', e);
// not closing on purpose
});

processor.start();

// Register recurring tasks
cronAutoIdleDemo();
deleteSyncsData();
timeoutLogsOperations();

// handle SIGTERM
process.on('SIGTERM', () => {
processor.stop();
srv.close(() => {
process.exit(0);
});
});
} catch (err) {
logger.error(stringifyError(err));
process.exit(1);
Expand Down
72 changes: 34 additions & 38 deletions packages/jobs/lib/crons/deleteSyncsData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ export function deleteSyncsData(): void {
const start = Date.now();
try {
await exec();

logger.info('[deleteSyncs] ✅ done');
} catch (err: unknown) {
const e = new Error('failed_to_hard_delete_syncs_data', { cause: err instanceof Error ? err.message : err });
errorManager.report(e, { source: ErrorSourceEnum.PLATFORM }, tracer);
Expand All @@ -31,49 +33,43 @@ export function deleteSyncsData(): void {
export async function exec(): Promise<void> {
logger.info('[deleteSyncs] starting');

await db.knex.transaction(async (trx) => {
// Because it's slow and create deadlocks
// we need to acquire a Lock that prevents any other duplicate cron to execute the same thing
const { rows } = await trx.raw<{ rows: { delete_syncs: boolean }[] }>(`SELECT pg_try_advisory_xact_lock(?) as delete_syncs`, [123456789]);
if (!rows || rows.length <= 0 || !rows[0]!.delete_syncs) {
logger.info(`[deleteSyncs] could not acquire lock, skipping`);
return;
}

// NB: we are not using trx again, we only care about the lock
// Because it's slow and create deadlocks
// we need to acquire a Lock that prevents any other duplicate cron to execute the same thing
const { rows } = await db.knex.raw<{ rows: { delete_syncs: boolean }[] }>(`SELECT pg_try_advisory_xact_lock(?) as delete_syncs`, [123456789]);
if (!rows || rows.length <= 0 || !rows[0]!.delete_syncs) {
logger.info(`[deleteSyncs] could not acquire lock, skipping`);
return;
}

const syncs = await findRecentlyDeletedSync();
const syncs = await findRecentlyDeletedSync();

const orchestrator = new Orchestrator(orchestratorClient);
const orchestrator = new Orchestrator(orchestratorClient);

for (const sync of syncs) {
logger.info(`[deleteSyncs] deleting syncId: ${sync.id}`);
for (const sync of syncs) {
logger.info(`[deleteSyncs] deleting syncId: ${sync.id}`);

// Soft delete jobs
let countJobs = 0;
do {
countJobs = await softDeleteJobs({ syncId: sync.id, limit: limitJobs });
logger.info(`[deleteSyncs] soft deleted ${countJobs} jobs`);
metrics.increment(metrics.Types.JOBS_DELETE_SYNCS_DATA_JOBS, countJobs);
} while (countJobs >= limitJobs);
// Soft delete jobs
let countJobs = 0;
do {
countJobs = await softDeleteJobs({ syncId: sync.id, limit: limitJobs });
logger.info(`[deleteSyncs] soft deleted ${countJobs} jobs`);
metrics.increment(metrics.Types.JOBS_DELETE_SYNCS_DATA_JOBS, countJobs);
} while (countJobs >= limitJobs);

// -----
// Soft delete schedules
const resSchedule = await orchestrator.deleteSync({ syncId: sync.id, environmentId: sync.environmentId });
const deletedScheduleCount = resSchedule.isErr() ? 1 : 0;
logger.info(`[deleteSyncs] soft deleted ${deletedScheduleCount} schedules`);
metrics.increment(metrics.Types.JOBS_DELETE_SYNCS_DATA_SCHEDULES, deletedScheduleCount);
// -----
// Soft delete schedules
const resSchedule = await orchestrator.deleteSync({ syncId: sync.id, environmentId: sync.environmentId });
const deletedScheduleCount = resSchedule.isErr() ? 1 : 0;
logger.info(`[deleteSyncs] soft deleted ${deletedScheduleCount} schedules`);
metrics.increment(metrics.Types.JOBS_DELETE_SYNCS_DATA_SCHEDULES, deletedScheduleCount);

// ----
// hard delete records
let deletedRecords = 0;
for (const model of sync.models) {
const res = await records.deleteRecordsBySyncId({ connectionId: sync.connectionId, model, syncId: sync.id, limit: limitRecords });
deletedRecords += res.totalDeletedRecords;
}
metrics.increment(metrics.Types.JOBS_DELETE_SYNCS_DATA_RECORDS, deletedRecords);
// ----
// hard delete records
let deletedRecords = 0;
for (const model of sync.models) {
const res = await records.deleteRecordsBySyncId({ connectionId: sync.connectionId, model, syncId: sync.id, limit: limitRecords });
deletedRecords += res.totalDeletedRecords;
}
});

logger.info('[deleteSyncs] ✅ done');
metrics.increment(metrics.Types.JOBS_DELETE_SYNCS_DATA_RECORDS, deletedRecords);
}
}
5 changes: 4 additions & 1 deletion packages/jobs/lib/routes/getHealth.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,8 @@ export const routeHandler: RouteHandler<Health> = {
path,
method,
validate: (_req, _res, next) => next(), // No extra validation needed
handler: (_req, res) => res.status(200).json({ status: 'ok' })
handler: (_req, res) => {
// Not used in Render
res.status(200).json({ status: 'ok' });
}
};
3 changes: 3 additions & 0 deletions packages/jobs/lib/tracer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ tracer.init({
tracer.use('pg', {
service: (params: { database: string }) => `postgres-${params.database}`
});
tracer.use('express', {
enabled: true
});
tracer.use('elasticsearch', {
service: 'nango-elasticsearch'
});
Expand Down
2 changes: 1 addition & 1 deletion packages/shared/lib/services/sync/job.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ export const updateLatestJobSyncStatus = async (sync_id: string, status: SyncSta
* @desc grab any existing results and add them to the current
*/
export const updateSyncJobResult = async (id: number, result: SyncResultByModel, model: string): Promise<SyncJob> => {
return db.knex.transaction(async (trx) => {
return await db.knex.transaction(async (trx) => {
const { result: existingResult } = await trx.from<SyncJob>(SYNC_JOB_TABLE).select('result').forUpdate().where({ id }).first();

if (!existingResult || Object.keys(existingResult).length === 0) {
Expand Down
2 changes: 1 addition & 1 deletion packages/utils/lib/telemetry/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ export function time<T, E, F extends (...args: E[]) => Promise<T>>(metricName: T
};

// This function should handle both async/sync function
// So it's try/catch regular execution and use .then() for async
// So it try/catch regular execution and use .then() for async
// @ts-expect-error can't fix this
return function wrapped(...args: any) {
const start = process.hrtime();
Expand Down

0 comments on commit d695be3

Please sign in to comment.