Skip to content

Commit

Permalink
Optimize socket write queue
Browse files Browse the repository at this point in the history
  • Loading branch information
sniper00 committed Oct 20, 2024
1 parent 41efe3f commit 45dfd7d
Show file tree
Hide file tree
Showing 8 changed files with 251 additions and 114 deletions.
8 changes: 8 additions & 0 deletions common/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,14 @@ operator&(Enum a, Enum b) {
);
}

template<typename Enum>
inline typename std::enable_if_t<enum_enable_bitmask_operators<Enum>::enable, Enum>
enum_unset_bitmask(Enum mask, Enum value) {
return static_cast<Enum>(
static_cast<std::underlying_type_t<Enum>>(mask) & ~static_cast<std::underlying_type_t<Enum>>(value)
);
}

template<
typename Enum,
typename std::enable_if_t<enum_enable_bitmask_operators<Enum>::enable, int> = 0>
Expand Down
3 changes: 1 addition & 2 deletions lualib-src/lua_moon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,7 @@ static int lmoon_log(lua_State* L) {
}

if (lua_Debug ar; lua_getstack(L, 2, &ar) && lua_getinfo(L, "Sl", &ar)) {
line.write_back(" ", 4);
line.write_back('(');
line.write_back(" (", 5);
if (ar.srclen > 1)
line.write_back(ar.source + 1, ar.srclen - 1);
line.write_back(':');
Expand Down
76 changes: 41 additions & 35 deletions moon-src/core/network/base_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,26 @@
#include "config.hpp"
#include "error.hpp"
#include "message.hpp"
#include "write_buffer.hpp"
#include "write_queue.hpp"

namespace moon {
enum class connection_mask : uint8_t {
none = 0,
server = 1 << 0,
would_close = 1 << 1,
reading = 1 << 2,
chunked_recv = 1 << 3,
chunked_send = 1 << 4,
chunked_both = 1 << 5,
};

template<>
struct enum_enable_bitmask_operators<connection_mask> {
static constexpr bool enable = true;
};

class base_connection: public std::enable_shared_from_this<base_connection> {
public:
enum class role : uint8_t { none, client, server };

using socket_t = asio::ip::tcp::socket;

template<typename... Args>
Expand All @@ -32,8 +45,10 @@ class base_connection: public std::enable_shared_from_this<base_connection> {

virtual ~base_connection() = default;

virtual void start(role r) {
role_ = r;
virtual void start(bool server) {
if (!server) {
mask_ = enum_unset_bitmask(mask_, connection_mask::server);
}
recvtime_ = now();
}

Expand All @@ -49,19 +64,21 @@ class base_connection: public std::enable_shared_from_this<base_connection> {
return false;
}

if (wq_warn_size_ != 0 && queue_.size() >= wq_warn_size_) {
CONSOLE_WARN("network send queue too long. size:%zu", queue_.size());
if (wq_error_size_ != 0 && queue_.size() >= wq_error_size_) {
if (wq_warn_size_ != 0 && wqueue_.writeable() >= wq_warn_size_) {
CONSOLE_WARN("network send queue too long. size:%zu", wqueue_.writeable());
if (wq_error_size_ != 0 && wqueue_.writeable() >= wq_error_size_) {
asio::post(socket_.get_executor(), [this, self = shared_from_this()]() {
error(make_error_code(moon::error::send_queue_too_big));
});
return false;
}
}

will_close_ = data->has_bitmask(socket_send_mask::close) ? true : will_close_;
if (data->has_bitmask(socket_send_mask::close)) {
mask_ = mask_ | connection_mask::would_close;
}

if (queue_.emplace_back(std::move(data)); queue_.size() == 1) {
if (wqueue_.enqueue(std::move(data)) == 1) {
post_send();
}

Expand Down Expand Up @@ -100,8 +117,8 @@ class base_connection: public std::enable_shared_from_this<base_connection> {
return serviceid_;
}

role get_role() const {
return role_;
bool is_server() const {
return enum_has_any_bitmask(mask_, connection_mask::server);
}

void timeout(time_t now) {
Expand Down Expand Up @@ -149,39 +166,31 @@ class base_connection: public std::enable_shared_from_this<base_connection> {

protected:
virtual void prepare_send(size_t default_once_send_bytes) {
size_t bytes = 0;
size_t queue_size = queue_.size();
for (size_t i = 0; i < queue_size; ++i) {
const auto& elm = queue_[i];
wbuffers_.write(elm->data(), elm->size());
bytes += elm->size();
if (bytes >= default_once_send_bytes) {
break;
}
}
wqueue_.prepare_buffers(
[this](const buffer_shr_ptr_t& elm) { wqueue_.consume(elm->data(), elm->size()); },
default_once_send_bytes
);
}

void post_send() {
prepare_send(262144);

asio::async_write(
socket_,
make_buffers_ref(wbuffers_.buffers()),
make_buffers_ref(wqueue_.buffer_sequence()),
[this, self = shared_from_this()](const asio::error_code& e, std::size_t) {
if (e) {
error(e);
return;
}

for (size_t i = 0; i < wbuffers_.size(); ++i) {
queue_.pop_front();
}

wbuffers_.clear();
wqueue_.commit_written();

if (!queue_.empty()) {
if (wqueue_.writeable() > 0) {
post_send();
} else if (will_close_ && parent_ != nullptr) {
} else if (enum_has_any_bitmask(mask_, connection_mask::would_close)
&& parent_ != nullptr)
{
parent_->close(fd_);
parent_ = nullptr;
}
Expand Down Expand Up @@ -226,9 +235,7 @@ class base_connection: public std::enable_shared_from_this<base_connection> {
}

protected:
bool will_close_ = false;
bool read_in_progress_ = false;
role role_ = role::none;
connection_mask mask_ = connection_mask::server;
uint8_t type_ = 0;
uint16_t wq_warn_size_ = 0;
uint16_t wq_error_size_ = 0;
Expand All @@ -237,8 +244,7 @@ class base_connection: public std::enable_shared_from_this<base_connection> {
uint32_t serviceid_ = 0;
time_t recvtime_ = 0;
moon::socket_server* parent_;
write_buffer wbuffers_;
VecDeque<buffer_shr_ptr_t> queue_;
write_queue wqueue_;
socket_t socket_;
};
} // namespace moon
85 changes: 45 additions & 40 deletions moon-src/core/network/moon_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,23 @@ class moon_connection: public base_connection {
int> = 0>
explicit moon_connection(Args&&... args):
base_connection(std::forward<Args>(args)...),
flag_(enable_chunked::none),
cache_(512) {}

void start(role r) override {
base_connection_t::start(r);
void start(bool server) override {
base_connection_t::start(server);
message m {};
m.set_type(type_);
m.write_data(address());
m.set_receiver(static_cast<uint8_t>(
r == role::server ? socket_data_type::socket_accept : socket_data_type::socket_connect
server ? socket_data_type::socket_accept : socket_data_type::socket_connect
));
handle_message(std::move(m));
read_header();
}

bool send(buffer_shr_ptr_t data) override {
if (data->size() >= MESSAGE_CONTINUED_FLAG
&& !enum_has_any_bitmask(flag_, enable_chunked::send))
&& !enum_has_any_bitmask(mask_, connection_mask::chunked_send))
{
asio::post(socket_.get_executor(), [this, self = shared_from_this()]() {
error(make_error_code(moon::error::write_message_too_big));
Expand All @@ -45,44 +44,51 @@ class moon_connection: public base_connection {
return base_connection_t::send(std::move(data));
}

void set_enable_chunked(enable_chunked v) {
flag_ = v;
void set_enable_chunked(connection_mask v) {
if(enum_has_any_bitmask(v, connection_mask::chunked_send)) {
mask_ = mask_ | connection_mask::chunked_send;
} else {
mask_ = enum_unset_bitmask(mask_, connection_mask::chunked_send);
}

if(enum_has_any_bitmask(v, connection_mask::chunked_recv)) {
mask_ = mask_ | connection_mask::chunked_recv;
} else {
mask_ = enum_unset_bitmask(mask_, connection_mask::chunked_recv);
}
}

private:
void prepare_send(size_t default_once_send_bytes) override {
size_t bytes = 0;
size_t queue_size = queue_.size();
for (size_t i = 0; i < queue_size; ++i) {
const auto& elm = queue_[i];
size_t size = elm->size();
const char* data = elm->data();
bytes += size;
if (!elm->has_bitmask(socket_send_mask::raw)) {
wbuffers_.begin_write_slice();
message_size_t slice_size = 0;
message_size_t header = 0;
do {
header = slice_size = (size >= MESSAGE_CONTINUED_FLAG)
? MESSAGE_CONTINUED_FLAG
: static_cast<message_size_t>(size);
host2net(header);
wbuffers_.write_slice(&header, sizeof(header), data, slice_size);
size -= slice_size;
data += slice_size;
} while (size != 0);

if (slice_size == MESSAGE_CONTINUED_FLAG) {
header = 0;
wbuffers_.write_slice(&header, sizeof(header), nullptr, 0); //end flag
wqueue_.prepare_buffers(
[this](const buffer_shr_ptr_t& elm) {
size_t size = elm->size();
const char* data = elm->data();
if (!elm->has_bitmask(socket_send_mask::raw)) {
wqueue_.consume();
message_size_t slice_size = 0;
message_size_t header = 0;
do {
header = slice_size = (size >= MESSAGE_CONTINUED_FLAG)
? MESSAGE_CONTINUED_FLAG
: static_cast<message_size_t>(size);
host2net(header);
wqueue_.prepare_with_padding(&header, sizeof(header), data, slice_size);
size -= slice_size;
data += slice_size;
} while (size != 0);

if (slice_size == MESSAGE_CONTINUED_FLAG) {
header = 0;
wqueue_
.prepare_with_padding(&header, sizeof(header), nullptr, 0); //end flag
}
} else {
wqueue_.consume(data, size);
}
} else {
wbuffers_.write(data, size);
}
if (bytes >= default_once_send_bytes) {
break;
}
}
},
default_once_send_bytes
);
}

void read_header() {
Expand Down Expand Up @@ -111,7 +117,7 @@ class moon_connection: public base_connection {
net2host(header);

bool fin = (header != MESSAGE_CONTINUED_FLAG);
if (!fin && !enum_has_any_bitmask(flag_, enable_chunked::receive)) {
if (!fin && !enum_has_any_bitmask(mask_, connection_mask::chunked_recv)) {
error(make_error_code(moon::error::read_message_too_big));
return;
}
Expand Down Expand Up @@ -169,7 +175,6 @@ class moon_connection: public base_connection {
}

private:
enable_chunked flag_;
buffer cache_;
buffer_ptr_t data_;
};
Expand Down
14 changes: 7 additions & 7 deletions moon-src/core/network/socket_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ void socket_server::connect(
if (!ec) {
conn->fd(server_->nextfd());
connections_.try_emplace(conn->fd(), conn);
conn->start(base_connection::role::client);
conn->start(false);
response(
0,
params->owner,
Expand All @@ -243,7 +243,7 @@ void socket_server::connect(
);
} else {
//Set the fd flag to prevent timeout handling
conn->fd(std::numeric_limits<uint32_t>::max());
conn->fd(std::numeric_limits<uint32_t>::max());
response(
0,
params->owner,
Expand Down Expand Up @@ -367,16 +367,16 @@ bool socket_server::setnodelay(uint32_t fd) {
}

bool socket_server::set_enable_chunked(uint32_t fd, std::string_view flag) {
auto v = enable_chunked::none;
auto v = connection_mask::none;
for (const auto& c: flag) {
switch (c) {
case 'r':
case 'R':
v = v | moon::enable_chunked::receive;
v = v | moon::connection_mask::chunked_recv;
break;
case 'w':
case 'W':
v = v | moon::enable_chunked::send;
v = v | moon::connection_mask::chunked_send;
break;
default:
CONSOLE_WARN(
Expand Down Expand Up @@ -466,7 +466,7 @@ bool moon::socket_server::switch_type(uint32_t fd, uint8_t new_type) {
auto newc = make_connection(c->owner(), new_type, std::move(c->socket()));
newc->fd(fd);
iter->second = newc;
newc->start(c->get_role());
newc->start(c->is_server());
return true;
}
return false;
Expand Down Expand Up @@ -546,7 +546,7 @@ void socket_server::add_connection(
) {
asio::dispatch(context_, [this, from, ctx, c, sessionid] {
connections_.try_emplace(c->fd(), c);
c->start(base_connection::role::server);
c->start(true);

if (sessionid != 0) {
asio::dispatch(from->context_, [from, ctx, sessionid, fd = c->fd()] {
Expand Down
Loading

0 comments on commit 45dfd7d

Please sign in to comment.