Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Uses asio::deferred_t as default completion type. #228

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions example/cpp20_chat_room.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
*/

#include <boost/redis/connection.hpp>
#include <boost/asio/deferred.hpp>
#include <boost/asio/signal_set.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>
Expand All @@ -18,13 +17,12 @@
#if defined(BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR)

namespace asio = boost::asio;
using stream_descriptor = asio::deferred_t::as_default_on_t<asio::posix::stream_descriptor>;
using signal_set = asio::deferred_t::as_default_on_t<asio::signal_set>;
using asio::posix::stream_descriptor;
using asio::signal_set;
using boost::asio::async_read_until;
using boost::asio::awaitable;
using boost::asio::co_spawn;
using boost::asio::consign;
using boost::asio::deferred;
using boost::asio::detached;
using boost::asio::dynamic_buffer;
using boost::asio::redirect_error;
Expand Down Expand Up @@ -52,7 +50,7 @@ receiver(std::shared_ptr<connection> conn) -> awaitable<void>
while (conn->will_reconnect()) {

// Subscribe to channels.
co_await conn->async_exec(req, ignore, deferred);
co_await conn->async_exec(req, ignore);

// Loop reading Redis push messages.
for (error_code ec;;) {
Expand All @@ -76,7 +74,7 @@ auto publisher(std::shared_ptr<stream_descriptor> in, std::shared_ptr<connection
auto n = co_await async_read_until(*in, dynamic_buffer(msg, 1024), "\n");
request req;
req.push("PUBLISH", "channel", msg);
co_await conn->async_exec(req, ignore, deferred);
co_await conn->async_exec(req, ignore);
msg.erase(0, n);
}
}
Expand Down
8 changes: 3 additions & 5 deletions example/cpp20_containers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
*/

#include <boost/redis/connection.hpp>
#include <boost/asio/deferred.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/co_spawn.hpp>
#include <map>
Expand All @@ -22,7 +21,6 @@ using boost::redis::ignore;
using boost::redis::config;
using boost::redis::connection;
using boost::asio::awaitable;
using boost::asio::deferred;
using boost::asio::detached;
using boost::asio::consign;

Expand Down Expand Up @@ -51,7 +49,7 @@ auto store(std::shared_ptr<connection> conn) -> awaitable<void>
req.push_range("RPUSH", "rpush-key", vec);
req.push_range("HSET", "hset-key", map);

co_await conn->async_exec(req, ignore, deferred);
co_await conn->async_exec(req, ignore);
}

auto hgetall(std::shared_ptr<connection> conn) -> awaitable<void>
Expand All @@ -64,7 +62,7 @@ auto hgetall(std::shared_ptr<connection> conn) -> awaitable<void>
response<std::map<std::string, std::string>> resp;

// Executes the request and reads the response.
co_await conn->async_exec(req, resp, deferred);
co_await conn->async_exec(req, resp);

print(std::get<0>(resp).value());
}
Expand All @@ -85,7 +83,7 @@ auto transaction(std::shared_ptr<connection> conn) -> awaitable<void>
response<std::optional<std::vector<int>>, std::optional<std::map<std::string, std::string>>> // exec
> resp;

co_await conn->async_exec(req, resp, deferred);
co_await conn->async_exec(req, resp);

print(std::get<0>(std::get<3>(resp).value()).value().value());
print(std::get<1>(std::get<3>(resp).value()).value().value());
Expand Down
14 changes: 7 additions & 7 deletions example/cpp20_echo_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
*/

#include <boost/redis/connection.hpp>
#include <boost/asio/deferred.hpp>
#include <boost/asio/signal_set.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/redirect_error.hpp>
Expand All @@ -15,25 +14,26 @@
#if defined(BOOST_ASIO_HAS_CO_AWAIT)

namespace asio = boost::asio;
using tcp_socket = asio::deferred_t::as_default_on_t<asio::ip::tcp::socket>;
using tcp_acceptor = asio::deferred_t::as_default_on_t<asio::ip::tcp::acceptor>;
using signal_set = asio::deferred_t::as_default_on_t<asio::signal_set>;
using boost::asio::signal_set;
using boost::redis::request;
using boost::redis::response;
using boost::redis::config;
using boost::system::error_code;
using boost::redis::connection;
using namespace std::chrono_literals;

