Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix feedback handler's stdio not being captured #97

Merged
merged 1 commit into from
Aug 1, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 49 additions & 137 deletions crates/ext-processor/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,30 @@ struct RouteTarget {
timeout: Option<u64>,
}

/// Helper function that joins everything in a joinset, ignoring success and raising warnings as needed
async fn join_all(mut join_set: JoinSet<Result<(), tokio::time::error::Elapsed>>) {
// efficiently hand execution off to the the tasks we're joining
tokio::task::yield_now().await;

while let Some(r) = join_set.join_next().await {
match r {
Ok(Ok(_)) => {}
Ok(Err(e)) => {
warn!(
message = "timeout on plugin execution",
elapsed = ?e,
);
}
Err(e) => {
warn!(
message = "join error on plugin execution",
error_message = ?e,
);
}
}
}
}

/// The `BulwarkProcessor` implements the primary envoy processing service logic via the [`ExternalProcessor`] trait.
///
/// The [`process`](BulwarkProcessor::process) function is the main request handler.
Expand Down Expand Up @@ -419,38 +443,17 @@ impl BulwarkProcessor {
timeout_duration: std::time::Duration,
) {
let mut init_phase_tasks = JoinSet::new();
for plugin_instance in plugin_instances {
for plugin_instance in plugin_instances.iter().cloned() {
let init_phase_child_span = tracing::info_span!("execute on_init",);
init_phase_tasks.spawn(
timeout(timeout_duration, async move {
// TODO: avoid unwraps
Self::execute_on_init(plugin_instance.clone())
.await
.unwrap();
Self::execute_on_init(plugin_instance).await.unwrap();
})
.instrument(init_phase_child_span.or_current()),
);
}
// efficiently hand execution off to the plugins
tokio::task::yield_now().await;

while let Some(r) = init_phase_tasks.join_next().await {
match r {
Ok(Ok(_)) => {}
Ok(Err(e)) => {
warn!(
message = "timeout on plugin execution",
elapsed = ?e,
);
}
Err(e) => {
warn!(
message = "join error on plugin execution",
error_message = ?e,
);
}
}
}
join_all(init_phase_tasks).await;
}

async fn execute_request_phase(
Expand All @@ -466,38 +469,17 @@ impl BulwarkProcessor {
timeout_duration: std::time::Duration,
) {
let mut enrichment_phase_tasks = JoinSet::new();
for plugin_instance in plugin_instances {
for plugin_instance in plugin_instances.iter().cloned() {
let enrichment_phase_child_span = tracing::info_span!("execute on_request",);
enrichment_phase_tasks.spawn(
timeout(timeout_duration, async move {
// TODO: avoid unwraps
Self::execute_on_request(plugin_instance.clone())
.await
.unwrap();
Self::execute_on_request(plugin_instance).await.unwrap();
})
.instrument(enrichment_phase_child_span.or_current()),
);
}
// efficiently hand execution off to the plugins
tokio::task::yield_now().await;

while let Some(r) = enrichment_phase_tasks.join_next().await {
match r {
Ok(Ok(_)) => {}
Ok(Err(e)) => {
warn!(
message = "timeout on plugin execution",
elapsed = ?e,
);
}
Err(e) => {
warn!(
message = "join error on plugin execution",
error_message = ?e,
);
}
}
}
join_all(enrichment_phase_tasks).await;
}

async fn execute_request_decision_phase(
Expand Down Expand Up @@ -544,26 +526,8 @@ impl BulwarkProcessor {
.instrument(decision_phase_child_span.or_current()),
);
}
// efficiently hand execution off to the plugins
tokio::task::yield_now().await;

while let Some(r) = decision_phase_tasks.join_next().await {
match r {
Ok(Ok(_)) => {}
Ok(Err(e)) => {
warn!(
message = "timeout on plugin execution",
elapsed = ?e,
);
}
Err(e) => {
warn!(
message = "join error on plugin execution",
error_message = ?e,
);
}
}
}
join_all(decision_phase_tasks).await;

let decision_vec: Vec<Decision>;
let tags: HashSet<String>;
{
Expand Down Expand Up @@ -634,26 +598,8 @@ impl BulwarkProcessor {
.instrument(decision_phase_child_span.or_current()),
);
}
// efficiently hand execution off to the plugins
tokio::task::yield_now().await;

while let Some(r) = decision_phase_tasks.join_next().await {
match r {
Ok(Ok(_)) => {}
Ok(Err(e)) => {
warn!(
message = "timeout on plugin execution",
elapsed = ?e,
);
}
Err(e) => {
warn!(
message = "join error on plugin execution",
error_message = ?e,
);
}
}
}
join_all(decision_phase_tasks).await;

let decision_vec: Vec<Decision>;
let tags: HashSet<String>;
{
Expand Down Expand Up @@ -723,26 +669,8 @@ impl BulwarkProcessor {
.instrument(response_phase_child_span.or_current()),
);
}
// efficiently hand execution off to the plugins
tokio::task::yield_now().await;

while let Some(r) = response_phase_tasks.join_next().await {
match r {
Ok(Ok(_)) => {}
Ok(Err(e)) => {
warn!(
message = "timeout on plugin execution",
elapsed = ?e,
);
}
Err(e) => {
warn!(
message = "join error on plugin execution",
error_message = ?e,
);
}
}
}
join_all(response_phase_tasks).await;

let decision_vec: Vec<Decision>;
let tags: HashSet<String>;
{
Expand Down Expand Up @@ -813,26 +741,8 @@ impl BulwarkProcessor {
.instrument(response_phase_child_span.or_current()),
);
}
// efficiently hand execution off to the plugins
tokio::task::yield_now().await;

while let Some(r) = response_phase_tasks.join_next().await {
match r {
Ok(Ok(_)) => {}
Ok(Err(e)) => {
warn!(
message = "timeout on plugin execution",
elapsed = ?e,
);
}
Err(e) => {
warn!(
message = "join error on plugin execution",
error_message = ?e,
);
}
}
}
join_all(response_phase_tasks).await;

let decision_vec: Vec<Decision>;
let tags: HashSet<String>;
{
Expand Down Expand Up @@ -1063,7 +973,7 @@ impl BulwarkProcessor {
Self::handle_decision_feedback(
decision_components,
outcome,
plugin_instances,
plugin_instances.clone(),
timeout_duration,
).await;
// Short-circuit if restricted, we can skip the response phase
Expand Down Expand Up @@ -1178,7 +1088,7 @@ impl BulwarkProcessor {
Self::handle_decision_feedback(
decision_components,
outcome,
plugin_instances,
plugin_instances.clone(),
timeout_duration,
).await;
// Short-circuit if restricted, we can skip the response phase
Expand Down Expand Up @@ -1324,8 +1234,6 @@ impl BulwarkProcessor {
timeout_duration,
)
.await;

Self::handle_stdio(plugin_instances).await;
}
}

Expand Down Expand Up @@ -1404,8 +1312,6 @@ impl BulwarkProcessor {
timeout_duration,
)
.await;

Self::handle_stdio(plugin_instances).await;
}

async fn handle_decision_feedback(
Expand All @@ -1423,7 +1329,8 @@ impl BulwarkProcessor {
decision_components.decision.pignistic().restrict
);

for plugin_instance in plugin_instances {
let mut feedback_phase_tasks = JoinSet::new();
for plugin_instance in plugin_instances.iter().cloned() {
let response_phase_child_span = tracing::info_span!("execute on_decision_feedback",);
{
// Make sure the plugin instance knows about the final combined decision
Expand All @@ -1435,19 +1342,24 @@ impl BulwarkProcessor {
"ref" => plugin_instance.plugin_reference(),
);
}
tokio::spawn(
feedback_phase_tasks.spawn(
timeout(timeout_duration, async move {
Self::execute_on_decision_feedback(plugin_instance.clone())
Self::execute_on_decision_feedback(plugin_instance)
.await
.ok();
})
.instrument(response_phase_child_span.or_current()),
);
}
join_all(feedback_phase_tasks).await;

// Capturing stdio is always the last thing that happens and feedback should always be the second-to-last.
Self::handle_stdio(plugin_instances).await;
}

#[instrument(name = "plugin output", skip(plugin_instances))]
async fn handle_stdio(plugin_instances: Vec<Arc<Mutex<PluginInstance>>>) {
// TODO: refactor to process one plugin at a time and try to avoid having handle_decision_feedback join_all
for plugin_instance in plugin_instances {
let plugin_instance = plugin_instance.lock().await;
let (_, stdout, stderr) = plugin_instance.stdio().into_inner();
Expand Down
Loading