-
Notifications
You must be signed in to change notification settings - Fork 0
/
multithread.rs
85 lines (73 loc) · 2.83 KB
/
multithread.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
use std::sync::Arc;
use std::thread::{self, JoinHandle};
use crossbeam_channel::unbounded;
use dom_finder::{Config, Finder};
const CFG_YAML: &str = r"
name: root
base_path: html
children:
- name: results
base_path: div.serp__results div.result
many: true
children:
- name: url
base_path: h2.result__title > a[href]
extract: href
- name: title
base_path: h2.result__title
extract: text
- name: snippet
base_path: a.result__snippet
extract: html
pipeline: [ [ policy_highlight ], [ trim_space ] ]
";
fn main() -> Result<(), Box<dyn std::error::Error>> {
// There is a benefit to reuse the same `Finder` instance, because it keeps all matchers compiled and ready to use.
// Also pipeline functions ready to use.
// And of course the benefit is even bigger, when we use `Finder` in multithreaded environment.
// Setting favorite concurrency number;
let concurrency: usize = 2;
// Creating a `Config` instance from yaml string.
let cfg: Config = Config::from_yaml(CFG_YAML)?;
// Setting up the finder inside `Arc` to be able to clone it later.
let finder = Arc::new(Finder::new(&cfg)?);
// Unnecessary: At this point we do not need the config anymore, so we can safely drop it.
drop(cfg);
// or just in one line:
// let finder: Arc<Finder> = Arc::new(Config::from_yaml(CFG_YAML)?.try_into()?);
// Creating a channel to send html pages -- just for testing purposes
let (tx, rx) = unbounded::<&str>();
// Sending dummy pages to the channel. it can be any amount of pages, but they must be the same type of markup.
// For instance, presented config can handle only duckduckgo search results pages and nothing more.
for _ in 0..1000 {
let html_page = include_str!("../test_data/page_0.html");
tx.send(html_page)?;
}
// dropping sender -- we don't need it anymore
drop(tx);
let workers: Vec<usize> = (1..concurrency + 1).collect();
let mut handles: Vec<JoinHandle<()>> = vec![];
for i in workers {
let rx = rx.clone();
let finder = finder.clone();
let handle = thread::spawn(move || {
let worker_id = i;
let mut total = 0;
while let Ok(html_page) = rx.recv() {
// Using `Finder` instance to parse, without cloning it
let _ = finder.parse(html_page);
// result is omitted here, but in the normal case it may be passed to another channel,
// or it may be collected in some storage (database, etc.).
total += 1;
}
println!("worker: {worker_id} processed {total} pages");
drop(rx)
});
handles.push(handle)
}
for handle in handles {
handle.join().unwrap();
}
drop(finder);
Ok(())
}