Skip to content

Commit

Permalink
Improvements in the examples.
Browse files Browse the repository at this point in the history
  • Loading branch information
mzimbres committed Aug 6, 2022
1 parent 37ab1e7 commit c57f97b
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 39 deletions.
27 changes: 12 additions & 15 deletions examples/chat_room.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ using response_type = std::vector<aedis::resp3::node<std::string>>;
//
// To see the message traffic.

net::awaitable<void> reader(std::shared_ptr<connection> db)
net::awaitable<void> subscriber(std::shared_ptr<connection> db)
{
request req;
req.push("SUBSCRIBE", "chat-channel");
Expand All @@ -54,19 +54,14 @@ net::awaitable<void> reader(std::shared_ptr<connection> db)
}

net::awaitable<void>
run(net::posix::stream_descriptor& in, std::shared_ptr<connection> db)
publisher(net::posix::stream_descriptor& in, std::shared_ptr<connection> db)
{
try {
for (std::string msg;;) {
std::size_t n = co_await net::async_read_until(in, net::dynamic_buffer(msg, 1024), "\n", net::use_awaitable);
request req;
req.push("PUBLISH", "chat-channel", msg);
co_await db->async_exec(req);
msg.erase(0, n);
}

} catch (std::exception const& e) {
std::cerr << "Error: " << e.what() << std::endl;
for (std::string msg;;) {
std::size_t n = co_await net::async_read_until(in, net::dynamic_buffer(msg, 1024), "\n", net::use_awaitable);
request req;
req.push("PUBLISH", "chat-channel", msg);
co_await db->async_exec(req);
msg.erase(0, n);
}
}

Expand All @@ -75,17 +70,19 @@ int main()
try {
net::io_context ioc{1};
net::posix::stream_descriptor in{ioc, ::dup(STDIN_FILENO)};

auto db = std::make_shared<connection>(ioc);
db->get_config().enable_events = true;
co_spawn(ioc, run(in, db), net::detached);
co_spawn(ioc, reader(db), net::detached);

co_spawn(ioc, publisher(in, db), net::detached);
co_spawn(ioc, subscriber(db), net::detached);
db->async_run([](auto ec) {
std::cout << ec.message() << std::endl;
});

net::signal_set signals(ioc, SIGINT, SIGTERM);
signals.async_wait([&](auto, auto){ ioc.stop(); });

ioc.run();
} catch (std::exception const& e) {
std::cerr << e.what() << std::endl;
Expand Down
4 changes: 2 additions & 2 deletions examples/subscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ net::awaitable<void> receiver(std::shared_ptr<connection> db)
break;

case connection::event::hello:
// Subscribes to the channels again after stablishing a new
// connection.
// Subscribes to the channels when a new connection is
// stablished.
co_await db->async_exec(req);
break;

Expand Down
47 changes: 25 additions & 22 deletions examples/subscriber_sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,30 @@ using event = connection::event;

// See subscriber.cpp for more info about how to run this example.

void subscriber(connection& conn)
{
request req;
req.push("SUBSCRIBE", "channel");

for (std::vector<node<std::string>> resp;;) {
auto const ev = receive_event(conn, aedis::adapt(resp));
switch (ev) {
case connection::event::push:
print_push(resp);
resp.clear();
break;

case connection::event::hello:
// Subscribes to the channels when a new connection is
// stablished.
exec(conn, req);
break;

default:;
}
}
}

int main()
{
try {
Expand All @@ -38,28 +62,7 @@ int main()
ioc.run();
}};

request req;
req.push("SUBSCRIBE", "channel");

for (std::vector<node<std::string>> resp;;) {
boost::system::error_code ec;
auto const ev = receive_event(conn, aedis::adapt(resp), ec);
switch (ev) {
case connection::event::push:
print_push(resp);
resp.clear();
break;

case connection::event::hello:
// Subscribes to the channels again after stablishing a
// new connection.
exec(conn, req);
break;

default:;
}
}

subscriber(conn);
thread.join();

} catch (std::exception const& e) {
Expand Down
15 changes: 15 additions & 0 deletions include/aedis/experimental/sync.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,21 @@ auto receive_event(
return ev;
}

/// TODO
template <
class Connection,
class ResponseAdapter = aedis::detail::response_traits<void>::adapter_type>
auto receive_event(
Connection& conn,
ResponseAdapter adapter = aedis::adapt())
{
boost::system::error_code ec;
auto const res = receive_event(conn, adapter, ec);
if (ec)
throw std::system_error(ec);
return res;
}

} // experimental
} // aedis

Expand Down

0 comments on commit c57f97b

Please sign in to comment.