Skip to content

Commit

Permalink
Merge pull request #97 from bulwark-security/fix-stdio-capture-sequen…
Browse files Browse the repository at this point in the history
…cing

Fix feedback handler's stdio not being captured
  • Loading branch information
sporkmonger authored Aug 1, 2023
2 parents 6c40184 + 4105362 commit d78bbbc
Showing 1 changed file with 49 additions and 137 deletions.
186 changes: 49 additions & 137 deletions crates/ext-processor/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,30 @@ struct RouteTarget {
timeout: Option<u64>,
}

/// Helper function that joins everything in a joinset, ignoring success and raising warnings as needed
async fn join_all(mut join_set: JoinSet<Result<(), tokio::time::error::Elapsed>>) {
// efficiently hand execution off to the the tasks we're joining
tokio::task::yield_now().await;

while let Some(r) = join_set.join_next().await {
match r {
Ok(Ok(_)) => {}
Ok(Err(e)) => {
warn!(
message = "timeout on plugin execution",
elapsed = ?e,
);
}
Err(e) => {
warn!(
message = "join error on plugin execution",
error_message = ?e,
);
}
}
}
}

/// The `BulwarkProcessor` implements the primary envoy processing service logic via the [`ExternalProcessor`] trait.
///
/// The [`process`](BulwarkProcessor::process) function is the main request handler.
Expand Down Expand Up @@ -419,38 +443,17 @@ impl BulwarkProcessor {
timeout_duration: std::time::Duration,
) {
let mut init_phase_tasks = JoinSet::new();
for plugin_instance in plugin_instances {
for plugin_instance in plugin_instances.iter().cloned() {
let init_phase_child_span = tracing::info_span!("execute on_init",);
init_phase_tasks.spawn(
timeout(timeout_duration, async move {
// TODO: avoid unwraps
Self::execute_on_init(plugin_instance.clone())
.await
.unwrap();
Self::execute_on_init(plugin_instance).await.unwrap();
})
.instrument(init_phase_child_span.or_current()),
);
}
// efficiently hand execution off to the plugins
tokio::task::yield_now().await;

while let Some(r) = init_phase_tasks.join_next().await {
match r {
Ok(Ok(_)) => {}
Ok(Err(e)) => {
warn!(
message = "timeout on plugin execution",
elapsed = ?e,
);
}
Err(e) => {
warn!(
message = "join error on plugin execution",
error_message = ?e,
);
}
}
}
join_all(init_phase_tasks).await;
}

async fn execute_request_phase(
Expand All @@ -466,38 +469,17 @@ impl BulwarkProcessor {
timeout_duration: std::time::Duration,
) {
let mut enrichment_phase_tasks = JoinSet::new();
for plugin_instance in plugin_instances {
for plugin_instance in plugin_instances.iter().cloned() {
let enrichment_phase_child_span = tracing::info_span!("execute on_request",);
enrichment_phase_tasks.spawn(
timeout(timeout_duration, async move {
// TODO: avoid unwraps
Self::execute_on_request(plugin_instance.clone())
.await
.unwrap();
Self::execute_on_request(plugin_instance).await.unwrap();
})
.instrument(enrichment_phase_child_span.or_current()),
);
}
// efficiently hand execution off to the plugins
tokio::task::yield_now().await;

while let Some(r) = enrichment_phase_tasks.join_next().await {
match r {
Ok(Ok(_)) => {}
Ok(Err(e)) => {
warn!(
message = "timeout on plugin execution",
elapsed = ?e,
);
}
Err(e) => {
warn!(
message = "join error on plugin execution",
error_message = ?e,
);
}
}
}
join_all(enrichment_phase_tasks).await;
}

async fn execute_request_decision_phase(
Expand Down Expand Up @@ -544,26 +526,8 @@ impl BulwarkProcessor {
.instrument(decision_phase_child_span.or_current()),
);
}
// efficiently hand execution off to the plugins
tokio::task::yield_now().await;

while let Some(r) = decision_phase_tasks.join_next().await {
match r {
Ok(Ok(_)) => {}
Ok(Err(e)) => {
warn!(
message = "timeout on plugin execution",
elapsed = ?e,
);
}
Err(e) => {
warn!(
message = "join error on plugin execution",
error_message = ?e,
);
}
}
}
join_all(decision_phase_tasks).await;

let decision_vec: Vec<Decision>;
let tags: HashSet<String>;
{
Expand Down Expand Up @@ -634,26 +598,8 @@ impl BulwarkProcessor {
.instrument(decision_phase_child_span.or_current()),
);
}
// efficiently hand execution off to the plugins
tokio::task::yield_now().await;

while let Some(r) = decision_phase_tasks.join_next().await {
match r {
Ok(Ok(_)) => {}
Ok(Err(e)) => {
warn!(
message = "timeout on plugin execution",
elapsed = ?e,
);
}
Err(e) => {
warn!(
message = "join error on plugin execution",
error_message = ?e,
);
}
}
}
join_all(decision_phase_tasks).await;

let decision_vec: Vec<Decision>;
let tags: HashSet<String>;
{
Expand Down Expand Up @@ -723,26 +669,8 @@ impl BulwarkProcessor {
.instrument(response_phase_child_span.or_current()),
);
}
// efficiently hand execution off to the plugins
tokio::task::yield_now().await;

while let Some(r) = response_phase_tasks.join_next().await {
match r {
Ok(Ok(_)) => {}
Ok(Err(e)) => {
warn!(
message = "timeout on plugin execution",
elapsed = ?e,
);
}
Err(e) => {
warn!(
message = "join error on plugin execution",
error_message = ?e,
);
}
}
}
join_all(response_phase_tasks).await;

let decision_vec: Vec<Decision>;
let tags: HashSet<String>;
{
Expand Down Expand Up @@ -813,26 +741,8 @@ impl BulwarkProcessor {
.instrument(response_phase_child_span.or_current()),
);
}
// efficiently hand execution off to the plugins
tokio::task::yield_now().await;

while let Some(r) = response_phase_tasks.join_next().await {
match r {
Ok(Ok(_)) => {}
Ok(Err(e)) => {
warn!(
message = "timeout on plugin execution",
elapsed = ?e,
);
}
Err(e) => {
warn!(
message = "join error on plugin execution",
error_message = ?e,
);
}
}
}
join_all(response_phase_tasks).await;

let decision_vec: Vec<Decision>;
let tags: HashSet<String>;
{
Expand Down Expand Up @@ -1063,7 +973,7 @@ impl BulwarkProcessor {
Self::handle_decision_feedback(
decision_components,
outcome,
plugin_instances,
plugin_instances.clone(),
timeout_duration,
).await;
// Short-circuit if restricted, we can skip the response phase
Expand Down Expand Up @@ -1178,7 +1088,7 @@ impl BulwarkProcessor {
Self::handle_decision_feedback(
decision_components,
outcome,
plugin_instances,
plugin_instances.clone(),
timeout_duration,
).await;
// Short-circuit if restricted, we can skip the response phase
Expand Down Expand Up @@ -1324,8 +1234,6 @@ impl BulwarkProcessor {
timeout_duration,
)
.await;

Self::handle_stdio(plugin_instances).await;
}
}

Expand Down Expand Up @@ -1404,8 +1312,6 @@ impl BulwarkProcessor {
timeout_duration,
)
.await;

Self::handle_stdio(plugin_instances).await;
}

async fn handle_decision_feedback(
Expand All @@ -1423,7 +1329,8 @@ impl BulwarkProcessor {
decision_components.decision.pignistic().restrict
);

for plugin_instance in plugin_instances {
let mut feedback_phase_tasks = JoinSet::new();
for plugin_instance in plugin_instances.iter().cloned() {
let response_phase_child_span = tracing::info_span!("execute on_decision_feedback",);
{
// Make sure the plugin instance knows about the final combined decision
Expand All @@ -1435,19 +1342,24 @@ impl BulwarkProcessor {
"ref" => plugin_instance.plugin_reference(),
);
}
tokio::spawn(
feedback_phase_tasks.spawn(
timeout(timeout_duration, async move {
Self::execute_on_decision_feedback(plugin_instance.clone())
Self::execute_on_decision_feedback(plugin_instance)
.await
.ok();
})
.instrument(response_phase_child_span.or_current()),
);
}
join_all(feedback_phase_tasks).await;

// Capturing stdio is always the last thing that happens and feedback should always be the second-to-last.
Self::handle_stdio(plugin_instances).await;
}

#[instrument(name = "plugin output", skip(plugin_instances))]
async fn handle_stdio(plugin_instances: Vec<Arc<Mutex<PluginInstance>>>) {
// TODO: refactor to process one plugin at a time and try to avoid having handle_decision_feedback join_all
for plugin_instance in plugin_instances {
let plugin_instance = plugin_instance.lock().await;
let (_, stdout, stderr) = plugin_instance.stdio().into_inner();
Expand Down

0 comments on commit d78bbbc

Please sign in to comment.