Skip to content

Commit

Permalink
Optimizing asynchronous event registration
Browse files Browse the repository at this point in the history
  • Loading branch information
sniper00 committed Apr 1, 2024
1 parent f32bad5 commit ed25237
Show file tree
Hide file tree
Showing 25 changed files with 308 additions and 375 deletions.
Empty file modified build.sh
100755 → 100644
Empty file.
2 changes: 1 addition & 1 deletion example/example_redis.lua
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ local function run_example()

local redisd = require("redisd")
--初始化服务配置
local db_conf= {host = "127.0.0.1", port = 6379, timeout = 1000}
local db_conf= {host = HOST, port = PORT, timeout = 1000}

local redis_db

Expand Down
4 changes: 1 addition & 3 deletions example/node2.lua
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ local function run_recv_message()
local tcount = 0
local bt = 0
command.COUNTER = function(t)
-- print(...)
count = count + 1
if bt == 0 then
bt = t
Expand Down Expand Up @@ -113,8 +112,7 @@ local function init(node_conf)
server_ok = true

-- cluster服务开启监听端口
print(moon.call("lua", moon.queryservice("cluster"), "Listen"))

print("start cluster Listen", moon.call("lua", moon.queryservice("cluster"), "Listen"))
run_recv_message()
end)

Expand Down
4 changes: 2 additions & 2 deletions lualib-src/lua_kcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

