Skip to content

Commit

Permalink
feat: support redis-compatible-mode, add comment, update logo (#5)
Browse files Browse the repository at this point in the history
  • Loading branch information
Iam-WenYi authored Aug 31, 2024
2 parents 19b4c96 + 7968e0c commit f1fd41e
Show file tree
Hide file tree
Showing 10 changed files with 195 additions and 95 deletions.
10 changes: 7 additions & 3 deletions etc/conf/pikiwidb.conf → etc/conf/kiwi.conf
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ logfile stdout
# dbid is a number between 0 and 'databases'-1
databases 16

# Enable redis compatible mode or not

redis_compatible_mode true

################################ SNAPSHOTTING #################################
#
# Save the DB on disk:
Expand Down Expand Up @@ -341,8 +345,8 @@ rocksdb-level0-stop-writes-trigger 36
rocksdb-ttl-second 604800
# default 86400 * 3
rocksdb-periodic-second 259200;

0
############################### RAFT ###############################
use-raft no
use-raft false
# Braft relies on brpc to communicate via the default port number plus the port offset
raft-port-offset 10
raft-port-offset 10
53 changes: 0 additions & 53 deletions src/base_cmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,31 +229,6 @@ enum AclCategory {
*/
class BaseCmd : public std::enable_shared_from_this<BaseCmd> {
public:
// 这些感觉不需要了

// enum CmdStage { kNone, kBinlogStage, kExecuteStage };
// struct HintKeys {
// HintKeys() = default;
// void Push(const std::string& key, int hint) {
// keys.push_back(key);
// hints.push_back(hint);
// }
// bool empty() const { return keys.empty() && hints.empty(); }
// std::vector<std::string> keys;
// std::vector<int> hints;
// };

// struct CommandStatistics {
// CommandStatistics() = default;
// CommandStatistics(const CommandStatistics& other) {
// cmd_time_consuming.store(other.cmd_time_consuming.load());
// cmd_count.store(other.cmd_count.load());
// }
// std::atomic<int32_t> cmd_count = {0};
// std::atomic<int32_t> cmd_time_consuming = {0};
// };
// CommandStatistics state;

/**
* @brief Construct a new Base Cmd object
* @param name command name
Expand Down Expand Up @@ -281,17 +256,6 @@ class BaseCmd : public std::enable_shared_from_this<BaseCmd> {
uint64_t offset);
virtual void DoBinlog();

// virtual void ProcessFlushDBCmd();
// virtual void ProcessFlushAllCmd();
// virtual void ProcessSingleSlotCmd();
// virtual void ProcessMultiSlotCmd();
// virtual void ProcessDoNotSpecifySlotCmd();
// virtual void Do(std::shared_ptr<Slot> slot = nullptr) = 0;
// virtual Cmd* Clone() = 0;
// used for execute multikey command into different slots
// virtual void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) = 0;
// virtual void Merge() = 0;

bool HasFlag(uint32_t flag) const;
void SetFlag(uint32_t flag);
void ResetFlag(uint32_t flag);
Expand All @@ -306,16 +270,6 @@ class BaseCmd : public std::enable_shared_from_this<BaseCmd> {
uint32_t AclCategory() const;
void AddAclCategory(uint32_t aclCategory);
std::string Name() const;
// CmdRes& Res();
// std::string db_name() const;
// BinlogOffset binlog_offset() const;
// PikaCmdArgsType& argv();

// void SetConn(const std::shared_ptr<net::NetConn>& conn);
// std::shared_ptr<net::NetConn> GetConn();

// void SetResp(const std::shared_ptr<std::string>& resp);
// std::shared_ptr<std::string> GetResp();

uint32_t GetCmdID() const;

Expand All @@ -326,13 +280,6 @@ class BaseCmd : public std::enable_shared_from_this<BaseCmd> {
std::string name_;
int16_t arity_ = 0;
uint32_t flag_ = 0;

// CmdRes res_;
// std::string dbName_;
// std::weak_ptr<net::NetConn> conn_;
// std::weak_ptr<std::string> resp_;
// uint64_t doDuration_ = 0;

uint32_t cmd_id_ = 0;
uint32_t acl_category_ = 0;

Expand Down
11 changes: 10 additions & 1 deletion src/cmd_kv.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@
*/

#include "cmd_kv.h"
#include <cstddef>
#include <cstdint>
#include "common.h"
#include "config.h"
#include "pstd_string.h"
#include "pstd_util.h"
#include "store.h"
Expand Down Expand Up @@ -674,11 +677,17 @@ bool SetRangeCmd::DoInitial(PClient* client) {

void SetRangeCmd::DoCmd(PClient* client) {
int64_t offset = 0;

if (!(pstd::String2int(client->argv_[2].data(), client->argv_[2].size(), &offset))) {
client->SetRes(CmdRes::kInvalidInt);
return;
}

// Ref: https://redis.io/docs/latest/commands/setrange/
if (g_config.redis_compatible_mode && std::atoi(client->argv_[2].c_str()) > 536870911) {
client->SetRes(CmdRes::kErrOther,
"When Redis compatibility mode is enabled, the offset parameter must not exceed 536870911");
return;
}
int32_t ret = 0;
storage::Status s =
PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->Setrange(client->Key(), offset, client->argv_[3], &ret);
Expand Down
1 change: 1 addition & 0 deletions src/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ Status NumberValue<T>::SetValue(const std::string& value) {
}

PConfig::PConfig() {
AddBool("redis-compatible-mode", &CheckYesNo, true, {&redis_compatible_mode});
AddBool("daemonize", &CheckYesNo, false, &daemonize);
AddString("ip", false, {&ip});
AddNumberWithLimit<uint16_t>("port", false, &port, PORT_LIMIT_MIN, PORT_LIMIT_MAX);
Expand Down
11 changes: 8 additions & 3 deletions src/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
#include <map>
#include <memory>
#include <shared_mutex>
#include <string>
#include <cstring>
#include <cstdint>
#include <cstddef>
#include <unordered_map>
#include <utility>
#include <vector>
Expand Down Expand Up @@ -133,7 +135,7 @@ class PConfig {

/*------------------------
* ~PConfig()
* Destroy a kiwi instance.
* Destroy a kiwi's config instance.
*/
~PConfig() = default;

Expand Down Expand Up @@ -283,6 +285,9 @@ class PConfig {
// The number of databases.
std::atomic<size_t> databases = 16;

// Enable redis_compatioble_mode?
std::atomic_bool redis_compatible_mode = true;

/*
* For Network I/O threads, in future version, we may delete
* slave_threads_num.
Expand All @@ -294,7 +299,7 @@ class PConfig {
std::atomic<size_t> db_instance_num = 3;

// Use raft protocol?
std::atomic_bool use_raft = true;
std::atomic_bool use_raft = false;

/*
* kiwi use the RocksDB to store the data,
Expand Down
38 changes: 24 additions & 14 deletions src/kiwi.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <time.h>
#include <unistd.h>
#include <cstdio>
#include <cstdlib>
#include <iostream>
#include <thread>

Expand All @@ -29,6 +30,7 @@
#include "client.h"
#include "config.h"
#include "helper.h"
#include "kiwi.h"
#include "kiwi_logo.h"
#include "slow_log.h"
#include "store.h"
Expand Down Expand Up @@ -64,11 +66,12 @@ static void Usage() {
std::cerr << " kiwi [/path/to/kiwi.conf] [options]\n";
std::cerr << "\n";
std::cerr << "Options:\n";
std::cerr << " -v, --version output version information, then exit\n";
std::cerr << " -h, --help output help message\n";
std::cerr << " -p PORT, --port PORT Set the port listen on\n";
std::cerr << " -l LEVEL, --loglevel LEVEL Set the log level\n";
std::cerr << " -s ADDRESS, --slaveof ADDRESS Set the slave address\n";
std::cerr << " -v, --version output version information, then exit\n";
std::cerr << " -h, --help output help message\n";
std::cerr << " -p PORT, --port PORT Set the port listen on\n";
std::cerr << " -l LEVEL, --loglevel LEVEL Set the log level\n";
std::cerr << " -s ADDRESS, --slaveof ADDRESS Set the slave address\n";
std::cerr << " -c, --redis-compatible-mode Enable Redis compatibility mode\n";
std::cerr << "Examples:\n";
std::cerr << " kiwi /path/kiwi.conf\n";
std::cerr << " kiwi /path/kiwi.conf --loglevel verbose\n";
Expand All @@ -81,7 +84,7 @@ bool KiwiDB::ParseArgs(int argc, char* argv[]) {
static struct option long_options[] = {
{"version", no_argument, 0, 'v'}, {"help", no_argument, 0, 'h'},
{"port", required_argument, 0, 'p'}, {"loglevel", required_argument, 0, 'l'},
{"slaveof", required_argument, 0, 's'},
{"slaveof", required_argument, 0, 's'}, {"redis-compatible-mode", no_argument, 0, 'c'},
};
// kiwi [/path/to/kiwi.conf] [options]
if (cfg_file_.empty() && argc > 1 && ::access(argv[1], R_OK) == 0) {
Expand All @@ -93,7 +96,7 @@ bool KiwiDB::ParseArgs(int argc, char* argv[]) {
int this_option_optind = optind ? optind : 1;
int option_index = 0;
int c;
c = getopt_long(argc, argv, "vhp:l:s:", long_options, &option_index);
c = getopt_long(argc, argv, "vhp:l:s:c", long_options, &option_index);
if (c == -1) {
break;
}
Expand All @@ -110,7 +113,7 @@ bool KiwiDB::ParseArgs(int argc, char* argv[]) {
std::cerr << "kiwi Server Build GIT SHA: " << Kkiwi_GIT_COMMIT_ID << std::endl;
#endif

exit(0);
std::exit(0);
break;
}
case 'h': {
Expand Down Expand Up @@ -142,6 +145,10 @@ bool KiwiDB::ParseArgs(int argc, char* argv[]) {
}
break;
}
case 'c': {
redis_compatible_mode = true;
break;
}
case '?': {
std::cerr << "Unknow option " << std::endl;
return false;
Expand All @@ -152,8 +159,7 @@ bool KiwiDB::ParseArgs(int argc, char* argv[]) {
return true;
}

void KiwiDB::OnNewConnection(uint64_t connId, std::shared_ptr<kiwi::PClient>& client,
const net::SocketAddr& addr) {
void KiwiDB::OnNewConnection(uint64_t connId, std::shared_ptr<kiwi::PClient>& client, const net::SocketAddr& addr) {
INFO("New connection from {}:{}", addr.GetIP(), addr.GetPort());
client->SetSocketAddr(addr);
client->OnConnect();
Expand All @@ -172,6 +178,10 @@ bool KiwiDB::Init() {
g_config.Set("log-level", log_level_, true);
}

if (redis_compatible_mode) {
g_config.Set("redis_compatible_mode", std::to_string(redis_compatible_mode), true);
}

auto num = g_config.worker_threads_num.load() + g_config.slave_threads_num.load();

// now we only use fast cmd thread pool
Expand All @@ -191,7 +201,7 @@ bool KiwiDB::Init() {
PREPL.SetMasterAddr(g_config.master_ip.ToString().c_str(), g_config.master_port.load());
}

event_server_ = std::make_unique<net::EventServer<std::shared_ptr<PClient>>>(num);
event_server_ =std::make_unique<net::EventServer<std::shared_ptr<PClient>>>(num);

event_server_->SetRwSeparation(true);

Expand Down Expand Up @@ -275,8 +285,8 @@ static int InitLimit() {
limit.rlim_cur = maxfiles;
limit.rlim_max = maxfiles;
if (setrlimit(RLIMIT_NOFILE, &limit) != -1) {
WARN("your 'limit -n' of {} is not enough for kiwi to start. kiwi has successfully reconfig it to {}",
old_limit, limit.rlim_cur);
WARN("your 'limit -n' of {} is not enough for kiwi to start. kiwi has successfully reconfig it to {}", old_limit,
limit.rlim_cur);
} else {
ERROR(
"your 'limit -n ' of {} is not enough for kiwi to start."
Expand Down Expand Up @@ -339,7 +349,7 @@ int main(int argc, char* argv[]) {

if (g_kiwi->Init()) {
// output logo to console
char logo[512] = "";
char logo[1024] = "";
snprintf(logo, sizeof logo - 1, kiwiLogo, Kkiwi_VERSION, static_cast<int>(sizeof(void*)) * 8,
static_cast<int>(g_config.port));
std::cout << logo;
Expand Down
3 changes: 3 additions & 0 deletions src/kiwi.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
Designed a set of functions and variables associated with
the kiwi server.
*/
#pragma once

#include "cmd_table_manager.h"
#include "cmd_thread_pool.h"
Expand Down Expand Up @@ -76,6 +77,8 @@ class KiwiDB final {
PString master_;
uint16_t master_port_{0};

std::atomic<bool> redis_compatible_mode = false;

static const uint32_t kRunidSize;

private:
Expand Down
6 changes: 3 additions & 3 deletions src/kiwi_logo.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ const char* kiwiLogo =
"\n _ _ __ _ \n"
" ( )( ) / )( ) _ _ \n"
" _ _ _ __ _ _ ___ _ _ ______ _| || |_ /' /' | |/') (_) _ _ _ (_) \n"
" /'_` )( '__)/'_` )/' _ `\ /'_` )(______)/'_` || '_`\ /' /' | , < | |( ) ( ) ( )| | \n"
" ( (_| || | ( (_| || ( ) |( (_| | ( (_| || |_) ) /' /' | |\`\ | || \_/ \_/ || | \n"
" `\__,_)(_) `\__,_)(_) (_)`\__,_) `\__,_)(_,__/'(_/' (_) (_)(_)`\___x___/'(_) \n"
" /'_` )( '__)/'_` )/' _ `\\ /'_` )(______)/'_` || '_`\\ /' /' | , < | |( ) ( ) ( )| | \n"
" ( (_| || | ( (_| || ( ) |( (_| | ( (_| || |_) ) /' /' | |\\`\\ | || \\_/ \\_/ || | \n"
" `\\__,_)(_) `\\__,_)(_) (_)`\\__,_) `\\__,_)(_,__/'(_/' (_) (_)(_)`\\___x___/'(_) \n"
"Kiwi(%s) %d bits \n"
"Port: %d\n"
"Github: https://github.com/arana-db/kiwi\n\n\n";
20 changes: 3 additions & 17 deletions src/pstd/memory_file.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ bool InputMemoryFile::Open(const char* file) {

if (file_ == kInvalidFile) {
char err[128];
snprintf(err, sizeof err - 1, "OpenForRead %s failed\n", file);
snprintf(err, sizeof(err - 1), "OpenForRead %s failed\n", file);
return false;
}

Expand Down Expand Up @@ -160,12 +160,6 @@ bool OutputMemoryFile::MapWriteOnly() {
return false;
}

#if 0
// codes below cause coredump when file size > 4MB
if (m_pMemory != kInvalidAddr) {
::munmap(m_pMemory, m_size);
}
#endif
pMemory_ = static_cast<char*>(::mmap(nullptr, size_, PROT_WRITE, MAP_SHARED, file_, 0));
return (pMemory_ != kInvalidAddr);
}
Expand All @@ -185,19 +179,11 @@ void OutputMemoryFile::Truncate(std::size_t size) {
MapWriteOnly();
}

void OutputMemoryFile::TruncateTailZero() {
void inline OutputMemoryFile::TruncateTailZero() {
if (file_ == kInvalidFile) {
return;
}

size_t tail = size_;
while (tail > 0 && pMemory_[--tail] == '\0') {
;
}

++tail;

Truncate(tail);
Truncate(0);
}

bool OutputMemoryFile::IsOpen() const { return file_ != kInvalidFile; }
Expand Down
Loading

0 comments on commit f1fd41e

Please sign in to comment.