Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add async observability to QuartzJob #461

Merged
merged 17 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import static org.databiosphere.workspacedataservice.shared.model.Schedulable.ARG_URL;

import bio.terra.pfb.PfbReader;
import io.micrometer.observation.ObservationRegistry;
import java.net.URL;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -58,7 +59,9 @@ public PfbQuartzJob(
RestClientRetry restClientRetry,
BatchWriteService batchWriteService,
ActivityLogger activityLogger,
ObservationRegistry observationRegistry,
@Value("${twds.instance.workspace-id}") UUID workspaceId) {
super(observationRegistry);
this.jobDao = jobDao;
this.wsmDao = wsmDao;
this.restClientRetry = restClientRetry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Multimap;
import io.micrometer.observation.ObservationRegistry;
import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
Expand Down Expand Up @@ -69,7 +70,9 @@ public TdrManifestQuartzJob(
BatchWriteService batchWriteService,
ActivityLogger activityLogger,
@Value("${twds.instance.workspace-id}") UUID workspaceId,
ObjectMapper mapper) {
ObjectMapper mapper,
ObservationRegistry observationRegistry) {
super(observationRegistry);
this.jobDao = jobDao;
this.wsmDao = wsmDao;
this.restClientRetry = restClientRetry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
import static org.databiosphere.workspacedataservice.sam.BearerTokenFilter.ATTRIBUTE_NAME_TOKEN;
import static org.databiosphere.workspacedataservice.shared.model.Schedulable.ARG_TOKEN;

import io.micrometer.core.instrument.observation.DefaultMeterObservationHandler;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;
import java.net.URL;
import java.util.UUID;
import org.databiosphere.workspacedataservice.dao.JobDao;
Expand All @@ -29,16 +33,32 @@
// note this implements Quartz's `Job`, not WDS's own `Job`
public abstract class QuartzJob implements Job {

private final ObservationRegistry observationRegistry;

protected QuartzJob(ObservationRegistry observationRegistry) {
this.observationRegistry = observationRegistry;
}

/** implementing classes are expected to be beans that inject a JobDao */
protected abstract JobDao getJobDao();

@Override
public void execute(JobExecutionContext context) throws org.quartz.JobExecutionException {
// retrieve jobId
UUID jobId = UUID.fromString(context.getJobDetail().getKey().getName());
try {
// configure observation to collect counter, timer, and longtasktimer metrics
observationRegistry
ashanhol marked this conversation as resolved.
Show resolved Hide resolved
.observationConfig()
.observationHandler(new DefaultMeterObservationHandler(new SimpleMeterRegistry()));
Observation observation =
Observation.start("wds.job.execute", observationRegistry)
.contextualName("job-execution")
.lowCardinalityKeyValue("jobType", getClass().getSimpleName())
.highCardinalityKeyValue("jobId", jobId.toString());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How feasible is it to start an Observation in a separate class - and a separate Java thread - and update the Observation here?

We queue the job over in ImportService.java:

// schedule the job. after successfully scheduling, mark the job as queued

and if we wanted to track timing between QUEUED and RUNNING status, we'd need to start the Observation over there.

I am totally fine answering this question sometime after this PR merges. What's in this PR as-is is useful and adds value, so it's worth it on its own.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call @davidangb this is actually the spirit behind the measurement, which is to know how long the customer has to wait, so I'm in favor of this. Ok to defer to another PR though if you just want to land this one @ashanhol

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually had this exact thought when doing this, but couldn't quite figure out how to "pass" an observation from one class to another. I'm happy to make a ticket to think about it further.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try (Observation.Scope scope = observation.openScope()) {
// mark this job as running
getJobDao().running(jobId);
observation.event(Observation.Event.of("job.running"));
ashanhol marked this conversation as resolved.
Show resolved Hide resolved
// look for an auth token in the Quartz JobDataMap
String authToken = getJobDataString(context.getMergedJobDataMap(), ARG_TOKEN);
// and stash the auth token into job context
Expand All @@ -50,11 +70,15 @@ public void execute(JobExecutionContext context) throws org.quartz.JobExecutionE
executeInternal(jobId, context);
// if we reached here, mark this job as successful
getJobDao().succeeded(jobId);
observation.event(Observation.Event.of("job.succeeded"));
} catch (Exception e) {
// on any otherwise-unhandled exception, mark the job as failed
getJobDao().fail(jobId, e);
observation.error(e);
observation.event(Observation.Event.of("job.failed"));
} finally {
JobContextHolder.destroy();
observation.stop();
}
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package org.databiosphere.workspacedataservice.service;

import static org.databiosphere.workspacedataservice.metrics.MetricsDefinitions.COUNTER_COL_CHANGE;
import static org.databiosphere.workspacedataservice.metrics.MetricsDefinitions.TAG_ATTRIBUTE_NAME;
import static org.databiosphere.workspacedataservice.metrics.MetricsDefinitions.TAG_INSTANCE;
import static org.databiosphere.workspacedataservice.metrics.MetricsDefinitions.TAG_RECORD_TYPE;
import static org.databiosphere.workspacedataservice.service.model.ReservedNames.RESERVED_NAME_PREFIX;

import bio.terra.common.db.WriteTransaction;
Expand Down Expand Up @@ -220,10 +216,10 @@ public Map<String, DataTypeMapping> addOrUpdateColumnIfNeeded(

// update a metrics counter with this schema change
Counter counter =
Counter.builder(COUNTER_COL_CHANGE)
.tag(TAG_RECORD_TYPE, recordType.getName())
.tag(TAG_ATTRIBUTE_NAME, column)
.tag(TAG_INSTANCE, instanceId.toString())
Counter.builder("column.change.datatype")
.tag("RecordType", recordType.getName())
.tag("AttributeName", column)
.tag("Instance", instanceId.toString())
ashanhol marked this conversation as resolved.
Show resolved Hide resolved
.tag("OldDataType", valueDifference.leftValue().toString())
.tag("NewDataType", updatedColType.toString())
.description("Column schema changes")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.databiosphere.workspacedataservice.dataimport.pfb;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.micrometer.observation.ObservationRegistry;
import java.util.UUID;
import org.databiosphere.workspacedataservice.activitylog.ActivityLogger;
import org.databiosphere.workspacedataservice.dao.JobDao;
Expand All @@ -18,10 +19,17 @@ class PfbTestSupport {
@Autowired private BatchWriteService batchWriteService;
@Autowired private ActivityLogger activityLogger;
@Autowired private ObjectMapper objectMapper;
@Autowired private ObservationRegistry observationRegistry;

PfbQuartzJob buildPfbQuartzJob(UUID workspaceId) {
return new PfbQuartzJob(
jobDao, wsmDao, restClientRetry, batchWriteService, activityLogger, workspaceId);
jobDao,
wsmDao,
restClientRetry,
batchWriteService,
activityLogger,
observationRegistry,
workspaceId);
}

PfbQuartzJob buildPfbQuartzJob() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.databiosphere.workspacedataservice.dataimport.tdr;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.micrometer.observation.ObservationRegistry;
import java.net.URL;
import java.util.UUID;
import org.databiosphere.workspacedataservice.activitylog.ActivityLogger;
Expand All @@ -19,6 +20,7 @@ class TdrTestSupport {
@Autowired private BatchWriteService batchWriteService;
@Autowired private ActivityLogger activityLogger;
@Autowired private ObjectMapper objectMapper;
@Autowired private ObservationRegistry observationRegistry;

/** Returns a TdrManifestQuartzJob that is capable of pulling parquet files from the classpath. */
TdrManifestQuartzJob buildTdrManifestQuartzJob(UUID workspaceId) {
Expand All @@ -29,7 +31,8 @@ TdrManifestQuartzJob buildTdrManifestQuartzJob(UUID workspaceId) {
batchWriteService,
activityLogger,
workspaceId,
objectMapper) {
objectMapper,
observationRegistry) {
@Override
protected URL parseUrl(String path) {
if (path.startsWith("classpath:")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import io.micrometer.observation.ObservationRegistry;
import java.time.OffsetDateTime;
import java.time.ZoneId;
import java.util.Map;
Expand All @@ -24,6 +25,7 @@
import org.quartz.JobExecutionException;
import org.quartz.JobKey;
import org.quartz.impl.JobDetailImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;

Expand All @@ -32,6 +34,7 @@
class QuartzJobTest {

@MockBean JobDao jobDao;
@Autowired ObservationRegistry observationRegistry;

@BeforeAll
void beforeAll() {
Expand Down Expand Up @@ -62,6 +65,7 @@ class TestableQuartzJob extends QuartzJob {
private final String expectedToken;

public TestableQuartzJob(String expectedToken) {
super(observationRegistry);
this.expectedToken = expectedToken;
}

Expand Down
Loading