From 4319bdf65171bf84221e9b30864d4577995a8293 Mon Sep 17 00:00:00 2001 From: Bruce Date: Thu, 11 Apr 2024 23:20:56 +0800 Subject: [PATCH] Refactor buffer --- build.sh | 0 common/buffer.hpp | 88 ++++-------- example/example_pgsql.lua | 94 +++++++++++++ example/example_tobeclosed.lua | 4 +- lualib-src/lua_buffer.cpp | 26 +--- lualib-src/lua_json.cpp | 14 +- lualib-src/lua_kcp.cpp | 2 +- lualib-src/lua_moon.cpp | 22 +-- lualib-src/lua_serialize.cpp | 3 +- lualib/moon/api/buffer.lua | 64 ++++----- lualib/moon/api/core.lua | 11 +- lualib/moon/db/pg.lua | 8 +- lualib/moon/db/redis.lua | 2 +- lualib/moon/http/websocket.lua | 7 +- moon-src/core/config.hpp | 19 +-- moon-src/core/log.hpp | 2 +- moon-src/core/message.hpp | 23 +--- moon-src/core/network/base_connection.hpp | 69 ++++------ .../core/network/const_buffers_holder.hpp | 10 +- moon-src/core/network/moon_connection.hpp | 98 ++++++-------- moon-src/core/network/socket_server.cpp | 11 +- moon-src/core/network/socket_server.h | 8 +- moon-src/core/network/stream_connection.hpp | 2 +- moon-src/core/network/streambuf.hpp | 2 +- moon-src/core/network/ws_connection.hpp | 52 +++++--- moon-src/core/server.cpp | 1 - moon-src/core/service.hpp | 2 - moon-src/core/worker.cpp | 2 +- moon-src/services/lua_service.cpp | 2 +- service/redisd.lua | 126 ++++++++---------- service/sqldriver.lua | 90 ++++++------- test/core.lua | 29 +--- 32 files changed, 430 insertions(+), 463 deletions(-) mode change 100644 => 100755 build.sh diff --git a/build.sh b/build.sh old mode 100644 new mode 100755 diff --git a/common/buffer.hpp b/common/buffer.hpp index a88269f95..fff0c2019 100644 --- a/common/buffer.hpp +++ b/common/buffer.hpp @@ -144,17 +144,14 @@ namespace moon return *this; } - compressed_pair(size_t cap, uint16_t head) - :headreserved(head) + compressed_pair(size_t cap) { - prepare(cap + head); - readpos = writepos = headreserved; + prepare(cap); + readpos = writepos = 0; } compressed_pair(compressed_pair&& other) noexcept - : flag(std::exchange(other.flag, 0)) - , headreserved(std::exchange(other.headreserved, 0)) - , capacity(std::exchange(other.capacity, 0)) + : capacity(std::exchange(other.capacity, 0)) , readpos(std::exchange(other.readpos, 0)) , writepos(std::exchange(other.writepos, 0)) , data(std::exchange(other.data, nullptr)) @@ -169,8 +166,6 @@ namespace moon { if(nullptr != data) first().deallocate(data, capacity); - flag = std::exchange(other.flag, 0); - headreserved = std::exchange(other.headreserved, 0); capacity = std::exchange(other.capacity, 0); readpos = std::exchange(other.readpos, 0); writepos = std::exchange(other.writepos, 0); @@ -210,7 +205,7 @@ namespace moon return std::pair{ data + writepos, need }; } - if (writeable + readpos < need + headreserved) + if (writeable + readpos < need) { auto required_size = writepos + need; required_size = next_pow2(required_size); @@ -228,21 +223,16 @@ namespace moon size_t readable = writepos - readpos; if (readable != 0) { - assert(readpos >= headreserved); - std::memmove(data + headreserved, data + readpos, readable); + std::memmove(data , data + readpos, readable); } - readpos = headreserved; + readpos = 0; writepos = readpos + readable; } return std::pair{data + writepos, need }; } - uint16_t flag = 0; - uint16_t headreserved = 0; size_t capacity = 0; - //read position size_t readpos = 0; - //write position size_t writepos = 0; pointer data = nullptr; }; @@ -254,9 +244,7 @@ namespace moon using pointer = typename iterator::pointer; using const_pointer = typename const_iterator::pointer; - //websocket header max len 14 bytes. - constexpr static uint16_t DEFAULT_HEAD_RESERVE = 16; - constexpr static size_t DEFAULT_RESERVE = 128 - DEFAULT_HEAD_RESERVE; + static constexpr size_t DEFAULT_CAPACITY = 128; enum class seek_origin { @@ -265,17 +253,12 @@ namespace moon }; base_buffer() - :pair_(DEFAULT_RESERVE, DEFAULT_HEAD_RESERVE) + :pair_(DEFAULT_CAPACITY) { } - base_buffer(size_t reserve) - :pair_(reserve, DEFAULT_HEAD_RESERVE) - { - } - - base_buffer(size_t reserve, uint16_t head_reserve) - :pair_(reserve, head_reserve) + base_buffer(size_t capacity) + :pair_(capacity) { } @@ -298,8 +281,7 @@ namespace moon base_buffer& operator=(base_buffer&& other) = default; base_buffer clone() { - base_buffer b{ pair_.capacity , pair_.headreserved }; - b.set_flag(pair_.flag); + base_buffer b{ pair_.capacity }; b.write_back(data(), size()); return b; } @@ -338,6 +320,7 @@ namespace moon if (n > pair_.readpos) { + assert(false); return false; } @@ -423,26 +406,7 @@ namespace moon void clear() noexcept { - pair_.flag = 0; - pair_.writepos = pair_.readpos = pair_.headreserved; - } - - template - void set_flag(ValueType v) noexcept - { - pair_.flag |= static_cast(v); - } - - template - bool has_flag(ValueType v) const noexcept - { - return ((pair_.flag & static_cast(v)) != 0); - } - - template - void clear_flag(ValueType v) noexcept - { - pair_.flag &= ~static_cast(v); + pair_.writepos = pair_.readpos = 0; } void commit(std::size_t n) noexcept @@ -455,10 +419,17 @@ namespace moon } } - std::pair prepare(size_t need) { + std::pair prepare(size_t need) + { return pair_.prepare(need); } + std::pair writeable() const noexcept + { + size_t writeable = pair_.capacity - pair_.writepos; + return std::pair{ pair_.data + pair_.writepos, writeable }; + } + pointer revert(size_t n) noexcept { assert(pair_.writepos >= (pair_.readpos+n)); @@ -509,11 +480,6 @@ namespace moon { return pair_.capacity; } - - size_t reserved() const noexcept - { - return pair_.headreserved; - } private: compressed_pair pair_; }; @@ -521,13 +487,15 @@ namespace moon #ifdef MOON_ENABLE_MIMALLOC #include "mimalloc.h" +#endif + namespace moon { +#ifdef MOON_ENABLE_MIMALLOC using buffer = base_buffer>; -} #else -namespace moon -{ using buffer = base_buffer>; -} #endif + constexpr size_t BUFFER_OPTION_CHEAP_PREPEND = 16; +} + diff --git a/example/example_pgsql.lua b/example/example_pgsql.lua index 61d9773bf..7c536c73d 100644 --- a/example/example_pgsql.lua +++ b/example/example_pgsql.lua @@ -343,6 +343,98 @@ $$ LANGUAGE plpgsql; -- select * from public.userdata where key='level' AND (value)::int = 100; end +local function test_sql_driver() + local sqldriver = require("service.sqldriver") + local db = moon.new_service { + unique = true, + name = "db_game", + file = "../service/sqldriver.lua", + provider = "moon.db.pg", + threadid = 2, + poolsize = 5, + opts = db_config + } + + assert(db>0) + + local fn_sql = [[ + CREATE OR REPLACE FUNCTION update_userdata(pk integer, VARIADIC key_values text[]) RETURNS void AS $$ +DECLARE + i integer; +BEGIN + FOR i IN 1..array_length(key_values, 1) BY 2 LOOP + INSERT INTO userdata(uid, key, value) VALUES (pk, key_values[i], key_values[i+1]::json) ON CONFLICT (uid, key) DO UPDATE SET value = excluded.value::json; + END LOOP; +END; +$$ LANGUAGE plpgsql; + ]] + + local sql = string.format([[ + --create userdata table + drop table if exists userdata; + create table userdata ( + uid bigint, + key text, + value jsonb, + CONSTRAINT pk_userdata PRIMARY KEY (uid, key) + ); + ]]) + + print(1) + sqldriver.query(db, sql) + print(2) + + local res = sqldriver.query(db, fn_sql) + assert(not res.code, res.message) + + ---@class User + ---@field name string + ---@field age number + ---@field level number + ---@field exp number + local user = { + name = "hello", + age = 10, + level = 99, + exp = 11, + info1 = simple_json_field, + info2 = simple_json_field, + } + + local key_value_model = require "key_value_model" + + ---@type User Description + local user = key_value_model.new("userdata", 233, user) + + user.age = 101 + user.level = 100 + user.info1.cc = 200 + user.info2.cc = 300 + user.name = "hello world" + + local sql = key_value_model.modifyed(user) + + -- print(sql) + local bt = moon.clock() + + local res + for m = 1, 100 do + res = sqldriver.query(db, sql) + assert(not res.code, res.message) + end + + print("test_key_value_table_function insert cost", (moon.clock() - bt)/100) + + sql = string.format([[ + --select userdata + select key, value from userdata where uid = %d; + ]], 233) + local res = sqldriver.query(db, sql) + local data = res.data + + moon.send("lua", db, "save_then_quit") +end + moon.async(function() test_json_query() test_big_json_value() @@ -352,6 +444,8 @@ moon.async(function() test_key_value_table_function() + test_sql_driver() + moon.exit(-1) end) diff --git a/example/example_tobeclosed.lua b/example/example_tobeclosed.lua index f61347d4c..de2c04dee 100644 --- a/example/example_tobeclosed.lua +++ b/example/example_tobeclosed.lua @@ -32,10 +32,10 @@ moon.async(function() end) moon.async(function() local a = new_test("session_id_coroutine_call") - moon.call("lua", moon.id) + local _ = moon.call("lua", moon.id) end) moon.async(function() - moon.call("lua", moon.id) + local _ = moon.call("lua", moon.id) end) moon.sleep(100) moon.async(function() diff --git a/lualib-src/lua_buffer.cpp b/lualib-src/lua_buffer.cpp index a4cf932be..be74ba55a 100644 --- a/lualib-src/lua_buffer.cpp +++ b/lualib-src/lua_buffer.cpp @@ -241,23 +241,6 @@ static int prepare(lua_State* L) return 0; } -static int has_flag(lua_State* L) -{ - auto buf = get_pointer(L, 1); - auto flag = static_cast(luaL_checkinteger(L, 2)); - bool res = buf->has_flag(flag); - lua_pushboolean(L, res ? 1 : 0); - return 1; -} - -static int set_flag(lua_State* L) -{ - auto buf = get_pointer(L, 1); - auto flag = static_cast(luaL_checkinteger(L, 2)); - buf->set_flag(flag); - return 0; -} - static int unsafe_delete(lua_State* L) { auto buf = get_pointer(L, 1); @@ -267,9 +250,8 @@ static int unsafe_delete(lua_State* L) static int unsafe_new(lua_State* L) { - size_t capacity = static_cast(luaL_optinteger(L, 1, buffer::DEFAULT_RESERVE)); - uint16_t headreserved = static_cast(luaL_optinteger(L, 2, buffer::DEFAULT_HEAD_RESERVE)); - buffer* buf = new buffer(capacity, headreserved); + size_t capacity = static_cast(luaL_optinteger(L, 1, buffer::DEFAULT_CAPACITY)); + buffer* buf = new buffer{capacity}; lua_pushlightuserdata(L, buf); return 1; } @@ -282,11 +264,13 @@ static int concat(lua_State* L) return 0; } auto buf = new buffer{}; + buf->commit(BUFFER_OPTION_CHEAP_PREPEND); try { for (int i = 1; i <= n; i++) { concat_one(L, buf, i, 0); } + buf->seek(BUFFER_OPTION_CHEAP_PREPEND); lua_pushlightuserdata(L, buf); return 1; } @@ -335,8 +319,6 @@ extern "C" { , {"seek", seek} , {"commit", commit} , {"prepare", prepare} - , {"has_flag", has_flag} - , {"set_flag", set_flag} , {"concat",concat } , {"concat_string",concat_string } , {NULL, NULL} diff --git a/lualib-src/lua_json.cpp b/lualib-src/lua_json.cpp index d17640432..32093c597 100644 --- a/lualib-src/lua_json.cpp +++ b/lualib-src/lua_json.cpp @@ -76,7 +76,6 @@ struct json_config { bool enable_number_key = true; bool enable_sparse_array = false; size_t concat_buffer_size = DEFAULT_CONCAT_BUFFER_SIZE; - uint16_t concat_buffer_head_size = buffer::DEFAULT_HEAD_RESERVE; }; static int json_destroy_config(lua_State *L) @@ -136,11 +135,8 @@ static int json_options(lua_State* L) case "concat_buffer_size"_csh: { auto concat_buffer_size = cfg->concat_buffer_size; - auto concat_buffer_head_size = cfg->concat_buffer_head_size; cfg->concat_buffer_size = static_cast(luaL_checkinteger(L, 2)); - cfg->concat_buffer_head_size = static_cast(luaL_checkinteger(L, 3)); lua_pushinteger(L, static_cast(concat_buffer_size)); - lua_pushinteger(L, static_cast(concat_buffer_head_size)); break; } default: @@ -597,8 +593,10 @@ static int concat(lua_State* L) { size_t size; const char* sz = lua_tolstring(L, -1, &size); - auto buf = new moon::buffer{size}; + auto buf = new moon::buffer{ BUFFER_OPTION_CHEAP_PREPEND + size }; + buf->commit(BUFFER_OPTION_CHEAP_PREPEND); buf->write_back(sz, size); + buf->seek(BUFFER_OPTION_CHEAP_PREPEND); lua_pushlightuserdata(L, buf); return 1; } @@ -609,7 +607,8 @@ static int concat(lua_State* L) json_config* cfg = json_fetch_config(L); - auto buf = new moon::buffer(cfg->concat_buffer_size, cfg->concat_buffer_head_size); + auto buf = new moon::buffer(cfg->concat_buffer_size); + buf->commit(BUFFER_OPTION_CHEAP_PREPEND); try { int array_size = (int)lua_rawlen(L, 1); @@ -653,6 +652,7 @@ static int concat(lua_State* L) } lua_pop(L, 1); } + buf->seek(BUFFER_OPTION_CHEAP_PREPEND); lua_pushlightuserdata(L, buf); return 1; } @@ -746,7 +746,7 @@ static int concat_resp(lua_State* L) json_config* cfg = json_fetch_config(L); - auto buf = new moon::buffer(cfg->concat_buffer_size, cfg->concat_buffer_head_size); + auto buf = new moon::buffer(cfg->concat_buffer_size); try { int64_t hash = 1; diff --git a/lualib-src/lua_kcp.cpp b/lualib-src/lua_kcp.cpp index ad7413d3f..b2d7ddaeb 100644 --- a/lualib-src/lua_kcp.cpp +++ b/lualib-src/lua_kcp.cpp @@ -10,7 +10,7 @@ struct box int64_t session = 0; size_t readn = 0; std::queue wqueue; - moon::buffer rbuf = moon::buffer{8192,0}; + moon::buffer rbuf = moon::buffer{8192}; }; static int udp_output(const char* buf, int len, ikcpcb*, void* user) { diff --git a/lualib-src/lua_moon.cpp b/lualib-src/lua_moon.cpp index 8ccf6b014..63dd5d9e8 100644 --- a/lualib-src/lua_moon.cpp +++ b/lualib-src/lua_moon.cpp @@ -395,6 +395,12 @@ static int message_decode(lua_State* L) lua_pushlightuserdata(L, m->as_buffer()); break; } + case 'L': + { + auto buf = m->into_buffer(); + lua_pushlightuserdata(L, buf.release()); + break; + } case 'C': { buffer* buf = m->as_buffer(); @@ -435,15 +441,15 @@ static int message_redirect(lua_State* L) static int ref_buffer(lua_State* L) { - message* m = (message*)lua_touserdata(L, 1); - if (nullptr == m) - return luaL_argerror(L, 1, "lightuserdata(message*) expected"); + buffer* b = (buffer*)lua_touserdata(L, 1); + if (nullptr == b) + return luaL_argerror(L, 1, "lightuserdata(buffer*) expected"); - if (m->data() == nullptr) { + if (b->size() == 0) { return 0; } - auto *p = new moon::buffer_shr_ptr_t{ m->into_buffer() }; + auto *p = new moon::buffer_shr_ptr_t{ b }; lua_pushlightuserdata(L, p); return 1; } @@ -645,12 +651,12 @@ static int lasio_write(lua_State* L) lua_service* S = lua_service::get(L); auto& sock = S->get_worker()->socket_server(); uint32_t fd = (uint32_t)luaL_checkinteger(L, 1); - int flag = (int)luaL_optinteger(L, 3, 0); - if (flag!=0 && (flag<=0 || flag >=(int)moon::buffer_flag::buffer_flag_max)) + int mask = (int)luaL_optinteger(L, 3, 0); + if (mask !=0 && (mask <0 || mask >=(int)moon::socket_send_mask::max_mask)) { return luaL_error(L, "asio.write param 'flag' invalid"); } - bool ok = sock.write(fd, moon_to_shr_buffer(L, 2), (moon::buffer_flag)flag); + bool ok = sock.write(fd, moon_to_shr_buffer(L, 2), (moon::socket_send_mask)mask); lua_pushboolean(L, ok ? 1 : 0); return 1; } diff --git a/lualib-src/lua_serialize.cpp b/lualib-src/lua_serialize.cpp index 8e139dab1..0fa9f2695 100644 --- a/lualib-src/lua_serialize.cpp +++ b/lualib-src/lua_serialize.cpp @@ -528,8 +528,7 @@ static int peek_one(lua_State* L) if (seek) { - assert(!buf->has_flag(buffer_flag::broadcast)); - buf->seek(static_cast(buf->size() - br.size())); + buf->seek(buf->size() - br.size()); } lua_pushlightuserdata(L, (void*)br.data()); lua_pushinteger(L, static_cast(br.size())); diff --git a/lualib/moon/api/buffer.lua b/lualib/moon/api/buffer.lua index 4fefc1289..396560dbb 100644 --- a/lualib/moon/api/buffer.lua +++ b/lualib/moon/api/buffer.lua @@ -1,21 +1,24 @@ ---@meta error("DO NOT REQUIRE THIS FILE") - ---- Buffer's memory layout : head part + data part. Avoid memory copy when we write ---- data first and need write data's length at head part. - +--- Represents a `Cpp Buffer` object, which is not managed by `Lua GC`. +--- This is often used for data transmission between Lua and Cpp layers. +--- This object can be used as an argument for `moon.raw_send` or `socket.write`, +--- and it will be automatically released. Otherwise, `buffer.drop` should be used to release it. ---@class buffer local buffer = {} ----创建一个不受`Lua GC`管理的`C++ Buffer`对象, 可以作为`moon.raw_send`或者`socket.write`的参数, 会自动管理该对象的生命周期。否则应该使用`buffer.delete`释放它。 ----@param capacity? integer @ Buffer的初始容量, 默认值 `240`。 ----@param headreserved? integer @ Buffer头部的预留空间, 默认值 `16`。 +--- Creates a `Cpp Buffer` object that is not managed by `Lua GC`. +--- This object can be used as an argument for `moon.raw_send` or `socket.write`, +--- and it will be automatically released. Otherwise, `buffer.drop` should be used to release it. +---@param capacity? integer @ The initial capacity of the Buffer, default value is `128`. ---@return buffer_ptr -function buffer.unsafe_new(capacity, headreserved) end +function buffer.unsafe_new(capacity) end +--- Releases a `Cpp Buffer` object. ---@param buf buffer_ptr function buffer.delete(buf) end +--- Clears the data in the buffer. ---@param buf buffer_ptr function buffer.clear(buf) end @@ -24,16 +27,20 @@ function buffer.clear(buf) end ---@return integer function buffer.size(buf) end ---- Get buffer's subytes or unpack subytes to integer ---- - buffer.unpack(buf, i, count) ---- - buffer.unpack(buf, fmt, i) ----@param buf buffer_ptr ----@param fmt? string @ like string.unpack but only support '>','<','h','H','i','I' ----@param i? integer @ start pos ----@param count? integer @ ----@return string | any ----@overload fun(buf:buffer_ptr, i:integer, count?:integer) -function buffer.unpack(buf, fmt, i, count) end +--- buffer.unpack(buf, pos, count) returns a portion of the buffer data. +--- The optional parameter `pos` (default is 0) marks where to start reading from the buffer, +--- and `count` indicates how much data to read. +--- +--- buffer.unpack(buf, fmt, pos) unpacks the buffer data according to the `fmt` format. +--- The optional parameter `pos` (default is 0) marks where to start reading from the buffer. +--- +--- @param buf buffer_ptr +--- @param fmt? string @ like string.unpack but only supports '>', '<', 'h', 'H', 'i', 'I' +--- @param pos? integer @ start position +--- @param count? integer @ number of elements to read +--- @return string | any +--- @overload fun(buf:buffer_ptr, pos:integer, count?:integer) +function buffer.unpack(buf, fmt, pos, count) end --- Read n bytes from buffer ---@param buf buffer_ptr @@ -51,36 +58,29 @@ function buffer.write_front(buf, ...) end ---@param str string function buffer.write_back(buf, str) end ---- Seek buffer's read pos +--- Moves the read position of the buffer ---@param buf buffer_ptr ---@param pos integer ---@param origin? integer @ Seek's origin, Current:1, Begin:0, default 1 function buffer.seek(buf, pos, origin) end ---- Offset buffer's write pos +--- Moves the write position of the buffer forward. ---@param buf buffer_ptr ---@param n integer function buffer.commit(buf, n) end ----Ensures that the buffer can accommodate n characters,reallocating character array objects as necessary. +--- Ensures that the buffer can accommodate n characters,reallocating character array objects as necessary. ---@param buf buffer_ptr ---@param n integer function buffer.prepare(buf, n) end ----@param buf buffer_ptr ----@param k integer ----@return boolean -function buffer.has_flag(buf, k) end - ----@param buf buffer_ptr ----@param k integer -function buffer.set_flag(buf, k) end - ---- All params are strings or numbers and return buffer_ptr(lightuserdata type, avoid creating Lua GC objects): `param1..param2 ··· ..paramN`. +--- Converts the parameters to a string and saves it in the buffer, +--- then returns a lightuserdata. This is often used for data transmission between Lua and Cpp layers, +--- to avoid creating Lua GC objects. ---@return buffer_ptr function buffer.concat(...) end ---- All params are strings or numbers and return string: `param1..param2 ··· ..paramN`. +--- Converts the parameters to a string. ---@return string function buffer.concat_string(...) end diff --git a/lualib/moon/api/core.lua b/lualib/moon/api/core.lua index 256e48ece..d9eb867e8 100644 --- a/lualib/moon/api/core.lua +++ b/lualib/moon/api/core.lua @@ -106,7 +106,8 @@ function core.now() end --- - 'E' message:sessionid() --- - 'Z' message:bytes() --- - 'N' message:size() ---- - 'B' message:buffer() +--- - 'B' message:get_buffer() +--- - 'L' return buffer_ptr,leak buffer ownership from message --- - 'C' message:buffer():data() and message:buffer():size() ---@param msg message_ptr ---@param pattern string @@ -114,7 +115,7 @@ function core.now() end ---@nodiscard function core.decode(msg, pattern) end ---- buffer_shr_ptr 's field +--- Get buffer_shr_ptr 's field --- --- - 'Z' to lua string --- - 'N' buffer size @@ -126,10 +127,10 @@ function core.decode(msg, pattern) end ---@nodiscard function core.decode_ref_buffer(msg, pattern) end ---- convert message's buffer to buffer_shr_ptr, and alloc lightuserdata hold it ----@param msg message_ptr +--- Converts a buffer_ptr into a buffer_shr_ptr. The buffer_shr_ptr is responsible for managing the lifecycle of the buffer_ptr, often used for reusing the same buffer_ptr. +---@param buf buffer_ptr ---@return buffer_shr_ptr -function core.ref_buffer(msg) end +function core.ref_buffer(buf) end --- release buffer_shr_ptr ---@param p buffer_shr_ptr diff --git a/lualib/moon/db/pg.lua b/lualib/moon/db/pg.lua index 530183a9a..457c3b87f 100644 --- a/lualib/moon/db/pg.lua +++ b/lualib/moon/db/pg.lua @@ -150,8 +150,7 @@ end local function send_message(self, t, data) local buf = concat(data) - local len = bsize(buf) - wfront(buf, t, strpack(">I", len+4)) + wfront(buf, t, strpack(">I", bsize(buf)+4)) socket.write(self.sock, buf) end @@ -484,9 +483,8 @@ local function format_query_result(row_desc, data_rows, command_complete) end function pg.pack_query_buffer(buf) - wback(buf, "\0") - local len = bsize(buf) - wfront(buf, MSG_TYPE.query, strpack(">I", len+4)) + wback(buf, NULL) + wfront(buf, MSG_TYPE.query, strpack(">I", bsize(buf)+4)) end ---@param sql buffer_shr_ptr|string diff --git a/lualib/moon/db/redis.lua b/lualib/moon/db/redis.lua index a1584c7b5..f43c0dcaf 100644 --- a/lualib/moon/db/redis.lua +++ b/lualib/moon/db/redis.lua @@ -167,7 +167,7 @@ local function request(fd, req, res, israw) if israw then socket.write_ref_buffer(fd, req) else - socket.write(fd, buffer.concat(req)) + socket.write(fd, buffer.concat(req)) end if not res then return true diff --git a/lualib/moon/http/websocket.lua b/lualib/moon/http/websocket.lua index 410c4a568..135702699 100644 --- a/lualib/moon/http/websocket.lua +++ b/lualib/moon/http/websocket.lua @@ -5,9 +5,10 @@ local internal = require("moon.http.internal") local WS_MAGICKEY = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" -local flag_ws_text = 16 -local flag_ws_ping = 32 -local flag_ws_pong = 64 +--- config.hpp: socket_send_mask +local flag_ws_text = 4 +local flag_ws_ping = 8 +local flag_ws_pong = 16 --- PTYPE_SOCKET_WS wscallbacks local wscallbacks = {} diff --git a/moon-src/core/config.hpp b/moon-src/core/config.hpp index 1e5b67541..7f44f5653 100644 --- a/moon-src/core/config.hpp +++ b/moon-src/core/config.hpp @@ -34,17 +34,20 @@ namespace moon stopped }; - enum class buffer_flag :uint8_t + enum class socket_send_mask :uint8_t { none = 0, - pack_size = 1 << 0, close = 1 << 1, - chunked = 1 << 2, - broadcast = 1 << 3, - ws_text = 1 << 4, - ws_ping = 1 << 5, - ws_pong = 1 << 6, - buffer_flag_max, + ws_text = 1 << 2, + ws_ping = 1 << 3, + ws_pong = 1 << 4, + max_mask + }; + + template<> + struct enum_enable_bitmask_operators + { + static constexpr bool enable = true; }; enum class socket_data_type :std::uint8_t diff --git a/moon-src/core/log.hpp b/moon-src/core/log.hpp index f30fee3f4..9e881bdce 100644 --- a/moon-src/core/log.hpp +++ b/moon-src/core/log.hpp @@ -94,7 +94,7 @@ namespace moon enable_stdout = enable_stdout_ ? enable_stdout : enable_stdout_; - auto line = buffer{ (datasize>0)?(64 + datasize):256, 0}; + auto line = buffer{ (datasize>0)?(64 + datasize):128 }; auto it = line.begin(); *(it++) = static_cast(enable_stdout); *(it++) = static_cast(level); diff --git a/moon-src/core/message.hpp b/moon-src/core/message.hpp index a1bca7987..1fcaf7701 100644 --- a/moon-src/core/message.hpp +++ b/moon-src/core/message.hpp @@ -16,13 +16,8 @@ namespace moon { } - explicit message(size_t reserve) - :data_(buffer::make_unique(reserve)) - { - } - - message(size_t reserve, uint16_t head_reserve) - :data_(buffer::make_unique(reserve, head_reserve)) + explicit message(size_t capacity) + :data_(buffer::make_unique(capacity)) { } @@ -100,20 +95,6 @@ namespace moon return type_; } - bool broadcast() const - { - return (data_ && data_->has_flag(buffer_flag::broadcast)); - } - - void set_broadcast(bool v) - { - if (!data_) - { - return; - } - v ? data_->set_flag(buffer_flag::broadcast) : data_->clear_flag(buffer_flag::broadcast); - } - void write_data(std::string_view s) { assert(data_); diff --git a/moon-src/core/network/base_connection.hpp b/moon-src/core/network/base_connection.hpp index 33549bfb9..13afb7cd2 100644 --- a/moon-src/core/network/base_connection.hpp +++ b/moon-src/core/network/base_connection.hpp @@ -48,10 +48,10 @@ namespace moon CONSOLE_ERROR("Unsupported read operation for PTYPE %d", (int)type_); asio::post(socket_.get_executor(), [this, self = shared_from_this()] { error(make_error_code(error::invalid_read_operation)); - }); + }); }; - virtual bool send(buffer_shr_ptr_t&& data) + virtual bool send(buffer_shr_ptr_t&& data, socket_send_mask mask) { if (data == nullptr || data->size() == 0) { @@ -70,15 +70,17 @@ namespace moon { asio::post(socket_.get_executor(), [this, self = shared_from_this()]() { error(make_error_code(moon::error::send_queue_too_big)); - }); + }); return false; } } + will_close_ = enum_has_any_bitmask(mask, socket_send_mask::close) ? true : will_close_; + bool write_in_progress = !queue_.empty(); queue_.emplace_back(std::move(data)); - if(!write_in_progress) + if (!write_in_progress) { post_send(); } @@ -137,7 +139,7 @@ namespace moon { asio::post(socket_.get_executor(), [this, self = shared_from_this()]() { error(make_error_code(moon::error::read_timeout)); - }); + }); return; } return; @@ -182,25 +184,16 @@ namespace moon return address; } protected: - virtual void message_slice(const_buffers_holder& holder, const buffer_shr_ptr_t& buf) + virtual void prepare_send(const_buffers_holder& holder, const buffer_shr_ptr_t& buf) { - (void)holder; - (void)buf; + holder.push_back(buf->data(), buf->size()); } void post_send() { for (const auto& buf : queue_) { - if (buf->has_flag(buffer_flag::chunked)) - { - message_slice(holder_, buf); - } - else - { - holder_.push_back(buf->data(), buf->size(), buf->has_flag(buffer_flag::close)); - } - + prepare_send(holder_, buf); if (holder_.size() >= const_buffers_holder::max_count) { break; @@ -211,37 +204,30 @@ namespace moon socket_, make_buffers_ref(holder_.buffers()), [this, self = shared_from_this()](const asio::error_code& e, std::size_t) - { - if (!e) { - if (holder_.close()) + if (e) { - if (parent_ != nullptr) - { - parent_->close(fd_); - parent_ = nullptr; - } + error(e); + return; } - else + + for (size_t i = 0; i < holder_.count(); ++i) { - for (size_t i = 0; i < holder_.count(); ++i) - { - queue_.pop_front(); - } + queue_.pop_front(); + } - holder_.clear(); + holder_.clear(); - if(!queue_.empty()) - { - post_send(); - } + if (!queue_.empty()) + { + post_send(); } - } - else - { - error(e); - } - }); + else if (will_close_ && parent_ != nullptr) + { + parent_->close(fd_); + parent_ = nullptr; + } + }); } virtual void error(const asio::error_code& e, const std::string& additional = "") @@ -285,6 +271,7 @@ namespace moon } } protected: + bool will_close_ = false; role role_ = role::none; uint32_t fd_ = 0; time_t recvtime_ = 0; diff --git a/moon-src/core/network/const_buffers_holder.hpp b/moon-src/core/network/const_buffers_holder.hpp index 7275b9d8b..a3be23f3e 100644 --- a/moon-src/core/network/const_buffers_holder.hpp +++ b/moon-src/core/network/const_buffers_holder.hpp @@ -12,9 +12,8 @@ namespace moon const_buffers_holder() = default; - void push_back(const char* data, size_t len, bool close) + void push_back(const char* data, size_t len) { - close_ = close ? true : close_; buffers_.emplace_back(data, len); ++count_; } @@ -51,18 +50,11 @@ namespace moon void clear() { - close_ = false; count_ = 0; buffers_.clear(); headers_.clear(); } - - bool close() const - { - return close_; - } private: - bool close_ = false; size_t count_ = 0; std::vector buffers_; std::forward_list headers_; diff --git a/moon-src/core/network/moon_connection.hpp b/moon-src/core/network/moon_connection.hpp index 66fe17d10..f123c7f63 100644 --- a/moon-src/core/network/moon_connection.hpp +++ b/moon-src/core/network/moon_connection.hpp @@ -31,31 +31,16 @@ namespace moon read_header(); } - bool send(buffer_shr_ptr_t&& data) override + bool send(buffer_shr_ptr_t&& data, socket_send_mask mask) override { - if (!data->has_flag(buffer_flag::pack_size)) + if (data->size() >= MESSAGE_CONTINUED_FLAG && !enum_has_any_bitmask(flag_, enable_chunked::send)) { - if (data->size() >= MESSAGE_CONTINUED_FLAG) - { - if (!enum_has_any_bitmask(flag_, enable_chunked::send)) - { - asio::post(socket_.get_executor() , [this, self= shared_from_this()]() { - error(make_error_code(moon::error::write_message_too_big)); - }); - return false; - } - data->set_flag(buffer_flag::chunked); - } - else - { - message_size_t size = static_cast(data->size()); - host2net(size); - [[maybe_unused]] bool res = data->write_front(&size, 1); - MOON_ASSERT(res, "tcp::send write front failed"); - data->set_flag(buffer_flag::pack_size); - } + asio::post(socket_.get_executor(), [this, self = shared_from_this()]() { + error(make_error_code(moon::error::write_message_too_big)); + }); + return false; } - return base_connection_t::send(std::move(data)); + return base_connection_t::send(std::move(data), mask); } void set_enable_chunked(enable_chunked v) @@ -63,7 +48,7 @@ namespace moon flag_ = v; } protected: - void message_slice(const_buffers_holder& holder, const buffer_shr_ptr_t& buf) override + void prepare_send(const_buffers_holder& holder, const buffer_shr_ptr_t& buf) override { size_t total = buf->size(); const char* p = buf->data(); @@ -74,10 +59,11 @@ namespace moon header = size = (total >= MESSAGE_CONTINUED_FLAG) ? MESSAGE_CONTINUED_FLAG : static_cast(total); host2net(header); holder.push_slice(header, p, size); + assert(holder.size() < 100); total -= size; p += size; } while (total != 0); - if(size == MESSAGE_CONTINUED_FLAG) + if (size == MESSAGE_CONTINUED_FLAG) holder.push_slice(0, nullptr, 0);//end flag } @@ -85,52 +71,56 @@ namespace moon { header_ = 0; asio::async_read(socket_, asio::buffer(&header_, sizeof(header_)), - [this, self = shared_from_this()](const asio::error_code& e, std::size_t) - { - if (e) + [this, self = shared_from_this()](const asio::error_code& e, std::size_t) { - error(e); - return; - } + if (e) + { + error(e); + return; + } - net2host(header_); + net2host(header_); - bool fin = (header_ != MESSAGE_CONTINUED_FLAG); - if (!fin && !enum_has_any_bitmask(flag_, enable_chunked::receive)) { - error(make_error_code(moon::error::read_message_too_big)); - return; - } + bool fin = (header_ != MESSAGE_CONTINUED_FLAG); + if (!fin && !enum_has_any_bitmask(flag_, enable_chunked::receive)) { + error(make_error_code(moon::error::read_message_too_big)); + return; + } - read_body(header_, fin); - }); + read_body(header_, fin); + }); } void read_body(message_size_t size, bool fin) { if (nullptr == buf_) - buf_ = buffer::make_unique(fin ? size : static_cast(5) * size); + { + buf_ = buffer::make_unique((fin ? size : static_cast(5) * size) + BUFFER_OPTION_CHEAP_PREPEND); + buf_->commit(BUFFER_OPTION_CHEAP_PREPEND); + } auto space = buf_->prepare(size); asio::async_read(socket_, asio::buffer(space.first, space.second), - [this, self = shared_from_this(), fin](const asio::error_code& e, std::size_t bytes_transferred) - { - if (e) + [this, self = shared_from_this(), fin](const asio::error_code& e, std::size_t bytes_transferred) { - error(e); - return; - } + if (e) + { + error(e); + return; + } - buf_->commit(static_cast(bytes_transferred)); - if (fin) - { - auto m = message{std::move(buf_)}; - m.set_receiver(static_cast(socket_data_type::socket_recv)); - handle_message(std::move(m)); - } + buf_->commit(static_cast(bytes_transferred)); + if (fin) + { + buf_->seek(BUFFER_OPTION_CHEAP_PREPEND); + auto m = message{ std::move(buf_) }; + m.set_receiver(static_cast(socket_data_type::socket_recv)); + handle_message(std::move(m)); + } - read_header(); - }); + read_header(); + }); } protected: diff --git a/moon-src/core/network/socket_server.cpp b/moon-src/core/network/socket_server.cpp index 941d6ef6e..943069268 100644 --- a/moon-src/core/network/socket_server.cpp +++ b/moon-src/core/network/socket_server.cpp @@ -261,15 +261,14 @@ void socket_server::read(uint32_t fd, uint32_t owner, size_t n, std::string_view }); } -bool socket_server::write(uint32_t fd, buffer_shr_ptr_t&& data, buffer_flag flag) +bool socket_server::write(uint32_t fd, buffer_shr_ptr_t&& data, socket_send_mask mask) { if (nullptr == data || 0 == data->size()) return false; if (auto iter = connections_.find(fd); iter != connections_.end()) { - data->set_flag(flag); - return iter->second->send(std::move(data)); + return iter->second->send(std::move(data), mask); } if (auto iter = udp_.find(fd); iter != udp_.end()) @@ -578,7 +577,11 @@ void socket_server::do_receive(const udp_context_ptr_t& ctx) auto buf = ctx->msg.as_buffer(); buf->clear(); - auto space = buf->prepare(udp_context::READ_BUFFER_SIZE); + //reserve addr_v6_size bytes for address + buf->commit(addr_v6_size); + buf->seek(addr_v6_size, buffer::seek_origin::Begin); + + auto space = buf->writeable(); ctx->sock.async_receive_from( asio::buffer(space.first, space.second), ctx->from_ep, [this, ctx](std::error_code ec, std::size_t bytes_recvd) diff --git a/moon-src/core/network/socket_server.h b/moon-src/core/network/socket_server.h index fc1ba971d..6ab2020eb 100644 --- a/moon-src/core/network/socket_server.h +++ b/moon-src/core/network/socket_server.h @@ -54,17 +54,17 @@ namespace moon struct udp_context { - static constexpr size_t READ_BUFFER_SIZE = size_t{ 2048 } - addr_v6_size; + static constexpr size_t READ_BUFFER_SIZE = 2048; udp_context(uint32_t o, asio::io_context& ioc, udp::endpoint ep) :owner(o) - , msg(READ_BUFFER_SIZE, static_cast(addr_v6_size)) + , msg(READ_BUFFER_SIZE) , sock(ioc, ep) { } udp_context(uint32_t o, asio::io_context& ioc) :owner(o) - , msg(READ_BUFFER_SIZE, static_cast(addr_v6_size)) + , msg(READ_BUFFER_SIZE) , sock(ioc, udp::endpoint(udp::v4(), 0)) { } @@ -102,7 +102,7 @@ namespace moon void read(uint32_t fd, uint32_t owner, size_t n, std::string_view delim, int64_t sessionid); - bool write(uint32_t fd, buffer_shr_ptr_t&& data, buffer_flag flag = buffer_flag::none); + bool write(uint32_t fd, buffer_shr_ptr_t&& data, socket_send_mask mask = socket_send_mask::none); bool close(uint32_t fd); diff --git a/moon-src/core/network/stream_connection.hpp b/moon-src/core/network/stream_connection.hpp index 25e7109a9..fddf2e8a9 100644 --- a/moon-src/core/network/stream_connection.hpp +++ b/moon-src/core/network/stream_connection.hpp @@ -12,7 +12,7 @@ namespace moon template , stream_connection>...>, int> = 0> explicit stream_connection(Args&&... args) :base_connection_t(std::forward(args)...) - ,response_(8192,0) + ,response_(8192) { } diff --git a/moon-src/core/network/streambuf.hpp b/moon-src/core/network/streambuf.hpp index 82d5595c5..fd21630fd 100644 --- a/moon-src/core/network/streambuf.hpp +++ b/moon-src/core/network/streambuf.hpp @@ -52,7 +52,7 @@ namespace moon std::size_t capacity() const noexcept { if (nullptr == buffer_) return 0; - return buffer_->capacity() - buffer_->reserved(); + return buffer_->capacity(); } const_buffers_type data() const noexcept diff --git a/moon-src/core/network/ws_connection.hpp b/moon-src/core/network/ws_connection.hpp index 8c4f5c0f8..8e053f8c8 100644 --- a/moon-src/core/network/ws_connection.hpp +++ b/moon-src/core/network/ws_connection.hpp @@ -141,17 +141,20 @@ namespace moon } } - bool send(buffer_shr_ptr_t&& data) override + bool send(buffer_shr_ptr_t&& data, socket_send_mask mask) override { - encode_frame(data); - return base_connection_t::send(std::move(data)); + auto payload = encode_frame(data, mask); + base_connection_t::send(std::move(payload), mask); + return base_connection_t::send(std::move(data), mask); } protected: asio::mutable_buffer prepare_buffer(size_t size) { - if (nullptr == recv_buf_) - recv_buf_ = buffer::make_unique(size); + if (nullptr == recv_buf_){ + recv_buf_ = buffer::make_unique(size + BUFFER_OPTION_CHEAP_PREPEND); + recv_buf_->commit(BUFFER_OPTION_CHEAP_PREPEND); + } auto space = recv_buf_->prepare(size); return asio::buffer(space.first, space.second); } @@ -288,15 +291,11 @@ namespace moon return std::error_code(); } - void send_response(const std::string& s, bool bclose = false) + void send_response(const std::string& s, bool will_close = false) { auto buf = std::make_shared(s.size()); buf->write_back(s.data(), s.size()); - if (bclose) - { - buf->set_flag(buffer_flag::close); - } - base_connection::send(std::move(buf)); + base_connection::send(std::move(buf), will_close?socket_send_mask::close:socket_send_mask::none); } bool handle_frame() @@ -472,12 +471,16 @@ namespace moon message msg = message::with_empty(); if (recv_buf_->size()==reallen) { + recv_buf_->seek(BUFFER_OPTION_CHEAP_PREPEND); msg = message{ std::move(recv_buf_) }; } else { - msg = message{ reallen }; - msg.as_buffer()->write_back(recv_buf_->data(), reallen); + msg = message{ reallen + BUFFER_OPTION_CHEAP_PREPEND }; + auto b = msg.as_buffer(); + b->commit(BUFFER_OPTION_CHEAP_PREPEND); + b->write_back(recv_buf_->data(), reallen); + b->seek(BUFFER_OPTION_CHEAP_PREPEND); recv_buf_->consume(reallen); } @@ -524,8 +527,12 @@ namespace moon return tmp; } - void encode_frame(const buffer_shr_ptr_t& data) const + buffer_shr_ptr_t encode_frame(const buffer_shr_ptr_t& data, socket_send_mask send_mask) const { + buffer_shr_ptr_t payload = buffer::make_shared(16); + payload->commit(16); + payload->seek(16); + uint64_t size = data->size(); if (role_ == role::client) @@ -536,7 +543,7 @@ namespace moon { d[i] = d[i] ^ mask[i % mask.size()]; } - data->write_front(mask.data(), mask.size()); + payload->write_front(mask.data(), mask.size()); } uint8_t payload_len = 0; @@ -549,13 +556,13 @@ namespace moon payload_len = static_cast(PAYLOAD_MID_LEN); uint16_t n = (uint16_t)size; moon::host2net(n); - data->write_front(&n, 1); + payload->write_front(&n, 1); } else { payload_len = static_cast(PAYLOAD_MAX_LEN); moon::host2net(size); - data->write_front(&size, 1); + payload->write_front(&size, 1); } //messages from the client must be masked @@ -564,24 +571,25 @@ namespace moon payload_len |= 0x80; } - data->write_front(&payload_len, 1); + payload->write_front(&payload_len, 1); uint8_t opcode = FIN_FRAME_FLAG | static_cast(ws::opcode::binary); - if (data->has_flag(buffer_flag::ws_text)) + if (enum_has_any_bitmask(send_mask, socket_send_mask::ws_text)) { opcode = FIN_FRAME_FLAG | static_cast(ws::opcode::text); } - else if (data->has_flag(buffer_flag::ws_ping)) + else if (enum_has_any_bitmask(send_mask, socket_send_mask::ws_ping)) { opcode = FIN_FRAME_FLAG | static_cast(ws::opcode::ping); } - else if (data->has_flag(buffer_flag::ws_pong)) + else if (enum_has_any_bitmask(send_mask, socket_send_mask::ws_pong)) { opcode = FIN_FRAME_FLAG | static_cast(ws::opcode::pong); } - data->write_front(&opcode, 1); + payload->write_front(&opcode, 1); + return payload; } static std::string hash_key(std::string_view seckey) diff --git a/moon-src/core/server.cpp b/moon-src/core/server.cpp index 3a8fb39ac..c949a40d8 100644 --- a/moon-src/core/server.cpp +++ b/moon-src/core/server.cpp @@ -267,7 +267,6 @@ namespace moon for (auto& w : workers_) { auto m = message{ std::make_unique(buf->clone()) }; - m.set_broadcast(true); m.set_sender(sender); m.set_type(type); w->send(std::move(m)); diff --git a/moon-src/core/service.hpp b/moon-src/core/service.hpp index 5867da101..82663caa5 100644 --- a/moon-src/core/service.hpp +++ b/moon-src/core/service.hpp @@ -104,13 +104,11 @@ namespace moon template inline void handle_message(Service&& s, Message&& m) { - uint32_t receiver = m.receiver(); s->dispatch(&m); //redirect message if (m.receiver() != receiver) { - MOON_ASSERT(!m.broadcast(), "can not redirect broadcast message"); if constexpr (std::is_rvalue_reference_v) { s->get_server()->send_message(std::forward(m)); diff --git a/moon-src/core/worker.cpp b/moon-src/core/worker.cpp index 87b42ec59..446dfe107 100644 --- a/moon-src/core/worker.cpp +++ b/moon-src/core/worker.cpp @@ -242,7 +242,7 @@ namespace moon uint32_t receiver = msg.receiver(); uint8_t type = msg.type(); - if (msg.broadcast()) + if (receiver == 0) { for (auto& it : services_) { diff --git a/moon-src/services/lua_service.cpp b/moon-src/services/lua_service.cpp index 71593cfc0..d54c20e1a 100644 --- a/moon-src/services/lua_service.cpp +++ b/moon-src/services/lua_service.cpp @@ -170,7 +170,7 @@ void lua_service::dispatch(message* m) if (!ok()) return; - //require ‘moon’ first + //require 'moon' first assert(cb_ctx != nullptr); lua_State* L = cb_ctx->L; diff --git a/service/redisd.lua b/service/redisd.lua index 902228fad..22c0b9510 100644 --- a/service/redisd.lua +++ b/service/redisd.lua @@ -25,22 +25,22 @@ if conf.name then if not db and auto_reconnect then moon.sleep(1000) end - until(not auto_reconnect or db) + until (not auto_reconnect or db) return db, err end ---@param msg buffer_shr_ptr - local function exec_one(db, msg , sender, sessionid, opt) + local function exec_one(db, msg, sender, sessionid, opt) local reconnect_times = 1 - local auto_reconnect = sessionid==0 + local auto_reconnect = sessionid == 0 if auto_reconnect then reconnect_times = -1 end repeat - local err,res + local err, res if not db then db, err = connect(conf.opts, auto_reconnect) if not db then @@ -56,8 +56,8 @@ if conf.name then if opt == "Q" then res, err = redis.raw_send(db, msg) else - local t = {moon.unpack(moon.decode_ref_buffer(msg,"C"))} - res, err =db[t[1]](db, table.unpack(t, 2)) + local t = { moon.unpack(moon.decode_ref_buffer(msg, "C")) } + res, err = db[t[1]](db, table.unpack(t, 2)) end if redis.socket_error == res then @@ -96,7 +96,7 @@ if conf.name then local free_all = function(one) for k, req in pairs(one.queue) do - if type(req[1])=="userdata" then + if type(req[1]) == "userdata" then release(req[1]) end end @@ -105,18 +105,20 @@ if conf.name then local pool = {} - for _=1,db_pool_size do - local one = setmetatable({queue = list.new(), running = false, db = false},{ + for _ = 1, db_pool_size do + local one = setmetatable({ queue = list.new(), running = false, db = false }, { __gc = free_all - }) - tbinsert(pool,one) + }) + tbinsert(pool, one) end - local function docmd(hash, sender, sessionid, msg, opt) - hash = hash%db_pool_size + 1 + local function docmd(sender, sessionid, args) + local opt = args[1] + local hash = args[2] + hash = hash % db_pool_size + 1 --print(moon.name, "db hash", hash) local ctx = pool[hash] - list.push(ctx.queue, {clone(msg), sender, sessionid, opt}) + list.push(ctx.queue, { clone(args[3]), sender, sessionid, opt }) if ctx.running then return end @@ -128,8 +130,8 @@ if conf.name then if not req then break end - local iscall = req[3]~=0 - local ok,db = xpcall(exec_one, traceback, ctx.db, req[1], req[2], req[3], req[4]) + local iscall = req[3] ~= 0 + local ok, db = xpcall(exec_one, traceback, ctx.db, req[1], req[2], req[3], req[4]) if not ok then if ctx.db then ctx.db:disconnect() @@ -156,12 +158,34 @@ if conf.name then function command.len() local res = {} - for _,v in ipairs(pool) do - res[#res+1] = list.size(v.queue) + for _, v in ipairs(pool) do + res[#res + 1] = list.size(v.queue) end return res end + function command.save_then_quit() + moon.async(function() + while true do + local all = true + for _, v in ipairs(pool) do + if list.front(v.queue) then + all = false + print("wait_all_send", _, list.size(v.queue)) + break + end + end + if not all then + moon.sleep(1000) + else + break + end + end + + moon.quit() + end) + end + local function xpcall_ret(ok, ...) if ok then return moon.pack(...) @@ -169,12 +193,14 @@ if conf.name then return moon.pack(false, ...) end - moon.raw_dispatch('lua',function(msg) - local sender, sessionid, buf = moon.decode(msg, "SEB") - local cmd, sz, len = seri.unpack_one(buf, true) + moon.raw_dispatch('lua', function(msg) + local sender, sessionid, sz, len = moon.decode(msg, "SEC") + + local args = { moon.unpack(sz, len) } + + local cmd = args[1] if cmd == "Q" or cmd == "D" then - local hash = seri.unpack_one(buf, true) - docmd(hash, sender, sessionid, msg, cmd) + docmd(sender, sessionid, args) return end @@ -182,81 +208,43 @@ if conf.name then if fn then moon.async(function() if sessionid == 0 then - fn(sender, sessionid, buf, msg) + fn(table.unpack(args, 2)) else if sessionid ~= 0 then - local unsafe_buf = xpcall_ret(xpcall(fn, debug.traceback, moon.unpack(sz, len))) + local unsafe_buf = xpcall_ret(xpcall(fn, debug.traceback, table.unpack(args, 2))) moon.raw_send("lua", sender, unsafe_buf, sessionid) end end end) else - moon.error(moon.name, "recv unknown cmd "..tostring(cmd)) + moon.error(moon.name, "recv unknown cmd " .. tostring(cmd)) end end) - - local function wait_all_send() - while true do - local all = true - for _,v in ipairs(pool) do - if list.front(v.queue) then - all = false - print("wait_all_send", _, list.size(v.queue)) - break - end - end - - if not all then - moon.sleep(1000) - else - return - end - end - end - - moon.system("wait_save", function() - moon.async(function() - wait_all_send() - moon.quit() - end) - end) else local json = require("json") - local buffer = require("buffer") + local concat_resp = json.concat_resp + ---@class redis_client local client = {} - local raw_send = moon.raw_send - local packstr = seri.packs - - local wfront = buffer.write_front - --- - if success return value same as redis commands.see http://www.redis.cn/commands/hgetall.html --- - if failed return false and error message. --- redisd.call(redis_db, "GET", "HELLO") function client.call(db, ...) local buf, hash = concat_resp(...) - if not wfront(buf, packstr("Q", hash)) then - error("buffer has no front space") - end - return moon.wait(raw_send("lua", db, buf, moon.next_sequence())) + return moon.call("lua", db, "Q", hash, buf) end function client.direct(db, cmd, ...) - local buf = seri.pack("D", 1, cmd, ...) - return moon.wait(raw_send("lua", db, buf, moon.next_sequence())) + return moon.call("lua", db, "D", 1, seri.pack(cmd, ...)) end --- redisd.send(redis_db, "SET", "HELLO", "WORLD") function client.send(db, ...) local buf, hash = concat_resp(...) - if not wfront(buf, packstr("Q", hash)) then - error("buffer has no front space") - end - raw_send("lua", db, buf) + moon.send("lua", db, "Q", hash, buf) end return client end - diff --git a/service/sqldriver.lua b/service/sqldriver.lua index 3be69ebbe..c646f8acc 100644 --- a/service/sqldriver.lua +++ b/service/sqldriver.lua @@ -77,11 +77,12 @@ if conf.name then tbinsert(pool,one) end - local function execute(sql, hash, sender, sessionid) + local function execute(sender, sessionid, args) + local hash = args[2] hash = hash%db_pool_size + 1 --print(moon.name, "db hash", hash, db_pool_size) local ctx = pool[hash] - list.push(ctx.queue, {clone(sql), sender, sessionid}) + list.push(ctx.queue, {clone(args[3]), sender, sessionid}) if ctx.running then return end @@ -121,6 +122,30 @@ if conf.name then return res end + function command.save_then_quit() + moon.async(function() + + while true do + local all = true + for _,v in ipairs(pool) do + if list.front(v.queue) then + all = false + print("wait_all_send", _, list.size(v.queue)) + break + end + end + + if not all then + moon.sleep(1000) + else + break + end + end + + moon.quit() + end) + end + local function xpcall_ret(ok, ...) if ok then return moon.pack(...) @@ -129,22 +154,24 @@ if conf.name then end moon.raw_dispatch('lua', function(msg) - local sender, sessionid, buf = moon.decode(msg, "SEB") - local cmd, sz, len = unpack_one(buf, true) + local sender, sessionid, sz, len = moon.decode(msg, "SEC") + + local args = { moon.unpack(sz, len) } + + local cmd = args[1] if cmd == "Q" then - local hash = unpack_one(buf, true) - provider.pack_query_buffer(buf) - execute(msg, hash, sender, sessionid) + provider.pack_query_buffer(args[3]) + execute(sender, sessionid, args) return end local fn = command[cmd] if fn then if sessionid == 0 then - fn(sender, sessionid, moon.unpack(sz, len)) + fn(sender, sessionid, table.unpack(args, 2)) else moon.async(function() - local unsafe_buf = xpcall_ret(xpcall(fn, debug.traceback, moon.unpack(sz, len))) + local unsafe_buf = xpcall_ret(xpcall(fn, debug.traceback, table.unpack(args, 2))) moon.raw_send("lua", sender, unsafe_buf, sessionid) end) end @@ -152,58 +179,19 @@ if conf.name then moon.error(moon.name, "recv unknown cmd "..tostring(cmd)) end end) - - local function wait_all_send() - while true do - local all = true - for _,v in ipairs(pool) do - if list.front(v.queue) then - all = false - print("wait_all_send", _, list.size(v.queue)) - break - end - end - - if not all then - moon.sleep(1000) - else - return - end - end - end - - moon.system("wait_save", function() - moon.async(function() - wait_all_send() - moon.quit() - end) - end) else local client = {} local json = require("json") - local buffer = require("buffer") - local wfront = buffer.write_front - local raw_send = moon.raw_send - local packstr = seri.packs + local concat = json.concat function client.execute(db, sql, hash) - hash = hash or 1 - local buf = concat(sql) - if not wfront(buf, packstr("Q", hash)) then - error("buffer has no front space") - end - raw_send("lua", db, buf) + moon.send("lua", db, "Q", hash or 1, concat(sql)) end function client.query(db, sql, hash) - hash = hash or 1 - local buf = concat(sql) - if not wfront(buf, packstr("Q", hash)) then - error("buffer has no front space") - end - return moon.wait(raw_send("lua", db, buf, moon.next_sequence())) + return moon.call("lua", db, "Q", hash or 1, concat(sql)) end return client diff --git a/test/core.lua b/test/core.lua index 7674c9ac1..d491e24dd 100644 --- a/test/core.lua +++ b/test/core.lua @@ -110,45 +110,26 @@ do do - local buf = buffer.unsafe_new(256, 8) - buffer.write_back(buf, "12345") - assert(buffer.read(buf, 5) == "12345") - buffer.write_back(buf, "abcde") - assert(buffer.read(buf, 5) == "abcde") - assert(buffer.size(buf) == 0) - - for i = 1, 1000 do - buffer.write_back(buf, "abcde") - end + local buf = buffer.unsafe_new(256) + buffer.commit(buf, 8) --reserve head space + buffer.seek(buf, 8) assert(not buffer.write_front(buf, "123456789")) buffer.write_front(buf, "12345678") assert(buffer.read(buf, 8) == "12345678") - buffer.seek(buf, 1) - assert(buffer.read(buf, 1) == "b") - - buffer.delete(buf) - end - - do - local buf = buffer.unsafe_new(256) - buffer.write_back(buf, "12345") + buffer.write_back(buf, "12345") assert(buffer.read(buf, 5) == "12345") buffer.write_back(buf, "abcde") assert(buffer.read(buf, 5) == "abcde") assert(buffer.size(buf) == 0) for i = 1, 1000 do - buffer.write_back(buf, "abcde") + buffer.write_back(buf, "abcde") -- realloc end - buffer.write_front(buf, "1000") - - assert(buffer.read(buf, 4) == "1000") - buffer.seek(buf, 1) assert(buffer.read(buf, 1) == "b")