Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WOR-1544] #major Populate MDC for flight and its steps #125

Merged
merged 10 commits into from
Mar 4, 2024
2 changes: 1 addition & 1 deletion .github/workflows/build-and-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ jobs:
distribution: 'temurin'
cache: 'gradle'
- name: Run tests
run: ./gradlew test
run: ./gradlew test --scan
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this do?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This generates a Gradle build scan as part of test output, which makes debugging tests a lot easier. Example from the most recent unit test run:

Publishing build scan...
https://gradle.com/s/gu4jckjm4mqzq


# TODO: Work with AppSec to get Sonar scans setup for this repo
# Run the Sonar scan after `gradle test` to include code coverage data in its report.
Expand Down
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,16 @@ Stairway is designed to provide atomic operations for one instance of one servic
global or cross-service state. Therefore, it is up to the application or service to implement concurrency control on
its objects.

### Context Awareness and Logs
Stairway leverages the underlying logging system's mapped diagnostic context (MDC) if available.

MDC manages contextual information on a per-thread basis: as Stairway submits a Flight for processing,
it **passes along the MDC of the calling thread** so that context isn't lost. It also further populates
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Propagating the MDC from the calling thread makes me want to designate this as a minor version bump at minimum. Default behavior in this repo is patch bumps.

Reviewers, WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with a more significant version bump. Though with all the dependency upgrades underway, I was already thinking we would do a major version bump very soon. Perhaps just do a major bump with this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. I'll make it a major one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added #major to the PR title, which will get pulled into the commit message.

the MDC with **flight-specific context** for the duration of the flight's execution on the thread,
and **step-specific context** for the duration of each step's execution.

