Skip to content

Commit

Permalink
feat: check to reset org generations if cadt is ahead of DL
Browse files Browse the repository at this point in the history
  • Loading branch information
wwills2 committed Sep 20, 2024
1 parent c965a53 commit b8e2529
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 9 deletions.
4 changes: 2 additions & 2 deletions src/config/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ const persistanceFolder = `${chiaRoot}/cadt/${getDataModelVersion()}`;
const localQueryLogger = (query) => {
const queryString = query.split(/:\s(.+)/)[1];
const queryHash = createHash('md5').update(queryString).digest('hex');
logger.debug(`SQLite Sequelize [query hash: ${queryHash}]\n\t${query}`);
logger.silly(`SQLite Sequelize [query hash: ${queryHash}]\n\t${query}`);
};

const mirrorQueryLogger = (query) => {
const queryString = query.split(/:\s(.+)/)[1];
const queryHash = createHash('md5').update(queryString).digest('hex');
logger.debug(`Mirror DB Sequelize [query hash: ${queryHash}]\n\t${query}`);
logger.silly(`Mirror DB Sequelize [query hash: ${queryHash}]\n\t${query}`);
};

const appLogLevel = getConfig().APP.LOG_LEVEL;
Expand Down
4 changes: 4 additions & 0 deletions src/datalayer/persistance.js
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,10 @@ const getSyncStatus = async (storeId) => {
// We just care that we got some response, not what the response is
if (Object.keys(data).includes('success')) {
return data;
} else {
logger.warn(
`datalayer '/get_sync_status' RPC failed to get sync status for ${storeId}`,
);
}

return false;
Expand Down
84 changes: 77 additions & 7 deletions src/tasks/sync-registries.js
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ const syncOrganizationAudit = async (organization) => {
const rootHistory = await datalayer.getRootHistory(organization.registryId);

if (!rootHistory.length) {
logger.info(
logger.warn(
`No root history found for ${organization.name} (store ${organization.orgUid})`,
);
return;
Expand Down Expand Up @@ -258,20 +258,22 @@ const syncOrganizationAudit = async (organization) => {
`1 Last processed index of ${organization.name}: ${lastProcessedIndex}`,
);

if (lastProcessedIndex > rootHistory.length) {
if (!rootHistory?.length) {
logger.error(
`Could not find root history for ${organization.name} (store ${organization.orgUid}) with timestamp ${currentGeneration.timestamp}, something is wrong and the sync for this organization will be paused until this is resolved.`,
);
return;
}

const rootHistoryZeroBasedCount = rootHistory.length - 1;
const syncRemaining = rootHistoryZeroBasedCount - lastProcessedIndex;
const isSynced = syncRemaining === 0;
logger.debug(`2 the root history length for ${organization.name} is ${rootHistory.length}
and the last processed generation is ${lastProcessedIndex}`);
logger.debug(`2 the highest root history index is ${rootHistoryZeroBasedCount},
given this and the last processed index, the number of generations left to sync is ${syncRemaining}`);

logger.debug(
`2 the root history length for ${organization.name} is ${rootHistory.length} and the last processed generation is ${lastProcessedIndex}`,
);
logger.debug(
`2 the highest root history index is ${rootHistoryZeroBasedCount}, given this and the last processed index, the number of generations left to sync is ${syncRemaining}`,
);
logger.debug(
`updating organization model with new sync status for ${organization.name}`,
);
Expand Down Expand Up @@ -315,6 +317,36 @@ const syncOrganizationAudit = async (organization) => {
organization.registryId,
);

if (
sync_status &&
sync_status?.generation &&
sync_status?.target_generation
) {
logger.debug(
`store ${organization.registryId} (${organization.name}) is currently at generation ${sync_status.generation} with a target generation of ${sync_status.target_generation}`,
);
} else {
logger.error(
`could not get datalayer sync status for store ${organization.registryId} (${organization.name}). pausing sync until sync status can be retrieved`,
);
return;
}

const orgRequiredResetDueToInvalidGenerationIndex =
await orgGenerationMismatchCheck(
organization.orgUid,
lastProcessedIndex,
sync_status.generation,
sync_status.target_generation,
);

if (orgRequiredResetDueToInvalidGenerationIndex) {
logger.info(
`${organization.name} was ahead of datalayer and needed to resync a few generations. trying again shortly...`,
);
return;
}

if (toBeProcessedIndex > sync_status.generation) {
const warningMsg = [
`No data found for ${organization.name} (store ${organization.orgUid}) in the current datalayer generation.`,
Expand Down Expand Up @@ -509,4 +541,42 @@ const syncOrganizationAudit = async (organization) => {
}
};

/**
* checks if an organization needs to be reset to a generation, and performs the reset. notifies the caller that the
* org was reset.
*
* datalayer store singletons can lose generation indexes due to blockchain reorgs. while the data is intact, in datalayer
* and cadt, this effectively makes the last synced cadt generation a 'future' generation, which causes problems.
*
* if the DL store is synced, and the cadt generation is higher than the DL generation, the org is resynced to 2 generations
* back from the highest DL generation
* @param orgUid
* @param cadtLastProcessedGeneration
* @param registryStoreGeneration
* @param registryStoreTargetGeneration
* @return {Promise<boolean>}
*/
const orgGenerationMismatchCheck = async (
orgUid,
cadtLastProcessedGeneration,
registryStoreGeneration,
registryStoreTargetGeneration,
) => {
const storeSynced = registryStoreGeneration === registryStoreTargetGeneration;
const lastProcessedGenerationIndexDoesNotExistInDatalayer =
cadtLastProcessedGeneration > registryStoreGeneration;

if (storeSynced && lastProcessedGenerationIndexDoesNotExistInDatalayer) {
const resetToGeneration = registryStoreGeneration - 2; // -2 to have a margin
logger.info(
`resetting org with orgUid ${orgUid} to generation ${resetToGeneration}`,
);

await Audit.resetToGeneration(resetToGeneration, orgUid);
return true;
} else {
return false;
}
};

export default job;

0 comments on commit b8e2529

Please sign in to comment.