Skip to content

Commit

Permalink
Merge pull request #531 from nasa/harmony-1622
Browse files Browse the repository at this point in the history
Harmony 1622 - Better job progress calculation
  • Loading branch information
indiejames authored Feb 7, 2024
2 parents da3128b + 025968f commit efcc1b8
Show file tree
Hide file tree
Showing 16 changed files with 425 additions and 124 deletions.
2 changes: 2 additions & 0 deletions db/db.sql
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ CREATE TABLE `workflow_steps` (
`maxBatchInputs` integer,
`maxBatchSizeInBytes` integer,
`operation` text not null,
`completed_work_item_count` integer not null default 0,
`progress_weight` float not null default 1.0,
`createdAt` datetime not null,
`updatedAt` datetime not null,
FOREIGN KEY(jobID) REFERENCES jobs(jobID),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
exports.up = function up(knex) {
const result = knex.schema
.alterTable('workflow_steps', (t) => {
t.integer('completed_work_item_count').defaultTo(0).notNullable();
t.float('progress_weight').defaultTo(1).notNullable();
})
.then(() => {
return knex.schema.raw(`
UPDATE "workflow_steps" SET completed_work_item_count="workItemCount" WHERE is_complete = true
`
)});

return result;
};

exports.down = function down(knex) {
return knex.schema
.alterTable('workflow_steps', (t) => {
t.dropColumn('progress_weight');
t.dropColumn('completed_work_item_count');
});
};
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import env from '../../util/env';
import { logAsyncExecutionTime } from '../../util/log-execution';
import { v4 as uuid } from 'uuid';
import WorkItemUpdate from '../../models/work-item-update';
import WorkflowStep, { decrementFutureWorkItemCount, getWorkflowStepByJobIdStepIndex, getWorkflowStepsByJobId, updateIsComplete } from '../../models/workflow-steps';
import WorkflowStep, { getWorkflowStepByJobIdStepIndex, updateIsComplete } from '../../models/workflow-steps';
import { Logger } from 'winston';
import _, { ceil, range, sum } from 'lodash';
import { JobStatus, Job } from '../../models/job';
Expand Down Expand Up @@ -83,7 +83,7 @@ async function addJobLinksForFinishedWorkItem(
* @returns the final job status for the request
*/
async function getFinalStatusAndMessageForJob(tx: Transaction, job: Job):
Promise<{ finalStatus: JobStatus, finalMessage: string }> {
Promise<{ finalStatus: JobStatus, finalMessage: string; }> {
let finalStatus = JobStatus.SUCCESSFUL;
const errorCount = await getErrorCountForJob(tx, job.jobID);
const dataLinkCount = await getJobDataLinkCount(tx, job.jobID);
Expand All @@ -96,7 +96,7 @@ Promise<{ finalStatus: JobStatus, finalMessage: string }> {
}
let finalMessage = '';
if ((errorCount > 1) && (finalStatus == JobStatus.FAILED)) {
finalMessage = `The job failed with ${errorCount} errors. See the errors field for more details`;
finalMessage = `The job failed with ${errorCount} errors. See the errors field for more details`;
} else if ((errorCount == 1) && (finalStatus == JobStatus.FAILED)) {
const jobError = (await getErrorsForJob(tx, job.jobID, 1))[0];
finalMessage = jobError.message;
Expand Down Expand Up @@ -168,7 +168,7 @@ async function handleFailedWorkItems(
logger: Logger, errorMessage: string,
): Promise<boolean> {
let continueProcessing = true;
// If the response is an error then set the job status to 'failed'
// If the response is an error then maybe set the job status to 'failed'
if (status === WorkItemStatus.FAILED) {
continueProcessing = job.ignoreErrors;
if (!job.hasTerminalStatus()) {
Expand Down Expand Up @@ -204,8 +204,6 @@ async function handleFailedWorkItems(
if (!continueProcessing) {
await completeJob(tx, job, JobStatus.FAILED, logger, jobMessage);
} else {
// Need to make sure we expect one fewer granule to complete
await decrementFutureWorkItemCount(tx, job.jobID, workflowStep.stepIndex);
if (job.status == JobStatus.RUNNING) {
job.status = JobStatus.RUNNING_WITH_ERRORS;
await job.save(tx);
Expand All @@ -217,26 +215,20 @@ async function handleFailedWorkItems(
}

/**
* Updated the workflow steps `workItemCount` field for the given job to match the new
* Updated the query-cmr workflow step's `workItemCount` field for the given job to match the new
* granule count
*
* @param transaction - the transaction to use for the update
* @param job - A Job that has a new input granule count
*/
async function updateWorkItemCounts(
async function updateCmrWorkItemCount(
transaction: Transaction,
job: Job):
Promise<void> {
const workflowSteps = await getWorkflowStepsByJobId(transaction, job.jobID);
for (const step of workflowSteps) {
if (QUERY_CMR_SERVICE_REGEX.test(step.serviceID)) {
step.workItemCount = Math.ceil(job.numInputGranules / env.cmrMaxPageSize);
} else if (!step.hasAggregatedOutput) {
step.workItemCount = job.numInputGranules;
} else {
step.workItemCount = 1;
}
await step.save(transaction);
}
// NOTE We assume here that any chain using query-cmr will have it as the first step
const step = await getWorkflowStepByJobIdStepIndex(transaction, job.jobID, 1);
step.workItemCount = Math.ceil(job.numInputGranules / env.cmrMaxPageSize);
await step.save(transaction);
}

/**
Expand Down Expand Up @@ -447,7 +439,7 @@ async function createNextWorkItems(
// When a work-item update comes in we have three possible cases to handle:
// 1. next step aggregates AND uses batching - in this case we process each result and put it
// into a batch. As batches fill up we generate a new work-item for the next step and create
// a new batch if neeeded. We have a check to see if we have completed all the work-items for
// a new batch if needed. We have a check to see if we have completed all the work-items for
// the current step, in which case we close the last batch even if it is not full and
// generate a final work-item for the next step
// 2. next step aggregates, but does not use batching, so we only create a new work-item
Expand Down Expand Up @@ -477,7 +469,9 @@ async function createNextWorkItems(
} else if (allWorkItemsForStepComplete) {
await createAggregatingWorkItem(tx, workItem, nextWorkflowStep, logger);
didCreateWorkItem = true;
nextWorkflowStep.workItemCount += 1;
}

} else {
// Create a new work item for each result using the next step
didCreateWorkItem = true;
Expand Down Expand Up @@ -513,12 +507,16 @@ async function createNextWorkItems(
return newItem;
});

nextWorkflowStep.workItemCount += newItems.length;
await incrementReadyCount(tx, workItem.jobID, nextWorkflowStep.serviceID, newItems.length);
for (const batch of _.chunk(newItems, batchSize)) {
await WorkItem.insertBatch(tx, batch);
logger.info('Queued new batch of work items.');
logger.info(`Created new set of ${newItems.length} work items.`);
}
}
if (didCreateWorkItem) {
await nextWorkflowStep.save(tx);
}
return didCreateWorkItem;
}

Expand Down Expand Up @@ -550,7 +548,7 @@ export async function preprocessWorkItem(
let catalogItems;
try {
if (status === WorkItemStatus.SUCCESSFUL && !nextWorkflowStep) {
// if we are the last step in the chain we should read the catalog items since they are
// if we are CREATING STAC CATALOGSth;e last step in the chain we should read the catalog items since they are
// needed for generating the output links we will save
catalogItems = await readCatalogsItems(results);
durationMs = new Date().getTime() - startTime;
Expand Down Expand Up @@ -680,6 +678,11 @@ export async function processWorkItem(
totalItemsSize = sum(outputItemSizes) / 1024 / 1024;
}

if (COMPLETED_WORK_ITEM_STATUSES.includes(status)) {
thisStep.completed_work_item_count += 1;
await thisStep.save(tx);
}

await (await logAsyncExecutionTime(
updateWorkItemStatus,
'HWIUWJI.updateWorkItemStatus',
Expand Down Expand Up @@ -713,11 +716,14 @@ export async function processWorkItem(
logger.debug('timing.HWIUWJI.job.save.end', { durationMs });

await (await logAsyncExecutionTime(
updateWorkItemCounts,
'HWIUWJI.updateWorkItemCounts',
updateCmrWorkItemCount,
'HWIUWJI.updateCmrWorkItemCount',
logger))(tx, job);
}

await job.updateProgress(tx);
await job.save(tx);

if (checkCompletion) {
allWorkItemsForStepComplete = await updateIsComplete(tx, jobID, job.numInputGranules, thisStep);
}
Expand Down Expand Up @@ -773,43 +779,48 @@ export async function processWorkItem(
logger))(tx, job, JobStatus.FAILED, logger, message);
}
}
if (!nextWorkflowStep || allWorkItemsForStepComplete) {

if (!nextWorkflowStep) {
// Finished with the chain for this granule
if (status != WorkItemStatus.FAILED) {
await (await logAsyncExecutionTime(
addJobLinksForFinishedWorkItem,
'HWIUWJI.addJobLinksForFinishedWorkItem',
logger))(tx, job.jobID, catalogItems);
}
job.completeBatch(thisStep.workItemCount);
if (allWorkItemsForStepComplete && !didCreateWorkItem && (!nextWorkflowStep || nextWorkflowStep.workItemCount === 0)) {
}

if (allWorkItemsForStepComplete) {
if (!didCreateWorkItem && (!nextWorkflowStep || nextWorkflowStep.workItemCount < 1)) {
// If all granules are finished mark the job as finished
const { finalStatus, finalMessage } = await getFinalStatusAndMessageForJob(tx, job);
await (await logAsyncExecutionTime(
completeJob,
'HWIUWJI.completeJob',
logger))(tx, job, finalStatus, logger, finalMessage);
} else {
// Either previewing or next step is a batched step and this item failed
if (job.status === JobStatus.PREVIEWING) {
// Special case to pause the job as soon as any single granule completes when in the previewing state
jobSaveStartTime = new Date().getTime();
await job.pauseAndSave(tx);
durationMs = new Date().getTime() - jobSaveStartTime;
logger.debug('timing.HWIUWJI.job.pauseAndSave.end', { durationMs });
} else {
jobSaveStartTime = new Date().getTime();
await job.save(tx);
durationMs = new Date().getTime() - jobSaveStartTime;
logger.debug('timing.HWIUWJI.job.save.end', { durationMs });
}
}
} else { // Currently only reach this condition for batched aggregation requests
jobSaveStartTime = new Date().getTime();
await job.save(tx);
durationMs = new Date().getTime() - jobSaveStartTime;
logger.debug('timing.HWIUWJI.job.save.end', { durationMs });

if (!nextWorkflowStep && job.status === JobStatus.PREVIEWING) {
// Special case to pause the job as soon as any single granule completes when in the previewing state
jobSaveStartTime = new Date().getTime();
await job.pauseAndSave(tx);
durationMs = new Date().getTime() - jobSaveStartTime;
logger.debug('timing.HWIUWJI.job.pauseAndSave.end', { durationMs });
} else {
jobSaveStartTime = new Date().getTime();
await job.save(tx);
durationMs = new Date().getTime() - jobSaveStartTime;
logger.debug('timing.HWIUWJI.job.save.end', { durationMs });
}

}

jobSaveStartTime = new Date().getTime();
await job.save(tx);
durationMs = new Date().getTime() - jobSaveStartTime;
logger.debug('timing.HWIUWJI.job.save.end', { durationMs });

}
} catch (e) {
logger.error(`Work item update failed for work item ${workItemID} and status ${status}`);
Expand Down Expand Up @@ -851,7 +862,7 @@ export async function processWorkItems(

const lastIndex = items.length - 1;
for (let index = 0; index < items.length; index++) {
const { preprocessResult, update } = items[index];
const { preprocessResult, update } = items[index];
if (index < lastIndex) {
await processWorkItem(tx, preprocessResult, job, update, logger, false, thisStep);
} else {
Expand Down Expand Up @@ -898,7 +909,7 @@ export async function handleWorkItemUpdateWithJobId(
logger))(tx, jobID, false, true);

await processWorkItem(tx, preprocessResult, job, update, logger, true, undefined);

await job.save(tx);
});
const durationMs = new Date().getTime() - transactionStart;
logger.debug('timing.HWIUWJI.transaction.end', { durationMs });
Expand Down
42 changes: 30 additions & 12 deletions services/harmony/app/models/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { truncateString } from '@harmony/util/string';
import DBRecord from './record';
import { Transaction } from '../util/db';
import JobLink, { getLinksForJob, JobLinkOrRecord } from './job-link';
import WorkflowStep, { getWorkflowStepsByJobId } from './workflow-steps';

// how long data generated by this job will be available
export const EXPIRATION_DAYS = 30;
Expand Down Expand Up @@ -801,21 +802,38 @@ export class Job extends DBRecord implements JobRecord {
}

/**
* Updates the job progress based on a single batch completing
* You must call `#save` to persist the change
* Update the progress of a job using the progress of the WorkflowSteps for the job.
* You must call `#save` to persist the change.
*
* @param totalItemCount - the number of items in total that need to be processed for the job
* to complete.
* @param tx - a transaction to use when querying the database
* @returns An empty Promise
*/
async updateProgress(tx: Transaction): Promise<void> {
const steps = await getWorkflowStepsByJobId(tx, this.jobID, ['workItemCount', 'completed_work_item_count', 'progress_weight']);
let prevStep: WorkflowStep = null;
for (const step of steps) {
step.updateProgress(prevStep);
prevStep = step;
}
let sumOfWeights = steps.reduce((sum: number, step: WorkflowStep) => sum + step.progress_weight, 0);
sumOfWeights = sumOfWeights > 0 ? sumOfWeights : 1;
let progSum = steps.reduce((sum: number, step: WorkflowStep) => sum + step.progress_weight * step.progress, 0);
progSum = Math.max(0, progSum);
// Only allow progress to be set to 100 when the job completes and don't let progress go
// backwards
const progress = Math.min(Math.floor(progSum / sumOfWeights), 99);
if (this.progress < progress) {
this.progress = progress;
}
}

/**
* Updates the number of completed batches. This is no longer used to compute job progress,
* but it is left in place in the event we want to track batches later.
* You must call `#save` to persist the change.
*/
completeBatch(totalItemCount: number = this.numInputGranules): void {
completeBatch(): void {
this.batchesCompleted += 1;
// Only allow progress to be set to 100 when the job completes
let progress = Math.min(100 * (this.batchesCompleted / totalItemCount), 99);
// don't allow negative progress
progress = Math.max(0, progress);
// progress must be an integer
progress = Math.floor(progress);
this.progress = progress;
}

/**
Expand Down
Loading

0 comments on commit efcc1b8

Please sign in to comment.