Skip to content

Commit

Permalink
Add Support for Socket Send Without Encoding Message
Browse files Browse the repository at this point in the history
  • Loading branch information
sniper00 committed May 22, 2024
1 parent af981f2 commit e341b23
Show file tree
Hide file tree
Showing 9 changed files with 59 additions and 47 deletions.
5 changes: 3 additions & 2 deletions lualib/moon/db/mysql.lua
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
-- protocol detail: https://mariadb.com/kb/en/clientserver-protocol/


local moon = require "moon"
local buffer = require "buffer"
local crypt = require("crypt")
local socketchannel = require("moon.db.socketchannel")

Expand All @@ -27,6 +27,7 @@ local tointeger = math.tointeger

---@class mysql
---@field _server_ver string
---@field sockchannel socketchannel
local _M = {_VERSION = "0.14"}

-- the following charset map is generated from the following mysql query:
Expand Down Expand Up @@ -783,7 +784,7 @@ end

function _M.query(self, query)
if type(query) == "userdata" then
query = moon.decode_ref_buffer(query, "Z")
query = buffer.unpack(query)
end
local querypacket = _compose_query(self, query)
local sockchannel = self.sockchannel
Expand Down
5 changes: 2 additions & 3 deletions lualib/moon/db/pg.lua
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,8 @@ local function send_startup_message(self)
NULL
}

local str = buffer.concat_string(data)

