diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index c165f550..157392b9 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -166,13 +166,13 @@ jobs: - name: Upload Mac Installer if: matrix.runs-on == 'macos-latest' - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: cadt-mac-installer path: ${{ github.workspace }}/build-scripts/macos/target/ready-to-upload - name: Upload artifacts - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: ${{ matrix.artifact-name }} path: ${{ github.workspace }}/dist @@ -213,7 +213,7 @@ jobs: sudo cp ./node_modules/sqlite3/build/Release/node_sqlite3.node ./dist/ - name: Upload artifacts - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: cadt-linux-arm64 path: ${{ github.workspace }}/dist @@ -236,7 +236,7 @@ jobs: uses: actions/checkout@v4 - name: Download Linux artifacts - uses: actions/download-artifact@v3 + uses: actions/download-artifact@v4 with: name: ${{ matrix.name }} path: ${{ matrix.name }} @@ -265,7 +265,7 @@ jobs: dpkg-deb --build --root-owner-group "deb/$CLI_DEB_BASE" - name: Upload deb - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: ${{ matrix.name }}-deb path: ${{ github.workspace }}/deb/*.deb @@ -277,31 +277,31 @@ jobs: - debs steps: - name: Download Windows artifacts - uses: actions/download-artifact@v3 + uses: actions/download-artifact@v4 with: name: cadt-windows-x64 path: cadt-windows-x64 - name: Download MacOS artifacts - uses: actions/download-artifact@v3 + uses: actions/download-artifact@v4 with: name: cadt-mac-installer path: cadt-mac-installer - name: Download Linux artifacts - uses: actions/download-artifact@v3 + uses: actions/download-artifact@v4 with: name: cadt-linux-x64 path: cadt-linux-x64 - name: Download Linux x64 deb - uses: actions/download-artifact@v3 + uses: actions/download-artifact@v4 with: name: cadt-linux-x64-deb path: cadt-linux-x64-deb - name: Download Linux arm64 deb - uses: actions/download-artifact@v3 + uses: actions/download-artifact@v4 with: name: cadt-linux-arm64-deb path: cadt-linux-arm64-deb @@ -318,7 +318,7 @@ jobs: zip -r cadt-linux-x64-${{ steps.tag-name.outputs.TAGNAME }}.zip cadt-linux-x64 - name: Release - uses: softprops/action-gh-release@v2.0.9 + uses: softprops/action-gh-release@v2.1.0 with: files: | cadt-windows-x64-${{ steps.tag-name.outputs.TAGNAME }}.zip diff --git a/src/models/audit/audit.model.js b/src/models/audit/audit.model.js index e10057df..5e758530 100644 --- a/src/models/audit/audit.model.js +++ b/src/models/audit/audit.model.js @@ -7,6 +7,7 @@ import { AuditMirror } from './audit.model.mirror'; import ModelTypes from './audit.modeltypes.cjs'; import findDuplicateIssuancesSql from './sql/find-duplicate-issuances.sql.js'; import { Organization } from '../organizations/index.js'; +import { waitForSyncRegistriesTransaction } from '../../utils/model-utils.js'; class Audit extends Model { static async create(values, options) { @@ -107,4 +108,8 @@ Audit.init(ModelTypes, { updatedAt: true, }); +Audit.addHook('beforeFind', async () => { + await waitForSyncRegistriesTransaction(); +}); + export { Audit }; diff --git a/src/tasks/sync-registries.js b/src/tasks/sync-registries.js index 095bd440..b5d9db9c 100644 --- a/src/tasks/sync-registries.js +++ b/src/tasks/sync-registries.js @@ -1,7 +1,6 @@ import _ from 'lodash'; import { SimpleIntervalJob, Task } from 'toad-scheduler'; -import { Mutex } from 'async-mutex'; import { Organization, Audit, ModelKeys, Staging, Meta } from '../models'; import datalayer from '../datalayer'; import { @@ -22,15 +21,18 @@ import { migrateToNewSync, generateGenerationIndex, } from '../utils/sync-migration-utils'; +import { + processingSyncRegistriesTransactionMutex, + syncRegistriesTaskMutex, +} from '../utils/model-utils.js'; dotenv.config(); -const mutex = new Mutex(); const CONFIG = getConfig().APP; const task = new Task('sync-registries', async () => { - if (!mutex.isLocked()) { - logger.debug('running sync registries task'); - const releaseMutex = await mutex.acquire(); + logger.debug('sync registries task invoked'); + if (!syncRegistriesTaskMutex.isLocked()) { + const releaseSyncTaskMutex = await syncRegistriesTaskMutex.acquire(); try { const hasMigratedToNewSyncMethod = await Meta.findOne({ where: { metaKey: 'migratedToNewSync' }, @@ -61,7 +63,7 @@ const task = new Task('sync-registries', async () => { ); } } finally { - releaseMutex(); + releaseSyncTaskMutex(); } } }); @@ -87,18 +89,65 @@ const processJob = async () => { }); for (const organization of organizations) { - await syncOrganizationAudit(organization); + if (CONFIG.USE_SIMULATOR || process.env.NODE_ENV === 'test') { + await syncOrganizationAudit(organization); + } else { + const mostRecentOrgAuditRecord = await Audit.findOne({ + where: { + orgUid: organization.orgUid, + }, + order: [['createdAt', 'DESC']], + limit: 1, + raw: true, + }); + + // verify that the latest organization root hash is up to date with the audit records. attempt correction. + if (mostRecentOrgAuditRecord.rootHash !== organization.registryHash) { + logger.warn( + `latest root hash in org table for organization ${organization.name} (orgUid ${organization.orgUid}) does not match the audit records. attempting to correct`, + ); + try { + const result = await Organization.update( + { registryHash: mostRecentOrgAuditRecord.rootHash }, + { + where: { orgUid: organization.orgUid }, + }, + ); + + if (result?.length) { + logger.info( + `registry hash record corrected for ${organization.name} (orgUid ${organization.orgUid}). proceeding with audit sync`, + ); + const correctedOrganizationRecord = await Organization.findOne({ + where: { orgUid: organization.orgUid }, + }); + + await syncOrganizationAudit(correctedOrganizationRecord); + } else { + throw new Error('organizations update query affected 0 records'); + } + } catch (error) { + logger.error( + `failed to update organization table record for ${organization.name} (orgUid ${organization.orgUid}) with correct root hash. Something is wrong. Skipping audit sync and trying again shortly. Error: ${error}`, + ); + } + } else { + // normal state, proceed with audit sync + await syncOrganizationAudit(organization); + } + } } }; -async function createTransaction(callback, afterCommitCallbacks) { - let result = null; - +async function createAndProcessTransaction(callback, afterCommitCallbacks) { let transaction; let mirrorTransaction; + logger.info('Starting sequelize transaction and acquiring transaction mutex'); + const releaseTransactionMutex = + await processingSyncRegistriesTransactionMutex.acquire(); + try { - logger.info('Starting sequelize transaction'); // Start a transaction transaction = await sequelize.transaction(); @@ -107,7 +156,7 @@ async function createTransaction(callback, afterCommitCallbacks) { } // Execute the provided callback with the transaction - result = await callback(transaction, mirrorTransaction); + await callback(transaction, mirrorTransaction); // Commit the transaction if the callback completes without errors await transaction.commit(); @@ -122,14 +171,18 @@ async function createTransaction(callback, afterCommitCallbacks) { logger.info('Commited sequelize transaction'); - return result; + return true; } catch (error) { // Roll back the transaction if an error occurs if (transaction) { - logger.error('Rolling back transaction'); - console.error(error); + logger.error( + `encountered error syncing organization audit. Rolling back transaction. Error: ${error}`, + ); await transaction.rollback(); + return false; } + } finally { + releaseTransactionMutex(); } } @@ -432,7 +485,7 @@ const syncOrganizationAudit = async (organization) => { // by not processing the DELETE for that record. const optimizedKvDiff = optimizeAndSortKvDiff(kvDiff); - const updateTransaction = async (transaction, mirrorTransaction) => { + const updateAuditTransaction = async (transaction, mirrorTransaction) => { logger.info( `Syncing ${organization.name} generation ${toBeProcessedDatalayerGenerationIndex} (orgUid ${organization.orgUid}, registryId ${organization.registryId})`, ); @@ -529,18 +582,8 @@ const syncOrganizationAudit = async (organization) => { } // Create the Audit record - logger.debug( - `creating audit model entry for ${organization.name} transacton`, - ); + logger.debug(`creating audit model transaction entry`); await Audit.create(auditData, { transaction, mirrorTransaction }); - await Organization.update( - { registryHash: rootToBeProcessed.root_hash }, - { - where: { orgUid: organization.orgUid }, - transaction, - mirrorTransaction, - }, - ); } } }; @@ -549,7 +592,27 @@ const syncOrganizationAudit = async (organization) => { afterCommitCallbacks.push(truncateStaging); } - await createTransaction(updateTransaction, afterCommitCallbacks); + const transactionSucceeded = await createAndProcessTransaction( + updateAuditTransaction, + afterCommitCallbacks, + ); + + if (transactionSucceeded) { + logger.debug( + `updateAuditTransaction successfully completed and committed audit updates for ${organization.name} (orgUid: ${organization.orgUid}, registryId: ${organization.registryId}) generation index ${toBeProcessedDatalayerGenerationIndex}. updating registry hash to ${rootToBeProcessed.root_hash}`, + ); + + await Organization.update( + { registryHash: rootToBeProcessed.root_hash }, + { + where: { orgUid: organization.orgUid }, + }, + ); + } else { + logger.debug( + `updateAuditTransaction failed to complete and commit audit updates for ${organization.name} (orgUid: ${organization.orgUid}, registryId: ${organization.registryId}) generation index ${toBeProcessedDatalayerGenerationIndex}`, + ); + } } catch (error) { logger.error('Error syncing org audit', error); } diff --git a/src/utils/model-utils.js b/src/utils/model-utils.js index f0fbca82..0d77887a 100644 --- a/src/utils/model-utils.js +++ b/src/utils/model-utils.js @@ -1,6 +1,32 @@ import { columnsToInclude } from './helpers.js'; import Sequelize from 'sequelize'; +import { Mutex } from 'async-mutex'; + +export async function waitForSyncRegistriesTransaction() { + if (processingSyncRegistriesTransactionMutex.isLocked()) { + // when the mutex is acquired, the current sync transaction has completed + const releaseMutex = + await processingSyncRegistriesTransactionMutex.acquire(); + await releaseMutex(); + } +} + +/** + * mutex which must be acquired to run the sync-registries task job. + * this mutex exists to prevent multiple registry sync tasks from running at the same time and overloading the chia + * RPC's or causing a SQLite locking error due to multiple task instances trying to commit large update transactions + * @type {Mutex} + */ +export const syncRegistriesTaskMutex = new Mutex(); + +/** + * mutex which must be acquired when writing registry update information until the transaction has been committed + * audit model update transactions are large and lock the DB for long periods. + * @type {Mutex} + */ +export const processingSyncRegistriesTransactionMutex = new Mutex(); + export function formatModelAssociationName(model) { if (model == null || model.model == null) return '';