auto echo_server_session(tcp_socket socket, std::shared_ptr<connection> conn) -> asio::awaitable<void>
auto
echo_server_session(
asio::ip::tcp::socket socket,
std::shared_ptr<connection> conn) -> asio::awaitable<void>
{
request req;
response<std::string> resp;

for (std::string buffer;;) {
auto n = co_await asio::async_read_until(socket, asio::dynamic_buffer(buffer, 1024), "\n");
req.push("PING", buffer);
co_await conn->async_exec(req, resp, asio::deferred);
co_await conn->async_exec(req, resp);
co_await asio::async_write(socket, asio::buffer(std::get<0>(resp).value()));
std::get<0>(resp).value().clear();
req.clear();
Expand All @@ -46,7 +46,7 @@ auto listener(std::shared_ptr<connection> conn) -> asio::awaitable<void>
{
try {
auto ex = co_await asio::this_coro::executor;
tcp_acceptor acc(ex, {asio::ip::tcp::v4(), 55555});
asio::ip::tcp::acceptor acc(ex, {asio::ip::tcp::v4(), 55555});
for (;;)
asio::co_spawn(ex, echo_server_session(co_await acc.async_accept(), conn), asio::detached);
} catch (std::exception const& e) {
Expand Down
3 changes: 1 addition & 2 deletions example/cpp20_intro.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
*/

#include <boost/redis/connection.hpp>
#include <boost/asio/deferred.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/consign.hpp>
Expand Down Expand Up @@ -33,7 +32,7 @@ auto co_main(config cfg) -> asio::awaitable<void>
response<std::string> resp;

// Executes the request.
co_await conn->async_exec(req, resp, asio::deferred);
co_await conn->async_exec(req, resp);
conn->cancel();

std::cout << "PING: " << std::get<0>(resp).value() << std::endl;
Expand Down
3 changes: 1 addition & 2 deletions example/cpp20_intro_tls.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
*/

#include <boost/redis/connection.hpp>
#include <boost/asio/deferred.hpp>
#include <boost/asio/use_awaitable.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/consign.hpp>
Expand Down Expand Up @@ -45,7 +44,7 @@ auto co_main(config cfg) -> asio::awaitable<void>
conn->next_layer().set_verify_mode(asio::ssl::verify_peer);
conn->next_layer().set_verify_callback(verify_certificate);

co_await conn->async_exec(req, resp, asio::deferred);
co_await conn->async_exec(req, resp);
conn->cancel();

std::cout << "Response: " << std::get<0>(resp).value() << std::endl;
Expand Down
3 changes: 1 addition & 2 deletions example/cpp20_json.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
*/

#include <boost/redis/connection.hpp>
#include <boost/asio/deferred.hpp>
#include <boost/asio/detached.hpp>
#include <boost/describe.hpp>
#include <boost/asio/consign.hpp>
Expand Down Expand Up @@ -62,7 +61,7 @@ auto co_main(config cfg) -> asio::awaitable<void>

response<ignore_t, user> resp;

co_await conn->async_exec(req, resp, asio::deferred);
co_await conn->async_exec(req, resp);
conn->cancel();

// Prints the first ping
Expand Down
3 changes: 1 addition & 2 deletions example/cpp20_protobuf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

#include <boost/redis/connection.hpp>
#include <boost/redis/resp3/serialization.hpp>
#include <boost/asio/deferred.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/consign.hpp>
Expand Down Expand Up @@ -76,7 +75,7 @@ asio::awaitable<void> co_main(config cfg)
response<ignore_t, person> resp;

// Sends the request and receives the response.
co_await conn->async_exec(req, resp, asio::deferred);
co_await conn->async_exec(req, resp);
conn->cancel();

std::cout
Expand Down
5 changes: 2 additions & 3 deletions example/cpp20_streams.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
*/

#include <boost/redis/connection.hpp>
#include <boost/asio/deferred.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/consign.hpp>
Expand All @@ -26,7 +25,7 @@ using boost::redis::generic_response;
using boost::redis::operation;
using boost::redis::request;
using boost::redis::connection;
using signal_set = net::deferred_t::as_default_on_t<net::signal_set>;
using net::signal_set;

auto stream_reader(std::shared_ptr<connection> conn) -> net::awaitable<void>
{
Expand All @@ -39,7 +38,7 @@ auto stream_reader(std::shared_ptr<connection> conn) -> net::awaitable<void>

for (;;) {
req.push("XREAD", "BLOCK", "0", "STREAMS", "test-topic", stream_id);
co_await conn->async_exec(req, resp, net::deferred);
co_await conn->async_exec(req, resp);

//std::cout << "Response: ";
//for (auto i = 0UL; i < resp->size(); ++i) {
Expand Down
5 changes: 2 additions & 3 deletions example/cpp20_subscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
#include <boost/redis/logger.hpp>
#include <boost/asio/awaitable.hpp>
#include <boost/asio/use_awaitable.hpp>
#include <boost/asio/deferred.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/consign.hpp>
Expand All @@ -29,7 +28,7 @@ using boost::redis::ignore;
using boost::redis::error;
using boost::system::error_code;
using boost::redis::connection;
using signal_set = asio::deferred_t::as_default_on_t<asio::signal_set>;
using asio::signal_set;

/* This example will subscribe and read pushes indefinitely.
*
Expand Down Expand Up @@ -61,7 +60,7 @@ receiver(std::shared_ptr<connection> conn) -> asio::awaitable<void>
while (conn->will_reconnect()) {

// Reconnect to the channels.
co_await conn->async_exec(req, ignore, asio::deferred);
co_await conn->async_exec(req, ignore);

// Loop reading Redis pushs messages.
for (error_code ec;;) {
Expand Down
29 changes: 13 additions & 16 deletions include/boost/redis/connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <boost/asio/buffer.hpp>
#include <boost/asio/cancel_after.hpp>
#include <boost/asio/coroutine.hpp>
#include <boost/asio/deferred.hpp>
#include <boost/asio/experimental/channel.hpp>
#include <boost/asio/experimental/parallel_group.hpp>
#include <boost/asio/io_context.hpp>
Expand Down Expand Up @@ -1250,8 +1251,8 @@ class connection {
{ return impl_.get_executor(); }

/// Calls `boost::redis::basic_connection::async_run`.
template <class CompletionToken>
auto async_run(config const& cfg, logger l, CompletionToken token)
template <class CompletionToken = asio::deferred_t>
auto async_run(config const& cfg, logger l, CompletionToken token = {})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As minor comments, consider unifying all signatures using completion tokens to CompletionToken&&

{
return asio::async_initiate<
CompletionToken, void(boost::system::error_code)>(
Expand All @@ -1262,16 +1263,8 @@ class connection {
}

/// Calls `boost::redis::basic_connection::async_receive`.
template <class Response, class CompletionToken>
[[deprecated("Set the response with set_receive_response and use the other overload.")]]
auto async_receive(Response& response, CompletionToken token)
{
return impl_.async_receive(response, std::move(token));
}

/// Calls `boost::redis::basic_connection::async_receive`.
template <class CompletionToken>
auto async_receive(CompletionToken token)
template <class CompletionToken = asio::deferred_t>
auto async_receive(CompletionToken token = {})
{ return impl_.async_receive(std::move(token)); }

/// Calls `boost::redis::basic_connection::receive`.
Expand All @@ -1281,15 +1274,19 @@ class connection {
}

/// Calls `boost::redis::basic_connection::async_exec`.
template <class Response, class CompletionToken>
auto async_exec(request const& req, Response& resp, CompletionToken&& token)
template <class Response, class CompletionToken = asio::deferred_t>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can also consider using BOOST_ASIO_COMPLETION_TOKEN_FOR to implement concept checks.

auto async_exec(request const& req, Response& resp, CompletionToken&& token = {})
{
return async_exec(req, any_adapter(resp), std::forward<CompletionToken>(token));
}

/// Calls `boost::redis::basic_connection::async_exec`.
template <class CompletionToken>
auto async_exec(request const& req, any_adapter adapter, CompletionToken&& token)
template <class CompletionToken = asio::deferred_t>
auto
async_exec(
request const& req,
any_adapter adapter,
CompletionToken&& token = {})
{
return asio::async_initiate<
CompletionToken, void(boost::system::error_code, std::size_t)>(
Expand Down
Loading