Skip to content

Latest commit

 

History

History
31 lines (25 loc) · 1.02 KB

README.md

File metadata and controls

31 lines (25 loc) · 1.02 KB

sandflow

Try to extend the async Stream to do more things, e.g. map-reduce, iteration ... .

More than that, the async stream can be executed with massive parallel in a distributed cluster;

Example

fn main() {
    let source = futures::stream::iter(vec![1, 2, 3, 4, 5, 6].into_iter()).map(|i| Ok(i));
    // items in source stream will be consumed by parallel tasks in a round-robin manner;
    let result = sandflow::spawn(2, source, || {
        move |src| {
            src.map(|item| item + 1)
                .then(|item| async move { Ok(item + 1) })
                // exchange data between tasks; (e.g. for load balance;)
                .exchange(|item| *item)
                .inspect(| item | {
                    println!("worker[{}]: {};", worker_id(), item)
                })
                .then(|item|  async move { Ok (item * 2)})
        }
    })
    .collect::<Vec<_>>();
    let r = futures::executor::block_on(result);
    println!("{:?}", r);
}