Skip to content

Commit

Permalink
refactor(discover_slots): drop par day discovery
Browse files Browse the repository at this point in the history
We had complex parallel day discovery in an attempt to improve
concurrency of our program. It turns out the bottleneck was not fetching
adjacent slots.
  • Loading branch information
alextes committed Nov 7, 2023
1 parent 4037243 commit 267c275
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 97 deletions.
104 changes: 10 additions & 94 deletions src/bin/bundle-submissions/discover_slots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,7 @@ use std::{
sync::Arc,
};

use futures::{
channel::mpsc::{channel, Receiver, Sender},
future::try_join_all,
SinkExt, StreamExt, TryStreamExt,
};
use futures::{channel::mpsc::Sender, SinkExt, TryStreamExt};
use mouseion::units::Slot;
use object_store::{aws::AmazonS3, path::Path, ObjectStore};
use tokio::{spawn, task::JoinHandle};
Expand All @@ -20,11 +16,11 @@ const SLOT_LIMIT: i32 = 7696479;
const SLOT_MEMORY: usize = 32;

async fn discover_slots_for_path(
path: &Path,
path_prefix: Option<&Path>,
object_store: &AmazonS3,
mut slots_tx: Sender<Slot>,
) -> anyhow::Result<()> {
let mut block_submission_meta_stream = object_store.list(Some(path)).await.unwrap();
let mut block_submission_meta_stream = object_store.list(path_prefix).await.unwrap();

let mut last_sent_slots: VecDeque<Slot> = VecDeque::with_capacity(SLOT_MEMORY);
let mut last_sent_slots_set: HashSet<Slot> = HashSet::with_capacity(SLOT_MEMORY);
Expand Down Expand Up @@ -63,99 +59,19 @@ async fn discover_slots_for_path(
Ok(())
}

async fn discover_month_paths(
from: Option<String>,
object_store: &AmazonS3,
) -> anyhow::Result<Vec<Path>> {
let path = {
let path = from.as_deref().unwrap_or("/2023");
Path::from(path)
};
let common_prefixes = object_store
.list_with_delimiter(Some(&path))
.await
.map(|list_results| list_results.common_prefixes);
let paths = common_prefixes.into_iter().flatten().collect();
Ok(paths)
}

async fn discover_day_paths(months: &[Path], object_store: &AmazonS3) -> anyhow::Result<Vec<Path>> {
let days_futs = months
.iter()
.map(|path| object_store.list_with_delimiter(Some(path)))
.collect::<Vec<_>>();
let day_list_results = try_join_all(days_futs).await?;
let days = day_list_results
.into_iter()
.flat_map(|list_results| list_results.common_prefixes)
.collect();
Ok(days)
}

const DAY_CONCURRENCY: usize = 8;
const DAY_SLOT_BUFFER: usize = 8;

pub fn run_discover_slots_thread(
from: Option<String>,
path_prefix: Option<Path>,
object_store: Arc<AmazonS3>,
mut slots_tx: Sender<Slot>,
slots_tx: Sender<Slot>,
) -> JoinHandle<()> {
let object_store = object_store.clone();
spawn(async move {
let month_paths = discover_month_paths(from.clone(), &object_store)
.await
.unwrap();
// These are the days we'd like to concurrently discover slots for.
let day_paths = discover_day_paths(&month_paths, &object_store)
.await
.unwrap();

// Vec of sender, receiver pair for each task.
let mut individual_channels: Vec<(Sender<Slot>, Receiver<Slot>)> = Vec::new();

// Create a channel for each day slot discovery task.
for _ in 0..DAY_CONCURRENCY {
let (send, recv) = channel(DAY_SLOT_BUFFER);
individual_channels.push((send, recv));
}

// Start discovering days in a threads. For simplicity sake we create a task for each day,
// even if we only take from the first DAY_CONCURRENCY.
for (path, (task_tx, _)) in day_paths.into_iter().zip(individual_channels.iter()) {
let object_store = object_store.clone();
let task_tx = task_tx.clone();

spawn(async move {
match discover_slots_for_path(&path, &object_store, task_tx).await {
Ok(_) => {
info!(%path, "finished discovering slots for day");
}
Err(e) => {
panic!("failed to discover slots for day, {}", e);
}
}
});
}

// Take discovered slots round-robin.
let mut curr_ind = 0;

loop {
let rx = &mut individual_channels[curr_ind].1;
if let Some(slot) = rx.next().await {
slots_tx.send(slot).await.unwrap();
} else {
// The channel is closed, means the task is done
individual_channels.remove(curr_ind);
match discover_slots_for_path(path_prefix.as_ref(), object_store.as_ref(), slots_tx).await {
Ok(_) => {
info!(?path_prefix, "finished discovering slots for day");
}

if individual_channels.is_empty() {
// all tasks are done, break.
info!("all day slot discovery tasks are done");
break;
Err(e) => {
panic!("failed to discover slots for day, {}", e);
}

curr_ind = (curr_ind + 1) % DAY_CONCURRENCY % individual_channels.len();
}
})
}
5 changes: 2 additions & 3 deletions src/bin/bundle-submissions/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ mod store_bundles;

use std::sync::Arc;

use ::object_store as object_store_lib;
use futures::{channel::mpsc::channel, try_join};
use lazy_static::lazy_static;
use mouseion::{env::ENV_CONFIG, log, object_store};
Expand All @@ -27,7 +26,7 @@ use crate::{
discover_slots::run_discover_slots_thread, store_bundles::run_store_bundles_thread,
};

type ObjectPath = object_store_lib::path::Path;
type ObjectPath = ::object_store::path::Path;

lazy_static! {
static ref SOURCE_BUCKET: String = (*ENV_CONFIG.submissions_bucket).to_string();
Expand All @@ -51,7 +50,7 @@ pub async fn main() -> anyhow::Result<()> {

let (slots_to_delete_tx, slots_to_delete_rx) = channel(16);

let from = std::env::args().nth(1);
let from: Option<ObjectPath> = std::env::args().nth(1).map(|str| str.into());

try_join!(
run_discover_slots_thread(from, submissions_store.clone(), slots_tx),
Expand Down

0 comments on commit 267c275

Please sign in to comment.