Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev #43

Merged
merged 5 commits into from
Sep 20, 2024
Merged

Dev #43

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
{
"name": "@oada/jobs",
"version": "4.5.2",
"version": "4.6.0",
"description": "A library for oada job based microservices",
"source": "src/index.ts",
"main": "dist/index.js",
"type": "module",
"publishConfig": {
"access": "public"
},
"bin": {
"oada-jobs": "./dist/cli.mjs"
},
Expand Down Expand Up @@ -56,6 +59,7 @@
"packageManager": "yarn@4.1.0",
"dependencies": {
"@ava/typescript": "^4.1.0",
"@oada/lib-prom": "^3.8.0",
"@oada/list-lib": "^4.3.0",
"@oada/oadaify": "^2.1.0",
"@oada/types": "^3.5.3",
Expand Down
10 changes: 7 additions & 3 deletions src/Queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,13 @@ export class Queue {

info(`[QueueId ${this._id}] Started WATCH.`);

// Clean up the resource and grab all existing jobs to run them before starting watch
trace(`[QueueId ${this._id}] Adding existing jobs`);
const jobs = stripResource(r.data as Record<string, unknown>);

if (skipQueue) {
info('Skipping existing jobs in the queue prior to startup.');
} else {
// Clean up the resource and grab all existing jobs to run them before starting watch
trace(`[QueueId ${this._id}] Adding existing jobs`);
const jobs = stripResource(r.data as Record<string, unknown>);
// AssertJobs(jobs);
await this.#doJobs(jobs as OADAJobs);
trace(
Expand Down Expand Up @@ -198,11 +199,14 @@ export class Queue {
async #doJobs(jobs: OADAJobs | OADAJobsChange): Promise<void> {
// Queue up the Runners in parallel
for (const [jobKey, value] of Object.entries(jobs)) {
this._service.metrics[`${this._service.name}_total_queued`].inc();
void this.#queue.add(async () => {
const { _id } = value as Link;
if (!_id) return;
// Fetch the job
const { job, isJob } = await Job.fromOada(this.#oada, _id);
const mtype = job.type.replaceAll('-', '_').replaceAll(' ', '_');
this._service.metrics[`${this._service.name}_queued_${mtype}`].inc();

// Instantiate a runner to manage the job
const runner = new Runner(this._service, jobKey, job, this.#oada);
Expand Down
49 changes: 38 additions & 11 deletions src/Runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ export class Runner {
* appropriate.
*/
public async run(): Promise<void> {
const mtype = this.#job.type.replaceAll('-', '_').replaceAll(' ', '_');
this.#service.metrics[`${this.#service.name}_running_${mtype}`].inc();
this.#service.metrics[`${this.#service.name}_total_running`].inc();

// A quick check to ensure job isn't already completed
if (this.#job.status === 'success' || this.#job.status === 'failure') {
debug(`[Runner ${this.#job.oadaId}] Job already complete.`);
Expand Down Expand Up @@ -139,7 +143,7 @@ export class Runner {
contentType: tree.bookmarks.services['*'].jobs.pending['*']._type,
data: {
status,
time: moment().toISOString(),
time: new Date().toISOString(),
meta,
},
});
Expand Down Expand Up @@ -191,16 +195,7 @@ export class Runner {

// Link into success/failure event log
const date = moment(time).format('YYYY-MM-DD');
// Const finalpath = `/bookmarks/services/${this.service.name}/jobs/${status}/day-index/${date}`;
let finalpath: string | undefined;
if (status === 'failure') {
finalpath = failType
? `/bookmarks/services/${this.#service.name}/jobs/${status}/${failType}/day-index/${date}`
: `/bookmarks/services/${this.#service.name}/jobs/${status}/unknown/day-index/${date}`;
} else if (status === 'success') {
finalpath = `/bookmarks/services/${this.#service.name}/jobs/${status}/day-index/${date}`;
}

let finalpath = `/bookmarks/services/${this.#service.name}/jobs/${status}/day-index/${date}`;
info(
`[job ${this.#job.oadaId} ]: linking job to final resting place at ${finalpath}`,
);
Expand All @@ -215,12 +210,44 @@ export class Runner {
tree,
});

// If there is a failType, also link to the typed failure log
if (status === 'failure' && failType) {
let typedFailPath = `/bookmarks/services/${this.#service.name}/jobs/typed-${status}/${failType}/day-index/${date}`;
info(
`[job ${this.#job.oadaId} ]: linking job to final resting place at ${typedFailPath}`,
);
await this.#oada.put({
path: typedFailPath!,
data: {
[this.#jobKey]: {
_id: this.#job.oadaId,
_rev: 0,
},
},
tree,
});
}

// Remove from job queue
trace(`[job ${this.#job.oadaId} ]: removing from jobs queue`);
await this.#oada.delete({
path: `/bookmarks/services/${this.#service.name}/jobs/pending/${this.#jobKey}`,
});

trace(
`[job ${this.#job.oadaId} ]: marking job as ${status}`,
failType ?? 'unknown',
);
const mtype = this.#job.type.replaceAll('-', '_').replaceAll(' ', '_');
this.#service.metrics[`${this.#service.name}_total_queued`].dec();
this.#service.metrics[`${this.#service.name}_queued_${mtype}`].dec();

this.#service.metrics[`${this.#service.name}_total_running`].dec();
this.#service.metrics[`${this.#service.name}_running_${mtype}`].dec();

this.#service.metrics[`${this.#service.name}_total_${status}`].inc();
this.#service.metrics[`${this.#service.name}_${status}_${mtype}`].inc();

// Notify the status reporter if there is one
try {
const frs = this.#service.opts?.finishReporters;
Expand Down
92 changes: 92 additions & 0 deletions src/Service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import type { Config } from '@oada/client';
import { OADAClient } from '@oada/client';
import { assert as assertQueue } from '@oada/types/oada/service/queue.js';
import { Gauge } from '@oada/lib-prom';

import { Report, type ReportConstructor } from './Report.js';
import { debug, error, warn } from './utils.js';
Expand Down Expand Up @@ -63,6 +64,10 @@ export interface ConstructorArguments {
opts?: ServiceOptions;
}

export interface ServiceMetrics {
[key: string]: any;
}

export const defaultServiceQueueName = 'default-service-queue';

/**
Expand All @@ -81,6 +86,7 @@ export class Service {
public domain: string;
public token: string;
public opts: ServiceOptions | undefined;
public metrics: ServiceMetrics;

readonly #oada: OADAClient;
// Readonly #clients = new Map<Domain, OADAClient>();
Expand Down Expand Up @@ -122,6 +128,9 @@ export class Service {
this.domain = this.#oada.getDomain();
this.token = this.#oada.getToken()[0]!;
this.concurrency = object.concurrency ?? this.#oada.getConcurrency();
this.metrics = {};
this.#ensureMetrics();

if (object.opts) {
this.opts = object.opts;
}
Expand Down Expand Up @@ -184,6 +193,7 @@ export class Service {
* @param worker Worker function
*/
public on(type: string, timeout: number, work: WorkerFunction): void {
this.#ensureTypedMetrics(type);
this.#workers.set(type, { work, timeout });
}

Expand Down Expand Up @@ -248,6 +258,7 @@ export class Service {
} catch (error_) {
warn('Invalid queue');
debug('Invalid queue: %O', error_);
error(error_);
}
}

Expand All @@ -259,4 +270,85 @@ export class Service {
debug('Unable to stop queue %0', error_);
}
}

/**
* Create the metrics
*/
async #ensureTypedMetrics(type: string): Promise<void> {
const statuses = ['queued', 'running', 'success', 'failure'];
for (const status of statuses) {
let mtype = type.replaceAll('-', '_').replaceAll(' ', '_');
const name = `${this.name}_${status}_${mtype}`;
if (!this.metrics[name]) {
this.metrics[name] = new Gauge({
name: name,
help: `Number of ${this.name} jobs of type "${type}" that are of status "${status}"`,
labelNames: ['service', mtype, status],
});
this.metrics[name].set(0);
}
}
}

/*
async #initTotalMetrics(): Promise<void> {
const date = new Date().toISOString().split('T')[0];
for await (const status of ['success', 'failure']) {
try {
let { data } = await this.#oada.get({
path: `/bookmarks/services/${this.name}/jobs/${status}/day-index/${date}`
})
let keys = Object.keys(data as Record<string, any>).filter(key => !key.startsWith('_'));
this.metrics[`${this.name}_total_${status}`].set(keys.length);
} catch(err) {
this.metrics[`${this.name}_total_${status}`].set(0);
}
}
}

async #initTypedMetrics(type: string): Promise<void> {
let mtype = type.replaceAll('-', '_').replaceAll(' ', '_');
const date = new Date().toISOString().split('T')[0];
for await (const status of ['success', 'failure']) {
try {
this.metrics[`${this.name}_${status}_${mtype}`].set(0);
let { data } = await this.#oada.get({
path: `/bookmarks/services/${this.name}/jobs/${status}/day-index/${date}`
})
for await (const job of Object.keys(data as Record<string, any>)) {
let { data: j } = await this.#oada.get({
path: `/bookmarks/services/${this.name}/jobs/${status}/day-index/${date}/${job}`
}) as unknown as { data: { j: string, [k: string]: any } };
if (j.type === type) this.metrics[`${this.name}_${status}_${mtype}`].inc();
}
} catch(err) {
}
}
}
*/

async #ensureMetrics(): Promise<void> {
/*
this.#oada.ensure({
path: `/bookmarks/services/${this.name}/metrics`,
data: {},
});
const { data: val } = this.#oada.get({
path: `/bookmarks/services/${this.name}/metrics`,
})
*/

const statuses = ['queued', 'running', 'success', 'failure'];
for (const status of statuses) {
const name = `${this.name}_total_${status}`;
if (!this.metrics[name]) {
this.metrics[name] = new Gauge({
name: name,
help: `Total number of ${this.name} jobs that are of status "${status}"`,
labelNames: ['service', 'total', status],
});
this.metrics[name].set(0);
}
}
}
}
3 changes: 1 addition & 2 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/

import _debug from 'debug';
import moment from 'moment';

import type { ConnectionResponse, Json, OADAClient } from '@oada/client';

Expand Down Expand Up @@ -101,7 +100,7 @@ export async function postUpdate(
contentType: tree.bookmarks.services['*'].jobs.pending['*']._type,
data: {
status,
time: moment().toISOString(),
time: new Date().toISOString(),
meta,
},
});
Expand Down
2 changes: 1 addition & 1 deletion tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
"forceConsistentCasingInFileNames": true,
"rootDir": "src",
"outDir": "dist",
"lib": ["ES2015"]
"lib": ["ES2021"]
},
"include": ["src"]
}
Loading