Skip to content

Commit

Permalink
fix: organization meta sync task hanging
Browse files Browse the repository at this point in the history
  • Loading branch information
wwills2 committed Dec 3, 2024
1 parent 1d16f7f commit b735808
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 63 deletions.
4 changes: 2 additions & 2 deletions src/datalayer/syncService.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ const getRootDiff = (storeId, root1, root2) => {
* @param {number} retry - Number of retry attempts.
*/
const getStoreData = async (storeId, callback, onFail, rootHash, retry = 0) => {
const MAX_RETRIES = 50;
const RETRY_DELAY = 120000;
const MAX_RETRIES = 6;
const RETRY_DELAY = 10000;

try {
logger.info(`Getting store data, retry: ${retry}`);
Expand Down
113 changes: 55 additions & 58 deletions src/models/organizations/organizations.model.js
Original file line number Diff line number Diff line change
Expand Up @@ -343,69 +343,66 @@ class Organization extends Model {
try {
const allSubscribedOrganizations = await Organization.findAll({
where: { subscribed: true },
raw: true,
});

await Promise.all(
allSubscribedOrganizations.map(async (organization) => {
const processData = (data, keyFilter) =>
data
.filter(({ key }) => keyFilter(key))
.reduce(
(update, { key, value }) => ({ ...update, [key]: value }),
{},
);

const onFail = (message) => {
logger.info(`Unable to sync metadata from ${organization.orgUid}`);
logger.error(`ORGANIZATION DATA SYNC ERROR: ${message}`);
Organization.update(
{ orgHash: '0' },
{ where: { orgUid: organization.orgUid } },
for (const organization of allSubscribedOrganizations) {
const processData = (data, keyFilter) =>
data
.filter(({ key }) => keyFilter(key))
.reduce(
(update, { key, value }) => ({ ...update, [key]: value }),
{},
);
};

const onResult = async (updateHash, data) => {
try {
const updateData = processData(
data,
(key) => !key.includes('meta_'),
);
const metadata = processData(data, (key) =>
key.includes('meta_'),
);

await Organization.update(
{
..._.omit(updateData, ['registryId']),
prefix: updateData.prefix || '0',
metadata: JSON.stringify(metadata),
},
{ where: { orgUid: organization.orgUid } },
);

logger.debug(
`Updating orgUid ${organization.orgUid} with hash ${updateHash}`,
);
await Organization.update(
{ orgHash: updateHash },
{ where: { orgUid: organization.orgUid } },
);
} catch (error) {
logger.info(error.message);
onFail(error.message);
}
};

datalayer.getStoreIfUpdated(
organization.orgUid,
organization.orgHash,
onResult,
onFail,

const onFail = async (message) => {
logger.info(`Unable to sync metadata from ${organization.orgUid}`);
logger.error(`ORGANIZATION DATA SYNC ERROR: ${message}`);
await Organization.update(
{ orgHash: '0' },
{ where: { orgUid: organization.orgUid } },
);
}),
);
};

const onResult = async (updateHash, data) => {
try {
const updateData = processData(
data,
(key) => !key.includes('meta_'),
);
const metadata = processData(data, (key) => key.includes('meta_'));

await Organization.update(
{
..._.omit(updateData, ['registryId']),
prefix: updateData.prefix || '0',
metadata: JSON.stringify(metadata),
},
{ where: { orgUid: organization.orgUid } },
);

logger.debug(
`Updating orgUid ${organization.orgUid} with hash ${updateHash}`,
);
await Organization.update(
{ orgHash: updateHash },
{ where: { orgUid: organization.orgUid } },
);
} catch (error) {
logger.info(error.message);
onFail(error.message);
}
};

await datalayer.getStoreIfUpdated(
organization.orgUid,
organization.orgHash,
onResult,
onFail,
);
}
} catch (error) {
logger.info(error.message);
logger.error(error.message);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/tasks/sync-default-organizations.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const task = new Task('sync-default-organizations', async () => {
await assertDataLayerAvailable();
await assertWalletIsSynced();
if (!CONFIG.USE_SIMULATOR) {
Organization.subscribeToDefaultOrganizations();
await Organization.subscribeToDefaultOrganizations();
}
} catch (error) {
logger.error(
Expand Down
2 changes: 1 addition & 1 deletion src/tasks/sync-organization-meta.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ const task = new Task('sync-organization-meta', async () => {
await assertDataLayerAvailable();
await assertWalletIsSynced();
if (!CONFIG.USE_SIMULATOR) {
Organization.syncOrganizationMeta();
await Organization.syncOrganizationMeta();
}
} catch (error) {
logger.error(
Expand Down
2 changes: 1 addition & 1 deletion src/tasks/sync-registries.js
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ const processJob = async () => {
});

// verify that the latest organization root hash is up to date with the audit records. attempt correction.
if (mostRecentOrgAuditRecord.rootHash !== organization.registryHash) {
if (mostRecentOrgAuditRecord?.rootHash !== organization?.registryHash) {
logger.warn(
`latest root hash in org table for organization ${organization.name} (orgUid ${organization.orgUid}) does not match the audit records. attempting to correct`,
);
Expand Down

0 comments on commit b735808

Please sign in to comment.