Skip to content

Commit

Permalink
add observability to quartzjob
Browse files Browse the repository at this point in the history
  • Loading branch information
ashanhol committed Jan 18, 2024
1 parent fdbf58e commit efbd2ee
Show file tree
Hide file tree
Showing 10 changed files with 122 additions and 17 deletions.
1 change: 1 addition & 0 deletions service/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ dependencies {
implementation 'org.springframework.boot:spring-boot-starter-batch'
implementation 'org.springframework.boot:spring-boot-starter-quartz'
implementation 'org.springframework.boot:spring-boot-starter-actuator'
implementation 'org.springframework.boot:spring-boot-starter-aop'
implementation 'io.micrometer:micrometer-registry-prometheus'
implementation 'org.springframework.integration:spring-integration-jdbc'
implementation 'org.aspectj:aspectjweaver' // required by spring-retry, not used directly
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import static org.databiosphere.workspacedataservice.shared.model.Schedulable.ARG_URL;

import bio.terra.pfb.PfbReader;
import io.micrometer.observation.ObservationRegistry;
import java.net.URL;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -58,7 +59,9 @@ public PfbQuartzJob(
RestClientRetry restClientRetry,
BatchWriteService batchWriteService,
ActivityLogger activityLogger,
@Value("${twds.instance.workspace-id}") UUID workspaceId) {
@Value("${twds.instance.workspace-id}") UUID workspaceId,
ObservationRegistry observationRegistry) {
super(observationRegistry);
this.jobDao = jobDao;
this.wsmDao = wsmDao;
this.restClientRetry = restClientRetry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Multimap;
import io.micrometer.observation.ObservationRegistry;
import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
Expand Down Expand Up @@ -69,7 +70,9 @@ public TdrManifestQuartzJob(
BatchWriteService batchWriteService,
ActivityLogger activityLogger,
@Value("${twds.instance.workspace-id}") UUID workspaceId,
ObjectMapper mapper) {
ObjectMapper mapper,
ObservationRegistry observationRegistry) {
super(observationRegistry);
this.jobDao = jobDao;
this.wsmDao = wsmDao;
this.restClientRetry = restClientRetry;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
package org.databiosphere.workspacedataservice.jobexec;

import static org.databiosphere.workspacedataservice.metrics.MetricsDefinitions.EVENT_JOB_FAILED;
import static org.databiosphere.workspacedataservice.metrics.MetricsDefinitions.EVENT_JOB_RUNNING;
import static org.databiosphere.workspacedataservice.metrics.MetricsDefinitions.EVENT_JOB_SUCCEEDED;
import static org.databiosphere.workspacedataservice.metrics.MetricsDefinitions.NAME_JOB_EXECUTION;
import static org.databiosphere.workspacedataservice.metrics.MetricsDefinitions.OBSERVE_JOB_EXECUTE;
import static org.databiosphere.workspacedataservice.metrics.MetricsDefinitions.TAG_JOB_ID;
import static org.databiosphere.workspacedataservice.metrics.MetricsDefinitions.TAG_JOB_TYPE;
import static org.databiosphere.workspacedataservice.sam.BearerTokenFilter.ATTRIBUTE_NAME_TOKEN;
import static org.databiosphere.workspacedataservice.shared.model.Schedulable.ARG_TOKEN;

import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;
import java.net.URL;
import java.util.UUID;
import org.databiosphere.workspacedataservice.dao.JobDao;
Expand All @@ -29,16 +38,29 @@
// note this implements Quartz's `Job`, not WDS's own `Job`
public abstract class QuartzJob implements Job {

private final ObservationRegistry observationRegistry;

public QuartzJob(ObservationRegistry observationRegistry) {
this.observationRegistry = observationRegistry;
}

/** implementing classes are expected to be beans that inject a JobDao */
protected abstract JobDao getJobDao();

@Override
public void execute(JobExecutionContext context) throws org.quartz.JobExecutionException {
// retrieve jobId
UUID jobId = UUID.fromString(context.getJobDetail().getKey().getName());
try {
Observation observation =
Observation.start(OBSERVE_JOB_EXECUTE, observationRegistry)
.contextualName(NAME_JOB_EXECUTION)
.lowCardinalityKeyValue(TAG_JOB_TYPE, getClass().getName())
.highCardinalityKeyValue(TAG_JOB_ID, jobId.toString());

try (Observation.Scope scope = observation.openScope()) {
// mark this job as running
getJobDao().running(jobId);
observation.event(Observation.Event.of(EVENT_JOB_RUNNING));
// look for an auth token in the Quartz JobDataMap
String authToken = getJobDataString(context.getMergedJobDataMap(), ARG_TOKEN);
// and stash the auth token into job context
Expand All @@ -50,11 +72,15 @@ public void execute(JobExecutionContext context) throws org.quartz.JobExecutionE
executeInternal(jobId, context);
// if we reached here, mark this job as successful
getJobDao().succeeded(jobId);
observation.event(Observation.Event.of(EVENT_JOB_SUCCEEDED));
} catch (Exception e) {
// on any otherwise-unhandled exception, mark the job as failed
getJobDao().fail(jobId, e);
observation.error(e);
observation.event(Observation.Event.of(EVENT_JOB_FAILED));
} finally {
JobContextHolder.destroy();
observation.stop();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import com.google.common.collect.ImmutableList;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.observation.ObservationRegistry;
import io.micrometer.observation.aop.ObservedAspect;
import java.util.Set;
import org.springframework.boot.actuate.autoconfigure.metrics.MeterRegistryCustomizer;
import org.springframework.boot.info.BuildProperties;
Expand Down Expand Up @@ -35,6 +37,7 @@ public MetricsConfig(BuildProperties buildProperties) {
"jvm",
"logback",
"process",
"quartz",
"spring",
"system",
"tomcat",
Expand All @@ -59,4 +62,10 @@ MeterRegistryCustomizer<MeterRegistry> addWdsVersionTag() {
.add(Tag.of("wds.version", buildProperties.getVersion()))
.build());
}

// Observation support for metrics
@Bean
ObservedAspect observedAspect(ObservationRegistry observationRegistry) {
return new ObservedAspect(observationRegistry);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ public final class MetricsDefinitions {

private MetricsDefinitions() {}

// Record Metrics
/** counter for column schema changes, i.e. "alter column" sql statements */
public static final String COUNTER_COL_CHANGE = "column_change_datatype";
public static final String COUNTER_COL_CHANGE = "column.change.datatype";

/** tag for a {@link org.databiosphere.workspacedataservice.shared.model.RecordType} */
public static final String TAG_RECORD_TYPE = "RecordType";
Expand All @@ -15,4 +16,26 @@ private MetricsDefinitions() {}

/** tag for a record attribute name */
public static final String TAG_ATTRIBUTE_NAME = "AttributeName";

// Quartz Job Metrics
/** observable name for running job */
public static final String OBSERVE_JOB_EXECUTE = "wds.job.execute";

/** event name for job succeeded */
public static final String EVENT_JOB_SUCCEEDED = "job.succeeded";

/** event name for job running */
public static final String EVENT_JOB_RUNNING = "job.running";

/** event name for job failed */
public static final String EVENT_JOB_FAILED = "job.failed";

/** name for job execution (used in span) */
public static final String NAME_JOB_EXECUTION = "job-execution";

/** tag for the type of job */
public static final String TAG_JOB_TYPE = "jobType";

/** tag for the type of job */
public static final String TAG_JOB_ID = "jobId";
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import bio.terra.workspace.model.ResourceList;
import com.google.common.collect.ImmutableMap;
import io.micrometer.observation.ObservationRegistry;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.List;
Expand Down Expand Up @@ -70,6 +71,7 @@ class PfbQuartzJobE2ETest {
@Autowired RecordOrchestratorService recordOrchestratorService;
@Autowired ImportService importService;
@Autowired InstanceService instanceService;
@Autowired ObservationRegistry observationRegistry;

@MockBean SchedulerDao schedulerDao;
@MockBean WorkspaceManagerDao wsmDao;
Expand Down Expand Up @@ -127,7 +129,8 @@ void importTestResource() throws IOException, JobExecutionException {
when(wsmDao.enumerateDataRepoSnapshotReferences(any(), anyInt(), anyInt()))
.thenReturn(new ResourceList());

buildPfbQuartzJob(jobDao, wsmDao, restClientRetry, batchWriteService, activityLogger)
buildPfbQuartzJob(
jobDao, wsmDao, restClientRetry, batchWriteService, activityLogger, observationRegistry)
.execute(mockContext);

/* the testAvroResource should insert:
Expand Down Expand Up @@ -184,7 +187,8 @@ void importFourRowsResource() throws IOException, JobExecutionException {
when(wsmDao.enumerateDataRepoSnapshotReferences(any(), anyInt(), anyInt()))
.thenReturn(new ResourceList());

buildPfbQuartzJob(jobDao, wsmDao, restClientRetry, batchWriteService, activityLogger)
buildPfbQuartzJob(
jobDao, wsmDao, restClientRetry, batchWriteService, activityLogger, observationRegistry)
.execute(mockContext);

/* the fourRowsAvroResource should insert:
Expand Down Expand Up @@ -231,7 +235,8 @@ void numberPrecision() throws IOException, JobExecutionException {
when(wsmDao.enumerateDataRepoSnapshotReferences(any(), anyInt(), anyInt()))
.thenReturn(new ResourceList());

buildPfbQuartzJob(jobDao, wsmDao, restClientRetry, batchWriteService, activityLogger)
buildPfbQuartzJob(
jobDao, wsmDao, restClientRetry, batchWriteService, activityLogger, observationRegistry)
.execute(mockContext);

// this record, within the precision.avro file, is known to have numbers with high decimal
Expand Down Expand Up @@ -271,7 +276,8 @@ void importWithForwardRelations() throws IOException, JobExecutionException {
when(wsmDao.enumerateDataRepoSnapshotReferences(any(), anyInt(), anyInt()))
.thenReturn(new ResourceList());

buildPfbQuartzJob(jobDao, wsmDao, restClientRetry, batchWriteService, activityLogger)
buildPfbQuartzJob(
jobDao, wsmDao, restClientRetry, batchWriteService, activityLogger, observationRegistry)
.execute(mockContext);

/* the forwardRelationsAvroResource should insert:
Expand Down Expand Up @@ -321,7 +327,8 @@ void importCyclicalRelations() throws IOException, JobExecutionException {
when(wsmDao.enumerateDataRepoSnapshotReferences(any(), anyInt(), anyInt()))
.thenReturn(new ResourceList());

buildPfbQuartzJob(jobDao, wsmDao, restClientRetry, batchWriteService, activityLogger)
buildPfbQuartzJob(
jobDao, wsmDao, restClientRetry, batchWriteService, activityLogger, observationRegistry)
.execute(mockContext);

RecordTypeSchema dataReleaseSchema =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import bio.terra.workspace.model.ResourceAttributesUnion;
import bio.terra.workspace.model.ResourceDescription;
import bio.terra.workspace.model.ResourceList;
import io.micrometer.observation.ObservationRegistry;
import java.io.IOException;
import java.util.List;
import java.util.Random;
Expand Down Expand Up @@ -51,6 +52,7 @@ class PfbQuartzJobTest {
@MockBean BatchWriteService batchWriteService;
@MockBean ActivityLogger activityLogger;
@Autowired RestClientRetry restClientRetry;
@Autowired ObservationRegistry observationRegistry;

// test resources used below
@Value("classpath:avro/minimal_data.avro")
Expand All @@ -72,7 +74,13 @@ void linkAllNewSnapshots() {

// call linkSnapshots
PfbQuartzJob pfbQuartzJob =
buildPfbQuartzJob(jobDao, wsmDao, restClientRetry, batchWriteService, activityLogger);
buildPfbQuartzJob(
jobDao,
wsmDao,
restClientRetry,
batchWriteService,
activityLogger,
observationRegistry);
pfbQuartzJob.linkSnapshots(input);
// capture calls
ArgumentCaptor<SnapshotModel> argumentCaptor = ArgumentCaptor.forClass(SnapshotModel.class);
Expand Down Expand Up @@ -101,7 +109,13 @@ void linkNothingWhenAllExist() {

// call linkSnapshots
PfbQuartzJob pfbQuartzJob =
buildPfbQuartzJob(jobDao, wsmDao, restClientRetry, batchWriteService, activityLogger);
buildPfbQuartzJob(
jobDao,
wsmDao,
restClientRetry,
batchWriteService,
activityLogger,
observationRegistry);
pfbQuartzJob.linkSnapshots(input);
// should not call WSM's create-snapshot-reference at all
verify(wsmDao, times(0)).linkSnapshotForPolicy(any());
Expand Down Expand Up @@ -130,7 +144,13 @@ void linkSomeWhenSomeExist() {

// call linkSnapshots
PfbQuartzJob pfbQuartzJob =
buildPfbQuartzJob(jobDao, wsmDao, restClientRetry, batchWriteService, activityLogger);
buildPfbQuartzJob(
jobDao,
wsmDao,
restClientRetry,
batchWriteService,
activityLogger,
observationRegistry);
pfbQuartzJob.linkSnapshots(input);

// should call WSM's create-snapshot-reference only for the references that didn't already exist
Expand Down Expand Up @@ -158,7 +178,8 @@ void doNotFailOnMissingSnapshotId() throws JobExecutionException, IOException {
when(batchWriteService.batchWritePfbStream(any(), any(), any(), eq(BASE_ATTRIBUTES)))
.thenReturn(BatchWriteResult.empty());

buildPfbQuartzJob(jobDao, wsmDao, restClientRetry, batchWriteService, activityLogger)
buildPfbQuartzJob(
jobDao, wsmDao, restClientRetry, batchWriteService, activityLogger, observationRegistry)
.execute(mockContext);

// Should not call wsm dao
Expand All @@ -179,7 +200,8 @@ void snapshotIdsAreParsed() throws JobExecutionException, IOException {
when(batchWriteService.batchWritePfbStream(any(), any(), any(), eq(BASE_ATTRIBUTES)))
.thenReturn(BatchWriteResult.empty());

buildPfbQuartzJob(jobDao, wsmDao, restClientRetry, batchWriteService, activityLogger)
buildPfbQuartzJob(
jobDao, wsmDao, restClientRetry, batchWriteService, activityLogger, observationRegistry)
.execute(mockContext);

// The "790795c4..." UUID below is the snapshotId found in the "test.avro" resource used
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import io.micrometer.observation.ObservationRegistry;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -179,8 +180,15 @@ public static PfbQuartzJob buildPfbQuartzJob(
WorkspaceManagerDao wsmDao,
RestClientRetry restClientRetry,
BatchWriteService batchWriteService,
ActivityLogger activityLogger) {
ActivityLogger activityLogger,
ObservationRegistry observationRegistry) {
return new PfbQuartzJob(
jobDao, wsmDao, restClientRetry, batchWriteService, activityLogger, UUID.randomUUID());
jobDao,
wsmDao,
restClientRetry,
batchWriteService,
activityLogger,
UUID.randomUUID(),
observationRegistry);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.databiosphere.workspacedataservice.dataimport.tdr;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.micrometer.observation.ObservationRegistry;
import java.net.URL;
import java.util.UUID;
import org.databiosphere.workspacedataservice.activitylog.ActivityLogger;
Expand All @@ -19,6 +20,7 @@ class TdrTestSupport {
@Autowired private BatchWriteService batchWriteService;
@Autowired private ActivityLogger activityLogger;
@Autowired private ObjectMapper objectMapper;
@Autowired private ObservationRegistry observationRegistry;

/** Returns a TdrManifestQuartzJob that is capable of pulling parquet files from the classpath. */
TdrManifestQuartzJob buildTdrManifestQuartzJob(UUID workspaceId) {
Expand All @@ -29,7 +31,8 @@ TdrManifestQuartzJob buildTdrManifestQuartzJob(UUID workspaceId) {
batchWriteService,
activityLogger,
workspaceId,
objectMapper) {
objectMapper,
observationRegistry) {
@Override
protected URL parseUrl(String path) {
if (path.startsWith("classpath:")) {
Expand Down

0 comments on commit efbd2ee

Please sign in to comment.