From e341b232e916549bd155416023c21c1cc3331415 Mon Sep 17 00:00:00 2001 From: Bruce Date: Wed, 22 May 2024 21:51:17 +0800 Subject: [PATCH] Add Support for Socket Send Without Encoding Message --- lualib/moon/db/mysql.lua | 5 +- lualib/moon/db/pg.lua | 5 +- lualib/moon/socket.lua | 12 +++-- moon-src/core/config.hpp | 2 +- moon-src/core/network/base_connection.hpp | 13 ++---- moon-src/core/network/moon_connection.hpp | 6 +-- moon-src/core/network/socket_server.cpp | 2 +- moon-src/core/network/write_buffer.hpp | 5 +- moon-src/core/network/ws_connection.hpp | 56 +++++++++++++++-------- 9 files changed, 59 insertions(+), 47 deletions(-) diff --git a/lualib/moon/db/mysql.lua b/lualib/moon/db/mysql.lua index 1a619b29b..b552cc1ef 100644 --- a/lualib/moon/db/mysql.lua +++ b/lualib/moon/db/mysql.lua @@ -7,7 +7,7 @@ -- protocol detail: https://mariadb.com/kb/en/clientserver-protocol/ -local moon = require "moon" +local buffer = require "buffer" local crypt = require("crypt") local socketchannel = require("moon.db.socketchannel") @@ -27,6 +27,7 @@ local tointeger = math.tointeger ---@class mysql ---@field _server_ver string +---@field sockchannel socketchannel local _M = {_VERSION = "0.14"} -- the following charset map is generated from the following mysql query: @@ -783,7 +784,7 @@ end function _M.query(self, query) if type(query) == "userdata" then - query = moon.decode_ref_buffer(query, "Z") + query = buffer.unpack(query) end local querypacket = _compose_query(self, query) local sockchannel = self.sockchannel diff --git a/lualib/moon/db/pg.lua b/lualib/moon/db/pg.lua index 282297498..7b6198bd8 100644 --- a/lualib/moon/db/pg.lua +++ b/lualib/moon/db/pg.lua @@ -174,9 +174,8 @@ local function send_startup_message(self) NULL } - local str = buffer.concat_string(data) - - socket.write(self.sock, buffer.concat_string(encode_int(#str + 4), data)) + local startup_message = buffer.concat_string(data) + socket.write(self.sock, buffer.concat_string(encode_int(#startup_message + 4), startup_message)) end local function parse_error(err_msg) diff --git a/lualib/moon/socket.lua b/lualib/moon/socket.lua index 38628a98a..2f47a8ea6 100644 --- a/lualib/moon/socket.lua +++ b/lualib/moon/socket.lua @@ -12,7 +12,7 @@ local udp = core.udp local unpack_udp = core.unpack_udp local mask_close = 2 -local mask_moon_packed = 32 +local mask_raw = 32 local supported_tcp_protocol = { [moon.PTYPE_SOCKET_TCP] = "tcp", @@ -26,7 +26,7 @@ local supported_tcp_protocol = { ---@class socket : asio local socket = core -socket.mask_moon_packed = mask_moon_packed +socket.mask_raw = mask_raw ---@async ---@param listenfd integer @@ -83,10 +83,12 @@ function socket.write_then_close(fd, data) write(fd, data, mask_close) end +--- This function sends raw network data, bypassing any message encoding. +--- If you need to send data that must be encoded in a specific way, you should encode the data before calling this function. ---@param fd integer ----@param data string|buffer_ptr|buffer_shr_ptr -function socket.write_moon_packed(fd, data) - write(fd, data, mask_moon_packed) +---@param data string|buffer_ptr|buffer_shr_ptr The data to be written. This can be a string, a buffer pointer, or a shared buffer pointer. +function socket.write_raw(fd, data) + write(fd, data, mask_raw) end local socket_data_type = { diff --git a/moon-src/core/config.hpp b/moon-src/core/config.hpp index bd79c552b..e89a46fbf 100644 --- a/moon-src/core/config.hpp +++ b/moon-src/core/config.hpp @@ -41,7 +41,7 @@ namespace moon ws_text = 1 << 2, ws_ping = 1 << 3, ws_pong = 1 << 4, - moon_packed = 1 << 5, + raw = 1 << 5, max_mask }; diff --git a/moon-src/core/network/base_connection.hpp b/moon-src/core/network/base_connection.hpp index 8487c7c5e..3db1bd80a 100644 --- a/moon-src/core/network/base_connection.hpp +++ b/moon-src/core/network/base_connection.hpp @@ -52,13 +52,8 @@ namespace moon return direct_read_result{false, { "Unsupported read operation" } }; }; - virtual bool send(buffer_shr_ptr_t&& data, socket_send_mask mask) + virtual bool send(buffer_shr_ptr_t&& data) { - if (data == nullptr || data->size() == 0) - { - return false; - } - if (!socket_.is_open()) { return false; @@ -76,11 +71,9 @@ namespace moon } } - will_close_ = enum_has_any_bitmask(mask, socket_send_mask::close) ? true : will_close_; - - queue_.push_back(std::move(data)); + will_close_ = data->has_bitmask(socket_send_mask::close) ? true : will_close_; - if (queue_.size() == 1) + if (queue_.push_back(std::move(data)); queue_.size() == 1) { post_send(); } diff --git a/moon-src/core/network/moon_connection.hpp b/moon-src/core/network/moon_connection.hpp index 11c6de63d..ec59063c0 100644 --- a/moon-src/core/network/moon_connection.hpp +++ b/moon-src/core/network/moon_connection.hpp @@ -32,7 +32,7 @@ namespace moon read_header(); } - bool send(buffer_shr_ptr_t&& data, socket_send_mask mask) override + bool send(buffer_shr_ptr_t&& data) override { if (data->size() >= MESSAGE_CONTINUED_FLAG && !enum_has_any_bitmask(flag_, enable_chunked::send)) { @@ -41,7 +41,7 @@ namespace moon }); return false; } - return base_connection_t::send(std::move(data), mask); + return base_connection_t::send(std::move(data)); } void set_enable_chunked(enable_chunked v) @@ -58,7 +58,7 @@ namespace moon size_t size = elm->size(); const char* data = elm->data(); bytes += size; - if (!elm->has_bitmask(socket_send_mask::moon_packed)) { + if (!elm->has_bitmask(socket_send_mask::raw)) { wbuffers_.begin_write_slice(); message_size_t slice_size = 0, header = 0; do diff --git a/moon-src/core/network/socket_server.cpp b/moon-src/core/network/socket_server.cpp index 54e1af8d5..733b42357 100644 --- a/moon-src/core/network/socket_server.cpp +++ b/moon-src/core/network/socket_server.cpp @@ -265,7 +265,7 @@ bool socket_server::write(uint32_t fd, buffer_shr_ptr_t&& data, socket_send_mask if (auto iter = connections_.find(fd); iter != connections_.end()) { data->add_bitmask(mask); - return iter->second->send(std::move(data), mask); + return iter->second->send(std::move(data)); } if (auto iter = udp_.find(fd); iter != udp_.end()) diff --git a/moon-src/core/network/write_buffer.hpp b/moon-src/core/network/write_buffer.hpp index 6c04fc4a0..fd0178f0d 100644 --- a/moon-src/core/network/write_buffer.hpp +++ b/moon-src/core/network/write_buffer.hpp @@ -25,8 +25,9 @@ namespace moon void write_slice(void* padding_data, size_t padding_size, const char* data, size_t len) { auto& space = padding_.emplace_front(); - memcpy(space.data(), padding_data, std::min(space.size(), padding_size)); - buffers_.emplace_back(space.data(), padding_size); + size_t n = std::min(space.size(), padding_size); + memcpy(space.data(), padding_data, n); + buffers_.emplace_back(space.data(), n); if(len>0) buffers_.emplace_back(data, len); } diff --git a/moon-src/core/network/ws_connection.hpp b/moon-src/core/network/ws_connection.hpp index 1ae934a5e..3a78cbe20 100644 --- a/moon-src/core/network/ws_connection.hpp +++ b/moon-src/core/network/ws_connection.hpp @@ -143,13 +143,6 @@ namespace moon } } - bool send(buffer_shr_ptr_t&& data, socket_send_mask mask) override - { - auto payload = encode_frame(data, mask); - base_connection_t::send(std::move(payload), mask); - return base_connection_t::send(std::move(data), mask); - } - private: void read_handshake() { @@ -261,7 +254,8 @@ namespace moon { auto buf = std::make_shared(s.size()); buf->write_back(s.data(), s.size()); - base_connection::send(std::move(buf), will_close?socket_send_mask::close:socket_send_mask::none); + buf->add_bitmask(socket_send_mask::raw|(will_close?socket_send_mask::close:socket_send_mask::none)); + base_connection::send(std::move(buf)); } template @@ -499,11 +493,33 @@ namespace moon return tmp; } - buffer_shr_ptr_t encode_frame(const buffer_shr_ptr_t& data, socket_send_mask send_mask) const + virtual void prepare_send(size_t default_once_send_bytes) override + { + size_t bytes = 0; + size_t queue_size = queue_.size(); + for (size_t i = 0; i < queue_size; ++i) { + const auto& elm = queue_[i]; + if (!elm->has_bitmask(socket_send_mask::raw)){ + buffer payload = encode_frame(elm); + wbuffers_.begin_write_slice(); + wbuffers_.write_slice(payload.data(), payload.size(), elm->data(), elm->size()); + }else{ + wbuffers_.write(elm->data(), elm->size()); + } + + bytes+= elm->size(); + if (bytes >= default_once_send_bytes) + { + break; + } + } + } + + buffer encode_frame(const buffer_shr_ptr_t& data) const { - buffer_shr_ptr_t payload = buffer::make_shared(16); - payload->commit(16); - payload->seek(16); + buffer payload{16}; + payload.commit(16); + payload.seek(16); uint64_t size = data->size(); @@ -515,7 +531,7 @@ namespace moon { d[i] = d[i] ^ mask[i % mask.size()]; } - payload->write_front(mask.data(), mask.size()); + payload.write_front(mask.data(), mask.size()); } uint8_t payload_len = 0; @@ -528,13 +544,13 @@ namespace moon payload_len = static_cast(PAYLOAD_MID_LEN); uint16_t n = (uint16_t)size; moon::host2net(n); - payload->write_front(&n, 1); + payload.write_front(&n, 1); } else { payload_len = static_cast(PAYLOAD_MAX_LEN); moon::host2net(size); - payload->write_front(&size, 1); + payload.write_front(&size, 1); } //messages from the client must be masked @@ -543,24 +559,24 @@ namespace moon payload_len |= 0x80; } - payload->write_front(&payload_len, 1); + payload.write_front(&payload_len, 1); uint8_t opcode = FIN_FRAME_FLAG | static_cast(ws::opcode::binary); - if (enum_has_any_bitmask(send_mask, socket_send_mask::ws_text)) + if (data->has_bitmask(socket_send_mask::ws_text)) { opcode = FIN_FRAME_FLAG | static_cast(ws::opcode::text); } - else if (enum_has_any_bitmask(send_mask, socket_send_mask::ws_ping)) + else if (data->has_bitmask(socket_send_mask::ws_ping)) { opcode = FIN_FRAME_FLAG | static_cast(ws::opcode::ping); } - else if (enum_has_any_bitmask(send_mask, socket_send_mask::ws_pong)) + else if (data->has_bitmask(socket_send_mask::ws_pong)) { opcode = FIN_FRAME_FLAG | static_cast(ws::opcode::pong); } - payload->write_front(&opcode, 1); + payload.write_front(&opcode, 1); return payload; }