Skip to content

Commit

Permalink
Enable grouping by workflow step
Browse files Browse the repository at this point in the history
  • Loading branch information
mvdbeek committed Nov 4, 2024
1 parent bcf212d commit 6b94267
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 16 deletions.
4 changes: 4 additions & 0 deletions client/src/api/schema/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18360,6 +18360,10 @@ export interface components {
* @description The raw value of the metric as a string.
*/
raw_value: string;
/** Step Index */
step_index: number;
/** Step Label */
step_label: string | null;
/**
* Title
* @description A descriptive title for this metric.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
<script setup lang="ts">
import { type VisualizationSpec } from "vega-embed";
import { computed, ref, watch } from "vue";
import { type ComputedRef } from "vue";
import { type components, GalaxyApi } from "@/api";
import { errorMessageAsString } from "@/utils/simple-error";
import { type ComputedRef } from "vue";
import { type VisualizationSpec } from "vega-embed";
import VegaWrapper from "./VegaWrapper.vue";
Expand All @@ -15,9 +15,15 @@ const props = defineProps({
},
});
const groupBy = ref<"tool_id" | "step_id">("tool_id");
const jobMetrics = ref<components["schemas"]["WorkflowJobMetric"][]>();
const fetchError = ref<string>();
const attributeToLabel = {
tool_id: "Tool ID",
step_id: "Step",
};
async function fetchMetrics() {
const { data, error } = await GalaxyApi().GET("/api/invocations/{invocation_id}/metrics", {
params: {
Expand All @@ -35,6 +41,16 @@ async function fetchMetrics() {
watch(props, () => fetchMetrics(), { immediate: true });
function itemToX(item: components["schemas"]["WorkflowJobMetric"]) {
if (groupBy.value === "tool_id") {
return item.tool_id;
} else if (groupBy.value === "step_id") {
return `${item.step_index + 1}: ${item.step_label || item.tool_id}`;
} else {
throw Error("Cannot happen");
}
}
interface boxplotData {
x_title: string;
y_title: string;
Expand All @@ -46,11 +62,11 @@ const wallclock: ComputedRef<boxplotData> = computed(() => {
const values = wallclock?.map((item) => {
return {
y: parseFloat(item.raw_value),
x: item.tool_id,
x: itemToX(item),
};
});
return {
x_title: "Tool ID",
x_title: attributeToLabel[groupBy.value],
y_title: "Runtime (in Seconds)",
values,
};
Expand All @@ -61,11 +77,11 @@ const coresAllocated: ComputedRef<boxplotData> = computed(() => {
const values = coresAllocated?.map((item) => {
return {
y: parseFloat(item.raw_value),
x: item.tool_id,
x: itemToX(item),
};
});
return {
x_title: "Tool ID",
x_title: attributeToLabel[groupBy.value],
y_title: "Cores Allocated",
values,
};
Expand All @@ -76,7 +92,7 @@ const memoryAllocated: ComputedRef<boxplotData> = computed(() => {
const values = memoryAllocated?.map((item) => {
return {
y: parseFloat(item.raw_value),
x: item.tool_id,
x: itemToX(item),
};
});
return {
Expand Down Expand Up @@ -147,9 +163,19 @@ const specs = computed(() => {

<template>
<div>
<div v-for="(spec, key) in specs" :key="key">
<h2 class="h-l truncate text-center">{{ key }}</h2>
<VegaWrapper :spec="spec" />
</div>
<b-tabs lazy>
<b-tab title="Summary by Tool" @click="groupBy = 'tool_id'">
<div v-for="(spec, key) in specs" :key="key">
<h2 class="h-l truncate text-center">{{ key }}</h2>
<VegaWrapper :spec="spec" />
</div>
</b-tab>
<b-tab title="Summary by Workflow Step" @click="groupBy = 'step_id'">
<div v-for="(spec, key) in specs" :key="key">
<h2 class="h-l truncate text-center">{{ key }}</h2>
<VegaWrapper :spec="spec" />
</div>
</b-tab>
</b-tabs>
</div>
</template>
12 changes: 9 additions & 3 deletions lib/galaxy/managers/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
Workflow,
WorkflowInvocation,
WorkflowInvocationStep,
WorkflowStep,
YIELD_PER_ROWS,
)
from galaxy.model.base import transaction
Expand Down Expand Up @@ -732,17 +733,18 @@ def invocation_job_source_iter(sa_session, invocation_id):

def get_job_metrics_for_invocation(sa_session: galaxy_scoped_session, invocation_id: int):
single_job_stmnt = (
select(JobMetricNumeric, Job.tool_id)
select(WorkflowStep.order_index, Job.tool_id, WorkflowStep.label, JobMetricNumeric)
.join(Job, JobMetricNumeric.job_id == Job.id)
.join(
WorkflowInvocationStep,
and_(
WorkflowInvocationStep.workflow_invocation_id == invocation_id, WorkflowInvocationStep.job_id == Job.id
),
)
.join(WorkflowStep, WorkflowStep.id == WorkflowInvocationStep.workflow_step_id)
)
collection_job_stmnt = (
select(JobMetricNumeric, Job.tool_id)
select(WorkflowStep.order_index, Job.tool_id, WorkflowStep.label, JobMetricNumeric)
.join(Job, JobMetricNumeric.job_id == Job.id)
.join(ImplicitCollectionJobsJobAssociation, Job.id == ImplicitCollectionJobsJobAssociation.job_id)
.join(
Expand All @@ -756,10 +758,14 @@ def get_job_metrics_for_invocation(sa_session: galaxy_scoped_session, invocation
WorkflowInvocationStep.implicit_collection_jobs_id == ImplicitCollectionJobs.id,
),
)
.join(WorkflowStep, WorkflowStep.id == WorkflowInvocationStep.workflow_step_id)
)
# should be sa_session.execute(single_job_stmnt.union(collection_job_stmnt)).all() but that returns
# columns instead of the job metrics ORM instance.
return list(sa_session.execute(single_job_stmnt).all()) + list(sa_session.execute(collection_job_stmnt).all())
return sorted(
(*sa_session.execute(single_job_stmnt).all(), *sa_session.execute(collection_job_stmnt).all()),
key=lambda row: row[0],
)


def fetch_job_states(sa_session, job_source_ids, job_source_types):
Expand Down
2 changes: 2 additions & 0 deletions lib/galaxy/schema/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -2173,6 +2173,8 @@ class JobMetric(Model):

class WorkflowJobMetric(JobMetric):
tool_id: str
step_index: int
step_label: Optional[str]


class JobMetricCollection(RootModel):
Expand Down
12 changes: 10 additions & 2 deletions lib/galaxy/webapps/galaxy/services/invocations.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,20 @@ def show_invocation_metrics(self, trans: ProvidesHistoryContext, invocation_id:
extended_job_metrics = get_job_metrics_for_invocation(trans.sa_session, invocation_id)
job_metrics = []
tool_ids = []
step_indexes = []
step_labels = []
for row in extended_job_metrics:
job_metrics.append(row[0])
step_indexes.append(row[0])
tool_ids.append(row[1])
step_labels.append(row[2])
job_metrics.append(row[3])
metrics_dict_list = summarize_metrics(trans, job_metrics)
for tool_id, metrics_dict in zip(tool_ids, metrics_dict_list):
for tool_id, step_index, step_label, metrics_dict in zip(
tool_ids, step_indexes, step_labels, metrics_dict_list
):
metrics_dict["tool_id"] = tool_id
metrics_dict["step_index"] = step_index
metrics_dict["step_label"] = step_label
return metrics_dict_list

def update_invocation_step(self, trans, step_id, action):
Expand Down

0 comments on commit 6b94267

Please sign in to comment.