Skip to content

Commit

Permalink
Eliminate span.enter and refactor outbox concurrency (#837)
Browse files Browse the repository at this point in the history
* Eliminate span.enter and refactor outbox concurrency
* changelog + minor deps
  • Loading branch information
sergiimk authored Sep 20, 2024
1 parent ddc4128 commit 3d023af
Show file tree
Hide file tree
Showing 11 changed files with 237 additions and 162 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Axum 0.7
- latest AWS SDK
- latest versions of all remaining libs we depend on
- Outbox refactoring towards true parallelism via Tokio spaned tasks instead of futures
### Fixed
- Failed flows should still propagate `finishedAt` time
- Eliminate span.enter, replaced with instrument everywhere

## [0.201.0] - 2024-09-18
### Added
Expand Down
29 changes: 15 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/domain/flow-system/services/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ kamu-accounts = { workspace = true }
kamu-core = { workspace = true }
kamu-flow-system = { workspace = true }
kamu-task-system = { workspace = true }
observability = { workspace = true, default-features = false }
opendatafabric = { workspace = true }
time-source = { workspace = true }

Expand Down
144 changes: 79 additions & 65 deletions src/domain/flow-system/services/src/flow/flow_executor_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use messaging_outbox::{
OutboxExt,
};
use time_source::SystemTimeSource;
use tracing::Instrument as _;

use crate::{
FlowAbortHelper,
Expand Down Expand Up @@ -224,79 +225,93 @@ impl FlowExecutorImpl {
}

#[transactional_method]
async fn run_flows_current_timeslot(&self) -> Result<(), InternalError> {
// Do we have a timeslot scheduled?
async fn tick_current_timeslot(&self) -> Result<(), InternalError> {
let flow_event_store = transaction_catalog.get_one::<dyn FlowEventStore>().unwrap();
let maybe_nearest_flow_activation_moment =
flow_event_store.nearest_flow_activation_moment().await?;

// Do we have a timeslot scheduled?
let Some(nearest_flow_activation_moment) =
flow_event_store.nearest_flow_activation_moment().await?
else {
return Ok(());
};

// Is it time to execute it yet?
let current_time = self.time_source.now();
if let Some(nearest_flow_activation_moment) = maybe_nearest_flow_activation_moment
&& nearest_flow_activation_moment <= current_time
{
let activation_span = tracing::info_span!("FlowExecutor::activation");
let _ = activation_span.enter();

let planned_flow_ids: Vec<_> = flow_event_store
.get_flows_scheduled_for_activation_at(nearest_flow_activation_moment)
.await?;
if nearest_flow_activation_moment > current_time {
return Ok(());
}

let mut planned_task_futures = Vec::new();
for planned_flow_id in planned_flow_ids {
let transaction_catalog = transaction_catalog.clone();
let flow_event_store = flow_event_store.clone();
self.run_flows_for_timeslot(
nearest_flow_activation_moment,
flow_event_store,
transaction_catalog,
)
.instrument(observability::tracing::root_span!(
"FlowExecutor::activation"
))
.await
}

planned_task_futures.push(async move {
let mut flow = Flow::load(planned_flow_id, flow_event_store.as_ref())
.await
.int_err()?;
async fn run_flows_for_timeslot(
&self,
activation_moment: DateTime<Utc>,
flow_event_store: Arc<dyn FlowEventStore>,
transaction_catalog: dill::Catalog,
) -> Result<(), InternalError> {
let planned_flow_ids: Vec<_> = flow_event_store
.get_flows_scheduled_for_activation_at(activation_moment)
.await?;

if flow.can_schedule() {
self.schedule_flow_task(
transaction_catalog,
&mut flow,
nearest_flow_activation_moment,
)
.await?;
} else {
tracing::warn!(
flow_id = %planned_flow_id,
flow_status = %flow.status(),
"Skipped flow scheduling as no longer relevant"
);
}
let mut planned_task_futures = Vec::new();
for planned_flow_id in planned_flow_ids {
let transaction_catalog = transaction_catalog.clone();
let flow_event_store = flow_event_store.clone();

Ok(())
});
}
planned_task_futures.push(async move {
let mut flow = Flow::load(planned_flow_id, flow_event_store.as_ref())
.await
.int_err()?;

let results = futures::future::join_all(planned_task_futures).await;
results
.into_iter()
.filter(Result::is_err)
.map(|e| e.err().unwrap())
.for_each(|e: InternalError| {
tracing::error!(
error = ?e,
error_msg = %e,
"Scheduling flow failed"
if flow.can_schedule() {
self.schedule_flow_task(transaction_catalog, &mut flow, activation_moment)
.await?;
} else {
tracing::warn!(
flow_id = %planned_flow_id,
flow_status = %flow.status(),
"Skipped flow scheduling as no longer relevant"
);
});

// Publish progress event
let outbox = transaction_catalog.get_one::<dyn Outbox>().unwrap();
outbox
.post_message(
MESSAGE_PRODUCER_KAMU_FLOW_EXECUTOR,
FlowExecutorUpdatedMessage {
update_time: nearest_flow_activation_moment,
update_details: FlowExecutorUpdateDetails::ExecutedTimeslot,
},
)
.await?;
}

Ok(())
});
}

let results = futures::future::join_all(planned_task_futures).await;
results
.into_iter()
.filter(Result::is_err)
.map(|e| e.err().unwrap())
.for_each(|e: InternalError| {
tracing::error!(
error = ?e,
error_msg = %e,
"Scheduling flow failed"
);
});

// Publish progress event
let outbox = transaction_catalog.get_one::<dyn Outbox>().unwrap();
outbox
.post_message(
MESSAGE_PRODUCER_KAMU_FLOW_EXECUTOR,
FlowExecutorUpdatedMessage {
update_time: activation_moment,
update_details: FlowExecutorUpdateDetails::ExecutedTimeslot,
},
)
.await?;

Ok(())
}

Expand Down Expand Up @@ -433,11 +448,10 @@ impl FlowExecutor for FlowExecutorImpl {
async fn run(&self) -> Result<(), InternalError> {
// Main scanning loop
loop {
let tick_span = tracing::trace_span!("FlowExecutor::tick");
let _ = tick_span.enter();

// Run scheduling for current time slot
self.run_flows_current_timeslot().await?;
self.tick_current_timeslot()
.instrument(tracing::debug_span!("FlowExecutor::tick"))
.await?;

self.time_source
.sleep(self.executor_config.awaiting_step)
Expand Down
24 changes: 15 additions & 9 deletions src/domain/task-system/services/src/task_executor_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use dill::*;
use kamu_task_system::*;
use messaging_outbox::{Outbox, OutboxExt};
use time_source::SystemTimeSource;
use tracing::Instrument as _;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

Expand Down Expand Up @@ -44,8 +45,17 @@ impl TaskExecutorImpl {

async fn run_task_iteration(&self) -> Result<(), InternalError> {
let task = self.take_task().await?;
let task_outcome = self.run_task(&task).await?;

let task_outcome = self
.run_task(&task)
.instrument(observability::tracing::root_span!(
"TaskExecutor::run_task",
task_id = %task.task_id,
))
.await?;

self.process_task_outcome(task, task_outcome).await?;

Ok(())
}

Expand Down Expand Up @@ -106,11 +116,7 @@ impl TaskExecutorImpl {
return Ok(None);
};

tracing::info!(
task_id = %task.task_id,
logical_plan = ?task.logical_plan,
"Executing task",
);
tracing::debug!(task_id = %task.task_id, "Received next task from scheduler");

outbox
.post_message(
Expand All @@ -127,11 +133,11 @@ impl TaskExecutorImpl {
}

async fn run_task(&self, task: &Task) -> Result<TaskOutcome, InternalError> {
let span = observability::tracing::root_span!(
"run_task",
tracing::debug!(
task_id = %task.task_id,
logical_plan = ?task.logical_plan,
"Running task",
);
let _ = span.enter();

// Run task via logical plan
let task_run_result = self
Expand Down
5 changes: 3 additions & 2 deletions src/infra/core/src/dependency_graph_repository_inmem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::sync::Arc;

use internal_error::ResultIntoInternal;
use kamu_core::*;
use tracing::Instrument as _;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

Expand All @@ -35,13 +36,13 @@ impl DependencyGraphRepository for DependencyGraphRepositoryInMemory {
let mut datasets_stream = self.dataset_repo.get_all_datasets();

while let Some(Ok(dataset_handle)) = datasets_stream.next().await {
let dataset_span = tracing::debug_span!("Scanning dataset dependencies", dataset = %dataset_handle);
let _ = dataset_span.enter();
let span = tracing::debug_span!("Scanning dataset dependencies", dataset = %dataset_handle);

let summary = self
.dataset_repo
.get_dataset_by_handle(&dataset_handle)
.get_summary(GetSummaryOpts::default())
.instrument(span)
.await
.int_err()?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ impl OutboxConsumptionIterationPlanner {
async fn load_unprocessed_messages_by_producer(
&self,
unconsumed_state_by_producer: &HashMap<String, UnconsumedProducerState>,
) -> Result<HashMap<String, Vec<OutboxMessage>>, InternalError> {
) -> Result<HashMap<String, Vec<Arc<OutboxMessage>>>, InternalError> {
// Prepare filter to load messages with boundary by each producer
let boundaries_by_producer: Vec<_> = unconsumed_state_by_producer
.iter()
Expand All @@ -246,6 +246,7 @@ impl OutboxConsumptionIterationPlanner {
let mut unprocessed_messages = self
.outbox_message_repository
.get_messages(boundaries_by_producer, self.messages_batch_size)
.map_ok(Arc::new)
.try_collect::<Vec<_>>()
.await?;

Expand All @@ -266,7 +267,7 @@ impl OutboxConsumptionIterationPlanner {

fn compose_producer_consumption_tasks(
&self,
unprocessed_messages_by_producer: HashMap<String, Vec<OutboxMessage>>,
unprocessed_messages_by_producer: HashMap<String, Vec<Arc<OutboxMessage>>>,
mut unconsumed_state_by_producers: HashMap<String, UnconsumedProducerState>,
) -> HashMap<String, ProducerConsumptionTask> {
let mut consumption_tasks_by_producers = HashMap::new();
Expand Down
Loading

0 comments on commit 3d023af

Please sign in to comment.