socket.write(self.sock, buffer.concat_string(encode_int(#str + 4), data))
local startup_message = buffer.concat_string(data)
socket.write(self.sock, buffer.concat_string(encode_int(#startup_message + 4), startup_message))
end

local function parse_error(err_msg)
Expand Down
12 changes: 7 additions & 5 deletions lualib/moon/socket.lua
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ local udp = core.udp
local unpack_udp = core.unpack_udp

local mask_close<const> = 2
local mask_moon_packed<const> = 32
local mask_raw<const> = 32

local supported_tcp_protocol = {
[moon.PTYPE_SOCKET_TCP] = "tcp",
Expand All @@ -26,7 +26,7 @@ local supported_tcp_protocol = {
---@class socket : asio
local socket = core

socket.mask_moon_packed = mask_moon_packed
socket.mask_raw = mask_raw

---@async
---@param listenfd integer
Expand Down Expand Up @@ -83,10 +83,12 @@ function socket.write_then_close(fd, data)
write(fd, data, mask_close)
end

--- This function sends raw network data, bypassing any message encoding.
--- If you need to send data that must be encoded in a specific way, you should encode the data before calling this function.
---@param fd integer
---@param data string|buffer_ptr|buffer_shr_ptr
function socket.write_moon_packed(fd, data)
write(fd, data, mask_moon_packed)
---@param data string|buffer_ptr|buffer_shr_ptr The data to be written. This can be a string, a buffer pointer, or a shared buffer pointer.
function socket.write_raw(fd, data)
write(fd, data, mask_raw)
end

local socket_data_type = {
Expand Down
2 changes: 1 addition & 1 deletion moon-src/core/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ namespace moon
ws_text = 1 << 2,
ws_ping = 1 << 3,
ws_pong = 1 << 4,
moon_packed = 1 << 5,
raw = 1 << 5,
max_mask
};

Expand Down
13 changes: 3 additions & 10 deletions moon-src/core/network/base_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,8 @@ namespace moon
return direct_read_result{false, { "Unsupported read operation" } };
};

virtual bool send(buffer_shr_ptr_t&& data, socket_send_mask mask)
virtual bool send(buffer_shr_ptr_t&& data)
{
if (data == nullptr || data->size() == 0)
{
return false;
}

if (!socket_.is_open())
{
return false;
Expand All @@ -76,11 +71,9 @@ namespace moon
}
}

will_close_ = enum_has_any_bitmask(mask, socket_send_mask::close) ? true : will_close_;

queue_.push_back(std::move(data));
will_close_ = data->has_bitmask(socket_send_mask::close) ? true : will_close_;

if (queue_.size() == 1)
if (queue_.push_back(std::move(data)); queue_.size() == 1)
{
post_send();
}
Expand Down
6 changes: 3 additions & 3 deletions moon-src/core/network/moon_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ namespace moon
read_header();
}

bool send(buffer_shr_ptr_t&& data, socket_send_mask mask) override
bool send(buffer_shr_ptr_t&& data) override
{
if (data->size() >= MESSAGE_CONTINUED_FLAG && !enum_has_any_bitmask(flag_, enable_chunked::send))
{
Expand All @@ -41,7 +41,7 @@ namespace moon
});
return false;
}
return base_connection_t::send(std::move(data), mask);
return base_connection_t::send(std::move(data));
}

void set_enable_chunked(enable_chunked v)
Expand All @@ -58,7 +58,7 @@ namespace moon
size_t size = elm->size();
const char* data = elm->data();
bytes += size;
if (!elm->has_bitmask(socket_send_mask::moon_packed)) {
if (!elm->has_bitmask(socket_send_mask::raw)) {
wbuffers_.begin_write_slice();
message_size_t slice_size = 0, header = 0;
do
Expand Down
2 changes: 1 addition & 1 deletion moon-src/core/network/socket_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ bool socket_server::write(uint32_t fd, buffer_shr_ptr_t&& data, socket_send_mask
if (auto iter = connections_.find(fd); iter != connections_.end())
{
data->add_bitmask(mask);
return iter->second->send(std::move(data), mask);
return iter->second->send(std::move(data));
}

if (auto iter = udp_.find(fd); iter != udp_.end())
Expand Down
5 changes: 3 additions & 2 deletions moon-src/core/network/write_buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ namespace moon
void write_slice(void* padding_data, size_t padding_size, const char* data, size_t len)
{
auto& space = padding_.emplace_front();
memcpy(space.data(), padding_data, std::min(space.size(), padding_size));
buffers_.emplace_back(space.data(), padding_size);
size_t n = std::min(space.size(), padding_size);
memcpy(space.data(), padding_data, n);
buffers_.emplace_back(space.data(), n);
if(len>0)
buffers_.emplace_back(data, len);
}
Expand Down
56 changes: 36 additions & 20 deletions moon-src/core/network/ws_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,6 @@ namespace moon
}
}

bool send(buffer_shr_ptr_t&& data, socket_send_mask mask) override
{
auto payload = encode_frame(data, mask);
base_connection_t::send(std::move(payload), mask);
return base_connection_t::send(std::move(data), mask);
}

private:
void read_handshake()
{
Expand Down Expand Up @@ -261,7 +254,8 @@ namespace moon
{
auto buf = std::make_shared<buffer>(s.size());
buf->write_back(s.data(), s.size());
base_connection::send(std::move(buf), will_close?socket_send_mask::close:socket_send_mask::none);
buf->add_bitmask(socket_send_mask::raw|(will_close?socket_send_mask::close:socket_send_mask::none));
base_connection::send(std::move(buf));
}

template <typename T>
Expand Down Expand Up @@ -499,11 +493,33 @@ namespace moon
return tmp;
}

buffer_shr_ptr_t encode_frame(const buffer_shr_ptr_t& data, socket_send_mask send_mask) const
virtual void prepare_send(size_t default_once_send_bytes) override
{
size_t bytes = 0;
size_t queue_size = queue_.size();
for (size_t i = 0; i < queue_size; ++i) {
const auto& elm = queue_[i];
if (!elm->has_bitmask(socket_send_mask::raw)){
buffer payload = encode_frame(elm);
wbuffers_.begin_write_slice();
wbuffers_.write_slice(payload.data(), payload.size(), elm->data(), elm->size());
}else{
wbuffers_.write(elm->data(), elm->size());
}

bytes+= elm->size();
if (bytes >= default_once_send_bytes)
{
break;
}
}
}

buffer encode_frame(const buffer_shr_ptr_t& data) const
{
buffer_shr_ptr_t payload = buffer::make_shared(16);
payload->commit(16);
payload->seek(16);
buffer payload{16};
payload.commit(16);
payload.seek(16);

uint64_t size = data->size();

Expand All @@ -515,7 +531,7 @@ namespace moon
{
d[i] = d[i] ^ mask[i % mask.size()];
}
payload->write_front(mask.data(), mask.size());
payload.write_front(mask.data(), mask.size());
}

uint8_t payload_len = 0;
Expand All @@ -528,13 +544,13 @@ namespace moon
payload_len = static_cast<uint8_t>(PAYLOAD_MID_LEN);
uint16_t n = (uint16_t)size;
moon::host2net(n);
payload->write_front(&n, 1);
payload.write_front(&n, 1);
}
else
{
payload_len = static_cast<uint8_t>(PAYLOAD_MAX_LEN);
moon::host2net(size);
payload->write_front(&size, 1);
payload.write_front(&size, 1);
}

//messages from the client must be masked
Expand All @@ -543,24 +559,24 @@ namespace moon
payload_len |= 0x80;
}

payload->write_front(&payload_len, 1);
payload.write_front(&payload_len, 1);

uint8_t opcode = FIN_FRAME_FLAG | static_cast<uint8_t>(ws::opcode::binary);

if (enum_has_any_bitmask(send_mask, socket_send_mask::ws_text))
if (data->has_bitmask(socket_send_mask::ws_text))
{
opcode = FIN_FRAME_FLAG | static_cast<uint8_t>(ws::opcode::text);
}
else if (enum_has_any_bitmask(send_mask, socket_send_mask::ws_ping))
else if (data->has_bitmask(socket_send_mask::ws_ping))
{
opcode = FIN_FRAME_FLAG | static_cast<uint8_t>(ws::opcode::ping);
}
else if (enum_has_any_bitmask(send_mask, socket_send_mask::ws_pong))
else if (data->has_bitmask(socket_send_mask::ws_pong))
{
opcode = FIN_FRAME_FLAG | static_cast<uint8_t>(ws::opcode::pong);
}

payload->write_front(&opcode, 1);
payload.write_front(&opcode, 1);
return payload;
}

Expand Down

0 comments on commit e341b23

Please sign in to comment.