diff --git a/example/call_benchmark.lua b/example/call_benchmark.lua index b1a187698..11b0f9af4 100644 --- a/example/call_benchmark.lua +++ b/example/call_benchmark.lua @@ -7,28 +7,24 @@ local onecount = 1000 if arg and arg.runner then if arg.type == "receiver" then - moon.dispatch("lua", function(msg, unpack) - local sender, session, sz, len = moon.decode(msg, "SEC") - moon.response("lua", sender, session, unpack(sz, len)) + moon.dispatch("lua", function(sender, session, ...) + moon.response("lua", sender, session, ...) end) return else - moon.dispatch("lua", function(msg, unpack) - moon.async(function() - for i=1,onecount do - local ok,err= moon.co_call("lua", arg.target, "hello") - assert(ok, err) - end - moon.send("lua", arg.main, moon.clock()) - end) + moon.dispatch("lua", function(sender, session, ...) + for i=1,onecount do + local ok,err= moon.co_call("lua", arg.target, "hello") + assert(ok, err) + end + moon.send("lua", arg.main, moon.clock()) end) end else local counter = 0 local stime = 0 local endtime = 0 - moon.dispatch("lua", function(msg, unpack) - local etime = unpack(moon.decode(msg, "C")) + moon.dispatch("lua", function(sender, session, etime) if etime > endtime then endtime = etime end diff --git a/example/example_callback.lua b/example/example_callback.lua index 8a063ade0..8278eed3a 100644 --- a/example/example_callback.lua +++ b/example/example_callback.lua @@ -22,9 +22,13 @@ if conf and conf.receiver then end end - moon.dispatch('lua',function(msg,unpack) - local sender, p, n = moon.decode(msg,"SC") - docmd(sender, unpack(p, n)) + moon.dispatch('lua',function(sender, session, cmd, ...) + local f = command[cmd] + if f then + f(sender,...) + else + error(string.format("Unknown command %s", tostring(cmd))) + end end) print("callback example: service receiver start") @@ -38,18 +42,13 @@ else moon.exit(-1) end - local function docmd(header,...) - local f = command[header] - if f then - f(...) - else - error(string.format("Unknown command %s", tostring(header))) - end - end - - moon.dispatch('lua',function(msg,unpack) - local sz, len = moon.decode(msg,"C") - docmd(unpack(sz, len)) + moon.dispatch('lua',function(sender, session, cmd, ...) + local f = command[cmd] + if f then + f(...) + else + error(string.format("Unknown command %s", tostring(cmd))) + end end) print("callback example: service sender start") diff --git a/example/example_coroutine.lua b/example/example_coroutine.lua index ee7b21fa0..b33313d3c 100644 --- a/example/example_coroutine.lua +++ b/example/example_coroutine.lua @@ -15,20 +15,14 @@ if conf and conf.receiver then moon.response("lua", sender, sessionid) end - local function docmd(sender,sessionid,cmd,...) - -- body + moon.dispatch('lua',function(sender, session, cmd, ...) + -- sessionid 对应表示发送方 挂起的协程 local f = command[cmd] if f then - f(sender,sessionid,...) + f(sender,session,...) else error(string.format("Unknown command %s", tostring(cmd))) end - end - - moon.dispatch('lua',function(msg,unpack) - -- sessionid 对应表示发送方 挂起的协程 - local sender, sessionid, sz, len = moon.decode(msg,"SEC") - docmd(sender,sessionid, unpack(sz, len)) end) else diff --git a/example/example_create_service.lua b/example/example_create_service.lua index 36f9fa622..a18f5e524 100644 --- a/example/example_create_service.lua +++ b/example/example_create_service.lua @@ -10,21 +10,15 @@ if conf and conf.slave then moon.quit() end - local function docmd(sender,header,...) - -- body - local f = command[header] + print("conf:", conf.message) + + moon.dispatch('lua',function(sender, session, cmd, ...) + local f = command[cmd] if f then f(sender,...) else - error(string.format("Unknown command %s", tostring(header))) + error(string.format("Unknown command %s", tostring(cmd))) end - end - - print("conf:", conf.message) - - moon.dispatch('lua',function(msg,unpack) - local sender, sz, len = moon.decode(msg,"SC") - docmd(sender, unpack(sz, len)) end) if conf and conf.auto_quit then diff --git a/example/example_tobeclosed.lua b/example/example_tobeclosed.lua index 36e7703b0..d9294cd6e 100644 --- a/example/example_tobeclosed.lua +++ b/example/example_tobeclosed.lua @@ -2,23 +2,20 @@ local moon = require("moon") local function new_test(name) return setmetatable({}, { __close = function(...) - moon.error(...) + moon.warn(...) end, __name = "closemeta:" .. name}) end local i = 0 moon.dispatch("lua", function() - moon.async(function() - i = i + 1 - if i==2 then - local c = new_test("dispatch_error") - error("dispatch_error") - else - local c = new_test("dispatch_wait") - moon.sleep(1000000) - end - end) - + i = i + 1 + if i==2 then + local c = new_test("dispatch_error") + error("dispatch_error") + else + local c = new_test("dispatch_wait") + moon.sleep(1000000) + end end) moon.async(function() diff --git a/example/main.lua b/example/main.lua index d599c2dd6..fe288ee05 100644 --- a/example/main.lua +++ b/example/main.lua @@ -248,9 +248,6 @@ moon.shutdown(function() for _, addr in ipairs(addrs) do moon.remove_service(addr) end - - moon.remove_service(moon.queryservice("sharetable")) - moon.quit() end) end) diff --git a/example/start_by_config/cluster_receiver.lua b/example/start_by_config/cluster_receiver.lua index c3ecf7a02..04f37e4e0 100644 --- a/example/start_by_config/cluster_receiver.lua +++ b/example/start_by_config/cluster_receiver.lua @@ -49,24 +49,16 @@ moon.async(function() end) -local function docmd(sender,sessionid, CMD,...) +moon.dispatch('lua',function(sender, session, CMD, ...) local f = command[CMD] if f then if CMD ~= 'ADD' then - local args = {...} - moon.async(function () - --moon.sleep(20000) - moon.response('lua',sender,sessionid,f(table.unpack(args))) - end) + --moon.sleep(20000) + moon.response('lua',sender, session,f(...)) end else error(string.format("Unknown command %s", tostring(CMD))) end -end - -moon.dispatch('lua',function(msg,unpack) - local sender, sessionid, sz, len = moon.decode(msg, "SEC") - docmd(sender, sessionid, unpack(sz, len)) end) moon.async(function() diff --git a/example/start_by_config/cluster_sender.lua b/example/start_by_config/cluster_sender.lua index fc6b88579..8d804d0a5 100644 --- a/example/start_by_config/cluster_sender.lua +++ b/example/start_by_config/cluster_sender.lua @@ -14,7 +14,7 @@ end) local args = {1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16} moon.async(function() print(moon.co_call("lua", moon.queryservice("cluster"), "Start")) - cluster.call(9, 'cluster_receiver', "ACCUM", table.unpack(args)) + print(cluster.call(9, 'cluster_receiver', "ACCUM", table.unpack(args))) for i=1,100000 do cluster.send(9, 'cluster_receiver',"COUNTER", moon.now()) end diff --git a/example/start_by_config/network_text_benchmark.lua b/example/start_by_config/network_text_benchmark.lua index 4893b17f7..7ea1ed869 100644 --- a/example/start_by_config/network_text_benchmark.lua +++ b/example/start_by_config/network_text_benchmark.lua @@ -24,18 +24,13 @@ local function run_slave() end) end - local function docmd(sender,sessionid, CMD,...) - local f = command[CMD] + moon.dispatch('lua',function(sender, session, cmd, ...) + local f = command[cmd] if f then - moon.response('lua',sender,sessionid,f(...)) + moon.response('lua',sender,session,f(...)) else - error(string.format("Unknown command %s", tostring(CMD))) + error(string.format("Unknown command %s", tostring(cmd))) end - end - - moon.dispatch('lua',function(msg,unpack) - local sender, sessionid, sz, len = moon.decode(msg, "SEC") - docmd(sender,sessionid, unpack(sz, len)) end) end diff --git a/example/start_by_config/send_benchmark_receiver.lua b/example/start_by_config/send_benchmark_receiver.lua index 8ac53033a..4e59393c2 100644 --- a/example/start_by_config/send_benchmark_receiver.lua +++ b/example/start_by_config/send_benchmark_receiver.lua @@ -6,7 +6,7 @@ command.TEST = function(sender, ...) moon.send('lua', sender, 'TEST', ...) end -local function docmd(sender, cmd, ...) +moon.dispatch('lua',function(sender, session, cmd, ...) -- body local f = command[cmd] if f then @@ -14,10 +14,5 @@ local function docmd(sender, cmd, ...) else error(string.format("Unknown command %s", tostring(cmd))) end -end - -moon.dispatch('lua',function(msg,unpack) - local sender, p, n = moon.decode(msg, "SC") - docmd(sender, unpack(p, n)) end) diff --git a/example/start_by_config/send_benchmark_sender.lua b/example/start_by_config/send_benchmark_sender.lua index 449a5b981..1cde3f368 100644 --- a/example/start_by_config/send_benchmark_sender.lua +++ b/example/start_by_config/send_benchmark_sender.lua @@ -21,18 +21,13 @@ command.TEST = function() end end -local function docmd(header,...) - local f = command[header] - if f then - f(...) - else - error(string.format("Unknown command %s", tostring(header))) - end -end - -moon.dispatch('lua',function(msg,unpack) - local p, n = moon.decode(msg, "C") - docmd(unpack(p, n)) +moon.dispatch('lua',function(sender, session, cmd, ...) + local f = command[cmd] + if f then + f(...) + else + error(string.format("Unknown command %s", tostring(cmd))) + end end) receiver1 = moon.queryservice("send_benchmark_receiver1") @@ -45,10 +40,10 @@ moon.async(function() moon.sleep(1000) sttime = moon.now() for _=1,ncount do - moon.send('lua', receiver1,"TEST","123456789") - moon.send('lua', receiver2,"TEST","123456789") - moon.send('lua', receiver3,"TEST","123456789") - moon.send('lua', receiver4,"TEST","123456789") + moon.send('lua', receiver1, "TEST", "123456789") + moon.send('lua', receiver2, "TEST", "123456789") + moon.send('lua', receiver3, "TEST", "123456789") + moon.send('lua', receiver4, "TEST", "123456789") end end end) diff --git a/example/start_by_config/service_mysql.lua b/example/start_by_config/service_mysql.lua deleted file mode 100644 index 4890be773..000000000 --- a/example/start_by_config/service_mysql.lua +++ /dev/null @@ -1,129 +0,0 @@ -local moon = require("moon") -local socket = require("moon.socket") -local mysql = require("moon.db.mysql") -local queue = require("moon.queue") - -local tbinsert = table.insert -local tbremove = table.remove -local tbunpack = table.unpack - - -local conf = ... - -local command = {} - -local function connect(conf_, auto) - local db, err - repeat - db, err = mysql.connect(conf_) - if not db then - print("mysql connect failed ", err) - end - - if not db and auto then - moon.sleep(100) - end - until(not auto or db) - return db, err -end - -local db_num = conf.connection_num -local balance = 1 -local db_call = {} - -for _=1,db_num do - tbinsert(db_call,{q = queue.new()}) -end - -function command.query(sender, sessionid, ...) - local args = {...} - - local db = db_call[balance] - - balance = balance + 1 - if balance > db_num then - balance = 1 - end - - db.q:run(function() - if not db.db then - local db_,err = connect(conf) - if not db_ then - --print(err) - moon.response("lua", sender, sessionid, {err = err}) - return - end - db.db = db_ - end - - local res = db.db:query(tbunpack(args)) - if res.err then - --print("query error", tbunpack(args)) - --print_r(res) - if not res.errno or res.errno == 1053 then - db.db, res.err = connect(conf) - end - if db.db then - res = db.db:query(tbunpack(args)) - end - end - moon.response("lua", sender, sessionid, res) - end) -end - -local send_queue = {} -local sending = false - -local db_send - -function command.execute(_,_, ...) - tbinsert(send_queue, {...}) - if not sending then - sending = true - moon.async(function() - if not db_send then - local db,err = connect(conf) - if not db then - print(err) - return - end - db_send = db - end - - while #send_queue >0 do - local req = send_queue[1] - local res = db_send:query(tbunpack(req)) - if res.err then - print_r(res) - if not res.errno or res.errno == 1053 then - print("mysql network error, reconnecting...") - db_send, res.err = connect(conf, true) - else - print(tbunpack(req)) - break - end - else - tbremove(send_queue,1) - end - end - sending = false - end) - end -end - -local fd = socket.sync_connect(conf.host,conf.port,moon.PTYPE_TEXT) -assert(fd, "connect db mysql failed") - -socket.close(fd) - -local function docmd(sender, sessionid, cmd, ...) - local f = command[cmd] - if f then - f(sender, sessionid, ...) - end -end - -moon.dispatch('lua',function(msg,unpack) - local sender, sessionid, sz, len = moon.decode(msg, "SEC") - docmd(sender, sessionid, unpack(sz, len)) -end) diff --git a/example/start_by_config/sharetable_example.lua b/example/start_by_config/sharetable_example.lua index 36c8dfd96..3c029be1d 100644 --- a/example/start_by_config/sharetable_example.lua +++ b/example/start_by_config/sharetable_example.lua @@ -1,7 +1,7 @@ local moon = require("moon") local seri = require("seri") local sharetable = require("sharetable") - +local test_assert = require("test_assert") local conf = ... or {} local file = "sharetable_data.lua" @@ -20,14 +20,10 @@ if conf.agent then print_r(data) end - moon.dispatch('lua',function(msg) - local sender, sessionid, buf = moon.decode(msg, "SEB") - local cmd, sz, len = seri.unpack_one(buf) + moon.dispatch('lua',function(sender, session, cmd, ...) local f = command[cmd] if f then - moon.async(function() - moon.response("lua", sender, sessionid, f(seri.unpack(sz, len))) - end) + moon.response("lua", sender, session, f(...)) else moon.error(moon.name, "recv unknown cmd "..cmd) end @@ -81,7 +77,9 @@ else print(moon.co_call("lua", agent, "UPDATE")) - moon.remove_service(agent) + moon.remove_service(agent, true) + moon.exit(-1) + -- moon.remove_service(moon.queryservice("sharetable")) end) end diff --git a/example/start_by_config/sharetable_example_dir.lua b/example/start_by_config/sharetable_example_dir.lua index 4f3715b38..897c9788b 100644 --- a/example/start_by_config/sharetable_example_dir.lua +++ b/example/start_by_config/sharetable_example_dir.lua @@ -1,7 +1,7 @@ local moon = require("moon") local seri = require("seri") local sharetable = require("sharetable") - +local test_assert = require("test_assert") local conf = ... or {} local name = "sharedata" @@ -25,14 +25,10 @@ if conf.agent then print_r(data) end - moon.dispatch('lua',function(msg) - local sender, sessionid, buf = moon.decode(msg, "SEB") - local cmd, sz, len = seri.unpack_one(buf) + moon.dispatch('lua',function(sender, session, cmd, ...) local f = command[cmd] if f then - moon.async(function() - moon.response("lua", sender, sessionid, f(seri.unpack(sz, len))) - end) + moon.response("lua", sender, session, f(...)) else moon.error(moon.name, "recv unknown cmd "..cmd) end @@ -88,6 +84,8 @@ else print(moon.co_call("lua", agent, "UPDATE")) moon.remove_service(agent) + moon.exit(-1) + -- moon.remove_service(moon.queryservice("sharetable")) end) end diff --git a/example/start_by_config/test_call.lua b/example/start_by_config/test_call.lua index 691788b91..178f187ea 100644 --- a/example/start_by_config/test_call.lua +++ b/example/start_by_config/test_call.lua @@ -31,18 +31,13 @@ if conf and conf.receiver then moon.quit() end - local function docmd(sender,sessionid, CMD,...) - local f = command[CMD] + moon.dispatch('lua',function(sender, session, cmd, ...) + local f = command[cmd] if f then - moon.response('lua',sender,sessionid,f(...)) + moon.response('lua',sender,session,f(...)) else - error(string.format("Unknown command %s", tostring(CMD))) + error(string.format("Unknown command %s", tostring(cmd))) end - end - - moon.dispatch('lua',function(msg,unpack) - local sender, sessionid, sz, len = moon.decode(msg, "SEC") - docmd(sender,sessionid, unpack(sz, len)) end) else diff --git a/example/start_by_config/test_moon.lua b/example/start_by_config/test_moon.lua index ec889d851..9b867ad38 100644 --- a/example/start_by_config/test_moon.lua +++ b/example/start_by_config/test_moon.lua @@ -74,18 +74,13 @@ command.FAILED = function(name, sid, dsp) end) end -local function docmd(cmd,...) +moon.dispatch('lua',function(sender, session, cmd, ...) local f = command[cmd] if f then f(...) else error(string.format("Unknown command %s", tostring(cmd))) end -end - -moon.dispatch('lua',function(msg,unpack) - local sz, len = moon.decode(msg, "C") - docmd(unpack(sz, len)) end) next_case() diff --git a/example/start_by_config/test_send.lua b/example/start_by_config/test_send.lua index cc18107a9..6ef5bfbd9 100644 --- a/example/start_by_config/test_send.lua +++ b/example/start_by_config/test_send.lua @@ -10,7 +10,7 @@ if conf and conf.receiver then moon.send('lua', sender,'WORLD', ...) end - local function docmd(sender,cmd,...) + moon.dispatch('lua',function(sender,session, cmd, ...) -- body local f = command[cmd] if f then @@ -18,11 +18,6 @@ if conf and conf.receiver then else error(string.format("Unknown command %s", tostring(cmd))) end - end - - moon.dispatch('lua',function(msg,unpack) - local sender, sz, len = moon.decode(msg, "SC") - docmd(sender, unpack(sz, len)) end) else @@ -35,22 +30,14 @@ else test_assert.success() end - local function docmd(cmd, ...) + moon.dispatch("lua",function(sender, session, cmd, ...) local f = command[cmd] if f then f(...) else error(string.format("Unknown command %s", tostring(cmd))) end - end - - moon.dispatch( - "lua", - function(msg, unpack) - local sz, len = moon.decode(msg, "C") - docmd(unpack(sz, len)) - end - ) + end) local receiver diff --git a/lualib/moon.lua b/lualib/moon.lua index fb2422d50..03fdc4e05 100644 --- a/lualib/moon.lua +++ b/lualib/moon.lua @@ -102,7 +102,6 @@ local uuid = 0 local session_id_coroutine = {} local protocol = {} local session_watcher = {} - local timer_routine = {} local function coresume(co, ...) @@ -141,44 +140,6 @@ end moon.make_response = make_response ----@param msg userdata @message* ----@param PTYPE string -local function _default_dispatch(msg, PTYPE) - local p = protocol[PTYPE] - if not p then - error(string.format( "handle unknown PTYPE: %s. sender %u",PTYPE, _decode(msg, "S"))) - end - - local sessionid = _decode(msg, "E") - if sessionid > 0 and PTYPE ~= moon.PTYPE_ERROR then - session_watcher[sessionid] = nil - local co = session_id_coroutine[sessionid] - if co then - session_id_coroutine[sessionid] = nil - --print(coroutine.status(co)) - if p.unpack then - coresume(co, p.unpack(_decode(msg,"C"))) - else - coresume(co, msg) - end - --print(coroutine.status(co)) - return - end - - if co ~= false then - error(string.format( "%s: response [%u] can not find co.",moon.name, sessionid)) - end - else - if not p.dispatch then - error(string.format( "[%s] dispatch PTYPE [%u] is nil",moon.name, p.PTYPE)) - return - end - p.dispatch(msg, p.unpack) - end -end - -core.callback(_default_dispatch) - --- ---向指定服务发送消息,消息内容会根据协议类型进行打包 ---@param PTYPE string @协议类型 @@ -294,14 +255,18 @@ local co_num = 0 local co_pool = setmetatable({}, {__mode = "kv"}) -local function routine(fn) +local function invoke(co, fn, ...) + co_num = co_num + 1 + fn(...) + co_num = co_num - 1 + co_pool[#co_pool + 1] = co +end + +local function routine(fn, ...) local co = co_running() + invoke(co, fn, ...) while true do - co_num = co_num + 1 - fn() - co_num = co_num - 1 - co_pool[#co_pool + 1] = co - fn = co_yield() + invoke(co, co_yield()) end end @@ -309,12 +274,12 @@ end ---If `func` lacks call `coroutine.yield`, will run syncronously. ---@param func fun() ---@return thread -function moon.async(func) +function moon.async(func, ...) local co = tremove(co_pool) if not co then co = co_create(routine) end - coresume(co, func) + coresume(co, func, ...) return co end @@ -330,7 +295,7 @@ function moon.wakeup(co, ...) end) end ----返回运行中的协程个数,和协程池空闲的协程个数 +---return count of running coroutine and total coroutine in coroutine pool function moon.coroutine_num() return co_num, #co_pool end @@ -389,6 +354,54 @@ function moon.response(PTYPE, receiver, sessionid, ...) end ------------------------------------ +---@param msg userdata @message* +---@param PTYPE string +local function _default_dispatch(msg, PTYPE) + local p = protocol[PTYPE] + if not p then + error(string.format( "handle unknown PTYPE: %s. sender %u",PTYPE, _decode(msg, "S"))) + end + + local sender, session, sz, len = _decode(msg, "SEC") + if session > 0 and PTYPE ~= moon.PTYPE_ERROR then + session_watcher[session] = nil + local co = session_id_coroutine[session] + if co then + session_id_coroutine[session] = nil + --print(coroutine.status(co)) + if p.unpack then + coresume(co, p.unpack(sz, len)) + else + coresume(co, msg) + end + --print(coroutine.status(co)) + return + end + + if co ~= false then + error(string.format( "%s: response [%u] can not find co.",moon.name, session)) + end + else + local dispatch = p.dispatch + if not dispatch then + error(string.format( "[%s] dispatch PTYPE [%u] is nil",moon.name, p.PTYPE)) + return + end + + if not p.israw and p.unpack then + local co = tremove(co_pool) + if not co then + co = co_create(routine) + end + coresume(co, dispatch, sender, session, p.unpack(sz, len)) + else + dispatch(msg) + end + end +end + +core.callback(_default_dispatch) + function moon.register_protocol(t) local PTYPE = t.PTYPE if protocol[PTYPE] then @@ -400,16 +413,15 @@ end local reg_protocol = moon.register_protocol - ----设置指定协议消息的消息处理函数 ---@param PTYPE string ----@param cb fun(msg:userdata,ptype:table) +---@param fn fun(msg:userdata,ptype:table) ---@return boolean -function moon.dispatch(PTYPE, cb) +function moon.dispatch(PTYPE, fn, israw) local p = protocol[PTYPE] - if cb then + if fn then local ret = p.dispatch - p.dispatch = cb + p.dispatch = fn + p.israw = israw return ret else return p and p.dispatch @@ -444,7 +456,6 @@ reg_protocol { pack = function(...) return ... end, - unpack = moon.tostring, dispatch = function(msg) local sessionid, content, data = _decode(msg,"EHZ") if data and #data >0 then @@ -485,7 +496,6 @@ reg_protocol { pack = function(...) return ... end, - unpack = moon.tostring, dispatch = function(msg) local sender, header = _decode(msg,"SH") local func = system_command[header] @@ -617,14 +627,12 @@ reg_protocol { PTYPE = moon.PTYPE_DEBUG, pack = moon.pack, unpack = moon.unpack, - dispatch = function(msg, unpack_fn) - local sender, sessionid, sz, len = _decode(msg,"SEC") - local params = {unpack_fn(sz, len)} - local func = debug_command[params[1]] + dispatch = function(sender, session, cmd, ...) + local func = debug_command[cmd] if func then - func(sender, sessionid, table.unpack(params,2)) + func(sender, session, ...) else - moon.response("debug",sender,sessionid, "unknow debug cmd "..params[1]) + moon.response("debug",sender, session, "unknow debug cmd "..cmd) end end } diff --git a/service/cluster.lua b/service/cluster.lua index 44304da99..be673c013 100644 --- a/service/cluster.lua +++ b/service/cluster.lua @@ -95,9 +95,8 @@ local function cluster_service() assert(address>0, tostring(header.to_sname)) local session = moon.make_response(address) redirect(msg, "", address, moon.PTYPE_LUA, moon.addr(), -session) - local res = {co_yield()} header.session = -header.session - socket.write(fd, pack(header, table.unpack(res))) + socket.write(fd, pack(header, co_yield())) end) elseif header.session > 0 then --receive response message if remove_send_watch(fd, header.from_addr, header.session) then @@ -250,27 +249,29 @@ local function cluster_service() end end) - moon.dispatch("lua",function(msg) - local sender, sessionid, buf = moon.decode(msg, "SEB") - local cmd = unpack_one(buf, true) - local fn = command[cmd] - if fn then - moon.async(function() - if sessionid ~= 0 then - local unsafe_buf = pack(xpcall(fn, debug.traceback, msg)) - local ok = unpack_one(unsafe_buf, true) - if not ok then - wfront(unsafe_buf, packs(false)) - end - moon.raw_send("lua", sender, "", unsafe_buf, sessionid) + local function xpcall_ret(ok, ...) + if ok then + return moon.pack(...) + end + return moon.pack(false, ...) + end + + moon.dispatch("lua", function(m) + moon.async(function(msg) + local sender, session, buf = moon.decode(msg, "SEB") + local cmd = unpack_one(buf, true) + local fn = command[cmd] + if fn then + if session ~= 0 then + moon.raw_send("lua", sender, "", xpcall_ret(xpcall(fn, debug.traceback, msg)), session) else fn(msg) end - end) - else - moon.error(moon.name, "recv unknown cmd "..tostring(cmd)) - end - end) + else + moon.error(moon.name, "recv unknown cmd "..tostring(cmd)) + end + end, m) + end, true) moon.shutdown(function() moon.quit() diff --git a/service/redisd.lua b/service/redisd.lua index 8c82e7f63..9fc4418df 100644 --- a/service/redisd.lua +++ b/service/redisd.lua @@ -138,10 +138,6 @@ if conf.name then assert(fd, "connect db redis failed") socket.close(fd) - local buffer = require("buffer") - local unpack_one = seri.unpack_one - local buf_write_front = buffer.write_front - local command = {} function command.len() @@ -152,7 +148,14 @@ if conf.name then return res end - moon.dispatch('lua',function(msg,unpack) + local function xpcall_ret(ok, ...) + if ok then + return moon.pack(...) + end + return moon.pack(false, ...) + end + + moon.dispatch('lua',function(msg) local sender, sessionid, buf = moon.decode(msg, "SEB") local cmd, sz, len = seri.unpack_one(buf, true) if cmd == "Q" then @@ -163,24 +166,20 @@ if conf.name then local fn = command[cmd] if fn then - if sessionid == 0 then - fn(sender, sessionid, buf, msg) - else - moon.async(function() + moon.async(function() + if sessionid == 0 then + fn(sender, sessionid, buf, msg) + else if sessionid ~= 0 then - local unsafe_buf = seri.pack(xpcall(fn, debug.traceback, unpack(sz, len))) - local ok = unpack_one(unsafe_buf, true) - if not ok then - buf_write_front(unsafe_buf, seri.packs(false)) - end + local unsafe_buf = xpcall_ret(xpcall(fn, debug.traceback, moon.unpack(sz, len))) moon.raw_send("lua", sender, "", unsafe_buf, sessionid) end - end) - end + end + end) else moon.error(moon.name, "recv unknown cmd "..tostring(cmd)) end - end) + end, true) local function wait_all_send() while true do diff --git a/service/sharetable.lua b/service/sharetable.lua index 6c020ee36..bd0d3d38f 100644 --- a/service/sharetable.lua +++ b/service/sharetable.lua @@ -1,5 +1,4 @@ local moon = require "moon" -local seri = require "seri" local core = require "sharetable.core" local fs = require("fs") @@ -115,13 +114,8 @@ local function sharetable_service() -- no return end - local unpack = seri.unpack - local unpack_one = seri.unpack_one - - moon.dispatch("lua",function(msg) - local sender, sessionid, buf = moon.decode(msg, "SEB") - local cmd, sz, len = unpack_one(buf) - sharetable[cmd](sender, sessionid, unpack(sz, len)) + moon.dispatch("lua",function(sender, session, cmd, ...) + sharetable[cmd](sender, session, ...) end) ---sharetable service's 'files' use fs.join(conf.dir, filename) diff --git a/service/sqldriver.lua b/service/sqldriver.lua index 896831fca..35c2c3fcf 100644 --- a/service/sqldriver.lua +++ b/service/sqldriver.lua @@ -1,9 +1,7 @@ local moon = require("moon") local seri = require("seri") -local buffer = require("buffer") local unpack_one = seri.unpack_one -local wfront = buffer.write_front local tbinsert = table.insert @@ -123,7 +121,14 @@ if conf.name then return res end - moon.dispatch('lua',function(msg,unpack) + local function xpcall_ret(ok, ...) + if ok then + return moon.pack(...) + end + return moon.pack(false, ...) + end + + moon.dispatch('lua', function(msg) local sender, sessionid, buf = moon.decode(msg, "SEB") local cmd, sz, len = unpack_one(buf, true) if cmd == "Q" then @@ -136,21 +141,17 @@ if conf.name then local fn = command[cmd] if fn then if sessionid == 0 then - fn(sender, sessionid, buf, msg) + fn(sender, sessionid, moon.unpack(sz, len)) else moon.async(function() - local unsafe_buf = seri.pack(xpcall(fn, debug.traceback, unpack(sz, len))) - local ok = unpack_one(unsafe_buf, true) - if not ok then - wfront(unsafe_buf, seri.packs(false)) - end + local unsafe_buf = xpcall_ret(xpcall(fn, debug.traceback, moon.unpack(sz, len))) moon.raw_send("lua", sender, "", unsafe_buf, sessionid) end) end else moon.error(moon.name, "recv unknown cmd "..tostring(cmd)) end - end) + end, true) local function wait_all_send() while true do @@ -182,7 +183,8 @@ else local json = require("json") local yield = coroutine.yield - + local buffer = require("buffer") + local wfront = buffer.write_front local raw_send = moon.raw_send local packstr = seri.packs local concat = json.concat