For more information on MDC, please see [Logback's MDC manual](https://logback.qos.ch/manual/mdc.html).

# TODOs
* Add a section on clusters, queuing, failure, and recovery
* Add a section - perhaps in DEVELOPMENT.md describing the schema
Expand Down
11 changes: 9 additions & 2 deletions buildSrc/src/main/groovy/stairway.java-conventions.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ dependencies {
implementation group: 'org.slf4j', name: 'slf4j-api', version: '1.7.36'

// For testing
testImplementation group: 'org.junit.jupiter', name: 'junit-jupiter-api', version: '5.9.0'
testImplementation group: 'org.junit.jupiter', name: 'junit-jupiter-engine', version: '5.9.0'
testImplementation group: 'org.junit.jupiter', name: 'junit-jupiter', version: '5.10.2'
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needed this expanded dependency for parameterized tests, and upgraded while in the neighborhood.

testImplementation group: 'org.hamcrest', name: 'hamcrest', version: '2.2'
testImplementation group: 'ch.qos.logback', name: 'logback-classic', version: '1.4.1'
testRuntimeOnly group: 'org.codehaus.groovy', name: 'groovy', version: '3.0.13'
Expand All @@ -44,3 +43,11 @@ test {
includeTags 'unit'
}
}

// for scans
if (hasProperty("buildScan")) {
buildScan {
termsOfServiceUrl = "https://gradle.com/terms-of-service"
termsOfServiceAgree = "yes"
}
}
10 changes: 5 additions & 5 deletions stairway/src/main/java/bio/terra/stairway/impl/FlightRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,9 @@ private FlightStatus fly() throws InterruptedException {
// the UNDO in the first place.
flightDao.step(flightContext);
logger.error(
"DISMAL FAILURE: non-retry-able error during undo. Flight: {}({}) Step: {}({})",
flightContext.getFlightId(),
flightContext.getFlightClassName(),
flightContext.getStepIndex(),
flightContext.getStepClassName());
"{} (index {}) experienced DISMAL FAILURE: non-retryable error on undo",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drive-by: now that we include flight ID and class on all flight logs in the MDC, we can trim down the dismal failure log line.

I also reformatted to lead with the failing step class based off of Stairway retro feedback that all of the useful information on a log line is at the end. Do we like this better? Unfortunately we don't get step context in MDC for free at this point -- we're outside of step execution.

flightContext.getStepClassName(),
flightContext.getStepIndex());

} catch (InterruptedException ex) {
// Interrupted exception - we assume this means that the thread pool is shutting down and
Expand Down Expand Up @@ -242,6 +240,7 @@ private StepResult stepWithRetry() throws InterruptedException, StairwayExecutio
// Retry loop
do {
try {
MdcUtils.addStepContextToMdc(flightContext);
// Do or undo based on direction we are headed
hookWrapper.startStep(flightContext);

Expand Down Expand Up @@ -271,6 +270,7 @@ private StepResult stepWithRetry() throws InterruptedException, StairwayExecutio
result = new StepResult(stepStatus, ex);
} finally {
hookWrapper.endStep(flightContext);
MdcUtils.removeStepContextFromMdc(flightContext);
}

switch (result.getStepStatus()) {
Expand Down
74 changes: 74 additions & 0 deletions stairway/src/main/java/bio/terra/stairway/impl/MdcUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package bio.terra.stairway.impl;

import bio.terra.stairway.FlightContext;
import java.util.Map;
import org.slf4j.MDC;

/**
* Utility methods to make Stairway flight runnables context-aware, using mapped diagnostic context
* (MDC).
*/
class MdcUtils {

/** ID of the flight */
static final String FLIGHT_ID_KEY = "flightId";

/** Class of the flight */
static final String FLIGHT_CLASS_KEY = "flightClass";

/** Class of the flight step */
static final String FLIGHT_STEP_CLASS_KEY = "flightStepClass";

/** Direction of the step (START, DO, SWITCH, or UNDO) */
static final String FLIGHT_STEP_DIRECTION_KEY = "flightStepDirection";

/** The step's execution order */
static final String FLIGHT_STEP_NUMBER_KEY = "flightStepNumber";

/**
* Null-safe utility method for overwriting the current thread's MDC.
*
* @param context to set as MDC, if null then MDC will be cleared.
*/
static void overwriteContext(Map<String, String> context) {
if (context == null) {
MDC.clear();
} else {
MDC.setContextMap(context);
okotsopoulos marked this conversation as resolved.
Show resolved Hide resolved
}
}

private static Map<String, String> flightContextForMdc(FlightContext context) {
return Map.of(
FLIGHT_ID_KEY, context.getFlightId(),
FLIGHT_CLASS_KEY, context.getFlightClassName());
}

/**
* Supplement the current thread's MDC with flight-specific context, meant to persist for the
* duration of this flight's execution on the thread.
*/
static void addFlightContextToMdc(FlightContext flightContext) {
flightContextForMdc(flightContext).forEach(MDC::put);
}

private static Map<String, String> stepContextForMdc(FlightContext flightContext) {
return Map.of(
FLIGHT_STEP_CLASS_KEY, flightContext.getStepClassName(),
FLIGHT_STEP_DIRECTION_KEY, flightContext.getDirection().toString(),
FLIGHT_STEP_NUMBER_KEY, Integer.toString(flightContext.getStepIndex()));
}

/**
* Supplement the current thread's MDC with step-specific context, meant to persist for the
* duration of this step's current attempt on the thread.
*/
static void addStepContextToMdc(FlightContext flightContext) {
stepContextForMdc(flightContext).forEach(MDC::put);
}

/** Remove any step-specific context from the current thread's MDC. */
static void removeStepContextFromMdc(FlightContext flightContext) {
stepContextForMdc(flightContext).keySet().forEach(MDC::remove);
}
}
13 changes: 3 additions & 10 deletions stairway/src/main/java/bio/terra/stairway/impl/StairwayImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.time.Duration;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
Expand Down Expand Up @@ -211,13 +210,7 @@ public void recoverStairway(String stairwayName) throws InterruptedException {
}

private void configureThreadPools() {
threadPool =
new StairwayThreadPool(
maxParallelFlights,
maxParallelFlights,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
threadPool = new StairwayThreadPool(maxParallelFlights);

scheduledPool = new ScheduledThreadPoolExecutor(SCHEDULED_POOL_CORE_THREADS);
// If we have retention settings then set up the regular flight cleaner
Expand Down Expand Up @@ -636,7 +629,7 @@ void recoverReady() throws StairwayException, InterruptedException {

/*
* Build the FlightRunner object that will execute the flight
* and hand it to the threadPool to run.
* and hand it to the threadPool to run, with MDC populated from the calling thread and flight context.
*/
private void launchFlight(FlightContextImpl flightContext) {
FlightRunner runner = new FlightRunner(flightContext);
Expand All @@ -649,7 +642,7 @@ private void launchFlight(FlightContextImpl flightContext) {
+ threadPool.getPoolSize());
}
logger.info("Launching flight " + flightContext.flightDesc());
threadPool.submit(runner);
threadPool.submitWithMdcAndFlightContext(runner, flightContext);
}

HookWrapper getHookWrapper() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,27 @@
package bio.terra.stairway.impl;

import java.util.concurrent.BlockingQueue;
import bio.terra.stairway.FlightContext;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

class StairwayThreadPool extends ThreadPoolExecutor {
private static final Logger logger = LoggerFactory.getLogger(StairwayThreadPool.class);
AtomicInteger activeTasks;

StairwayThreadPool(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
StairwayThreadPool(int maxParallelFlights) {
super(
maxParallelFlights,
maxParallelFlights,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>());
activeTasks = new AtomicInteger();
}

Expand All @@ -29,10 +33,45 @@ int getQueuedFlights() {
return getQueue().size();
}

/**
* Submits a Runnable task for execution and returns a Future representing that task. The Future's
* get method will return null upon successful completion.
*
* @param flightRunner the flight to submit
* @param flightContext
* @return
*/
protected Future<?> submitWithMdcAndFlightContext(
Runnable flightRunner, FlightContext flightContext) {
// Save the calling thread's context before potentially submitting to a child thread
Map<String, String> callingThreadContext = MDC.getCopyOfContextMap();
Runnable flightRunnerWithMdc =
() -> {
Map<String, String> initialContext = MDC.getCopyOfContextMap();
initializeFlightMdc(callingThreadContext, flightContext);
try {
flightRunner.run();
} finally {
MdcUtils.overwriteContext(initialContext);
}
};
return super.submit(flightRunnerWithMdc);
}

private void initializeFlightMdc(
Map<String, String> callingThreadContext, FlightContext flightContext) {
// Any leftover context on the thread will be fully overwritten:
MdcUtils.overwriteContext(callingThreadContext);
// If the calling thread's context contains flight and step context from a parent flight, this
// will be overwritten below:
MdcUtils.addFlightContextToMdc(flightContext);
MdcUtils.removeStepContextFromMdc(flightContext);
}

protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
int active = activeTasks.incrementAndGet();
logger.debug("before: " + active);
super.beforeExecute(t, r);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading the docstring on this method, it's recommended to call the super at the end to allow for potential nesting.

(I also read the docstring on afterExecute, and the super call at the beginning matches the recommendation.)

}

protected void afterExecute(Runnable r, Throwable t) {
Expand Down
Loading
Loading