-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WOR-1544] #major Populate MDC for flight and its steps #125
Changes from 9 commits
45ba3bf
15f1718
0967119
6668262
4ac56fa
37533fd
b47d695
7d4c792
981703e
b669bb8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -93,6 +93,16 @@ Stairway is designed to provide atomic operations for one instance of one servic | |
global or cross-service state. Therefore, it is up to the application or service to implement concurrency control on | ||
its objects. | ||
|
||
### Context Awareness and Logs | ||
Stairway leverages the underlying logging system's mapped diagnostic context (MDC) if available. | ||
|
||
MDC manages contextual information on a per-thread basis: as Stairway submits a Flight for processing, | ||
it **passes along the MDC of the calling thread** so that context isn't lost. It also further populates | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Propagating the MDC from the calling thread makes me want to designate this as a minor version bump at minimum. Default behavior in this repo is patch bumps. Reviewers, WDYT? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree with a more significant version bump. Though with all the dependency upgrades underway, I was already thinking we would do a major version bump very soon. Perhaps just do a major bump with this change? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good idea. I'll make it a major one. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added #major to the PR title, which will get pulled into the commit message. |
||
the MDC with **flight-specific context** for the duration of the flight's execution on the thread, | ||
and **step-specific context** for the duration of each step's execution. | ||
|
||
For more information on MDC, please see [Logback's MDC manual](https://logback.qos.ch/manual/mdc.html). | ||
|
||
# TODOs | ||
* Add a section on clusters, queuing, failure, and recovery | ||
* Add a section - perhaps in DEVELOPMENT.md describing the schema | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,8 +16,7 @@ dependencies { | |
implementation group: 'org.slf4j', name: 'slf4j-api', version: '1.7.36' | ||
|
||
// For testing | ||
testImplementation group: 'org.junit.jupiter', name: 'junit-jupiter-api', version: '5.9.0' | ||
testImplementation group: 'org.junit.jupiter', name: 'junit-jupiter-engine', version: '5.9.0' | ||
testImplementation group: 'org.junit.jupiter', name: 'junit-jupiter', version: '5.10.2' | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Needed this expanded dependency for parameterized tests, and upgraded while in the neighborhood. |
||
testImplementation group: 'org.hamcrest', name: 'hamcrest', version: '2.2' | ||
testImplementation group: 'ch.qos.logback', name: 'logback-classic', version: '1.4.1' | ||
testRuntimeOnly group: 'org.codehaus.groovy', name: 'groovy', version: '3.0.13' | ||
|
@@ -44,3 +43,11 @@ test { | |
includeTags 'unit' | ||
} | ||
} | ||
|
||
// for scans | ||
if (hasProperty("buildScan")) { | ||
buildScan { | ||
termsOfServiceUrl = "https://gradle.com/terms-of-service" | ||
termsOfServiceAgree = "yes" | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -144,11 +144,9 @@ private FlightStatus fly() throws InterruptedException { | |
// the UNDO in the first place. | ||
flightDao.step(flightContext); | ||
logger.error( | ||
"DISMAL FAILURE: non-retry-able error during undo. Flight: {}({}) Step: {}({})", | ||
flightContext.getFlightId(), | ||
flightContext.getFlightClassName(), | ||
flightContext.getStepIndex(), | ||
flightContext.getStepClassName()); | ||
"{} (index {}) experienced DISMAL FAILURE: non-retryable error on undo", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Drive-by: now that we include flight ID and class on all flight logs in the MDC, we can trim down the dismal failure log line. I also reformatted to lead with the failing step class based off of Stairway retro feedback that all of the useful information on a log line is at the end. Do we like this better? Unfortunately we don't get step context in MDC for free at this point -- we're outside of step execution. |
||
flightContext.getStepClassName(), | ||
flightContext.getStepIndex()); | ||
|
||
} catch (InterruptedException ex) { | ||
// Interrupted exception - we assume this means that the thread pool is shutting down and | ||
|
@@ -242,6 +240,7 @@ private StepResult stepWithRetry() throws InterruptedException, StairwayExecutio | |
// Retry loop | ||
do { | ||
try { | ||
MdcUtils.addStepContextToMdc(flightContext); | ||
// Do or undo based on direction we are headed | ||
hookWrapper.startStep(flightContext); | ||
|
||
|
@@ -271,6 +270,7 @@ private StepResult stepWithRetry() throws InterruptedException, StairwayExecutio | |
result = new StepResult(stepStatus, ex); | ||
} finally { | ||
hookWrapper.endStep(flightContext); | ||
MdcUtils.removeStepContextFromMdc(flightContext); | ||
} | ||
|
||
switch (result.getStepStatus()) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
package bio.terra.stairway.impl; | ||
|
||
import bio.terra.stairway.FlightContext; | ||
import java.util.Map; | ||
import org.slf4j.MDC; | ||
|
||
/** | ||
* Utility methods to make Stairway flight runnables context-aware, using mapped diagnostic context | ||
* (MDC). | ||
*/ | ||
class MdcUtils { | ||
|
||
/** ID of the flight */ | ||
static final String FLIGHT_ID_KEY = "flightId"; | ||
|
||
/** Class of the flight */ | ||
static final String FLIGHT_CLASS_KEY = "flightClass"; | ||
|
||
/** Class of the flight step */ | ||
static final String FLIGHT_STEP_CLASS_KEY = "flightStepClass"; | ||
|
||
/** Direction of the step (START, DO, SWITCH, or UNDO) */ | ||
static final String FLIGHT_STEP_DIRECTION_KEY = "flightStepDirection"; | ||
|
||
/** The step's execution order */ | ||
static final String FLIGHT_STEP_NUMBER_KEY = "flightStepNumber"; | ||
|
||
/** | ||
* Null-safe utility method for overwriting the current thread's MDC. | ||
* | ||
* @param context to set as MDC, if null then MDC will be cleared. | ||
*/ | ||
static void overwriteContext(Map<String, String> context) { | ||
if (context == null) { | ||
MDC.clear(); | ||
} else { | ||
MDC.setContextMap(context); | ||
okotsopoulos marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
|
||
private static Map<String, String> flightContextForMdc(FlightContext context) { | ||
return Map.of( | ||
FLIGHT_ID_KEY, context.getFlightId(), | ||
FLIGHT_CLASS_KEY, context.getFlightClassName()); | ||
} | ||
|
||
/** | ||
* Supplement the current thread's MDC with flight-specific context, meant to persist for the | ||
* duration of this flight's execution on the thread. | ||
*/ | ||
static void addFlightContextToMdc(FlightContext flightContext) { | ||
flightContextForMdc(flightContext).forEach(MDC::put); | ||
} | ||
|
||
private static Map<String, String> stepContextForMdc(FlightContext flightContext) { | ||
return Map.of( | ||
FLIGHT_STEP_CLASS_KEY, flightContext.getStepClassName(), | ||
FLIGHT_STEP_DIRECTION_KEY, flightContext.getDirection().toString(), | ||
FLIGHT_STEP_NUMBER_KEY, Integer.toString(flightContext.getStepIndex())); | ||
} | ||
|
||
/** | ||
* Supplement the current thread's MDC with step-specific context, meant to persist for the | ||
* duration of this step's current attempt on the thread. | ||
*/ | ||
static void addStepContextToMdc(FlightContext flightContext) { | ||
stepContextForMdc(flightContext).forEach(MDC::put); | ||
} | ||
|
||
/** Remove any step-specific context from the current thread's MDC. */ | ||
static void removeStepContextFromMdc(FlightContext flightContext) { | ||
stepContextForMdc(flightContext).keySet().forEach(MDC::remove); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,23 +1,27 @@ | ||
package bio.terra.stairway.impl; | ||
|
||
import java.util.concurrent.BlockingQueue; | ||
import bio.terra.stairway.FlightContext; | ||
import java.util.Map; | ||
import java.util.concurrent.Future; | ||
import java.util.concurrent.LinkedBlockingQueue; | ||
import java.util.concurrent.ThreadPoolExecutor; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
import org.slf4j.MDC; | ||
|
||
class StairwayThreadPool extends ThreadPoolExecutor { | ||
private static final Logger logger = LoggerFactory.getLogger(StairwayThreadPool.class); | ||
AtomicInteger activeTasks; | ||
|
||
StairwayThreadPool( | ||
int corePoolSize, | ||
int maximumPoolSize, | ||
long keepAliveTime, | ||
TimeUnit unit, | ||
BlockingQueue<Runnable> workQueue) { | ||
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); | ||
StairwayThreadPool(int maxParallelFlights) { | ||
super( | ||
maxParallelFlights, | ||
maxParallelFlights, | ||
0L, | ||
TimeUnit.MILLISECONDS, | ||
new LinkedBlockingQueue<>()); | ||
activeTasks = new AtomicInteger(); | ||
} | ||
|
||
|
@@ -29,10 +33,45 @@ int getQueuedFlights() { | |
return getQueue().size(); | ||
} | ||
|
||
/** | ||
* Submits a Runnable task for execution and returns a Future representing that task. The Future's | ||
* get method will return null upon successful completion. | ||
* | ||
* @param flightRunner the flight to submit | ||
* @param flightContext | ||
* @return | ||
*/ | ||
protected Future<?> submitWithMdcAndFlightContext( | ||
Runnable flightRunner, FlightContext flightContext) { | ||
// Save the calling thread's context before potentially submitting to a child thread | ||
Map<String, String> callingThreadContext = MDC.getCopyOfContextMap(); | ||
Runnable flightRunnerWithMdc = | ||
() -> { | ||
Map<String, String> initialContext = MDC.getCopyOfContextMap(); | ||
initializeFlightMdc(callingThreadContext, flightContext); | ||
try { | ||
flightRunner.run(); | ||
} finally { | ||
MdcUtils.overwriteContext(initialContext); | ||
} | ||
}; | ||
return super.submit(flightRunnerWithMdc); | ||
} | ||
|
||
private void initializeFlightMdc( | ||
Map<String, String> callingThreadContext, FlightContext flightContext) { | ||
// Any leftover context on the thread will be fully overwritten: | ||
MdcUtils.overwriteContext(callingThreadContext); | ||
// If the calling thread's context contains flight and step context from a parent flight, this | ||
// will be overwritten below: | ||
MdcUtils.addFlightContextToMdc(flightContext); | ||
MdcUtils.removeStepContextFromMdc(flightContext); | ||
} | ||
|
||
protected void beforeExecute(Thread t, Runnable r) { | ||
super.beforeExecute(t, r); | ||
int active = activeTasks.incrementAndGet(); | ||
logger.debug("before: " + active); | ||
super.beforeExecute(t, r); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Reading the docstring on this method, it's recommended to call the super at the end to allow for potential nesting. (I also read the docstring on |
||
} | ||
|
||
protected void afterExecute(Runnable r, Throwable t) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does this do?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This generates a Gradle build scan as part of test output, which makes debugging tests a lot easier. Example from the most recent unit test run: