From c57f97b8c1d97ee5d23a6cac4c84fe0af1a19b36 Mon Sep 17 00:00:00 2001 From: Marcelo Zimbres Date: Sat, 6 Aug 2022 23:12:32 +0200 Subject: [PATCH] Improvements in the examples. --- examples/chat_room.cpp | 27 ++++++++--------- examples/subscriber.cpp | 4 +-- examples/subscriber_sync.cpp | 47 +++++++++++++++-------------- include/aedis/experimental/sync.hpp | 15 +++++++++ 4 files changed, 54 insertions(+), 39 deletions(-) diff --git a/examples/chat_room.cpp b/examples/chat_room.cpp index 867f6f1a..789f7171 100644 --- a/examples/chat_room.cpp +++ b/examples/chat_room.cpp @@ -31,7 +31,7 @@ using response_type = std::vector>; // // To see the message traffic. -net::awaitable reader(std::shared_ptr db) +net::awaitable subscriber(std::shared_ptr db) { request req; req.push("SUBSCRIBE", "chat-channel"); @@ -54,19 +54,14 @@ net::awaitable reader(std::shared_ptr db) } net::awaitable -run(net::posix::stream_descriptor& in, std::shared_ptr db) +publisher(net::posix::stream_descriptor& in, std::shared_ptr db) { - try { - for (std::string msg;;) { - std::size_t n = co_await net::async_read_until(in, net::dynamic_buffer(msg, 1024), "\n", net::use_awaitable); - request req; - req.push("PUBLISH", "chat-channel", msg); - co_await db->async_exec(req); - msg.erase(0, n); - } - - } catch (std::exception const& e) { - std::cerr << "Error: " << e.what() << std::endl; + for (std::string msg;;) { + std::size_t n = co_await net::async_read_until(in, net::dynamic_buffer(msg, 1024), "\n", net::use_awaitable); + request req; + req.push("PUBLISH", "chat-channel", msg); + co_await db->async_exec(req); + msg.erase(0, n); } } @@ -75,17 +70,19 @@ int main() try { net::io_context ioc{1}; net::posix::stream_descriptor in{ioc, ::dup(STDIN_FILENO)}; + auto db = std::make_shared(ioc); db->get_config().enable_events = true; - co_spawn(ioc, run(in, db), net::detached); - co_spawn(ioc, reader(db), net::detached); + co_spawn(ioc, publisher(in, db), net::detached); + co_spawn(ioc, subscriber(db), net::detached); db->async_run([](auto ec) { std::cout << ec.message() << std::endl; }); net::signal_set signals(ioc, SIGINT, SIGTERM); signals.async_wait([&](auto, auto){ ioc.stop(); }); + ioc.run(); } catch (std::exception const& e) { std::cerr << e.what() << std::endl; diff --git a/examples/subscriber.cpp b/examples/subscriber.cpp index 4a33dfb7..062069f9 100644 --- a/examples/subscriber.cpp +++ b/examples/subscriber.cpp @@ -58,8 +58,8 @@ net::awaitable receiver(std::shared_ptr db) break; case connection::event::hello: - // Subscribes to the channels again after stablishing a new - // connection. + // Subscribes to the channels when a new connection is + // stablished. co_await db->async_exec(req); break; diff --git a/examples/subscriber_sync.cpp b/examples/subscriber_sync.cpp index 450be56f..9dc23da8 100644 --- a/examples/subscriber_sync.cpp +++ b/examples/subscriber_sync.cpp @@ -24,6 +24,30 @@ using event = connection::event; // See subscriber.cpp for more info about how to run this example. +void subscriber(connection& conn) +{ + request req; + req.push("SUBSCRIBE", "channel"); + + for (std::vector> resp;;) { + auto const ev = receive_event(conn, aedis::adapt(resp)); + switch (ev) { + case connection::event::push: + print_push(resp); + resp.clear(); + break; + + case connection::event::hello: + // Subscribes to the channels when a new connection is + // stablished. + exec(conn, req); + break; + + default:; + } + } +} + int main() { try { @@ -38,28 +62,7 @@ int main() ioc.run(); }}; - request req; - req.push("SUBSCRIBE", "channel"); - - for (std::vector> resp;;) { - boost::system::error_code ec; - auto const ev = receive_event(conn, aedis::adapt(resp), ec); - switch (ev) { - case connection::event::push: - print_push(resp); - resp.clear(); - break; - - case connection::event::hello: - // Subscribes to the channels again after stablishing a - // new connection. - exec(conn, req); - break; - - default:; - } - } - + subscriber(conn); thread.join(); } catch (std::exception const& e) { diff --git a/include/aedis/experimental/sync.hpp b/include/aedis/experimental/sync.hpp index debfd277..d24d906b 100644 --- a/include/aedis/experimental/sync.hpp +++ b/include/aedis/experimental/sync.hpp @@ -130,6 +130,21 @@ auto receive_event( return ev; } +/// TODO +template < + class Connection, + class ResponseAdapter = aedis::detail::response_traits::adapter_type> +auto receive_event( + Connection& conn, + ResponseAdapter adapter = aedis::adapt()) +{ + boost::system::error_code ec; + auto const res = receive_event(conn, adapter, ec); + if (ec) + throw std::system_error(ec); + return res; +} + } // experimental } // aedis