struct box
{
int32_t session = 0;
int64_t session = 0;
size_t readn = 0;
std::queue<std::string> wqueue;
moon::buffer rbuf = moon::buffer{8192,0};
Expand Down Expand Up @@ -132,7 +132,7 @@ static int lua_ikcp_read(lua_State* L) {
box* ud = (box*)kcp->user;
if (ud->session != 0)
return luaL_error(L, "already has a read request!");
ud->session = (int32_t)luaL_checkinteger(L, 2);
ud->session = (int64_t)luaL_checkinteger(L, 2);
auto n = luaL_checkinteger(L, 3);
if(n<=0)
return luaL_error(L, "invalid read size!");
Expand Down
99 changes: 60 additions & 39 deletions lualib-src/lua_moon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,10 @@ static int lmoon_timeout(lua_State* L)
{
lua_service* S = lua_service::get(L);
int64_t interval = luaL_checkinteger(L, 1);
uint32_t timerid = static_cast<uint32_t>(luaL_checkinteger(L, 2));
S->get_server()->timeout(interval, S->id(), timerid);
return 0;
int64_t timer_id = S->next_sequence();
S->get_server()->timeout(interval, S->id(), -timer_id);
lua_pushinteger(L, timer_id);
return 1;
}

static int lmoon_log(lua_State* L)
Expand Down Expand Up @@ -167,17 +168,21 @@ static int lmoon_cpu(lua_State* L)

static int lmoon_send(lua_State* L)
{
lua_service* S = lua_service::get(L);

uint8_t type = (uint8_t)luaL_checkinteger(L, 1);
luaL_argcheck(L, type > 0, 1, "PTYPE must > 0");

uint32_t receiver = (uint32_t)luaL_checkinteger(L, 2);
luaL_argcheck(L, receiver > 0, 2, "receiver must > 0");

int32_t sessionid = (int32_t)luaL_checkinteger(L, 3);
int64_t session = luaL_opt(L, luaL_checkinteger, 4, S->next_sequence());

lua_service* S = lua_service::get(L);
S->get_server()->send(S->id(), receiver, moon_to_buffer(L, 4), sessionid, type);
return 0;
S->get_server()->send(S->id(), receiver, moon_to_buffer(L, 3), session, type);

lua_pushinteger(L, session);
lua_pushinteger(L, receiver);
return 2;
}

static void table_tostring(std::string& res, lua_State* L, int index)
Expand Down Expand Up @@ -232,48 +237,50 @@ static void table_tostring(std::string& res, lua_State* L, int index)

static int lmoon_new_service(lua_State* L)
{
luaL_checktype(L, 2, LUA_TTABLE);
luaL_checktype(L, 1, LUA_TTABLE);

lua_service* S = lua_service::get(L);
std::unique_ptr<moon::service_conf> conf = std::make_unique<moon::service_conf>();

conf->creator = S->id();
conf->session = (int32_t)luaL_checkinteger(L, 1);
conf->name = lua_opt_field<std::string>(L, 2, "name");
conf->type = lua_opt_field<std::string>(L, 2, "stype", "lua");
conf->source = lua_opt_field<std::string>(L, 2, "file");
conf->memlimit = lua_opt_field<size_t>(L, 2, "memlimit", std::numeric_limits<size_t>::max());
conf->unique = lua_opt_field<bool>(L, 2, "unique", false);
conf->threadid = lua_opt_field<uint32_t>(L, 2, "threadid", 0);
int64_t session = S->next_sequence();
conf->session = session;
conf->name = lua_opt_field<std::string>(L, 1, "name");
conf->type = lua_opt_field<std::string>(L, 1, "stype", "lua");
conf->source = lua_opt_field<std::string>(L, 1, "file");
conf->memlimit = lua_opt_field<size_t>(L, 1, "memlimit", std::numeric_limits<size_t>::max());
conf->unique = lua_opt_field<bool>(L, 1, "unique", false);
conf->threadid = lua_opt_field<uint32_t>(L, 1, "threadid", 0);

auto path = S->get_server()->get_env("PATH");
if(path)
conf->params.append(*path);
conf->params.append("return ");
table_tostring(conf->params, L, 2);
table_tostring(conf->params, L, 1);

S->get_server()->new_service(std::move(conf));
return 0;
lua_pushinteger(L, session);
return 1;
}

static int lmoon_kill(lua_State* L)
{
lua_service* S = lua_service::get(L);
uint32_t serviceid = (uint32_t)luaL_checkinteger(L, 1);
int32_t sessionid = (int32_t)luaL_optinteger(L, 2, 0);
if (S->id() == serviceid)
S->ok(false);
S->get_server()->remove_service(serviceid, S->id(), sessionid);
S->get_server()->remove_service(serviceid, S->id(), 0);
return 0;
}

static int lmoon_scan_services(lua_State* L)
{
lua_service* S = lua_service::get(L);
uint32_t workerid = (uint32_t)luaL_checkinteger(L, 1);
int32_t sessionid = (int32_t)luaL_checkinteger(L, 2);
int64_t sessionid = S->next_sequence();
S->get_server()->scan_services(S->id(), workerid, sessionid);
return 0;
lua_pushinteger(L, sessionid);
return 1;
}

static int lmoon_queryservice(lua_State* L)
Expand All @@ -285,6 +292,13 @@ static int lmoon_queryservice(lua_State* L)
return 1;
}

static int lmoon_next_sequence(lua_State* L)
{
lua_service* S = lua_service::get(L);
lua_pushinteger(L, S->next_sequence());
return 1;
}

static int lmoon_env(lua_State* L)
{
lua_service* S = lua_service::get(L);
Expand Down Expand Up @@ -380,12 +394,12 @@ static int message_decode(lua_State* L)
}
case 'B':
{
lua_pushlightuserdata(L, m->get_buffer());
lua_pushlightuserdata(L, m->as_buffer());
break;
}
case 'C':
{
buffer* buf = m->get_buffer();
buffer* buf = m->as_buffer();
if (nullptr == buf)
{
lua_pushlightuserdata(L, nullptr);
Expand Down Expand Up @@ -416,7 +430,7 @@ static int message_redirect(lua_State* L)
if (top > 3)
{
m->set_sender((uint32_t)luaL_checkinteger(L, 4));
m->set_sessionid((int32_t)luaL_checkinteger(L, 5));
m->set_sessionid(luaL_checkinteger(L, 5));
}
return 0;
}
Expand Down Expand Up @@ -513,6 +527,7 @@ extern "C" {
{ "kill", lmoon_kill},
{ "scan_services", lmoon_scan_services},
{ "queryservice", lmoon_queryservice},
{ "next_sequence", lmoon_next_sequence},
{ "env", lmoon_env},
{ "server_stats", lmoon_server_stats},
{ "exit", lmoon_exit},
Expand Down Expand Up @@ -579,46 +594,52 @@ static int lasio_accept(lua_State* L)
lua_service* S = lua_service::get(L);
auto& sock = S->get_worker()->socket_server();
uint32_t fd = (uint32_t)luaL_checkinteger(L, 1);
int32_t sessionid = (int32_t)luaL_checkinteger(L, 2);
uint32_t owner = (uint32_t)luaL_checkinteger(L, 3);
bool ok = sock.accept(fd, sessionid, owner);
lua_pushboolean(L, ok ? 1 : 0);
uint32_t owner = (uint32_t)luaL_checkinteger(L, 2);
int64_t session = luaL_opt(L, luaL_checkinteger, 3, S->next_sequence());
if(!sock.accept(fd, session, owner)){
lua_pushboolean(L, 0);
lua_pushfstring(L, "socket.accept error: fd(%I) not open or not found.", fd);
return 2;
}
lua_pushinteger(L, session);
return 1;
}

static int lasio_connect(lua_State* L)
{
lua_service* S = lua_service::get(L);

auto& sock = S->get_worker()->socket_server();
std::string host = lua_check<std::string>(L, 1);
uint16_t port = (uint16_t)luaL_checkinteger(L, 2);
uint8_t type = (uint8_t)luaL_checkinteger(L, 3);
int32_t sessionid = (int32_t)luaL_checkinteger(L, 4);
uint32_t timeout = (uint32_t)luaL_checkinteger(L, 5);
uint32_t fd = sock.connect(host, port, S->id(), type, sessionid, timeout);
lua_pushinteger(L, fd);
uint32_t timeout = (uint32_t)luaL_checkinteger(L, 4);
int64_t session = S->next_sequence();
sock.connect(host, port, S->id(), type, session, timeout);
lua_pushinteger(L, session);
return 1;
}

static int lasio_read(lua_State* L)
{
lua_service* S = lua_service::get(L);
int64_t session = S->next_sequence();
auto& sock = S->get_worker()->socket_server();
uint32_t fd = (uint32_t)luaL_checkinteger(L, 1);
int32_t sessionid = (int32_t)luaL_checkinteger(L, 2);
int64_t size = 0;
std::string_view delim;
if (lua_type(L, 3) == LUA_TNUMBER)
if (lua_type(L, 2) == LUA_TNUMBER)
{
size = (int64_t)luaL_checkinteger(L, 3);
size = (int64_t)luaL_checkinteger(L, 2);
}
else
{
delim = lua_check<std::string_view>(L, 3);
size = (int64_t)luaL_optinteger(L, 4, 0);
delim = lua_check<std::string_view>(L, 2);
size = (int64_t)luaL_optinteger(L, 3, 0);
}
sock.read(fd, S->id(), size, delim, sessionid);
return 0;
sock.read(fd, S->id(), size, delim, session);
lua_pushinteger(L, session);
return 1;
}

static int lasio_write(lua_State* L)
Expand Down
Loading

0 comments on commit ed25237

Please sign in to comment.