Skip to content

Commit

Permalink
Merge pull request #524 from nasa/harmony-1652-2
Browse files Browse the repository at this point in the history
Fixes for Batchee and Stitchee
  • Loading branch information
indiejames authored Jan 19, 2024
2 parents b51aab3 + 4784422 commit 8bfe738
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,7 @@ async function maybeQueueQueryCmrWorkItem(
*/
async function createNextWorkItems(
tx: Transaction,
currentWorkflowStep: WorkflowStep,
nextWorkflowStep: WorkflowStep,
logger: Logger,
workItem: WorkItem,
Expand All @@ -445,68 +446,79 @@ async function createNextWorkItems(
outputItemSizes: number[],
): Promise<boolean> {
let didCreateWorkItem = false;
if (results && results.length > 0 || nextWorkflowStep.isBatched) {
didCreateWorkItem = true;
// if we have completed all the work items for this step or if the next step does not
// aggregate then create a work item for the next step
if (nextWorkflowStep.hasAggregatedOutput) {
if (nextWorkflowStep.isBatched) {
let sortIndex;
if (!QUERY_CMR_SERVICE_REGEX.test(workItem.serviceID)) {
// eslint-disable-next-line prefer-destructuring
sortIndex = workItem.sortIndex;
}
let outputItemUrls = [];
if (workItem.status !== WorkItemStatus.FAILED) {
outputItemUrls = await outputStacItemUrls(results);
}
// TODO add other services that can produce more than one output and so should have their
// batching sortIndex propagated to child work items to provide consistent batching
didCreateWorkItem = await handleBatching(
tx,
logger,
nextWorkflowStep,
outputItemUrls,
outputItemSizes,
sortIndex,
workItem.status,
allWorkItemsForStepComplete);
} else if (allWorkItemsForStepComplete) {
await createAggregatingWorkItem(tx, workItem, nextWorkflowStep, logger);
// When a work-item update comes in we have three possible cases to handle:
// 1. next step aggregates AND uses batching - in this case we process each result and put it
// into a batch. As batches fill up we generate a new work-item for the next step and create
// a new batch if neeeded. We have a check to see if we have completed all the work-items for
// the current step, in which case we close the last batch even if it is not full and
// generate a final work-item for the next step
// 2. next step aggregates, but does not use batching, so we only create a new work-item
// for the next step if we have completed all the work-items for the current step
// 3. next step does not aggregate, so we create a new work-item for the next step for each
// result from this work-item update
if (nextWorkflowStep.hasAggregatedOutput) {
if (nextWorkflowStep.isBatched) {
let sortIndex;
if (!QUERY_CMR_SERVICE_REGEX.test(workItem.serviceID)) {
// eslint-disable-next-line prefer-destructuring
sortIndex = workItem.sortIndex;
}
} else {
// Create a new work item for each result using the next step

// use the sort index from the previous step's work item unless the service was
// query-cmr, in which case we start from the previous highest sort index for this step
// NOTE: This is only valid if the work-items for this multi-output step are worked
// sequentially and have consistently ordered outputs, as with query-cmr.
// If they are worked in parallel then we need a different approach.
let { sortIndex } = workItem;
let shouldIncrementSortIndex = false;
if (QUERY_CMR_SERVICE_REGEX.test(workItem.serviceID)) {
shouldIncrementSortIndex = true;
sortIndex = await maxSortIndexForJobService(tx, nextWorkflowStep.jobID, nextWorkflowStep.serviceID);
let outputItemUrls = [];
if (workItem.status !== WorkItemStatus.FAILED) {
outputItemUrls = await outputStacItemUrls(results);
}
const newItems = results.map(result => {
if (shouldIncrementSortIndex) {
sortIndex += 1;
}
return new WorkItem({
jobID: workItem.jobID,
serviceID: nextWorkflowStep.serviceID,
status: WorkItemStatus.READY,
stacCatalogLocation: result,
workflowStepIndex: nextWorkflowStep.stepIndex,
sortIndex,
});
didCreateWorkItem = await handleBatching(
tx,
logger,
nextWorkflowStep,
outputItemUrls,
outputItemSizes,
sortIndex,
workItem.status,
allWorkItemsForStepComplete);
} else if (allWorkItemsForStepComplete) {
await createAggregatingWorkItem(tx, workItem, nextWorkflowStep, logger);
didCreateWorkItem = true;
}
} else {
// Create a new work item for each result using the next step
didCreateWorkItem = true;

// use the sort index from the previous step's work item unless the service was
// query-cmr, in which case we start from the previous highest sort index for this step
// NOTE: This is only valid if the work-items for this multi-output step are worked
// sequentially and have consistently ordered outputs, as with query-cmr.
// If they are worked in parallel then we need a different approach.
let { sortIndex } = workItem;
let shouldIncrementSortIndex = false;
if (currentWorkflowStep.hasAggregatedOutput || currentWorkflowStep.is_sequential) {
shouldIncrementSortIndex = true;

if (currentWorkflowStep.is_sequential) {
sortIndex = await maxSortIndexForJobService(tx, nextWorkflowStep.jobID, nextWorkflowStep.serviceID) + 1;
}
}
const newItems = results.map(result => {
const newItem = new WorkItem({
jobID: workItem.jobID,
serviceID: nextWorkflowStep.serviceID,
status: WorkItemStatus.READY,
stacCatalogLocation: result,
workflowStepIndex: nextWorkflowStep.stepIndex,
sortIndex,
});

await incrementReadyCount(tx, workItem.jobID, nextWorkflowStep.serviceID, newItems.length);
for (const batch of _.chunk(newItems, batchSize)) {
await WorkItem.insertBatch(tx, batch);
logger.info('Queued new batch of work items.');
if (shouldIncrementSortIndex) {
sortIndex += 1;
}

return newItem;
});

await incrementReadyCount(tx, workItem.jobID, nextWorkflowStep.serviceID, newItems.length);
for (const batch of _.chunk(newItems, batchSize)) {
await WorkItem.insertBatch(tx, batch);
logger.info('Queued new batch of work items.');
}
}
return didCreateWorkItem;
Expand Down Expand Up @@ -690,7 +702,7 @@ export async function processWorkItem(

let allWorkItemsForStepComplete = false;


// The number of 'hits' returned by a query-cmr could be less than when CMR was first
// queried by harmony due to metadata deletions from CMR, so we update the job to reflect
// that there are fewer items and to know when no more query-cmr jobs should be created.
Expand Down Expand Up @@ -735,6 +747,7 @@ export async function processWorkItem(
'HWIUWJI.createNextWorkItems',
logger))(
tx,
thisStep,
nextWorkflowStep,
logger,
workItem,
Expand Down
8 changes: 6 additions & 2 deletions services/harmony/app/util/aggregation-batch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -297,15 +297,19 @@ export async function handleBatching(
const batchItems = await getByJobServiceBatch(tx, jobID, serviceID, null, true);
let index = 0;
let nextSortIndex: number;
// get the most recent batch so we can try to assign new items to it
let currentBatch = await withHighestBatchIDForJobService(tx, jobID, serviceID);
// keep track of how big the batch is in terms of number of items and total size of the
// items in bytes
let currentBatchSize = 0;
let currentBatchCount = 0;
if (currentBatch) {
const { sum, count } = await getCurrentBatchSizeAndCount(tx, jobID, serviceID, currentBatch.batchID);
currentBatchSize = sum;
currentBatchCount = count;
}

// assign the new batch items to the most recent batch until it gets full. create a new
// batch if necessary - this becomes the most recent batch
while (index < batchItems.length) {
if (currentBatch) {
// figure out what the next sort index in the batch should be
Expand Down Expand Up @@ -425,4 +429,4 @@ export async function handleBatching(
didCreateWorkItem = await createCatalogAndWorkItemForBatch(tx, workflowStep, currentBatch, logger) || didCreateWorkItem;
}
return didCreateWorkItem;
}
}

0 comments on commit 8bfe738

Please sign in to comment.