-
Notifications
You must be signed in to change notification settings - Fork 1
/
13_3_fake_google_search_timeout.rs
92 lines (77 loc) · 2.53 KB
/
13_3_fake_google_search_timeout.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
//! Based on Go example
//! [slide 47: "Google Search 2.1"](https://talks.golang.org/2012/concurrency.slide#47)
//!
//! Don't wait for slow servers. No locks. No condition variables. No callbacks.
//!
//! In this version we hard-limit the search to 80ms and only return results
//! successfully collected in that time.
//!
use async_std::future;
use async_std::task;
use futures::channel::mpsc::channel;
use futures::future::{FusedFuture, FutureExt};
use futures::select;
use futures::sink::SinkExt;
use futures::stream::StreamExt;
use std::time;
mod helpers;
struct FakeSearch<'a> {
kind: &'a str,
}
impl<'a> FakeSearch<'a> {
const fn new(kind: &'a str) -> Self {
Self { kind }
}
async fn call(&self, query: &str) -> String {
task::sleep(helpers::rand_duration(0, 100)).await;
format!("{} result for {}", self.kind, query)
}
}
static WEB: FakeSearch = FakeSearch::new("web");
static IMAGE: FakeSearch = FakeSearch::new("image");
static VIDEO: FakeSearch = FakeSearch::new("video");
async fn google(query: &str) -> Vec<String> {
let mut results = Vec::new();
let (sender, mut receiver) = channel(0);
let searches: [&FakeSearch; 3] = [&WEB, &IMAGE, &VIDEO];
for search in &searches {
// Clone values so they can be safely transferred across the threads
let search = search.to_owned();
let query = query.to_owned();
let mut sender = sender.to_owned();
task::spawn(async move {
// Perform the search
let result = search.call(&query).await;
// Send the result back over the channel
sender.send(result).await.unwrap();
});
}
// The searches have 80ms to report back or we just return whatever results
// we have (which could be none).
let mut timeout = timeout_after(80);
for _ in 0..searches.len() {
select! {
s = receiver.next() => results.push(s.unwrap()),
_ = timeout => {
println!("timed out");
break;
},
}
}
results
}
fn timeout_after(ms: u64) -> impl FusedFuture {
let duration = time::Duration::from_millis(ms);
let never = future::pending::<()>();
future::timeout(duration, never).boxed().fuse()
}
fn main() {
task::block_on(async_main());
}
async fn async_main() {
let start = time::Instant::now();
let results = google("rust lang").await;
let elapsed = start.elapsed();
println!("Result: {:#?}", results);
println!("Elapsed: {}ms", helpers::to_millis(elapsed));
}