Skip to content

Commit

Permalink
add drop helper for scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
jackzhhuang committed Nov 8, 2024
1 parent ba14089 commit 1d9bd4c
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 42 deletions.
17 changes: 15 additions & 2 deletions sync/src/parallel/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,18 @@ pub struct DagBlockExecutor {
worker_scheduler: Arc<WorkerScheduler>,
}

impl Drop for DagBlockExecutor {
struct ExecutorDeconstructor {
worker_scheduler: Arc<WorkerScheduler>,
}

impl ExecutorDeconstructor {
pub fn new(worker_scheduler: Arc<WorkerScheduler>) -> Self {
worker_scheduler.worker_start();
Self { worker_scheduler }
}
}

impl Drop for ExecutorDeconstructor {
fn drop(&mut self) {
self.worker_scheduler.worker_exits();
}
Expand All @@ -51,7 +62,6 @@ impl DagBlockExecutor {
worker_scheduler: Arc<WorkerScheduler>,
) -> anyhow::Result<(Sender<Option<Block>>, Self)> {
let (sender_for_main, receiver) = mpsc::channel::<Option<Block>>(buffer_size);
worker_scheduler.worker_start();
let executor = Self {
sender: sender_to_main,
receiver,
Expand Down Expand Up @@ -88,6 +98,7 @@ impl DagBlockExecutor {

pub fn start_to_execute(mut self) -> anyhow::Result<JoinHandle<()>> {
let handle = tokio::spawn(async move {
let _ = ExecutorDeconstructor::new(self.worker_scheduler.clone());
let mut chain = None;
loop {
if self.worker_scheduler.check_if_stop().await {
Expand All @@ -100,6 +111,7 @@ impl DagBlockExecutor {
Some(block) => block,
None => {
info!("sync worker channel closed");
drop(self.sender);
return;
}
};
Expand Down Expand Up @@ -239,6 +251,7 @@ impl DagBlockExecutor {
}
None => {
info!("sync worker channel closed");
drop(self.sender);
return;
}
}
Expand Down
80 changes: 40 additions & 40 deletions sync/tests/test_parallel_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,56 +4,56 @@ use starcoin_sync::parallel::worker_scheduler::WorkerScheduler;
use tokio::{task, time};

struct Worker {
worker_scheduler: Arc<WorkerScheduler>,
worker_scheduler: Arc<WorkerScheduler>,
}

impl Worker {
pub fn new(worker_scheduler: Arc<WorkerScheduler>) -> Self {
Worker {
worker_scheduler,
}
}

pub async fn start(&self) {
self.worker_scheduler.worker_start();
loop {
if self.worker_scheduler.check_if_stop().await {
break;
}
println!("Worker is working, worker_count = {:?}", self.worker_scheduler.check_worker_count().await);
time::sleep(Duration::from_secs(1)).await;
}
self.worker_scheduler.worker_exits();
}

pub fn new(worker_scheduler: Arc<WorkerScheduler>) -> Self {
Self { worker_scheduler }
}

pub async fn start(&self) {
self.worker_scheduler.worker_start();
loop {
if self.worker_scheduler.check_if_stop().await {
break;
}
println!(
"Worker is working, worker_count = {:?}",
self.worker_scheduler.check_worker_count().await
);
time::sleep(Duration::from_secs(1)).await;
}
self.worker_scheduler.worker_exits();
}
}

async fn work_cycle(worker_scheduler: Arc<WorkerScheduler>) {
worker_scheduler.tell_worker_to_start().await;
for _i in 0..10 {
let worker = Worker::new(worker_scheduler.clone());
task::spawn(async move {
worker.start().await;
});
}
worker_scheduler.tell_worker_to_start().await;
for _i in 0..10 {
let worker = Worker::new(worker_scheduler.clone());
task::spawn(async move {
worker.start().await;
});
}

println!("Start worker, now sleep for 5 seconds");
time::sleep(Duration::from_secs(5)).await;
println!("Start worker, now sleep for 5 seconds");
time::sleep(Duration::from_secs(5)).await;

println!("now stop worker");
worker_scheduler.tell_worker_to_stop().await;
println!("now stop worker");
worker_scheduler.tell_worker_to_stop().await;

worker_scheduler.wait_for_worker().await;
worker_scheduler.wait_for_worker().await;
}

#[stest::test(timeout = 120)]
async fn test_sync_parallel_scheduler() {
let worker_scheduler = Arc::new(WorkerScheduler::new());
for i in 0..10 {
println!("********************* work_cycle {} start", i);
worker_scheduler.tell_worker_to_stop().await;
worker_scheduler.wait_for_worker().await;
work_cycle(worker_scheduler.clone()).await;
println!("********************* work_cycle {} end", i);
}
}
let worker_scheduler = Arc::new(WorkerScheduler::new());
for i in 0..10 {
println!("********************* work_cycle {} start", i);
worker_scheduler.tell_worker_to_stop().await;
worker_scheduler.wait_for_worker().await;
work_cycle(worker_scheduler.clone()).await;
println!("********************* work_cycle {} end", i);
}
}

0 comments on commit 1d9bd4c

Please sign in to comment.