Skip to content

Commit

Permalink
Add async observability to QuartzJob (#461)
Browse files Browse the repository at this point in the history
Added Observability to QuartzJob.execute denoting running, failure, and succeed states. The DefaultMeterObservationHandler gives us a built in Timer, Counter, and LongTaskTimer.
  • Loading branch information
ashanhol authored Feb 1, 2024
1 parent dc85057 commit a2554ee
Show file tree
Hide file tree
Showing 11 changed files with 232 additions and 45 deletions.
1 change: 1 addition & 0 deletions service/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ dependencies {

annotationProcessor 'org.springframework.boot:spring-boot-configuration-processor'
runtimeOnly 'org.webjars.npm:swagger-ui-dist:5.9.0'
testImplementation 'io.micrometer:micrometer-observation-test'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.junit.jupiter:junit-jupiter'
testImplementation project(':client')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import static org.databiosphere.workspacedataservice.shared.model.Schedulable.ARG_URL;

import bio.terra.pfb.PfbReader;
import io.micrometer.observation.ObservationRegistry;
import java.net.URL;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -58,7 +59,9 @@ public PfbQuartzJob(
RestClientRetry restClientRetry,
BatchWriteService batchWriteService,
ActivityLogger activityLogger,
ObservationRegistry observationRegistry,
@Value("${twds.instance.workspace-id}") UUID workspaceId) {
super(observationRegistry);
this.jobDao = jobDao;
this.wsmDao = wsmDao;
this.restClientRetry = restClientRetry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Multimap;
import io.micrometer.observation.ObservationRegistry;
import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
Expand Down Expand Up @@ -66,7 +67,9 @@ public TdrManifestQuartzJob(
BatchWriteService batchWriteService,
ActivityLogger activityLogger,
@Value("${twds.instance.workspace-id}") UUID workspaceId,
ObjectMapper mapper) {
ObjectMapper mapper,
ObservationRegistry observationRegistry) {
super(observationRegistry);
this.jobDao = jobDao;
this.wsmDao = wsmDao;
this.restClientRetry = restClientRetry;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package org.databiosphere.workspacedataservice.jobexec;

import static org.databiosphere.workspacedataservice.generated.GenericJobServerModel.StatusEnum;
import static org.databiosphere.workspacedataservice.sam.BearerTokenFilter.ATTRIBUTE_NAME_TOKEN;
import static org.databiosphere.workspacedataservice.shared.model.Schedulable.ARG_TOKEN;

import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;
import java.net.URL;
import java.util.UUID;
import org.databiosphere.workspacedataservice.dao.JobDao;
Expand All @@ -29,16 +32,29 @@
// note this implements Quartz's `Job`, not WDS's own `Job`
public abstract class QuartzJob implements Job {

private final ObservationRegistry observationRegistry;

protected QuartzJob(ObservationRegistry observationRegistry) {
this.observationRegistry = observationRegistry;
}

/** implementing classes are expected to be beans that inject a JobDao */
protected abstract JobDao getJobDao();

@Override
public void execute(JobExecutionContext context) throws org.quartz.JobExecutionException {
// retrieve jobId
UUID jobId = UUID.fromString(context.getJobDetail().getKey().getName());

Observation observation =
Observation.start("wds.job.execute", observationRegistry)
.contextualName("job-execution")
.lowCardinalityKeyValue("jobType", getClass().getSimpleName())
.highCardinalityKeyValue("jobId", jobId.toString());
try {
// mark this job as running
getJobDao().running(jobId);
observation.event(Observation.Event.of("job.running"));
// look for an auth token in the Quartz JobDataMap
String authToken = getJobDataString(context.getMergedJobDataMap(), ARG_TOKEN);
// and stash the auth token into job context
Expand All @@ -50,11 +66,15 @@ public void execute(JobExecutionContext context) throws org.quartz.JobExecutionE
executeInternal(jobId, context);
// if we reached here, mark this job as successful
getJobDao().succeeded(jobId);
observation.lowCardinalityKeyValue("outcome", StatusEnum.SUCCEEDED.getValue());
} catch (Exception e) {
// on any otherwise-unhandled exception, mark the job as failed
getJobDao().fail(jobId, e);
observation.error(e);
observation.lowCardinalityKeyValue("outcome", StatusEnum.ERROR.getValue());
} finally {
JobContextHolder.destroy();
observation.stop();
}
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package org.databiosphere.workspacedataservice.service;

import static org.databiosphere.workspacedataservice.metrics.MetricsDefinitions.COUNTER_COL_CHANGE;
import static org.databiosphere.workspacedataservice.metrics.MetricsDefinitions.TAG_ATTRIBUTE_NAME;
import static org.databiosphere.workspacedataservice.metrics.MetricsDefinitions.TAG_INSTANCE;
import static org.databiosphere.workspacedataservice.metrics.MetricsDefinitions.TAG_RECORD_TYPE;
import static org.databiosphere.workspacedataservice.service.model.ReservedNames.RESERVED_NAME_PREFIX;

import bio.terra.common.db.WriteTransaction;
Expand Down Expand Up @@ -46,6 +42,13 @@

@Service
public class RecordService {
// strings used for metrics
public static final String COUNTER_COL_CHANGE = "column.change.datatype";
public static final String TAG_RECORD_TYPE = "RecordType";
public static final String TAG_INSTANCE = "Instance";
public static final String TAG_ATTRIBUTE_NAME = "AttributeName";
public static final String TAG_OLD_DATATYPE = "OldDataType";
public static final String TAG_NEW_DATATYPE = "NewDataType";

private final RecordDao recordDao;

Expand Down Expand Up @@ -224,8 +227,8 @@ public Map<String, DataTypeMapping> addOrUpdateColumnIfNeeded(
.tag(TAG_RECORD_TYPE, recordType.getName())
.tag(TAG_ATTRIBUTE_NAME, column)
.tag(TAG_INSTANCE, instanceId.toString())
.tag("OldDataType", valueDifference.leftValue().toString())
.tag("NewDataType", updatedColType.toString())
.tag(TAG_OLD_DATATYPE, valueDifference.leftValue().toString())
.tag(TAG_NEW_DATATYPE, updatedColType.toString())
.description("Column schema changes")
.register(meterRegistry);
counter.increment();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.databiosphere.workspacedataservice.dataimport.pfb;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.micrometer.observation.ObservationRegistry;
import java.util.UUID;
import org.databiosphere.workspacedataservice.activitylog.ActivityLogger;
import org.databiosphere.workspacedataservice.dao.JobDao;
Expand All @@ -18,10 +19,17 @@ class PfbTestSupport {
@Autowired private BatchWriteService batchWriteService;
@Autowired private ActivityLogger activityLogger;
@Autowired private ObjectMapper objectMapper;
@Autowired private ObservationRegistry observationRegistry;

PfbQuartzJob buildPfbQuartzJob(UUID workspaceId) {
return new PfbQuartzJob(
jobDao, wsmDao, restClientRetry, batchWriteService, activityLogger, workspaceId);
jobDao,
wsmDao,
restClientRetry,
batchWriteService,
activityLogger,
observationRegistry,
workspaceId);
}

PfbQuartzJob buildPfbQuartzJob() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.databiosphere.workspacedataservice.dataimport.tdr;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.micrometer.observation.ObservationRegistry;
import java.net.URL;
import java.util.UUID;
import org.databiosphere.workspacedataservice.activitylog.ActivityLogger;
Expand All @@ -19,6 +20,7 @@ class TdrTestSupport {
@Autowired private BatchWriteService batchWriteService;
@Autowired private ActivityLogger activityLogger;
@Autowired private ObjectMapper objectMapper;
@Autowired private ObservationRegistry observationRegistry;

/** Returns a TdrManifestQuartzJob that is capable of pulling parquet files from the classpath. */
TdrManifestQuartzJob buildTdrManifestQuartzJob(UUID workspaceId) {
Expand All @@ -29,7 +31,8 @@ TdrManifestQuartzJob buildTdrManifestQuartzJob(UUID workspaceId) {
batchWriteService,
activityLogger,
workspaceId,
objectMapper) {
objectMapper,
observationRegistry) {
@Override
protected URL parseUrl(String path) {
if (path.startsWith("classpath:")) {
Expand Down
Loading

0 comments on commit a2554ee

Please sign in to comment.