-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathexampleProducerConsumer3.cpp
59 lines (51 loc) · 1.7 KB
/
exampleProducerConsumer3.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
#include <thread>
#include <chrono>
#include <iostream>
using std::endl;
using std::cout;
#include "rtb/concurrency/Concurrency.h"
using namespace rtb::Concurrency;
// This example uses structs, rather than free functions, to implement producers and consumers
struct Producer {
Producer(int n, Queue<int> &outputQueue)
: numberOfMessages_(n)
, outputQueue_(outputQueue) {}
// The `operator()()` is the default function that is run when the struct is moved to a new
// thread
void operator()() {
for (int i{ 0 }; i < numberOfMessages_; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
cout << "Producer (id#" << std::this_thread::get_id() << "): " << i << endl;
outputQueue_.push(i);
}
outputQueue_.close();
}
int numberOfMessages_;
Queue<int> &outputQueue_;
};
struct Consumer {
Consumer(Queue<int> &inputQueue)
: inputQueue_(inputQueue) {}
void operator()() {
inputQueue_.subscribe();
while (auto value{ inputQueue_.pop() }) {
cout << "Consumer (id#" << std::this_thread::get_id() << "): " << value.value() << endl;
}
inputQueue_.unsubscribe();
}
Queue<int> &inputQueue_;
};
int main() {
// Define the queue as a variable in the main scope
Queue<int> q;
Producer prod(10, q);
Consumer cons(q);
// Run `produce` function on a new thread providing a function argument
std::thread prodThr(std::ref(prod));
// Run `consume` function on a new thread
std::thread consThr(std::ref(cons));
// Wait for the threads to terminate before exiting the main function
prodThr.join();
consThr.join();
return 0;
}