Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add async observability to QuartzJob #461

Merged
merged 17 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions service/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ dependencies {

annotationProcessor 'org.springframework.boot:spring-boot-configuration-processor'
runtimeOnly 'org.webjars.npm:swagger-ui-dist:5.9.0'
testImplementation 'io.micrometer:micrometer-observation-test'
ashanhol marked this conversation as resolved.
Show resolved Hide resolved
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.junit.jupiter:junit-jupiter'
testImplementation project(':client')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import static org.databiosphere.workspacedataservice.shared.model.Schedulable.ARG_URL;

import bio.terra.pfb.PfbReader;
import io.micrometer.observation.ObservationRegistry;
import java.net.URL;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -58,7 +59,9 @@ public PfbQuartzJob(
RestClientRetry restClientRetry,
BatchWriteService batchWriteService,
ActivityLogger activityLogger,
ObservationRegistry observationRegistry,
@Value("${twds.instance.workspace-id}") UUID workspaceId) {
super(observationRegistry);
this.jobDao = jobDao;
this.wsmDao = wsmDao;
this.restClientRetry = restClientRetry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Multimap;
import io.micrometer.observation.ObservationRegistry;
import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
Expand Down Expand Up @@ -69,7 +70,9 @@ public TdrManifestQuartzJob(
BatchWriteService batchWriteService,
ActivityLogger activityLogger,
@Value("${twds.instance.workspace-id}") UUID workspaceId,
ObjectMapper mapper) {
ObjectMapper mapper,
ObservationRegistry observationRegistry) {
super(observationRegistry);
this.jobDao = jobDao;
this.wsmDao = wsmDao;
this.restClientRetry = restClientRetry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import static org.databiosphere.workspacedataservice.sam.BearerTokenFilter.ATTRIBUTE_NAME_TOKEN;
import static org.databiosphere.workspacedataservice.shared.model.Schedulable.ARG_TOKEN;

import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;
import java.net.URL;
import java.util.UUID;
import org.databiosphere.workspacedataservice.dao.JobDao;
Expand All @@ -29,16 +31,29 @@
// note this implements Quartz's `Job`, not WDS's own `Job`
public abstract class QuartzJob implements Job {

private final ObservationRegistry observationRegistry;

protected QuartzJob(ObservationRegistry observationRegistry) {
this.observationRegistry = observationRegistry;
}

/** implementing classes are expected to be beans that inject a JobDao */
protected abstract JobDao getJobDao();

@Override
public void execute(JobExecutionContext context) throws org.quartz.JobExecutionException {
// retrieve jobId
UUID jobId = UUID.fromString(context.getJobDetail().getKey().getName());

Observation observation =
Observation.start("wds.job.execute", observationRegistry)
.contextualName("job-execution")
.lowCardinalityKeyValue("jobType", getClass().getSimpleName())
.highCardinalityKeyValue("jobId", jobId.toString());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How feasible is it to start an Observation in a separate class - and a separate Java thread - and update the Observation here?

We queue the job over in ImportService.java:

// schedule the job. after successfully scheduling, mark the job as queued

and if we wanted to track timing between QUEUED and RUNNING status, we'd need to start the Observation over there.

I am totally fine answering this question sometime after this PR merges. What's in this PR as-is is useful and adds value, so it's worth it on its own.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call @davidangb this is actually the spirit behind the measurement, which is to know how long the customer has to wait, so I'm in favor of this. Ok to defer to another PR though if you just want to land this one @ashanhol

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually had this exact thought when doing this, but couldn't quite figure out how to "pass" an observation from one class to another. I'm happy to make a ticket to think about it further.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try {
// mark this job as running
getJobDao().running(jobId);
observation.event(Observation.Event.of("job.running"));
ashanhol marked this conversation as resolved.
Show resolved Hide resolved
// look for an auth token in the Quartz JobDataMap
String authToken = getJobDataString(context.getMergedJobDataMap(), ARG_TOKEN);
// and stash the auth token into job context
Expand All @@ -50,11 +65,15 @@ public void execute(JobExecutionContext context) throws org.quartz.JobExecutionE
executeInternal(jobId, context);
// if we reached here, mark this job as successful
getJobDao().succeeded(jobId);
observation.lowCardinalityKeyValue("outcome", "success");
} catch (Exception e) {
// on any otherwise-unhandled exception, mark the job as failed
getJobDao().fail(jobId, e);
observation.error(e);
observation.lowCardinalityKeyValue("outcome", "failure");
ashanhol marked this conversation as resolved.
Show resolved Hide resolved
} finally {
JobContextHolder.destroy();
observation.stop();
}
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package org.databiosphere.workspacedataservice.metrics;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.observation.DefaultMeterObservationHandler;
import io.micrometer.observation.ObservationRegistry;
import org.springframework.boot.actuate.autoconfigure.observation.ObservationRegistryCustomizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ObservationConfig {
private final MeterRegistry meterRegistry;

public ObservationConfig(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
}

@Bean
ObservationRegistryCustomizer<ObservationRegistry> useAutowiredMeterRegistry() {
return observationRegistry ->
observationRegistry
.observationConfig()
.observationHandler(new DefaultMeterObservationHandler(meterRegistry));
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package org.databiosphere.workspacedataservice.service;

import static org.databiosphere.workspacedataservice.metrics.MetricsDefinitions.COUNTER_COL_CHANGE;
import static org.databiosphere.workspacedataservice.metrics.MetricsDefinitions.TAG_ATTRIBUTE_NAME;
import static org.databiosphere.workspacedataservice.metrics.MetricsDefinitions.TAG_INSTANCE;
import static org.databiosphere.workspacedataservice.metrics.MetricsDefinitions.TAG_RECORD_TYPE;
import static org.databiosphere.workspacedataservice.service.model.ReservedNames.RESERVED_NAME_PREFIX;

import bio.terra.common.db.WriteTransaction;
Expand Down Expand Up @@ -220,10 +216,10 @@ public Map<String, DataTypeMapping> addOrUpdateColumnIfNeeded(

// update a metrics counter with this schema change
Counter counter =
Counter.builder(COUNTER_COL_CHANGE)
.tag(TAG_RECORD_TYPE, recordType.getName())
.tag(TAG_ATTRIBUTE_NAME, column)
.tag(TAG_INSTANCE, instanceId.toString())
Counter.builder("column.change.datatype")
.tag("RecordType", recordType.getName())
.tag("AttributeName", column)
.tag("Instance", instanceId.toString())
ashanhol marked this conversation as resolved.
Show resolved Hide resolved
.tag("OldDataType", valueDifference.leftValue().toString())
.tag("NewDataType", updatedColType.toString())
.description("Column schema changes")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.databiosphere.workspacedataservice.dataimport.pfb;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.micrometer.observation.ObservationRegistry;
import java.util.UUID;
import org.databiosphere.workspacedataservice.activitylog.ActivityLogger;
import org.databiosphere.workspacedataservice.dao.JobDao;
Expand All @@ -18,10 +19,17 @@ class PfbTestSupport {
@Autowired private BatchWriteService batchWriteService;
@Autowired private ActivityLogger activityLogger;
@Autowired private ObjectMapper objectMapper;
@Autowired private ObservationRegistry observationRegistry;

PfbQuartzJob buildPfbQuartzJob(UUID workspaceId) {
return new PfbQuartzJob(
jobDao, wsmDao, restClientRetry, batchWriteService, activityLogger, workspaceId);
jobDao,
wsmDao,
restClientRetry,
batchWriteService,
activityLogger,
observationRegistry,
workspaceId);
}

PfbQuartzJob buildPfbQuartzJob() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.databiosphere.workspacedataservice.dataimport.tdr;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.micrometer.observation.ObservationRegistry;
import java.net.URL;
import java.util.UUID;
import org.databiosphere.workspacedataservice.activitylog.ActivityLogger;
Expand All @@ -19,6 +20,7 @@ class TdrTestSupport {
@Autowired private BatchWriteService batchWriteService;
@Autowired private ActivityLogger activityLogger;
@Autowired private ObjectMapper objectMapper;
@Autowired private ObservationRegistry observationRegistry;

/** Returns a TdrManifestQuartzJob that is capable of pulling parquet files from the classpath. */
TdrManifestQuartzJob buildTdrManifestQuartzJob(UUID workspaceId) {
Expand All @@ -29,7 +31,8 @@ TdrManifestQuartzJob buildTdrManifestQuartzJob(UUID workspaceId) {
batchWriteService,
activityLogger,
workspaceId,
objectMapper) {
objectMapper,
observationRegistry) {
@Override
protected URL parseUrl(String path) {
if (path.startsWith("classpath:")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,30 @@

import static org.databiosphere.workspacedataservice.shared.model.Schedulable.ARG_TOKEN;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import io.micrometer.core.instrument.LongTaskTimer;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Timer;
import io.micrometer.observation.ObservationRegistry;
import io.micrometer.observation.tck.TestObservationRegistry;
import io.micrometer.observation.tck.TestObservationRegistryAssert;
import java.time.OffsetDateTime;
import java.time.ZoneId;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.RandomStringUtils;
import org.databiosphere.workspacedataservice.dao.JobDao;
import org.databiosphere.workspacedataservice.generated.GenericJobServerModel;
import org.databiosphere.workspacedataservice.sam.TokenContextUtil;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
Expand All @@ -24,6 +34,7 @@
import org.quartz.JobExecutionException;
import org.quartz.JobKey;
import org.quartz.impl.JobDetailImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;

Expand All @@ -32,6 +43,9 @@
class QuartzJobTest {

@MockBean JobDao jobDao;
@Autowired ObservationRegistry observationRegistry;
@Autowired MeterRegistry meterRegistry;
private TestObservationRegistry testObservationRegistry;
ashanhol marked this conversation as resolved.
Show resolved Hide resolved

@BeforeAll
void beforeAll() {
Expand All @@ -50,6 +64,16 @@ void beforeAll() {
.thenThrow(new RuntimeException("test failed via jobDao.fail()"));
when(jobDao.fail(any(), anyString()))
.thenThrow(new RuntimeException("test failed via jobDao.fail()"));
// set up metrics registries
testObservationRegistry = TestObservationRegistry.create();
Metrics.globalRegistry.add(meterRegistry);
}

@AfterAll
void afterAll() {
testObservationRegistry.clear();
meterRegistry.clear();
Metrics.globalRegistry.clear();
ashanhol marked this conversation as resolved.
Show resolved Hide resolved
}

/**
Expand All @@ -61,7 +85,8 @@ class TestableQuartzJob extends QuartzJob {

private final String expectedToken;

public TestableQuartzJob(String expectedToken) {
public TestableQuartzJob(String expectedToken, ObservationRegistry registry) {
super(registry);
this.expectedToken = expectedToken;
}

Expand All @@ -80,21 +105,62 @@ protected void executeInternal(UUID jobId, JobExecutionContext context) {
void tokenIsStashedAndCleaned() throws JobExecutionException {
// set an example token, via mock, into the Quartz JobDataMap
String expectedToken = RandomStringUtils.randomAlphanumeric(10);
JobExecutionContext mockContext = mock(JobExecutionContext.class);
when(mockContext.getMergedJobDataMap())
.thenReturn(new JobDataMap(Map.of(ARG_TOKEN, expectedToken)));
JobDetailImpl jobDetail = new JobDetailImpl();
jobDetail.setKey(new JobKey(UUID.randomUUID().toString(), "bar"));
when(mockContext.getJobDetail()).thenReturn(jobDetail);

JobExecutionContext mockContext = setUpTestJob(expectedToken, UUID.randomUUID().toString());
// assert that no token exists via TokenContextUtil
assertTrue(
TokenContextUtil.getToken().isEmpty(), "no token should exist before running the job");
// execute the TestableQuartzJob, which asserts that the token passed properly from the
// JobDataMap into job context and is therefore retrievable via TokenContextUtil
new TestableQuartzJob(expectedToken).execute(mockContext);
new TestableQuartzJob(expectedToken, observationRegistry).execute(mockContext);
// assert that the token is cleaned up and no longer reachable via TokenContextUtil
assertTrue(
TokenContextUtil.getToken().isEmpty(), "no token should exist after running the job");
}

@Test
void correctObservation() throws JobExecutionException {
String randomToken = RandomStringUtils.randomAlphanumeric(10);
String jobUuid = UUID.randomUUID().toString();
JobExecutionContext mockContext = setUpTestJob(randomToken, jobUuid);

// execute the TestableQuartzJob, then use testObservationRegistry to confirm observation
new TestableQuartzJob(randomToken, testObservationRegistry).execute(mockContext);

TestObservationRegistryAssert.assertThat(testObservationRegistry)
.doesNotHaveAnyRemainingCurrentObservation()
.hasObservationWithNameEqualTo("wds.job.execute")
.that()
.hasHighCardinalityKeyValue("jobId", jobUuid)
.hasLowCardinalityKeyValue("jobType", "TestableQuartzJob")
.hasLowCardinalityKeyValue("outcome", "success")
.hasBeenStarted()
.hasBeenStopped();
ashanhol marked this conversation as resolved.
Show resolved Hide resolved
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you add a test for a failing job that looks for .hasLowCardinalityKeyValue("outcome", "failure")? You probably need to modify TestableQuartzJob or create something new that extends QuartzJob but throws an error in executeInternal.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I'm losing my mind here. In this current test implementation I have the execute function wrapped in a assertThrows, and it fails because it says it doesn't throw anything. But if I get rid of assertThrows it totally does. I feel like I'm missing something obvious

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK @jladieu and I figured this out, and it's confusing. We have two JobExecutionExceptions, one from Quartz and one we define. The one we define extends RuntimeException and therefore doesn't need to be caught/handled, but the quartz one does. QuartzJob.execute throws a org.quartz.JobExecutionException (and therefore needs the throws org.quartz.JobExecutionException in the method signature), but all our custom jobs throw various RuntimeException ones. So I had the right idea having the TestQuartzJob throw our custom one but it made writing this test VERY confusing

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this explanation helps to explain …

The typical data flow is:

  • Quartz looks for an execute() method and runs that method when it kicks off a job
  • in the WDS codebase, we define the execute() method in QuartzJob.
  • QuartzJob.execute() wraps and calls the executeInternal() method defined in subclasses such as TdrManifestQuartzJob or PfbQuartzJob
  • As part of its wrapping, QuartzJob.execute() !!catches any exceptions thrown by executeInternal!!. On catch, it marks the job as failed and swallows the exception so Quartz can mark its job as complete.

In this test:

  • the test defines a TestableQuartzJob subclass of QuartzJob
  • the test throws an exception !!from executeInternal()!!
  • the QuartzJob.execute() wrapper properly catches the error thrown from executeInternal(), marks the job as failed, registers metrics, and swallows the error.

So, it's expected that when this test calls TestableQuartzJob(...).execute, it will not throw an error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I figured it out eventually. I personally didn't know that RuntimeException had different behavior than regular Exception and that required some compiler fighting and extra confusion with the namespaces.

@Test
void correctMetrics() throws JobExecutionException {
String randomToken = RandomStringUtils.randomAlphanumeric(10);
JobExecutionContext mockContext = setUpTestJob(randomToken, UUID.randomUUID().toString());

ashanhol marked this conversation as resolved.
Show resolved Hide resolved
// execute the TestableQuartzJob, then confirm metrics provisioned correctly
new TestableQuartzJob(randomToken, observationRegistry).execute(mockContext);

// metrics provisioned should be longTaskTimer and a Timer, confirm both ran
LongTaskTimer longTaskTimer = meterRegistry.find("wds.job.execute.active").longTaskTimer();
ashanhol marked this conversation as resolved.
Show resolved Hide resolved
assertNotEquals(0, longTaskTimer.duration(TimeUnit.SECONDS));
ashanhol marked this conversation as resolved.
Show resolved Hide resolved
Timer timer = meterRegistry.find("wds.job.execute").timer();
ashanhol marked this conversation as resolved.
Show resolved Hide resolved
assertNotEquals(0, timer.count());
ashanhol marked this conversation as resolved.
Show resolved Hide resolved
}

// sets up a job and returns the job context
ashanhol marked this conversation as resolved.
Show resolved Hide resolved
private JobExecutionContext setUpTestJob(String randomToken, String jobUuid) {
// String jobUuid = UUID.randomUUID().toString();
JobExecutionContext mockContext = mock(JobExecutionContext.class);
when(mockContext.getMergedJobDataMap())
.thenReturn(new JobDataMap(Map.of(ARG_TOKEN, randomToken)));
JobDetailImpl jobDetail = new JobDetailImpl();
jobDetail.setKey(new JobKey(jobUuid, "bar"));
when(mockContext.getJobDetail()).thenReturn(jobDetail);
return mockContext;
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
package org.databiosphere.workspacedataservice.service;

import static org.databiosphere.workspacedataservice.metrics.MetricsDefinitions.TAG_ATTRIBUTE_NAME;
import static org.databiosphere.workspacedataservice.metrics.MetricsDefinitions.TAG_INSTANCE;
import static org.databiosphere.workspacedataservice.metrics.MetricsDefinitions.TAG_RECORD_TYPE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;

Expand Down Expand Up @@ -93,9 +90,9 @@ void schemaChangesIncrementMetricsCounter() {
// assert the counter has been incremented once
assertEquals(1, counter.count());
// validate the tags for that counter
assertEquals(recordType.getName(), counter.getId().getTag(TAG_RECORD_TYPE));
assertEquals("myAttr", counter.getId().getTag(TAG_ATTRIBUTE_NAME));
assertEquals(instanceId.toString(), counter.getId().getTag(TAG_INSTANCE));
assertEquals(recordType.getName(), counter.getId().getTag("RecordType"));
ashanhol marked this conversation as resolved.
Show resolved Hide resolved
assertEquals("myAttr", counter.getId().getTag("AttributeName"));
assertEquals(instanceId.toString(), counter.getId().getTag("Instance"));
assertEquals(DataTypeMapping.NUMBER.toString(), counter.getId().getTag("OldDataType"));
assertEquals(DataTypeMapping.STRING.toString(), counter.getId().getTag("NewDataType"));
}
Expand Down
Loading