From 3c917341d2a47960f4c185dd6a0de1bc02d659fb Mon Sep 17 00:00:00 2001 From: yuanx Date: Mon, 16 Dec 2019 14:38:52 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E4=BF=AE=E6=94=B9del=E5=91=BD=E4=BB=A4?= =?UTF-8?q?=E4=B8=BAkdel,=E6=B7=BB=E5=8A=A0=E6=9B=B4=E9=80=82=E9=85=8Dtwem?= =?UTF-8?q?proxy=E7=9A=84del=E5=91=BD=E4=BB=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Makefile | 4 +- src/proc_kv.cpp | 799 ++++++++++++++++++---------------- src/proc_kv.h | 4 +- src/proc_redis_compatible.cpp | 64 +++ src/proc_redis_compatible.h | 5 + src/serv.cpp | 108 +++-- 6 files changed, 553 insertions(+), 431 deletions(-) create mode 100644 src/proc_redis_compatible.cpp create mode 100644 src/proc_redis_compatible.h diff --git a/src/Makefile b/src/Makefile index 9c0847ac2..ec9268760 100644 --- a/src/Makefile +++ b/src/Makefile @@ -1,6 +1,6 @@ include ../build_config.mk -OBJS = proc_sys.o proc_kv.o proc_hash.o proc_zset.o proc_queue.o \ +OBJS = proc_sys.o proc_kv.o proc_hash.o proc_zset.o proc_queue.o proc_redis_compatible.o \ backend_dump.o backend_sync.o slave.o \ serv.o proc_cluster.o cluster.o cluster_store.o cluster_migrate.o LIBS = ./ssdb/libssdb.a ./util/libutil.a ./net/libnet.a @@ -30,6 +30,8 @@ backend_sync.o: backend_sync.h backend_sync.cpp proc.o: serv.h proc.cpp ${CXX} ${CFLAGS} -c proc.cpp +proc_redis_compatible.o: proc_redis_compatible.cpp + ${CXX} ${CFLAGS} -c proc_redis_compatible.cpp proc_sys.o: proc_sys.cpp ${CXX} ${CFLAGS} -c proc_sys.cpp proc_kv.o: proc_kv.cpp diff --git a/src/proc_kv.cpp b/src/proc_kv.cpp index 7323015be..3b35d17df 100644 --- a/src/proc_kv.cpp +++ b/src/proc_kv.cpp @@ -5,447 +5,474 @@ found in the LICENSE file. */ #include "proc_kv.h" -int proc_get(NetworkServer *net, Link *link, const Request &req, Response *resp){ - SSDBServer *serv = (SSDBServer *)net->data; - CHECK_NUM_PARAMS(2); - CHECK_KV_KEY_RANGE(1); - - std::string val; - int ret = serv->ssdb->get(req[1], &val); - resp->reply_get(ret, &val); - return 0; +int proc_get(NetworkServer *net, Link *link, const Request &req, + Response *resp) { + SSDBServer *serv = (SSDBServer *)net->data; + CHECK_NUM_PARAMS(2); + CHECK_KV_KEY_RANGE(1); + + std::string val; + int ret = serv->ssdb->get(req[1], &val); + resp->reply_get(ret, &val); + return 0; } -int proc_getset(NetworkServer *net, Link *link, const Request &req, Response *resp){ - SSDBServer *serv = (SSDBServer *)net->data; - CHECK_NUM_PARAMS(3); - CHECK_KV_KEY_RANGE(1); +int proc_getset(NetworkServer *net, Link *link, const Request &req, + Response *resp) { + SSDBServer *serv = (SSDBServer *)net->data; + CHECK_NUM_PARAMS(3); + CHECK_KV_KEY_RANGE(1); - std::string val; - int ret = serv->ssdb->getset(req[1], &val, req[2]); - resp->reply_get(ret, &val); - return 0; + std::string val; + int ret = serv->ssdb->getset(req[1], &val, req[2]); + resp->reply_get(ret, &val); + return 0; } -int proc_set(NetworkServer *net, Link *link, const Request &req, Response *resp){ - SSDBServer *serv = (SSDBServer *)net->data; - CHECK_NUM_PARAMS(3); - CHECK_KV_KEY_RANGE(1); - - int ret = serv->ssdb->set(req[1], req[2]); - if(ret == -1){ - resp->push_back("error"); - }else{ - resp->push_back("ok"); - resp->push_back("1"); - } - return 0; +int proc_set(NetworkServer *net, Link *link, const Request &req, + Response *resp) { + SSDBServer *serv = (SSDBServer *)net->data; + CHECK_NUM_PARAMS(3); + CHECK_KV_KEY_RANGE(1); + + int ret = serv->ssdb->set(req[1], req[2]); + if (ret == -1) { + resp->push_back("error"); + } else { + resp->push_back("ok"); + resp->push_back("1"); + } + return 0; } -int proc_setnx(NetworkServer *net, Link *link, const Request &req, Response *resp){ - SSDBServer *serv = (SSDBServer *)net->data; - CHECK_NUM_PARAMS(3); - CHECK_KV_KEY_RANGE(1); +int proc_setnx(NetworkServer *net, Link *link, const Request &req, + Response *resp) { + SSDBServer *serv = (SSDBServer *)net->data; + CHECK_NUM_PARAMS(3); + CHECK_KV_KEY_RANGE(1); - int ret = serv->ssdb->setnx(req[1], req[2]); - resp->reply_bool(ret); - return 0; + int ret = serv->ssdb->setnx(req[1], req[2]); + resp->reply_bool(ret); + return 0; } -int proc_setx(NetworkServer *net, Link *link, const Request &req, Response *resp){ - SSDBServer *serv = (SSDBServer *)net->data; - CHECK_NUM_PARAMS(4); - CHECK_KV_KEY_RANGE(1); - - Locking l(&serv->expiration->mutex); - int ret; - ret = serv->ssdb->set(req[1], req[2]); - if(ret == -1){ - resp->push_back("error"); - return 0; - } - ret = serv->expiration->set_ttl(req[1], req[3].Int()); - if(ret == -1){ - resp->push_back("error"); - }else{ - resp->push_back("ok"); - resp->push_back("1"); - } - return 0; +int proc_setx(NetworkServer *net, Link *link, const Request &req, + Response *resp) { + SSDBServer *serv = (SSDBServer *)net->data; + CHECK_NUM_PARAMS(4); + CHECK_KV_KEY_RANGE(1); + + Locking l(&serv->expiration->mutex); + int ret; + ret = serv->ssdb->set(req[1], req[2]); + if (ret == -1) { + resp->push_back("error"); + return 0; + } + ret = serv->expiration->set_ttl(req[1], req[3].Int()); + if (ret == -1) { + resp->push_back("error"); + } else { + resp->push_back("ok"); + resp->push_back("1"); + } + return 0; } -int proc_ttl(NetworkServer *net, Link *link, const Request &req, Response *resp){ - SSDBServer *serv = (SSDBServer *)net->data; - CHECK_NUM_PARAMS(2); - CHECK_KV_KEY_RANGE(1); +int proc_ttl(NetworkServer *net, Link *link, const Request &req, + Response *resp) { + SSDBServer *serv = (SSDBServer *)net->data; + CHECK_NUM_PARAMS(2); + CHECK_KV_KEY_RANGE(1); - int64_t ttl = serv->expiration->get_ttl(req[1]); - resp->push_back("ok"); - resp->push_back(str(ttl)); - return 0; + int64_t ttl = serv->expiration->get_ttl(req[1]); + resp->push_back("ok"); + resp->push_back(str(ttl)); + return 0; } -int proc_expire(NetworkServer *net, Link *link, const Request &req, Response *resp){ - SSDBServer *serv = (SSDBServer *)net->data; - CHECK_NUM_PARAMS(3); - CHECK_KV_KEY_RANGE(1); - - Locking l(&serv->expiration->mutex); - std::string val; - int ret = serv->ssdb->get(req[1], &val); - if(ret == 1){ - ret = serv->expiration->set_ttl(req[1], req[2].Int()); - if(ret != -1){ - resp->push_back("ok"); - resp->push_back("1"); - }else{ - resp->push_back("error"); - } - return 0; - } - resp->push_back("ok"); - resp->push_back("0"); - return 0; +int proc_expire(NetworkServer *net, Link *link, const Request &req, + Response *resp) { + SSDBServer *serv = (SSDBServer *)net->data; + CHECK_NUM_PARAMS(3); + CHECK_KV_KEY_RANGE(1); + + Locking l(&serv->expiration->mutex); + std::string val; + int ret = serv->ssdb->get(req[1], &val); + if (ret == 1) { + ret = serv->expiration->set_ttl(req[1], req[2].Int()); + if (ret != -1) { + resp->push_back("ok"); + resp->push_back("1"); + } else { + resp->push_back("error"); + } + return 0; + } + resp->push_back("ok"); + resp->push_back("0"); + return 0; } -int proc_exists(NetworkServer *net, Link *link, const Request &req, Response *resp){ - SSDBServer *serv = (SSDBServer *)net->data; - CHECK_NUM_PARAMS(2); - CHECK_KV_KEY_RANGE(1); - - const Bytes key = req[1]; - std::string val; - int ret = serv->ssdb->get(key, &val); - resp->reply_bool(ret); - return 0; +int proc_exists(NetworkServer *net, Link *link, const Request &req, + Response *resp) { + SSDBServer *serv = (SSDBServer *)net->data; + CHECK_NUM_PARAMS(2); + CHECK_KV_KEY_RANGE(1); + + const Bytes key = req[1]; + std::string val; + int ret = serv->ssdb->get(key, &val); + resp->reply_bool(ret); + return 0; } -int proc_multi_exists(NetworkServer *net, Link *link, const Request &req, Response *resp){ - SSDBServer *serv = (SSDBServer *)net->data; - CHECK_NUM_PARAMS(2); - - resp->push_back("ok"); - for(Request::const_iterator it=req.begin()+1; it!=req.end(); it++){ - const Bytes key = *it; - std::string val; - int ret = serv->ssdb->get(key, &val); - resp->push_back(key.String()); - if(ret == 1){ - resp->push_back("1"); - }else if(ret == 0){ - resp->push_back("0"); - }else{ - resp->push_back("0"); - } - } - return 0; +int proc_multi_exists(NetworkServer *net, Link *link, const Request &req, + Response *resp) { + SSDBServer *serv = (SSDBServer *)net->data; + CHECK_NUM_PARAMS(2); + + resp->push_back("ok"); + for (Request::const_iterator it = req.begin() + 1; it != req.end(); it++) { + const Bytes key = *it; + std::string val; + int ret = serv->ssdb->get(key, &val); + resp->push_back(key.String()); + if (ret == 1) { + resp->push_back("1"); + } else if (ret == 0) { + resp->push_back("0"); + } else { + resp->push_back("0"); + } + } + return 0; } -int proc_multi_set(NetworkServer *net, Link *link, const Request &req, Response *resp){ - SSDBServer *serv = (SSDBServer *)net->data; - if(req.size() < 3 || req.size() % 2 != 1){ - resp->push_back("client_error"); - }else{ - int ret = serv->ssdb->multi_set(req, 1); - resp->reply_int(0, ret); - } - return 0; +int proc_multi_set(NetworkServer *net, Link *link, const Request &req, + Response *resp) { + SSDBServer *serv = (SSDBServer *)net->data; + if (req.size() < 3 || req.size() % 2 != 1) { + resp->push_back("client_error"); + } else { + int ret = serv->ssdb->multi_set(req, 1); + resp->reply_int(0, ret); + } + return 0; } -int proc_multi_del(NetworkServer *net, Link *link, const Request &req, Response *resp){ - SSDBServer *serv = (SSDBServer *)net->data; - CHECK_NUM_PARAMS(2); - - Locking l(&serv->expiration->mutex); - int ret = serv->ssdb->multi_del(req, 1); - if(ret == -1){ - resp->push_back("error"); - }else{ - for(Request::const_iterator it=req.begin()+1; it!=req.end(); it++){ - const Bytes key = *it; - serv->expiration->del_ttl(key); - } - resp->reply_int(0, ret); - } - return 0; +int proc_multi_del(NetworkServer *net, Link *link, const Request &req, + Response *resp) { + SSDBServer *serv = (SSDBServer *)net->data; + CHECK_NUM_PARAMS(2); + + Locking l(&serv->expiration->mutex); + int ret = serv->ssdb->multi_del(req, 1); + if (ret == -1) { + resp->push_back("error"); + } else { + for (Request::const_iterator it = req.begin() + 1; it != req.end(); it++) { + const Bytes key = *it; + serv->expiration->del_ttl(key); + } + resp->reply_int(0, ret); + } + return 0; } -int proc_multi_get(NetworkServer *net, Link *link, const Request &req, Response *resp){ - SSDBServer *serv = (SSDBServer *)net->data; - CHECK_NUM_PARAMS(2); - - resp->push_back("ok"); - for(int i=1; issdb->get(req[i], &val); - if(ret == 1){ - resp->push_back(req[i].String()); - resp->push_back(val); - } - } - return 0; +int proc_multi_get(NetworkServer *net, Link *link, const Request &req, + Response *resp) { + SSDBServer *serv = (SSDBServer *)net->data; + CHECK_NUM_PARAMS(2); + + resp->push_back("ok"); + for (int i = 1; i < req.size(); i++) { + std::string val; + int ret = serv->ssdb->get(req[i], &val); + if (ret == 1) { + resp->push_back(req[i].String()); + resp->push_back(val); + } + } + return 0; } -int proc_del(NetworkServer *net, Link *link, const Request &req, Response *resp){ - SSDBServer *serv = (SSDBServer *)net->data; - CHECK_NUM_PARAMS(2); - CHECK_KV_KEY_RANGE(1); - - Locking l(&serv->expiration->mutex); - int ret = serv->ssdb->del(req[1]); - if(ret == -1){ - resp->push_back("error"); - }else{ - serv->expiration->del_ttl(req[1]); - - resp->push_back("ok"); - resp->push_back("1"); - } - return 0; +int proc_kdel(NetworkServer *net, Link *link, const Request &req, + Response *resp) { + SSDBServer *serv = (SSDBServer *)net->data; + CHECK_NUM_PARAMS(2); + CHECK_KV_KEY_RANGE(1); + + Locking l(&serv->expiration->mutex); + int ret = serv->ssdb->del(req[1]); + if (ret == -1) { + resp->push_back("error"); + } else { + serv->expiration->del_ttl(req[1]); + + resp->push_back("ok"); + resp->push_back("1"); + } + return 0; } -int proc_scan(NetworkServer *net, Link *link, const Request &req, Response *resp){ - SSDBServer *serv = (SSDBServer *)net->data; - CHECK_NUM_PARAMS(4); - - uint64_t limit = req[3].Uint64(); - KIterator *it = serv->ssdb->scan(req[1], req[2], limit); - resp->push_back("ok"); - while(it->next()){ - resp->push_back(it->key); - resp->push_back(it->val); - } - delete it; - return 0; +int proc_scan(NetworkServer *net, Link *link, const Request &req, + Response *resp) { + SSDBServer *serv = (SSDBServer *)net->data; + CHECK_NUM_PARAMS(4); + + uint64_t limit = req[3].Uint64(); + KIterator *it = serv->ssdb->scan(req[1], req[2], limit); + resp->push_back("ok"); + while (it->next()) { + resp->push_back(it->key); + resp->push_back(it->val); + } + delete it; + return 0; } -int proc_rscan(NetworkServer *net, Link *link, const Request &req, Response *resp){ - SSDBServer *serv = (SSDBServer *)net->data; - CHECK_NUM_PARAMS(4); - - uint64_t limit = req[3].Uint64(); - KIterator *it = serv->ssdb->rscan(req[1], req[2], limit); - resp->push_back("ok"); - while(it->next()){ - resp->push_back(it->key); - resp->push_back(it->val); - } - delete it; - return 0; +int proc_rscan(NetworkServer *net, Link *link, const Request &req, + Response *resp) { + SSDBServer *serv = (SSDBServer *)net->data; + CHECK_NUM_PARAMS(4); + + uint64_t limit = req[3].Uint64(); + KIterator *it = serv->ssdb->rscan(req[1], req[2], limit); + resp->push_back("ok"); + while (it->next()) { + resp->push_back(it->key); + resp->push_back(it->val); + } + delete it; + return 0; } -int proc_keys(NetworkServer *net, Link *link, const Request &req, Response *resp){ - SSDBServer *serv = (SSDBServer *)net->data; - CHECK_NUM_PARAMS(4); - - uint64_t limit = req[3].Uint64(); - KIterator *it = serv->ssdb->scan(req[1], req[2], limit); - it->return_val(false); - - resp->push_back("ok"); - while(it->next()){ - resp->push_back(it->key); - } - delete it; - return 0; +int proc_keys(NetworkServer *net, Link *link, const Request &req, + Response *resp) { + SSDBServer *serv = (SSDBServer *)net->data; + CHECK_NUM_PARAMS(4); + + uint64_t limit = req[3].Uint64(); + KIterator *it = serv->ssdb->scan(req[1], req[2], limit); + it->return_val(false); + + resp->push_back("ok"); + while (it->next()) { + resp->push_back(it->key); + } + delete it; + return 0; } -int proc_rkeys(NetworkServer *net, Link *link, const Request &req, Response *resp){ - SSDBServer *serv = (SSDBServer *)net->data; - CHECK_NUM_PARAMS(4); - - uint64_t limit = req[3].Uint64(); - KIterator *it = serv->ssdb->rscan(req[1], req[2], limit); - it->return_val(false); - - resp->push_back("ok"); - while(it->next()){ - resp->push_back(it->key); - } - delete it; - return 0; +int proc_rkeys(NetworkServer *net, Link *link, const Request &req, + Response *resp) { + SSDBServer *serv = (SSDBServer *)net->data; + CHECK_NUM_PARAMS(4); + + uint64_t limit = req[3].Uint64(); + KIterator *it = serv->ssdb->rscan(req[1], req[2], limit); + it->return_val(false); + + resp->push_back("ok"); + while (it->next()) { + resp->push_back(it->key); + } + delete it; + return 0; } // dir := +1|-1 -static int _incr(SSDB *ssdb, const Request &req, Response *resp, int dir){ - CHECK_NUM_PARAMS(2); - int64_t by = 1; - if(req.size() > 2){ - by = req[2].Int64(); - } - int64_t new_val; - int ret = ssdb->incr(req[1], dir * by, &new_val); - if(ret == 0){ - resp->reply_status(-1, "value is not an integer or out of range"); - }else{ - resp->reply_int(ret, new_val); - } - return 0; +static int _incr(SSDB *ssdb, const Request &req, Response *resp, int dir) { + CHECK_NUM_PARAMS(2); + int64_t by = 1; + if (req.size() > 2) { + by = req[2].Int64(); + } + int64_t new_val; + int ret = ssdb->incr(req[1], dir * by, &new_val); + if (ret == 0) { + resp->reply_status(-1, "value is not an integer or out of range"); + } else { + resp->reply_int(ret, new_val); + } + return 0; } -int proc_incr(NetworkServer *net, Link *link, const Request &req, Response *resp){ - SSDBServer *serv = (SSDBServer *)net->data; - CHECK_KV_KEY_RANGE(1); - return _incr(serv->ssdb, req, resp, 1); +int proc_incr(NetworkServer *net, Link *link, const Request &req, + Response *resp) { + SSDBServer *serv = (SSDBServer *)net->data; + CHECK_KV_KEY_RANGE(1); + return _incr(serv->ssdb, req, resp, 1); } -int proc_decr(NetworkServer *net, Link *link, const Request &req, Response *resp){ - SSDBServer *serv = (SSDBServer *)net->data; - CHECK_KV_KEY_RANGE(1); - return _incr(serv->ssdb, req, resp, -1); +int proc_decr(NetworkServer *net, Link *link, const Request &req, + Response *resp) { + SSDBServer *serv = (SSDBServer *)net->data; + CHECK_KV_KEY_RANGE(1); + return _incr(serv->ssdb, req, resp, -1); } -int proc_getbit(NetworkServer *net, Link *link, const Request &req, Response *resp){ - SSDBServer *serv = (SSDBServer *)net->data; - CHECK_NUM_PARAMS(3); - CHECK_KV_KEY_RANGE(1); +int proc_getbit(NetworkServer *net, Link *link, const Request &req, + Response *resp) { + SSDBServer *serv = (SSDBServer *)net->data; + CHECK_NUM_PARAMS(3); + CHECK_KV_KEY_RANGE(1); - int ret = serv->ssdb->getbit(req[1], req[2].Int()); - resp->reply_bool(ret); - return 0; + int ret = serv->ssdb->getbit(req[1], req[2].Int()); + resp->reply_bool(ret); + return 0; } -int proc_setbit(NetworkServer *net, Link *link, const Request &req, Response *resp){ - SSDBServer *serv = (SSDBServer *)net->data; - CHECK_NUM_PARAMS(4); - CHECK_KV_KEY_RANGE(1); - - const Bytes &key = req[1]; - int offset = req[2].Int(); - if(req[3].size() == 0 || (req[3].data()[0] != '0' && req[3].data()[0] != '1')){ - resp->push_back("client_error"); - resp->push_back("bit is not an integer or out of range"); - return 0; - } - if(offset < 0 || offset > Link::MAX_PACKET_SIZE * 8){ - std::string msg = "offset is out of range [0, "; - msg += str(Link::MAX_PACKET_SIZE * 8); - msg += "]"; - resp->push_back("client_error"); - resp->push_back(msg); - return 0; - } - int on = req[3].Int(); - int ret = serv->ssdb->setbit(key, offset, on); - resp->reply_bool(ret); - return 0; +int proc_setbit(NetworkServer *net, Link *link, const Request &req, + Response *resp) { + SSDBServer *serv = (SSDBServer *)net->data; + CHECK_NUM_PARAMS(4); + CHECK_KV_KEY_RANGE(1); + + const Bytes &key = req[1]; + int offset = req[2].Int(); + if (req[3].size() == 0 || + (req[3].data()[0] != '0' && req[3].data()[0] != '1')) { + resp->push_back("client_error"); + resp->push_back("bit is not an integer or out of range"); + return 0; + } + if (offset < 0 || offset > Link::MAX_PACKET_SIZE * 8) { + std::string msg = "offset is out of range [0, "; + msg += str(Link::MAX_PACKET_SIZE * 8); + msg += "]"; + resp->push_back("client_error"); + resp->push_back(msg); + return 0; + } + int on = req[3].Int(); + int ret = serv->ssdb->setbit(key, offset, on); + resp->reply_bool(ret); + return 0; } -int proc_countbit(NetworkServer *net, Link *link, const Request &req, Response *resp){ - SSDBServer *serv = (SSDBServer *)net->data; - CHECK_NUM_PARAMS(2); - CHECK_KV_KEY_RANGE(1); - - const Bytes &key = req[1]; - int start = 0; - if(req.size() > 2){ - start = req[2].Int(); - } - std::string val; - int ret = serv->ssdb->get(key, &val); - if(ret == -1){ - resp->push_back("error"); - }else{ - std::string str; - int size = -1; - if(req.size() > 3){ - size = req[3].Int(); - str = substr(val, start, size); - }else{ - str = substr(val, start, val.size()); - } - int count = bitcount(str.data(), str.size()); - resp->reply_int(0, count); - } - return 0; +int proc_countbit(NetworkServer *net, Link *link, const Request &req, + Response *resp) { + SSDBServer *serv = (SSDBServer *)net->data; + CHECK_NUM_PARAMS(2); + CHECK_KV_KEY_RANGE(1); + + const Bytes &key = req[1]; + int start = 0; + if (req.size() > 2) { + start = req[2].Int(); + } + std::string val; + int ret = serv->ssdb->get(key, &val); + if (ret == -1) { + resp->push_back("error"); + } else { + std::string str; + int size = -1; + if (req.size() > 3) { + size = req[3].Int(); + str = substr(val, start, size); + } else { + str = substr(val, start, val.size()); + } + int count = bitcount(str.data(), str.size()); + resp->reply_int(0, count); + } + return 0; } -int proc_bitcount(NetworkServer *net, Link *link, const Request &req, Response *resp){ - SSDBServer *serv = (SSDBServer *)net->data; - CHECK_NUM_PARAMS(2); - CHECK_KV_KEY_RANGE(1); - - const Bytes &key = req[1]; - int start = 0; - if(req.size() > 2){ - start = req[2].Int(); - } - int end = -1; - if(req.size() > 3){ - end = req[3].Int(); - } - std::string val; - int ret = serv->ssdb->get(key, &val); - if(ret == -1){ - resp->push_back("error"); - }else{ - std::string str = str_slice(val, start, end); - int count = bitcount(str.data(), str.size()); - resp->reply_int(0, count); - } - return 0; +int proc_bitcount(NetworkServer *net, Link *link, const Request &req, + Response *resp) { + SSDBServer *serv = (SSDBServer *)net->data; + CHECK_NUM_PARAMS(2); + CHECK_KV_KEY_RANGE(1); + + const Bytes &key = req[1]; + int start = 0; + if (req.size() > 2) { + start = req[2].Int(); + } + int end = -1; + if (req.size() > 3) { + end = req[3].Int(); + } + std::string val; + int ret = serv->ssdb->get(key, &val); + if (ret == -1) { + resp->push_back("error"); + } else { + std::string str = str_slice(val, start, end); + int count = bitcount(str.data(), str.size()); + resp->reply_int(0, count); + } + return 0; } -int proc_substr(NetworkServer *net, Link *link, const Request &req, Response *resp){ - SSDBServer *serv = (SSDBServer *)net->data; - CHECK_NUM_PARAMS(2); - CHECK_KV_KEY_RANGE(1); - - const Bytes &key = req[1]; - int start = 0; - if(req.size() > 2){ - start = req[2].Int(); - } - int size = 2000000000; - if(req.size() > 3){ - size = req[3].Int(); - } - std::string val; - int ret = serv->ssdb->get(key, &val); - if(ret == -1){ - resp->push_back("error"); - }else{ - std::string str = substr(val, start, size); - resp->push_back("ok"); - resp->push_back(str); - } - return 0; +int proc_substr(NetworkServer *net, Link *link, const Request &req, + Response *resp) { + SSDBServer *serv = (SSDBServer *)net->data; + CHECK_NUM_PARAMS(2); + CHECK_KV_KEY_RANGE(1); + + const Bytes &key = req[1]; + int start = 0; + if (req.size() > 2) { + start = req[2].Int(); + } + int size = 2000000000; + if (req.size() > 3) { + size = req[3].Int(); + } + std::string val; + int ret = serv->ssdb->get(key, &val); + if (ret == -1) { + resp->push_back("error"); + } else { + std::string str = substr(val, start, size); + resp->push_back("ok"); + resp->push_back(str); + } + return 0; } -int proc_getrange(NetworkServer *net, Link *link, const Request &req, Response *resp){ - SSDBServer *serv = (SSDBServer *)net->data; - CHECK_NUM_PARAMS(2); - CHECK_KV_KEY_RANGE(1); - - const Bytes &key = req[1]; - int start = 0; - if(req.size() > 2){ - start = req[2].Int(); - } - int size = -1; - if(req.size() > 3){ - size = req[3].Int(); - } - std::string val; - int ret = serv->ssdb->get(key, &val); - if(ret == -1){ - resp->push_back("error"); - }else{ - std::string str = str_slice(val, start, size); - resp->push_back("ok"); - resp->push_back(str); - } - return 0; +int proc_getrange(NetworkServer *net, Link *link, const Request &req, + Response *resp) { + SSDBServer *serv = (SSDBServer *)net->data; + CHECK_NUM_PARAMS(2); + CHECK_KV_KEY_RANGE(1); + + const Bytes &key = req[1]; + int start = 0; + if (req.size() > 2) { + start = req[2].Int(); + } + int size = -1; + if (req.size() > 3) { + size = req[3].Int(); + } + std::string val; + int ret = serv->ssdb->get(key, &val); + if (ret == -1) { + resp->push_back("error"); + } else { + std::string str = str_slice(val, start, size); + resp->push_back("ok"); + resp->push_back(str); + } + return 0; } -int proc_strlen(NetworkServer *net, Link *link, const Request &req, Response *resp){ - SSDBServer *serv = (SSDBServer *)net->data; - CHECK_NUM_PARAMS(2); - CHECK_KV_KEY_RANGE(1); - - const Bytes &key = req[1]; - std::string val; - int ret = serv->ssdb->get(key, &val); - resp->reply_int(ret, val.size()); - return 0; +int proc_strlen(NetworkServer *net, Link *link, const Request &req, + Response *resp) { + SSDBServer *serv = (SSDBServer *)net->data; + CHECK_NUM_PARAMS(2); + CHECK_KV_KEY_RANGE(1); + + const Bytes &key = req[1]; + std::string val; + int ret = serv->ssdb->get(key, &val); + resp->reply_int(ret, val.size()); + return 0; } diff --git a/src/proc_kv.h b/src/proc_kv.h index 1f9dbe44f..d72b74cbe 100644 --- a/src/proc_kv.h +++ b/src/proc_kv.h @@ -4,9 +4,9 @@ Use of this source code is governed by a BSD-style license that can be found in the LICENSE file. */ /* kv commands */ -#include "serv.h" #include "net/proc.h" #include "net/server.h" +#include "serv.h" DEF_PROC(get); DEF_PROC(set); @@ -20,7 +20,7 @@ DEF_PROC(substr); DEF_PROC(getrange); DEF_PROC(strlen); DEF_PROC(bitcount); -DEF_PROC(del); +DEF_PROC(kdel); DEF_PROC(incr); DEF_PROC(decr); DEF_PROC(scan); diff --git a/src/proc_redis_compatible.cpp b/src/proc_redis_compatible.cpp new file mode 100644 index 000000000..133c205db --- /dev/null +++ b/src/proc_redis_compatible.cpp @@ -0,0 +1,64 @@ +#include "proc_redis_compatible.h" +int proc_del(NetworkServer *net, Link *link, const Request &req, + Response *resp) { + SSDBServer *serv = (SSDBServer *)net->data; + // 上锁 + // Locking *locker = new Locking(&serv->expiration->mutex); + // 检查参数数量 + CHECK_NUM_PARAMS(2); + const Bytes &name = req[1]; + int64_t count = 0; + // 删掉key-value + /* + CHECK_KV_KEY_RANGE(1); + //这一段没太看懂在做什么 + if (!link->ignore_key_range && req.size() > 1) { + if (!serv->in_kv_range(req[1])) { + resp->push_back("out_of_range"); + return 0; + } + } + */ + if (serv->ssdb->del(req[1]) == -1) { + resp->push_back("error"); + resp->push_back("key"); + return 0; + } else { + serv->expiration->del_ttl(req[1]); + } + // 删掉hash(hclear) + count += serv->ssdb->hclear(name); + // 删掉sorted set(zclear) + std::string key_start, score_start; + int num = 1000; + while (num == 1000) { + num = 0; + ZIterator *it = serv->ssdb->zscan(name, key_start, score_start, "", 1000); + while (it->next()) { + key_start = it->key; + score_start = it->score; + if (serv->ssdb->zdel(name, key_start)) { + resp->push_back("error"); + resp->push_back("zset"); + delete it; + return 0; + } + num++; + } + count += num; + } + // 删掉 queue(qclear) + while (1) { + std::string item; + int ret = serv->ssdb->qpop_front(req[1], &item); + if (ret == 0) { + break; + } + if (ret == -1) { + return -1; + } + count += 1; + } + resp->reply_int(0, count); + return 0; +} \ No newline at end of file diff --git a/src/proc_redis_compatible.h b/src/proc_redis_compatible.h new file mode 100644 index 000000000..c80897887 --- /dev/null +++ b/src/proc_redis_compatible.h @@ -0,0 +1,5 @@ +#include "net/proc.h" +#include "net/server.h" +#include "serv.h" + +DEF_PROC(del); \ No newline at end of file diff --git a/src/serv.cpp b/src/serv.cpp index 8744b08ca..b68a6342c 100644 --- a/src/serv.cpp +++ b/src/serv.cpp @@ -9,6 +9,7 @@ found in the LICENSE file. #include "serv.h" #include "net/proc.h" #include "net/server.h" +#include "proc_redis_compatible.h" #include "proc_sys.h" #include "proc_kv.h" #include "proc_hash.h" @@ -22,12 +23,16 @@ DEF_PROC(cluster_set_kv_range); DEF_PROC(cluster_set_kv_status); DEF_PROC(cluster_migrate_kv_data); -#define REG_PROC(c, f) net->proc_map.set_proc(#c, f, proc_##c) +#define REG_PROC(c, f) net->proc_map.set_proc(#c, f, proc_##c) -void SSDBServer::reg_procs(NetworkServer *net){ +void SSDBServer::reg_procs(NetworkServer *net) +{ + // redis compatiable operations + REG_PROC(del, "wt"); + // key-value operations REG_PROC(get, "rt"); REG_PROC(set, "wt"); - REG_PROC(del, "wt"); + REG_PROC(kdel, "wt"); REG_PROC(setx, "wt"); REG_PROC(setnx, "wt"); REG_PROC(getset, "wt"); @@ -51,7 +56,7 @@ void SSDBServer::reg_procs(NetworkServer *net){ REG_PROC(multi_del, "wt"); REG_PROC(ttl, "rt"); REG_PROC(expire, "wt"); - + // hash operations REG_PROC(hsize, "rt"); REG_PROC(hget, "rt"); REG_PROC(hset, "wt"); @@ -107,7 +112,7 @@ void SSDBServer::reg_procs(NetworkServer *net){ REG_PROC(multi_zdel, "wt"); REG_PROC(zpop_front, "wt"); REG_PROC(zpop_back, "wt"); - + // queue operations REG_PROC(qsize, "rt"); REG_PROC(qfront, "rt"); REG_PROC(qback, "rt"); @@ -127,7 +132,7 @@ void SSDBServer::reg_procs(NetworkServer *net){ REG_PROC(qrange, "rt"); REG_PROC(qget, "rt"); REG_PROC(qset, "wt"); - + // sys operations REG_PROC(clear_binlog, "wt"); REG_PROC(flushdb, "wt"); @@ -145,7 +150,7 @@ void SSDBServer::reg_procs(NetworkServer *net){ REG_PROC(get_key_range, "r"); REG_PROC(get_kv_range, "r"); REG_PROC(set_kv_range, "r"); - + // cluster operations REG_PROC(cluster_add_kv_node, "r"); REG_PROC(cluster_del_kv_node, "r"); REG_PROC(cluster_kv_node_list, "r"); @@ -154,8 +159,8 @@ void SSDBServer::reg_procs(NetworkServer *net){ REG_PROC(cluster_migrate_kv_data, "r"); } - -SSDBServer::SSDBServer(SSDB *ssdb, SSDB *meta, const Config &conf, NetworkServer *net){ +SSDBServer::SSDBServer(SSDB *ssdb, SSDB *meta, const Config &conf, NetworkServer *net) +{ this->ssdb = (SSDBImpl *)ssdb; this->meta = meta; @@ -167,43 +172,52 @@ SSDBServer::SSDBServer(SSDB *ssdb, SSDB *meta, const Config &conf, NetworkServer backend_dump = new BackendDump(this->ssdb); backend_sync = new BackendSync(this->ssdb, sync_speed); expiration = new ExpirationHandler(this->ssdb); - + cluster = new Cluster(this->ssdb); - if(cluster->init() == -1){ + if (cluster->init() == -1) + { log_fatal("cluster init failed!"); exit(1); } { // slaves const Config *repl_conf = conf.get("replication"); - if(repl_conf != NULL){ + if (repl_conf != NULL) + { std::vector children = repl_conf->children; - for(std::vector::iterator it = children.begin(); it != children.end(); it++){ + for (std::vector::iterator it = children.begin(); it != children.end(); it++) + { Config *c = *it; - if(c->key != "slaveof"){ + if (c->key != "slaveof") + { continue; } std::string ip = c->get_str("ip"); int port = c->get_num("port"); - if(ip == ""){ + if (ip == "") + { ip = c->get_str("host"); } - if(ip == "" || port <= 0 || port > 65535){ + if (ip == "" || port <= 0 || port > 65535) + { continue; } bool is_mirror = false; std::string type = c->get_str("type"); - if(type == "mirror"){ + if (type == "mirror") + { is_mirror = true; - }else{ + } + else + { type = "sync"; is_mirror = false; } - + std::string id = c->get_str("id"); std::string auth = c->get_str("auth"); int recv_timeout = c->get_num("recv_timeout"); - + log_info("slaveof: %s:%d, type: %s", ip.c_str(), port, type.c_str()); this->slaveof(id, ip, port, auth, 0, "", is_mirror, recv_timeout); } @@ -212,19 +226,21 @@ SSDBServer::SSDBServer(SSDB *ssdb, SSDB *meta, const Config &conf, NetworkServer // load kv_range int ret = this->get_kv_range(&this->kv_range_s, &this->kv_range_e); - if(ret == -1){ + if (ret == -1) + { log_fatal("load key_range failed!"); exit(1); } log_info("key_range.kv: \"%s\", \"%s\"", - str_escape(this->kv_range_s).c_str(), - str_escape(this->kv_range_e).c_str() - ); + str_escape(this->kv_range_s).c_str(), + str_escape(this->kv_range_e).c_str()); } -SSDBServer::~SSDBServer(){ +SSDBServer::~SSDBServer() +{ std::vector::iterator it; - for(it = slaves.begin(); it != slaves.end(); it++){ + for (it = slaves.begin(); it != slaves.end(); it++) + { Slave *slave = *it; slave->stop(); delete slave; @@ -238,12 +254,15 @@ SSDBServer::~SSDBServer(){ log_debug("SSDBServer finalized"); } -int SSDBServer::slaveof(const std::string &id, const std::string &host, int port, const std::string &auth, uint64_t last_seq, const std::string &last_key, bool is_mirror, int recv_timeout){ +int SSDBServer::slaveof(const std::string &id, const std::string &host, int port, const std::string &auth, uint64_t last_seq, const std::string &last_key, bool is_mirror, int recv_timeout) +{ Slave *slave = new Slave(ssdb, meta, host.c_str(), port, is_mirror); - if(!id.empty()){ + if (!id.empty()) + { slave->set_id(id); } - if(recv_timeout > 0){ + if (recv_timeout > 0) + { slave->recv_timeout = recv_timeout; } slave->last_seq = last_seq; @@ -254,11 +273,14 @@ int SSDBServer::slaveof(const std::string &id, const std::string &host, int port return 0; } -int SSDBServer::set_kv_range(const std::string &start, const std::string &end){ - if(meta->hset("key_range", "kv_s", start) == -1){ +int SSDBServer::set_kv_range(const std::string &start, const std::string &end) +{ + if (meta->hset("key_range", "kv_s", start) == -1) + { return -1; } - if(meta->hset("key_range", "kv_e", end) == -1){ + if (meta->hset("key_range", "kv_e", end) == -1) + { return -1; } @@ -267,31 +289,33 @@ int SSDBServer::set_kv_range(const std::string &start, const std::string &end){ return 0; } -int SSDBServer::get_kv_range(std::string *start, std::string *end){ - if(meta->hget("key_range", "kv_s", start) == -1){ +int SSDBServer::get_kv_range(std::string *start, std::string *end) +{ + if (meta->hget("key_range", "kv_s", start) == -1) + { return -1; } - if(meta->hget("key_range", "kv_e", end) == -1){ + if (meta->hget("key_range", "kv_e", end) == -1) + { return -1; } return 0; } -bool SSDBServer::in_kv_range(const Bytes &key){ - if((this->kv_range_s.size() && this->kv_range_s >= key) - || (this->kv_range_e.size() && this->kv_range_e < key)) +bool SSDBServer::in_kv_range(const Bytes &key) +{ + if ((this->kv_range_s.size() && this->kv_range_s >= key) || (this->kv_range_e.size() && this->kv_range_e < key)) { return false; } return true; } -bool SSDBServer::in_kv_range(const std::string &key){ - if((this->kv_range_s.size() && this->kv_range_s >= key) - || (this->kv_range_e.size() && this->kv_range_e < key)) +bool SSDBServer::in_kv_range(const std::string &key) +{ + if ((this->kv_range_s.size() && this->kv_range_s >= key) || (this->kv_range_e.size() && this->kv_range_e < key)) { return false; } return true; } - From dd27455d77fd280a81ae01c3648b159f7c9ff45f Mon Sep 17 00:00:00 2001 From: yuanx Date: Mon, 16 Dec 2019 16:33:34 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E6=9B=B4=E6=94=B9multi=5Fdel=E4=B8=BAmulti?= =?UTF-8?q?=5Fkdel,=E5=88=9B=E5=BB=BAmulti=5Fdel=E8=83=BD=E5=A4=9F?= =?UTF-8?q?=E5=88=A0=E9=99=A4=E6=89=80=E6=9C=89=E7=B1=BB=E5=9E=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/proc_kv.cpp | 4 +- src/proc_kv.h | 2 +- src/proc_redis_compatible.cpp | 49 ++++ src/proc_redis_compatible.h | 3 +- src/serv.cpp | 522 ++++++++++++++++------------------ 5 files changed, 305 insertions(+), 275 deletions(-) diff --git a/src/proc_kv.cpp b/src/proc_kv.cpp index 3b35d17df..e5872848b 100644 --- a/src/proc_kv.cpp +++ b/src/proc_kv.cpp @@ -162,8 +162,8 @@ int proc_multi_set(NetworkServer *net, Link *link, const Request &req, return 0; } -int proc_multi_del(NetworkServer *net, Link *link, const Request &req, - Response *resp) { +int proc_multi_kdel(NetworkServer *net, Link *link, const Request &req, + Response *resp) { SSDBServer *serv = (SSDBServer *)net->data; CHECK_NUM_PARAMS(2); diff --git a/src/proc_kv.h b/src/proc_kv.h index d72b74cbe..5328b17e0 100644 --- a/src/proc_kv.h +++ b/src/proc_kv.h @@ -31,6 +31,6 @@ DEF_PROC(exists); DEF_PROC(multi_exists); DEF_PROC(multi_get); DEF_PROC(multi_set); -DEF_PROC(multi_del); +DEF_PROC(multi_kdel); DEF_PROC(ttl); DEF_PROC(expire); diff --git a/src/proc_redis_compatible.cpp b/src/proc_redis_compatible.cpp index 133c205db..3c212bcec 100644 --- a/src/proc_redis_compatible.cpp +++ b/src/proc_redis_compatible.cpp @@ -61,4 +61,53 @@ int proc_del(NetworkServer *net, Link *link, const Request &req, } resp->reply_int(0, count); return 0; +} +int proc_multi_del(NetworkServer *net, Link *link, const Request &req, + Response *resp) { + SSDBServer *serv = (SSDBServer *)net->data; + CHECK_NUM_PARAMS(2); + // 删除Key-value + Locking *locker = new Locking(&serv->expiration->mutex); + if (serv->ssdb->multi_del(req, 1) != -1) { + for (Request::const_iterator it = req.begin() + 1; it != req.end(); it++) { + serv->expiration->del_ttl(*it); + } + } else { + resp->push_back("error"); + return 0; + } + delete locker; + // 删除hash(hclear) + for (Request::const_iterator it = req.begin() + 1; it != req.end(); it++) { + serv->ssdb->hclear(*it); + } + // 删除zset + for (Request::const_iterator it = req.begin() + 1; it != req.end();) { + ZIterator *zit = serv->ssdb->zscan(*it, "", "", "", 1000); + int num = 0; + while (zit->next()) { + if (serv->ssdb->zdel(*it, zit->key) == -1) { + resp->push_back("error"); + delete zit; + return 0; + } + num++; + } + delete zit; + if (num == 0) { + it++; + } + } + // 删除queue + for (Request::const_iterator it = req.begin() + 1; it != req.end(); it++) { + int ret; + while ((ret = serv->ssdb->qpop_front(*it, NULL))) { + if (ret == -1) { + resp->push_back("error"); + return 0; + } + }; + } + resp->reply_int(0, req.size() - 1); + return 0; } \ No newline at end of file diff --git a/src/proc_redis_compatible.h b/src/proc_redis_compatible.h index c80897887..8875f8b6d 100644 --- a/src/proc_redis_compatible.h +++ b/src/proc_redis_compatible.h @@ -2,4 +2,5 @@ #include "net/server.h" #include "serv.h" -DEF_PROC(del); \ No newline at end of file +DEF_PROC(del); +DEF_PROC(multi_del); \ No newline at end of file diff --git a/src/serv.cpp b/src/serv.cpp index b68a6342c..29d57e17c 100644 --- a/src/serv.cpp +++ b/src/serv.cpp @@ -3,18 +3,18 @@ Copyright (c) 2012-2014 The SSDB Authors. All rights reserved. Use of this source code is governed by a BSD-style license that can be found in the LICENSE file. */ -#include "version.h" -#include "util/log.h" -#include "util/strings.h" #include "serv.h" #include "net/proc.h" #include "net/server.h" +#include "proc_hash.h" +#include "proc_kv.h" +#include "proc_queue.h" #include "proc_redis_compatible.h" #include "proc_sys.h" -#include "proc_kv.h" -#include "proc_hash.h" #include "proc_zset.h" -#include "proc_queue.h" +#include "util/log.h" +#include "util/strings.h" +#include "version.h" DEF_PROC(cluster_add_kv_node); DEF_PROC(cluster_del_kv_node); @@ -25,297 +25,277 @@ DEF_PROC(cluster_migrate_kv_data); #define REG_PROC(c, f) net->proc_map.set_proc(#c, f, proc_##c) -void SSDBServer::reg_procs(NetworkServer *net) -{ - // redis compatiable operations - REG_PROC(del, "wt"); - // key-value operations - REG_PROC(get, "rt"); - REG_PROC(set, "wt"); - REG_PROC(kdel, "wt"); - REG_PROC(setx, "wt"); - REG_PROC(setnx, "wt"); - REG_PROC(getset, "wt"); - REG_PROC(getbit, "rt"); - REG_PROC(setbit, "wt"); - REG_PROC(countbit, "rt"); - REG_PROC(substr, "rt"); - REG_PROC(getrange, "rt"); - REG_PROC(strlen, "rt"); - REG_PROC(bitcount, "rt"); - REG_PROC(incr, "wt"); - REG_PROC(decr, "wt"); - REG_PROC(scan, "rt"); - REG_PROC(rscan, "rt"); - REG_PROC(keys, "rt"); - REG_PROC(rkeys, "rt"); - REG_PROC(exists, "rt"); - REG_PROC(multi_exists, "rt"); - REG_PROC(multi_get, "rt"); - REG_PROC(multi_set, "wt"); - REG_PROC(multi_del, "wt"); - REG_PROC(ttl, "rt"); - REG_PROC(expire, "wt"); - // hash operations - REG_PROC(hsize, "rt"); - REG_PROC(hget, "rt"); - REG_PROC(hset, "wt"); - REG_PROC(hdel, "wt"); - REG_PROC(hincr, "wt"); - REG_PROC(hdecr, "wt"); - REG_PROC(hclear, "wt"); - REG_PROC(hgetall, "rt"); - REG_PROC(hscan, "rt"); - REG_PROC(hrscan, "rt"); - REG_PROC(hkeys, "rt"); - REG_PROC(hvals, "rt"); - REG_PROC(hlist, "rt"); - REG_PROC(hrlist, "rt"); - REG_PROC(hexists, "rt"); - REG_PROC(multi_hexists, "rt"); - REG_PROC(multi_hsize, "rt"); - REG_PROC(multi_hget, "rt"); - REG_PROC(multi_hset, "wt"); - REG_PROC(multi_hdel, "wt"); - REG_PROC(hfix, "wt"); +void SSDBServer::reg_procs(NetworkServer *net) { + // redis compatiable operations + REG_PROC(del, "wt"); + REG_PROC(multi_del, "wt"); + // key-value operations + REG_PROC(get, "rt"); + REG_PROC(set, "wt"); + REG_PROC(kdel, "wt"); + REG_PROC(setx, "wt"); + REG_PROC(setnx, "wt"); + REG_PROC(getset, "wt"); + REG_PROC(getbit, "rt"); + REG_PROC(setbit, "wt"); + REG_PROC(countbit, "rt"); + REG_PROC(substr, "rt"); + REG_PROC(getrange, "rt"); + REG_PROC(strlen, "rt"); + REG_PROC(bitcount, "rt"); + REG_PROC(incr, "wt"); + REG_PROC(decr, "wt"); + REG_PROC(scan, "rt"); + REG_PROC(rscan, "rt"); + REG_PROC(keys, "rt"); + REG_PROC(rkeys, "rt"); + REG_PROC(exists, "rt"); + REG_PROC(multi_exists, "rt"); + REG_PROC(multi_get, "rt"); + REG_PROC(multi_set, "wt"); + REG_PROC(multi_kdel, "wt"); + REG_PROC(ttl, "rt"); + REG_PROC(expire, "wt"); + // hash operations + REG_PROC(hsize, "rt"); + REG_PROC(hget, "rt"); + REG_PROC(hset, "wt"); + REG_PROC(hdel, "wt"); + REG_PROC(hincr, "wt"); + REG_PROC(hdecr, "wt"); + REG_PROC(hclear, "wt"); + REG_PROC(hgetall, "rt"); + REG_PROC(hscan, "rt"); + REG_PROC(hrscan, "rt"); + REG_PROC(hkeys, "rt"); + REG_PROC(hvals, "rt"); + REG_PROC(hlist, "rt"); + REG_PROC(hrlist, "rt"); + REG_PROC(hexists, "rt"); + REG_PROC(multi_hexists, "rt"); + REG_PROC(multi_hsize, "rt"); + REG_PROC(multi_hget, "rt"); + REG_PROC(multi_hset, "wt"); + REG_PROC(multi_hdel, "wt"); + REG_PROC(hfix, "wt"); - // because zrank may be extremly slow, execute in a seperate thread - REG_PROC(zrank, "rt"); - REG_PROC(zrrank, "rt"); - REG_PROC(zrange, "rt"); - REG_PROC(zrrange, "rt"); - REG_PROC(redis_zrange, "rt"); - REG_PROC(redis_zrrange, "rt"); - REG_PROC(zsize, "rt"); - REG_PROC(zget, "rt"); - REG_PROC(zset, "wt"); - REG_PROC(zdel, "wt"); - REG_PROC(zincr, "wt"); - REG_PROC(zdecr, "wt"); - REG_PROC(zclear, "wt"); - REG_PROC(zfix, "wt"); - REG_PROC(zscan, "rt"); - REG_PROC(zrscan, "rt"); - REG_PROC(zkeys, "rt"); - REG_PROC(zlist, "rt"); - REG_PROC(zrlist, "rt"); - REG_PROC(zcount, "rt"); - REG_PROC(zsum, "rt"); - REG_PROC(zavg, "rt"); - REG_PROC(zremrangebyrank, "wt"); - REG_PROC(zremrangebyscore, "wt"); - REG_PROC(zexists, "rt"); - REG_PROC(multi_zexists, "rt"); - REG_PROC(multi_zsize, "rt"); - REG_PROC(multi_zget, "rt"); - REG_PROC(multi_zset, "wt"); - REG_PROC(multi_zdel, "wt"); - REG_PROC(zpop_front, "wt"); - REG_PROC(zpop_back, "wt"); - // queue operations - REG_PROC(qsize, "rt"); - REG_PROC(qfront, "rt"); - REG_PROC(qback, "rt"); - REG_PROC(qpush, "wt"); - REG_PROC(qpush_front, "wt"); - REG_PROC(qpush_back, "wt"); - REG_PROC(qpop, "wt"); - REG_PROC(qpop_front, "wt"); - REG_PROC(qpop_back, "wt"); - REG_PROC(qtrim_front, "wt"); - REG_PROC(qtrim_back, "wt"); - REG_PROC(qfix, "wt"); - REG_PROC(qclear, "wt"); - REG_PROC(qlist, "rt"); - REG_PROC(qrlist, "rt"); - REG_PROC(qslice, "rt"); - REG_PROC(qrange, "rt"); - REG_PROC(qget, "rt"); - REG_PROC(qset, "wt"); - // sys operations - REG_PROC(clear_binlog, "wt"); - REG_PROC(flushdb, "wt"); + // because zrank may be extremly slow, execute in a seperate thread + REG_PROC(zrank, "rt"); + REG_PROC(zrrank, "rt"); + REG_PROC(zrange, "rt"); + REG_PROC(zrrange, "rt"); + REG_PROC(redis_zrange, "rt"); + REG_PROC(redis_zrrange, "rt"); + REG_PROC(zsize, "rt"); + REG_PROC(zget, "rt"); + REG_PROC(zset, "wt"); + REG_PROC(zdel, "wt"); + REG_PROC(zincr, "wt"); + REG_PROC(zdecr, "wt"); + REG_PROC(zclear, "wt"); + REG_PROC(zfix, "wt"); + REG_PROC(zscan, "rt"); + REG_PROC(zrscan, "rt"); + REG_PROC(zkeys, "rt"); + REG_PROC(zlist, "rt"); + REG_PROC(zrlist, "rt"); + REG_PROC(zcount, "rt"); + REG_PROC(zsum, "rt"); + REG_PROC(zavg, "rt"); + REG_PROC(zremrangebyrank, "wt"); + REG_PROC(zremrangebyscore, "wt"); + REG_PROC(zexists, "rt"); + REG_PROC(multi_zexists, "rt"); + REG_PROC(multi_zsize, "rt"); + REG_PROC(multi_zget, "rt"); + REG_PROC(multi_zset, "wt"); + REG_PROC(multi_zdel, "wt"); + REG_PROC(zpop_front, "wt"); + REG_PROC(zpop_back, "wt"); + // queue operations + REG_PROC(qsize, "rt"); + REG_PROC(qfront, "rt"); + REG_PROC(qback, "rt"); + REG_PROC(qpush, "wt"); + REG_PROC(qpush_front, "wt"); + REG_PROC(qpush_back, "wt"); + REG_PROC(qpop, "wt"); + REG_PROC(qpop_front, "wt"); + REG_PROC(qpop_back, "wt"); + REG_PROC(qtrim_front, "wt"); + REG_PROC(qtrim_back, "wt"); + REG_PROC(qfix, "wt"); + REG_PROC(qclear, "wt"); + REG_PROC(qlist, "rt"); + REG_PROC(qrlist, "rt"); + REG_PROC(qslice, "rt"); + REG_PROC(qrange, "rt"); + REG_PROC(qget, "rt"); + REG_PROC(qset, "wt"); + // sys operations + REG_PROC(clear_binlog, "wt"); + REG_PROC(flushdb, "wt"); - REG_PROC(dump, "b"); - REG_PROC(sync140, "b"); - REG_PROC(slaveof, "w"); - REG_PROC(info, "rt"); - REG_PROC(version, "r"); - REG_PROC(dbsize, "rt"); - // doing compaction in a reader thread, because we have only one - // writer thread(for performance reason); we don't want to block writes - REG_PROC(compact, "rt"); + REG_PROC(dump, "b"); + REG_PROC(sync140, "b"); + REG_PROC(slaveof, "w"); + REG_PROC(info, "rt"); + REG_PROC(version, "r"); + REG_PROC(dbsize, "rt"); + // doing compaction in a reader thread, because we have only one + // writer thread(for performance reason); we don't want to block writes + REG_PROC(compact, "rt"); - REG_PROC(ignore_key_range, "r"); - REG_PROC(get_key_range, "r"); - REG_PROC(get_kv_range, "r"); - REG_PROC(set_kv_range, "r"); - // cluster operations - REG_PROC(cluster_add_kv_node, "r"); - REG_PROC(cluster_del_kv_node, "r"); - REG_PROC(cluster_kv_node_list, "r"); - REG_PROC(cluster_set_kv_range, "r"); - REG_PROC(cluster_set_kv_status, "r"); - REG_PROC(cluster_migrate_kv_data, "r"); + REG_PROC(ignore_key_range, "r"); + REG_PROC(get_key_range, "r"); + REG_PROC(get_kv_range, "r"); + REG_PROC(set_kv_range, "r"); + // cluster operations + REG_PROC(cluster_add_kv_node, "r"); + REG_PROC(cluster_del_kv_node, "r"); + REG_PROC(cluster_kv_node_list, "r"); + REG_PROC(cluster_set_kv_range, "r"); + REG_PROC(cluster_set_kv_status, "r"); + REG_PROC(cluster_migrate_kv_data, "r"); } -SSDBServer::SSDBServer(SSDB *ssdb, SSDB *meta, const Config &conf, NetworkServer *net) -{ - this->ssdb = (SSDBImpl *)ssdb; - this->meta = meta; +SSDBServer::SSDBServer(SSDB *ssdb, SSDB *meta, const Config &conf, + NetworkServer *net) { + this->ssdb = (SSDBImpl *)ssdb; + this->meta = meta; - net->data = this; - this->reg_procs(net); + net->data = this; + this->reg_procs(net); - int sync_speed = conf.get_num("replication.sync_speed"); + int sync_speed = conf.get_num("replication.sync_speed"); - backend_dump = new BackendDump(this->ssdb); - backend_sync = new BackendSync(this->ssdb, sync_speed); - expiration = new ExpirationHandler(this->ssdb); + backend_dump = new BackendDump(this->ssdb); + backend_sync = new BackendSync(this->ssdb, sync_speed); + expiration = new ExpirationHandler(this->ssdb); - cluster = new Cluster(this->ssdb); - if (cluster->init() == -1) - { - log_fatal("cluster init failed!"); - exit(1); - } + cluster = new Cluster(this->ssdb); + if (cluster->init() == -1) { + log_fatal("cluster init failed!"); + exit(1); + } - { // slaves - const Config *repl_conf = conf.get("replication"); - if (repl_conf != NULL) - { - std::vector children = repl_conf->children; - for (std::vector::iterator it = children.begin(); it != children.end(); it++) - { - Config *c = *it; - if (c->key != "slaveof") - { - continue; - } - std::string ip = c->get_str("ip"); - int port = c->get_num("port"); - if (ip == "") - { - ip = c->get_str("host"); - } - if (ip == "" || port <= 0 || port > 65535) - { - continue; - } - bool is_mirror = false; - std::string type = c->get_str("type"); - if (type == "mirror") - { - is_mirror = true; - } - else - { - type = "sync"; - is_mirror = false; - } + { // slaves + const Config *repl_conf = conf.get("replication"); + if (repl_conf != NULL) { + std::vector children = repl_conf->children; + for (std::vector::iterator it = children.begin(); + it != children.end(); it++) { + Config *c = *it; + if (c->key != "slaveof") { + continue; + } + std::string ip = c->get_str("ip"); + int port = c->get_num("port"); + if (ip == "") { + ip = c->get_str("host"); + } + if (ip == "" || port <= 0 || port > 65535) { + continue; + } + bool is_mirror = false; + std::string type = c->get_str("type"); + if (type == "mirror") { + is_mirror = true; + } else { + type = "sync"; + is_mirror = false; + } - std::string id = c->get_str("id"); - std::string auth = c->get_str("auth"); - int recv_timeout = c->get_num("recv_timeout"); + std::string id = c->get_str("id"); + std::string auth = c->get_str("auth"); + int recv_timeout = c->get_num("recv_timeout"); - log_info("slaveof: %s:%d, type: %s", ip.c_str(), port, type.c_str()); - this->slaveof(id, ip, port, auth, 0, "", is_mirror, recv_timeout); - } - } - } + log_info("slaveof: %s:%d, type: %s", ip.c_str(), port, type.c_str()); + this->slaveof(id, ip, port, auth, 0, "", is_mirror, recv_timeout); + } + } + } - // load kv_range - int ret = this->get_kv_range(&this->kv_range_s, &this->kv_range_e); - if (ret == -1) - { - log_fatal("load key_range failed!"); - exit(1); - } - log_info("key_range.kv: \"%s\", \"%s\"", - str_escape(this->kv_range_s).c_str(), - str_escape(this->kv_range_e).c_str()); + // load kv_range + int ret = this->get_kv_range(&this->kv_range_s, &this->kv_range_e); + if (ret == -1) { + log_fatal("load key_range failed!"); + exit(1); + } + log_info("key_range.kv: \"%s\", \"%s\"", str_escape(this->kv_range_s).c_str(), + str_escape(this->kv_range_e).c_str()); } -SSDBServer::~SSDBServer() -{ - std::vector::iterator it; - for (it = slaves.begin(); it != slaves.end(); it++) - { - Slave *slave = *it; - slave->stop(); - delete slave; - } +SSDBServer::~SSDBServer() { + std::vector::iterator it; + for (it = slaves.begin(); it != slaves.end(); it++) { + Slave *slave = *it; + slave->stop(); + delete slave; + } - delete backend_dump; - delete backend_sync; - delete expiration; - delete cluster; + delete backend_dump; + delete backend_sync; + delete expiration; + delete cluster; - log_debug("SSDBServer finalized"); + log_debug("SSDBServer finalized"); } -int SSDBServer::slaveof(const std::string &id, const std::string &host, int port, const std::string &auth, uint64_t last_seq, const std::string &last_key, bool is_mirror, int recv_timeout) -{ - Slave *slave = new Slave(ssdb, meta, host.c_str(), port, is_mirror); - if (!id.empty()) - { - slave->set_id(id); - } - if (recv_timeout > 0) - { - slave->recv_timeout = recv_timeout; - } - slave->last_seq = last_seq; - slave->last_key = last_key; - slave->auth = auth; - slave->start(); - slaves.push_back(slave); - return 0; +int SSDBServer::slaveof(const std::string &id, const std::string &host, + int port, const std::string &auth, uint64_t last_seq, + const std::string &last_key, bool is_mirror, + int recv_timeout) { + Slave *slave = new Slave(ssdb, meta, host.c_str(), port, is_mirror); + if (!id.empty()) { + slave->set_id(id); + } + if (recv_timeout > 0) { + slave->recv_timeout = recv_timeout; + } + slave->last_seq = last_seq; + slave->last_key = last_key; + slave->auth = auth; + slave->start(); + slaves.push_back(slave); + return 0; } -int SSDBServer::set_kv_range(const std::string &start, const std::string &end) -{ - if (meta->hset("key_range", "kv_s", start) == -1) - { - return -1; - } - if (meta->hset("key_range", "kv_e", end) == -1) - { - return -1; - } +int SSDBServer::set_kv_range(const std::string &start, const std::string &end) { + if (meta->hset("key_range", "kv_s", start) == -1) { + return -1; + } + if (meta->hset("key_range", "kv_e", end) == -1) { + return -1; + } - kv_range_s = start; - kv_range_e = end; - return 0; + kv_range_s = start; + kv_range_e = end; + return 0; } -int SSDBServer::get_kv_range(std::string *start, std::string *end) -{ - if (meta->hget("key_range", "kv_s", start) == -1) - { - return -1; - } - if (meta->hget("key_range", "kv_e", end) == -1) - { - return -1; - } - return 0; +int SSDBServer::get_kv_range(std::string *start, std::string *end) { + if (meta->hget("key_range", "kv_s", start) == -1) { + return -1; + } + if (meta->hget("key_range", "kv_e", end) == -1) { + return -1; + } + return 0; } -bool SSDBServer::in_kv_range(const Bytes &key) -{ - if ((this->kv_range_s.size() && this->kv_range_s >= key) || (this->kv_range_e.size() && this->kv_range_e < key)) - { - return false; - } - return true; +bool SSDBServer::in_kv_range(const Bytes &key) { + if ((this->kv_range_s.size() && this->kv_range_s >= key) || + (this->kv_range_e.size() && this->kv_range_e < key)) { + return false; + } + return true; } -bool SSDBServer::in_kv_range(const std::string &key) -{ - if ((this->kv_range_s.size() && this->kv_range_s >= key) || (this->kv_range_e.size() && this->kv_range_e < key)) - { - return false; - } - return true; +bool SSDBServer::in_kv_range(const std::string &key) { + if ((this->kv_range_s.size() && this->kv_range_s >= key) || + (this->kv_range_e.size() && this->kv_range_e < key)) { + return false; + } + return true; }