Skip to content

Commit

Permalink
Merge branch 'develop' into documentation-updates
Browse files Browse the repository at this point in the history
  • Loading branch information
wwills2 authored Nov 22, 2024
2 parents 5179845 + db5cc79 commit 541f862
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 39 deletions.
22 changes: 11 additions & 11 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -166,13 +166,13 @@ jobs:
- name: Upload Mac Installer
if: matrix.runs-on == 'macos-latest'
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: cadt-mac-installer
path: ${{ github.workspace }}/build-scripts/macos/target/ready-to-upload

- name: Upload artifacts
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: ${{ matrix.artifact-name }}
path: ${{ github.workspace }}/dist
Expand Down Expand Up @@ -213,7 +213,7 @@ jobs:
sudo cp ./node_modules/sqlite3/build/Release/node_sqlite3.node ./dist/
- name: Upload artifacts
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: cadt-linux-arm64
path: ${{ github.workspace }}/dist
Expand All @@ -236,7 +236,7 @@ jobs:
uses: actions/checkout@v4

- name: Download Linux artifacts
uses: actions/download-artifact@v3
uses: actions/download-artifact@v4
with:
name: ${{ matrix.name }}
path: ${{ matrix.name }}
Expand Down Expand Up @@ -265,7 +265,7 @@ jobs:
dpkg-deb --build --root-owner-group "deb/$CLI_DEB_BASE"
- name: Upload deb
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: ${{ matrix.name }}-deb
path: ${{ github.workspace }}/deb/*.deb
Expand All @@ -277,31 +277,31 @@ jobs:
- debs
steps:
- name: Download Windows artifacts
uses: actions/download-artifact@v3
uses: actions/download-artifact@v4
with:
name: cadt-windows-x64
path: cadt-windows-x64

- name: Download MacOS artifacts
uses: actions/download-artifact@v3
uses: actions/download-artifact@v4
with:
name: cadt-mac-installer
path: cadt-mac-installer

- name: Download Linux artifacts
uses: actions/download-artifact@v3
uses: actions/download-artifact@v4
with:
name: cadt-linux-x64
path: cadt-linux-x64

- name: Download Linux x64 deb
uses: actions/download-artifact@v3
uses: actions/download-artifact@v4
with:
name: cadt-linux-x64-deb
path: cadt-linux-x64-deb

- name: Download Linux arm64 deb
uses: actions/download-artifact@v3
uses: actions/download-artifact@v4
with:
name: cadt-linux-arm64-deb
path: cadt-linux-arm64-deb
Expand All @@ -318,7 +318,7 @@ jobs:
zip -r cadt-linux-x64-${{ steps.tag-name.outputs.TAGNAME }}.zip cadt-linux-x64
- name: Release
uses: softprops/action-gh-release@v2.0.9
uses: softprops/action-gh-release@v2.1.0
with:
files: |
cadt-windows-x64-${{ steps.tag-name.outputs.TAGNAME }}.zip
Expand Down
5 changes: 5 additions & 0 deletions src/models/audit/audit.model.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { AuditMirror } from './audit.model.mirror';
import ModelTypes from './audit.modeltypes.cjs';
import findDuplicateIssuancesSql from './sql/find-duplicate-issuances.sql.js';
import { Organization } from '../organizations/index.js';
import { waitForSyncRegistriesTransaction } from '../../utils/model-utils.js';

class Audit extends Model {
static async create(values, options) {
Expand Down Expand Up @@ -107,4 +108,8 @@ Audit.init(ModelTypes, {
updatedAt: true,
});

Audit.addHook('beforeFind', async () => {
await waitForSyncRegistriesTransaction();
});

export { Audit };
119 changes: 91 additions & 28 deletions src/tasks/sync-registries.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import _ from 'lodash';

import { SimpleIntervalJob, Task } from 'toad-scheduler';
import { Mutex } from 'async-mutex';
import { Organization, Audit, ModelKeys, Staging, Meta } from '../models';
import datalayer from '../datalayer';
import {
Expand All @@ -22,15 +21,18 @@ import {
migrateToNewSync,
generateGenerationIndex,
} from '../utils/sync-migration-utils';
import {
processingSyncRegistriesTransactionMutex,
syncRegistriesTaskMutex,
} from '../utils/model-utils.js';

dotenv.config();
const mutex = new Mutex();
const CONFIG = getConfig().APP;

const task = new Task('sync-registries', async () => {
if (!mutex.isLocked()) {
logger.debug('running sync registries task');
const releaseMutex = await mutex.acquire();
logger.debug('sync registries task invoked');
if (!syncRegistriesTaskMutex.isLocked()) {
const releaseSyncTaskMutex = await syncRegistriesTaskMutex.acquire();
try {
const hasMigratedToNewSyncMethod = await Meta.findOne({
where: { metaKey: 'migratedToNewSync' },
Expand Down Expand Up @@ -61,7 +63,7 @@ const task = new Task('sync-registries', async () => {
);
}
} finally {
releaseMutex();
releaseSyncTaskMutex();
}
}
});
Expand All @@ -87,18 +89,65 @@ const processJob = async () => {
});

for (const organization of organizations) {
await syncOrganizationAudit(organization);
if (CONFIG.USE_SIMULATOR || process.env.NODE_ENV === 'test') {
await syncOrganizationAudit(organization);
} else {
const mostRecentOrgAuditRecord = await Audit.findOne({
where: {
orgUid: organization.orgUid,
},
order: [['createdAt', 'DESC']],
limit: 1,
raw: true,
});

// verify that the latest organization root hash is up to date with the audit records. attempt correction.
if (mostRecentOrgAuditRecord.rootHash !== organization.registryHash) {
logger.warn(
`latest root hash in org table for organization ${organization.name} (orgUid ${organization.orgUid}) does not match the audit records. attempting to correct`,
);
try {
const result = await Organization.update(
{ registryHash: mostRecentOrgAuditRecord.rootHash },
{
where: { orgUid: organization.orgUid },
},
);

if (result?.length) {
logger.info(
`registry hash record corrected for ${organization.name} (orgUid ${organization.orgUid}). proceeding with audit sync`,
);
const correctedOrganizationRecord = await Organization.findOne({
where: { orgUid: organization.orgUid },
});

await syncOrganizationAudit(correctedOrganizationRecord);
} else {
throw new Error('organizations update query affected 0 records');
}
} catch (error) {
logger.error(
`failed to update organization table record for ${organization.name} (orgUid ${organization.orgUid}) with correct root hash. Something is wrong. Skipping audit sync and trying again shortly. Error: ${error}`,
);
}
} else {
// normal state, proceed with audit sync
await syncOrganizationAudit(organization);
}
}
}
};

async function createTransaction(callback, afterCommitCallbacks) {
let result = null;

async function createAndProcessTransaction(callback, afterCommitCallbacks) {
let transaction;
let mirrorTransaction;

logger.info('Starting sequelize transaction and acquiring transaction mutex');
const releaseTransactionMutex =
await processingSyncRegistriesTransactionMutex.acquire();

try {
logger.info('Starting sequelize transaction');
// Start a transaction
transaction = await sequelize.transaction();

Expand All @@ -107,7 +156,7 @@ async function createTransaction(callback, afterCommitCallbacks) {
}

// Execute the provided callback with the transaction
result = await callback(transaction, mirrorTransaction);
await callback(transaction, mirrorTransaction);

// Commit the transaction if the callback completes without errors
await transaction.commit();
Expand All @@ -122,14 +171,18 @@ async function createTransaction(callback, afterCommitCallbacks) {

logger.info('Commited sequelize transaction');

return result;
return true;
} catch (error) {
// Roll back the transaction if an error occurs
if (transaction) {
logger.error('Rolling back transaction');
console.error(error);
logger.error(
`encountered error syncing organization audit. Rolling back transaction. Error: ${error}`,
);
await transaction.rollback();
return false;
}
} finally {
releaseTransactionMutex();
}
}

Expand Down Expand Up @@ -432,7 +485,7 @@ const syncOrganizationAudit = async (organization) => {
// by not processing the DELETE for that record.
const optimizedKvDiff = optimizeAndSortKvDiff(kvDiff);

const updateTransaction = async (transaction, mirrorTransaction) => {
const updateAuditTransaction = async (transaction, mirrorTransaction) => {
logger.info(
`Syncing ${organization.name} generation ${toBeProcessedDatalayerGenerationIndex} (orgUid ${organization.orgUid}, registryId ${organization.registryId})`,
);
Expand Down Expand Up @@ -529,18 +582,8 @@ const syncOrganizationAudit = async (organization) => {
}

// Create the Audit record
logger.debug(
`creating audit model entry for ${organization.name} transacton`,
);
logger.debug(`creating audit model transaction entry`);
await Audit.create(auditData, { transaction, mirrorTransaction });
await Organization.update(
{ registryHash: rootToBeProcessed.root_hash },
{
where: { orgUid: organization.orgUid },
transaction,
mirrorTransaction,
},
);
}
}
};
Expand All @@ -549,7 +592,27 @@ const syncOrganizationAudit = async (organization) => {
afterCommitCallbacks.push(truncateStaging);
}

await createTransaction(updateTransaction, afterCommitCallbacks);
const transactionSucceeded = await createAndProcessTransaction(
updateAuditTransaction,
afterCommitCallbacks,
);

if (transactionSucceeded) {
logger.debug(
`updateAuditTransaction successfully completed and committed audit updates for ${organization.name} (orgUid: ${organization.orgUid}, registryId: ${organization.registryId}) generation index ${toBeProcessedDatalayerGenerationIndex}. updating registry hash to ${rootToBeProcessed.root_hash}`,
);

await Organization.update(
{ registryHash: rootToBeProcessed.root_hash },
{
where: { orgUid: organization.orgUid },
},
);
} else {
logger.debug(
`updateAuditTransaction failed to complete and commit audit updates for ${organization.name} (orgUid: ${organization.orgUid}, registryId: ${organization.registryId}) generation index ${toBeProcessedDatalayerGenerationIndex}`,
);
}
} catch (error) {
logger.error('Error syncing org audit', error);
}
Expand Down
26 changes: 26 additions & 0 deletions src/utils/model-utils.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,32 @@
import { columnsToInclude } from './helpers.js';
import Sequelize from 'sequelize';

import { Mutex } from 'async-mutex';

export async function waitForSyncRegistriesTransaction() {
if (processingSyncRegistriesTransactionMutex.isLocked()) {
// when the mutex is acquired, the current sync transaction has completed
const releaseMutex =
await processingSyncRegistriesTransactionMutex.acquire();
await releaseMutex();
}
}

/**
* mutex which must be acquired to run the sync-registries task job.
* this mutex exists to prevent multiple registry sync tasks from running at the same time and overloading the chia
* RPC's or causing a SQLite locking error due to multiple task instances trying to commit large update transactions
* @type {Mutex}
*/
export const syncRegistriesTaskMutex = new Mutex();

/**
* mutex which must be acquired when writing registry update information until the transaction has been committed
* audit model update transactions are large and lock the DB for long periods.
* @type {Mutex}
*/
export const processingSyncRegistriesTransactionMutex = new Mutex();

export function formatModelAssociationName(model) {
if (model == null || model.model == null) return '';

Expand Down

0 comments on commit 541f862

Please sign in to comment.