diff --git a/etc/conf/kiwi.conf b/etc/conf/kiwi.conf index a793e53..801e678 100644 --- a/etc/conf/kiwi.conf +++ b/etc/conf/kiwi.conf @@ -9,9 +9,7 @@ port 9221 # If you want you can bind a single interface, if the bind option is not # specified all the interfaces will listen for incoming connections. -# -ip 127.0.0.1 - +ips 127.0.0.1 ::1 # Close the connection after a client is idle for N seconds (0 to disable) timeout 0 @@ -348,9 +346,12 @@ rocksdb-level0-stop-writes-trigger 36 # default 86400 * 7 rocksdb-ttl-second 604800 # default 86400 * 3 -rocksdb-periodic-second 259200; - +rocksdb-periodic-second 259200 ############################### RAFT ############################### use-raft no + +# The IP address that the Raft server will bind to +raft-ip 127.0.0.1 + # Braft relies on brpc to communicate via the default port number plus the port offset raft-port-offset 10 diff --git a/src/cmd_admin.cc b/src/cmd_admin.cc index 5ded6de..c8680c9 100644 --- a/src/cmd_admin.cc +++ b/src/cmd_admin.cc @@ -167,8 +167,18 @@ ShutdownCmd::ShutdownCmd(const std::string& name, int16_t arity) bool ShutdownCmd::DoInitial(PClient* client) { // For now, only shutdown need check local - if (client->PeerIP().find("127.0.0.1") == std::string::npos && - client->PeerIP().find(g_config.ip) == std::string::npos) { + bool finded = false; + if (client->PeerIP().find("127.0.0.1") != std::string::npos) { + finded = true; + } else { + for (auto& ip : g_config.ips) { + if (client->PeerIP().find(ip) != std::string::npos) { + finded = true; + break; + } + } + } + if (!finded) { client->SetRes(CmdRes::kErrOther, kCmdNameShutdown + " should be localhost"); return false; } diff --git a/src/config.cc b/src/config.cc index f8a5722..ced58f9 100644 --- a/src/config.cc +++ b/src/config.cc @@ -7,6 +7,7 @@ Responsible for managing the runtime configuration information of kiwi. */ +#include //delete #include #include #include @@ -133,7 +134,8 @@ Status MemorySize::SetValue(const std::string& value) { Config::Config() { AddBool("redis-compatible-mode", &CheckYesNo, true, &redis_compatible_mode); AddBool("daemonize", &CheckYesNo, false, &daemonize); - AddString("ip", false, &ip); + AddStringArray("ips", false, ips); + AddString("raft-ip", false, &raft_ip); AddNumberWithLimit("port", false, &port, PORT_LIMIT_MIN, PORT_LIMIT_MAX); AddNumber("raft-port-offset", true, &raft_port_offset); AddNumber("timeout", true, &timeout); diff --git a/src/config.h b/src/config.h index 17cc938..c5d95f4 100644 --- a/src/config.h +++ b/src/config.h @@ -44,7 +44,7 @@ class BaseValue { virtual std::string Value() const = 0; - Status Set(const std::string& value, bool force); + Status Set(const std::string& value, bool init_stage); protected: virtual Status SetValue(const std::string&) = 0; @@ -96,7 +96,7 @@ class NumberValue : public BaseValue { public: NumberValue(const std::string& key, CheckFunc check_func_ptr, bool rewritable, T* value_ptr, T min = std::numeric_limits::min(), T max = std::numeric_limits::max()) - : BaseValue(key, check_func_ptr, rewritable), value_(value_ptr), value_min_(min), value_max_(max) { + : BaseValue(key, std::move(check_func_ptr), rewritable), value_(value_ptr), value_min_(min), value_max_(max) { assert(value_ != nullptr); assert(value_min_ <= value_max_); }; @@ -290,8 +290,9 @@ class Config { * For kiwi, ip is the address and the port that * the server will listen on. * In default, the full address will be "127.0.0.1:9221" + * and "::1:9221" */ - std::string ip = "127.0.0.1"; + std::vector ips = {"127.0.0.1", "::1"}; uint16_t port = 9221; /* @@ -348,6 +349,9 @@ class Config { */ bool use_raft = false; + // raft ip + std::string raft_ip = "127.0.0.1"; + /* * kiwi use the RocksDB to store the data, * and these options below will set to rocksdb::Options, diff --git a/src/kiwi.cc b/src/kiwi.cc index 1c2523b..4a01fa4 100644 --- a/src/kiwi.cc +++ b/src/kiwi.cc @@ -212,9 +212,11 @@ bool KiwiDB::Init() { event_server_ = std::make_unique>>(options_); - net::SocketAddr addr(g_config.ip, g_config.port); - INFO("Add listen addr:{}, port:{}", g_config.ip, g_config.port); - event_server_->AddListenAddr(addr); + for (const auto& ip : g_config.ips) { + net::SocketAddr addr(ip, g_config.port); + INFO("Add listen addr: {}, port: {}", ip, g_config.port); + event_server_->AddListenAddr(addr); + } event_server_->SetOnInit([](std::shared_ptr* client) { *client = std::make_shared(); }); diff --git a/src/kiwi_logo.h b/src/kiwi_logo.h index e51c810..8be8af3 100644 --- a/src/kiwi_logo.h +++ b/src/kiwi_logo.h @@ -12,7 +12,7 @@ #pragma once const char* kiwiLogo = - "\n _ _ __ _ \n" + "\n _ _ __ _ \n" " ( )( ) / )( ) _ _ \n" " _ _ _ __ _ _ ___ _ _ ______ _| || |_ /' /' | |/') (_) _ _ _ (_) \n" " /'_` )( '__)/'_` )/' _ `\\ /'_` )(______)/'_` || '_`\\ /' /' | , < | |( ) ( ) ( )| | \n" diff --git a/src/net/base_event.h b/src/net/base_event.h index 22c097c..28a10ad 100644 --- a/src/net/base_event.h +++ b/src/net/base_event.h @@ -17,6 +17,7 @@ #include #include "callback_function.h" +#include "listen_socket.h" #include "net_event.h" #include "timer.h" @@ -45,8 +46,8 @@ class BaseEvent : public std::enable_shared_from_this { const static int EVENT_HUB; const static int EVENT_NULL; - BaseEvent(const std::shared_ptr &listen, int8_t mode, int8_t type) - : listen_(listen), mode_(mode), type_(type) {}; + BaseEvent(const std::vector> &listenSockets, int8_t mode, int8_t type) + : listenSockets_(listenSockets), mode_(mode), type_(type) {}; virtual ~BaseEvent() = default; @@ -93,6 +94,15 @@ class BaseEvent : public std::enable_shared_from_this { int8_t Type() const { return type_; } + std::shared_ptr getListenSocket(int fd) { + for (const auto &listen : listenSockets_) { + if (fd == listen->Fd()) { + return listen; + } + } + return nullptr; + } + protected: int evFd_ = 0; // event fd std::atomic running_ = true; @@ -109,8 +119,8 @@ class BaseEvent : public std::enable_shared_from_this { std::shared_ptr timer_; - // listening socket - std::shared_ptr listen_; + // listening sockets + std::vector> listenSockets_; // callback function when a new connection is created std::function)> onCreate_; diff --git a/src/net/base_socket.cc b/src/net/base_socket.cc index 16bab6d..faa9e78 100644 --- a/src/net/base_socket.cc +++ b/src/net/base_socket.cc @@ -14,7 +14,13 @@ namespace net { -int BaseSocket::CreateTCPSocket() { return ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); } +int BaseSocket::CreateTCPSocket(const SocketAddr &addr) { + if (addr.IsIPV6()) { + return ::socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP); + } else { + return ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); + } +} int BaseSocket::CreateUDPSocket() { return ::socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); } @@ -131,6 +137,16 @@ bool BaseSocket::SetReusePort() { return false; } +bool BaseSocket::SetDisableIpv6Only() { + int ipv6only = 0; + if (::setsockopt(Fd(), IPPROTO_IPV6, IPV6_V6ONLY, reinterpret_cast(&ipv6only), sizeof(ipv6only)) == + -1) { + WARN("SetIpv6Only fd:{} error:{}", Fd(), errno); + return false; + } + return true; +} + bool BaseSocket::GetLocalAddr(SocketAddr &addr) { sockaddr_in localAddr{}; socklen_t len = sizeof(localAddr); diff --git a/src/net/base_socket.h b/src/net/base_socket.h index c5dc305..a75f547 100644 --- a/src/net/base_socket.h +++ b/src/net/base_socket.h @@ -9,10 +9,6 @@ #include -#include -#include - -#include "base_event.h" #include "net_event.h" #include "socket_addr.h" @@ -38,7 +34,7 @@ class BaseSocket : public NetEvent { void Close() override; - static int CreateTCPSocket(); + static int CreateTCPSocket(const SocketAddr &addr); static int CreateUDPSocket(); @@ -59,6 +55,8 @@ class BaseSocket : public NetEvent { bool SetReusePort(); + bool SetDisableIpv6Only(); + bool GetLocalAddr(SocketAddr &); bool GetPeerAddr(SocketAddr &); diff --git a/src/net/callback_function.h b/src/net/callback_function.h index 1ceb1c5..aa07df3 100644 --- a/src/net/callback_function.h +++ b/src/net/callback_function.h @@ -61,7 +61,7 @@ class NetEvent; // Auxiliary structure struct Connection { - explicit Connection(std::unique_ptr netEvent) : netEvent_(std::move(netEvent)), addr_(0, 0) {} + explicit Connection(std::unique_ptr netEvent) : netEvent_(std::move(netEvent)) {} ~Connection() = default; diff --git a/src/net/client_socket.cc b/src/net/client_socket.cc index f3d318c..754ecd3 100644 --- a/src/net/client_socket.cc +++ b/src/net/client_socket.cc @@ -7,12 +7,13 @@ #include +#include #include "client_socket.h" namespace net { bool ClientSocket::Connect() { - fd_ = CreateTCPSocket(); + fd_ = CreateTCPSocket(addr_); if (fd_ == -1) { onConnectFail_("CreateTCPSocket open socket failed"); return false; @@ -22,7 +23,7 @@ bool ClientSocket::Connect() { SetRcvBuf(); SetSndBuf(); - auto ret = connect(Fd(), (sockaddr*)&addr_.GetAddr(), sizeof(sockaddr_in)); + auto ret = connect(Fd(), addr_.GetAddr(), addr_.GetAddrLen()); if (0 != ret) { if (EINPROGRESS == errno) { return true; diff --git a/src/net/epoll_event.cc b/src/net/epoll_event.cc index 0281291..c83413c 100644 --- a/src/net/epoll_event.cc +++ b/src/net/epoll_event.cc @@ -27,7 +27,9 @@ bool EpollEvent::Init() { return false; } if (mode_ & EVENT_MODE_READ) { // Add the listen socket to epoll for read - AddEvent(listen_->Fd(), listen_->Fd(), EVENT_READ); + for (auto &listenSocket : listenSockets_) { + AddEvent(listenSocket->Fd(), listenSocket->Fd(), EVENT_READ); + } } if (pipe(pipeFd_) == -1) { ERROR("pipe error errno:{}", errno); @@ -107,7 +109,8 @@ void EpollEvent::EventRead() { std::shared_ptr conn; if (events[i].events & EVENT_READ) { // If the event is less than the listen socket, it is a new connection - if (events[i].data.u64 != listen_->Fd()) { + // If getListenSocket is nullptr, it means the event is not a listen socket + if (!getListenSocket(events[i].data.u64)) { conn = getConn_(events[i].data.u64); } DoRead(events[i], conn); @@ -150,9 +153,9 @@ void EpollEvent::EventWrite() { } void EpollEvent::DoRead(const epoll_event &event, const std::shared_ptr &conn) { - if (event.data.u64 == listen_->Fd()) { + if (auto listenSocket = getListenSocket(event.data.u64); listenSocket) { auto newConn = std::make_shared(nullptr); - auto connFd = listen_->OnReadable(newConn, nullptr); + auto connFd = listenSocket->OnReadable(newConn, nullptr); if (connFd < 0) { DoError(event, "accept error"); return; diff --git a/src/net/epoll_event.h b/src/net/epoll_event.h index 5f2d3ff..5d14c39 100644 --- a/src/net/epoll_event.h +++ b/src/net/epoll_event.h @@ -8,6 +8,7 @@ #pragma once #include "config.h" +#include "listen_socket.h" #ifdef HAVE_EPOLL @@ -22,8 +23,8 @@ namespace net { class EpollEvent : public BaseEvent { public: - explicit EpollEvent(const std::shared_ptr &listen, int8_t mode) - : BaseEvent(listen, mode, BaseEvent::EVENT_TYPE_EPOLL) {}; + explicit EpollEvent(const std::vector> &listenSockets, int8_t mode) + : BaseEvent(listenSockets, mode, BaseEvent::EVENT_TYPE_EPOLL) {}; ~EpollEvent() override { Close(); } diff --git a/src/net/event_server.h b/src/net/event_server.h index bacd740..4c138c7 100644 --- a/src/net/event_server.h +++ b/src/net/event_server.h @@ -7,9 +7,12 @@ #pragma once +#include +#include #include #include #include +#include #include #include "base_socket.h" @@ -40,7 +43,7 @@ class EventServer final { void SetOnClose(OnClose &&func) { onClose_ = std::move(func); } - void AddListenAddr(const SocketAddr &addr) { listenAddrs_ = addr; } + inline void AddListenAddr(const SocketAddr &addr) { listenAddrs_.emplace_back(addr); } void InitTimer(int64_t interval) { timer_ = std::make_shared(interval); } @@ -86,7 +89,7 @@ class EventServer final { OnClose onClose_; // The callback function when the connection is closed - SocketAddr listenAddrs_; // The address to listen on + std::vector listenAddrs_; // The address to listen on std::atomic running_ = true; // Whether the server is running @@ -243,29 +246,39 @@ void EventServer::TCPConnect(const SocketAddr &addr, const std::function requires HasSetFdFunction int EventServer::StartThreadManager(bool serverMode) { - std::shared_ptr listen(ListenSocket::CreateTCPListen()); + std::vector> listenSockets; auto tcpKeepAlive = opt_.GetOpTcpKeepAlive(); + if (serverMode) { - listen->SetListenAddr(listenAddrs_); - listen->SetBSTcpKeepAlive(tcpKeepAlive); - if (auto ret = listen->Init() != static_cast(NetListen::OK)) { - return ret; + for (auto &listenAddr : listenAddrs_) { + std::shared_ptr listen(ListenSocket::CreateTCPListen()); + listen->SetListenAddr(listenAddr); + listen->SetBSTcpKeepAlive(tcpKeepAlive); + listenSockets.push_back(listen); + if (auto ret = (listen->Init() != static_cast(NetListen::OK))) { + listenSockets.clear(); // Clean up all sockets + return ret; + } } } int i = 0; for (const auto &thread : threadsManager_) { if (i > 0 && ListenSocket::REUSE_PORT && serverMode) { - listen.reset(ListenSocket::CreateTCPListen()); - listen->SetListenAddr(listenAddrs_); - listen->SetBSTcpKeepAlive(tcpKeepAlive); - if (auto ret = listen->Init() != static_cast(NetListen::OK)) { - return ret; + for (auto &listen : listenSockets) { + auto listenAddr = listen->GetListenAddr(); + listen.reset(ListenSocket::CreateTCPListen()); + listen->SetListenAddr(listenAddr); + listen->SetBSTcpKeepAlive(tcpKeepAlive); + if (auto ret = (listen->Init() != static_cast(NetListen::OK))) { + listenSockets.clear(); // Clean up all sockets + return ret; + } } } // timer only works in the first thread - bool ret = i == 0 ? thread->Start(listen, timer_) : thread->Start(listen, nullptr); + bool ret = i == 0 ? thread->Start(listenSockets, timer_) : thread->Start(listenSockets, nullptr); if (!ret) { return -1; } diff --git a/src/net/kqueue_event.cc b/src/net/kqueue_event.cc index 2d44ff5..072548e 100644 --- a/src/net/kqueue_event.cc +++ b/src/net/kqueue_event.cc @@ -25,7 +25,9 @@ bool KqueueEvent::Init() { return false; } if (mode_ & EVENT_MODE_READ) { - AddEvent(0, listen_->Fd(), EVENT_READ); + for (auto &listenSocket : listenSockets_) { + AddEvent(listenSocket->Fd(), listenSocket->Fd(), EVENT_READ); + } } if (pipe(pipeFd_) == -1) { ERROR("pipe error:{}", errno); @@ -107,7 +109,7 @@ void KqueueEvent::EventRead() { } std::shared_ptr conn; if (events[i].filter == EVENT_READ) { - if (events[i].ident != listen_->Fd()) { + if (!getListenSocket(events[i].ident)) { # ifdef HAVE_64BIT auto connId = reinterpret_cast(events[i].udata); # else @@ -166,9 +168,9 @@ void KqueueEvent::EventWrite() { } void KqueueEvent::DoRead(const struct kevent &event, const std::shared_ptr &conn) { - if (event.ident == listen_->Fd()) { + if (auto listenSocket = getListenSocket(event.ident); listenSocket) { auto newConn = std::make_shared(nullptr); - auto connFd = listen_->OnReadable(newConn, nullptr); + auto connFd = listenSocket->OnReadable(newConn, nullptr); onCreate_(connFd, newConn); } else if (conn) { std::string readBuff; diff --git a/src/net/kqueue_event.h b/src/net/kqueue_event.h index d21b402..2946429 100644 --- a/src/net/kqueue_event.h +++ b/src/net/kqueue_event.h @@ -23,8 +23,8 @@ namespace net { class KqueueEvent : public BaseEvent { public: - explicit KqueueEvent(std::shared_ptr listen, int8_t mode) - : BaseEvent(std::move(listen), mode, BaseEvent::EVENT_TYPE_KQUEUE) {}; + explicit KqueueEvent(const std::vector> &listenSockets, int8_t mode) + : BaseEvent(std::move(listenSockets), mode, BaseEvent::EVENT_TYPE_KQUEUE) {}; ~KqueueEvent() override { Close(); } diff --git a/src/net/listen_socket.cc b/src/net/listen_socket.cc index e79d837..093ef8e 100644 --- a/src/net/listen_socket.cc +++ b/src/net/listen_socket.cc @@ -6,6 +6,7 @@ */ #include +#include #include "config.h" #include "listen_socket.h" @@ -68,7 +69,7 @@ bool ListenSocket::Open() { } if (SocketType() == SOCKET_LISTEN_TCP) { - fd_ = CreateTCPSocket(); + fd_ = CreateTCPSocket(addr_); } else if (SocketType() == SOCKET_LISTEN_UDP) { fd_ = CreateUDPSocket(); } else { @@ -88,15 +89,17 @@ bool ListenSocket::Bind() { if (!SetReusePort()) { REUSE_PORT = false; } + if (addr_.IsIPV6()) { + SetDisableIpv6Only(); + } - struct sockaddr_in serv = addr_.GetAddr(); - - int ret = ::bind(Fd(), reinterpret_cast(&serv), sizeof serv); + int ret = ::bind(Fd(), addr_.GetAddr(), addr_.GetAddrLen()); if (0 != ret) { ERROR("ListenSocket fd:{},Bind error:{}", Fd(), errno); Close(); return false; } + return true; } diff --git a/src/net/listen_socket.h b/src/net/listen_socket.h index c7cbebb..b024dcf 100644 --- a/src/net/listen_socket.h +++ b/src/net/listen_socket.h @@ -27,6 +27,8 @@ class ListenSocket : public BaseSocket { void SetListenAddr(const SocketAddr &addr) { addr_ = addr; } + inline SocketAddr GetListenAddr() const { return addr_; } + // Accept new connection and create new connection object // when the connection is established, the OnCreate function is called int OnReadable(const std::shared_ptr &conn, std::string *readBuff) override; diff --git a/src/net/socket_addr.h b/src/net/socket_addr.h index ad1731f..8d44977 100644 --- a/src/net/socket_addr.h +++ b/src/net/socket_addr.h @@ -18,57 +18,110 @@ namespace net { struct SocketAddr { SocketAddr() { Clear(); } - SocketAddr(const SocketAddr &other) { memcpy(&addr_, &other.addr_, sizeof addr_); } + SocketAddr(const SocketAddr &other) { memcpy(&addr_, &other.addr_, sizeof(addr_)); } SocketAddr &operator=(const SocketAddr &other) { if (this != &other) { - memcpy(&addr_, &other.addr_, sizeof addr_); + memcpy(&addr_, &other.addr_, sizeof(addr_)); } return *this; } explicit SocketAddr(const sockaddr_in &addr) { Init(addr); } - SocketAddr(uint32_t netip, uint16_t netport) { Init(netip, netport); } + explicit SocketAddr(const sockaddr_in6 &addr) { Init(addr); } SocketAddr(const std::string &ip, uint16_t hostport) { Init(ip, hostport); } void Init(const sockaddr_in &addr) { memcpy(&addr_, &addr, sizeof(addr)); } - void Init(uint32_t netIp, uint16_t netPort) { - addr_.sin_family = AF_INET; - addr_.sin_addr.s_addr = netIp; - addr_.sin_port = netPort; - } + void Init(const sockaddr_in6 &addr) { memcpy(&addr_, &addr, sizeof(addr)); } void Init(const std::string &ip, uint16_t hostPort) { - addr_.sin_family = AF_INET; - addr_.sin_addr.s_addr = ::inet_addr(ip.data()); - addr_.sin_port = htons(hostPort); + if (::inet_pton(AF_INET, ip.c_str(), &addr_.addr4_.sin_addr) == 1) { + addr_.addr4_.sin_family = AF_INET; + addr_.addr4_.sin_port = htons(hostPort); + return; + } + if (::inet_pton(AF_INET6, ip.c_str(), &addr_.addr6_.sin6_addr) != 1) { + throw std::invalid_argument("Invalid IP address format"); + } + addr_.addr6_.sin6_family = AF_INET6; + addr_.addr6_.sin6_port = htons(hostPort); + } + + const sockaddr *GetAddr() const { + if (IsIPV4()) { + return reinterpret_cast(&addr_.addr4_); + } + return reinterpret_cast(&addr_.addr6_); } - const sockaddr_in &GetAddr() const { return addr_; } + socklen_t GetAddrLen() const { + if (IsIPV4()) { + return sizeof(addr_.addr4_); + } + return sizeof(addr_.addr6_); + } - std::string GetIP() const { return ::inet_ntoa(addr_.sin_addr); } + std::string GetIP() const { + if (IsIPV4()) { + char ipv4_buf[INET_ADDRSTRLEN] = {0}; + if (::inet_ntop(AF_INET, &addr_.addr4_.sin_addr, ipv4_buf, sizeof(ipv4_buf))) { + return ipv4_buf; + } + } + char ipv6_buf[INET6_ADDRSTRLEN] = {0}; + if (!::inet_ntop(AF_INET6, &addr_.addr6_.sin6_addr, ipv6_buf, sizeof(ipv6_buf))) { + throw std::runtime_error("Failed to convert IPv6 address to string"); + } + return ipv6_buf; + } std::string GetIP(char *buf, socklen_t size) const { - return ::inet_ntop(AF_INET, reinterpret_cast(&addr_.sin_addr), buf, size); + if (IsIPV4()) { + ::inet_ntop(AF_INET, &addr_.addr4_.sin_addr, buf, size); + return buf; + } + ::inet_ntop(AF_INET6, &addr_.addr6_.sin6_addr, buf, size); + return buf; + } + + uint16_t GetPort() const { + if (IsIPV4()) { + return ntohs(addr_.addr4_.sin_port); + } + return ntohs(addr_.addr6_.sin6_port); } - uint16_t GetPort() const { return ntohs(addr_.sin_port); } + bool IsValid() const { return IsIPV4() || IsIPV6(); } - bool IsValid() const { return 0 != addr_.sin_family; } + bool IsIPV6() const { return addr_.addr6_.sin6_family == AF_INET6; } - void Clear() { memset(&addr_, 0, sizeof addr_); } + bool IsIPV4() const { return addr_.addr4_.sin_family == AF_INET; } + + void Clear() { memset(&addr_, 0, sizeof(addr_)); } friend bool operator==(const SocketAddr &a, const SocketAddr &b) { - return a.addr_.sin_family == b.addr_.sin_family && a.addr_.sin_addr.s_addr == b.addr_.sin_addr.s_addr && - a.addr_.sin_port == b.addr_.sin_port; + if (a.IsIPV4() != b.IsIPV4()) { + return false; + } + + if (a.IsIPV4()) { + return a.addr_.addr4_.sin_addr.s_addr == b.addr_.addr4_.sin_addr.s_addr && + a.addr_.addr4_.sin_port == b.addr_.addr4_.sin_port; + } + + return memcmp(&a.addr_.addr6_.sin6_addr, &b.addr_.addr6_.sin6_addr, sizeof(in6_addr)) == 0 && + a.addr_.addr6_.sin6_port == b.addr_.addr6_.sin6_port; } friend bool operator!=(const SocketAddr &a, const SocketAddr &b) { return !(a == b); } - sockaddr_in addr_{}; + union { + sockaddr_in addr4_; + sockaddr_in6 addr6_; + } addr_; }; } // namespace net diff --git a/src/net/stream_socket.cc b/src/net/stream_socket.cc index 22ae328..efa2fd2 100644 --- a/src/net/stream_socket.cc +++ b/src/net/stream_socket.cc @@ -64,9 +64,7 @@ int StreamSocket::Read(std::string *readBuff) { } } else if (ret == 0) { return NE_CLOSE; - } - - if (ret > 0) { + } else if (ret > 0) { readBuff->append(readBuffer, ret); } if (!NoBlock()) { diff --git a/src/net/thread_manager.h b/src/net/thread_manager.h index cb5bf59..e251e3c 100644 --- a/src/net/thread_manager.h +++ b/src/net/thread_manager.h @@ -55,7 +55,7 @@ class ThreadManager { void SetOnClose(const OnClose &func) { onClose_ = func; } // Start the thread and initialize the event - bool Start(const std::shared_ptr &listen, const std::shared_ptr &timer); + bool Start(const std::vector> &listenSockets, const std::shared_ptr &timer); // Stop the thread void Stop(); @@ -83,7 +83,8 @@ class ThreadManager { private: // Create read thread - bool CreateReadThread(const std::shared_ptr &listen, const std::shared_ptr &timer); + bool CreateReadThread(const std::vector> &listenSockets, + const std::shared_ptr &timer); // Create write thread if rwSeparation_ is true bool CreateWriteThread(); @@ -124,8 +125,9 @@ ThreadManager::~ThreadManager() { template requires HasSetFdFunction -bool ThreadManager::Start(const std::shared_ptr &listen, const std::shared_ptr &timer) { - if (!CreateReadThread(listen, timer)) { +bool ThreadManager::Start(const std::vector> &listenSockets, + const std::shared_ptr &timer) { + if (!CreateReadThread(listenSockets, timer)) { return false; } if (netOptions_.GetRwSeparation()) { @@ -274,7 +276,8 @@ void ThreadManager::SendPacket(const T &conn, std::string &&msg) { template requires HasSetFdFunction -bool ThreadManager::CreateReadThread(const std::shared_ptr &listen, const std::shared_ptr &timer) { +bool ThreadManager::CreateReadThread(const std::vector> &listenSockets, + const std::shared_ptr &timer) { std::shared_ptr event; int8_t eventMode = BaseEvent::EVENT_MODE_READ; if (!netOptions_.GetRwSeparation()) { @@ -282,9 +285,9 @@ bool ThreadManager::CreateReadThread(const std::shared_ptr &listen, } #if defined(HAVE_EPOLL) - event = std::make_shared(listen, eventMode); + event = std::make_shared(listenSockets, eventMode); #elif defined(HAVE_KQUEUE) - event = std::make_shared(listen, eventMode); + event = std::make_shared(listenSockets, eventMode); #endif event->AddTimer(timer); @@ -316,9 +319,9 @@ bool ThreadManager::CreateWriteThread() { std::shared_ptr event; #if defined(HAVE_EPOLL) - event = std::make_shared(nullptr, BaseEvent::EVENT_MODE_WRITE); + event = std::make_shared(std::vector>(), BaseEvent::EVENT_MODE_WRITE); #elif defined(HAVE_KQUEUE) - event = std::make_shared(nullptr, BaseEvent::EVENT_MODE_WRITE); + event = std::make_shared(std::vector>(), BaseEvent::EVENT_MODE_WRITE); #endif event->SetOnClose([this](uint64_t connId, std::string &&msg) { OnNetEventClose(connId, std::move(msg)); }); diff --git a/src/raft/raft.cc b/src/raft/raft.cc index 7006f22..e0edb12 100644 --- a/src/raft/raft.cc +++ b/src/raft/raft.cc @@ -126,10 +126,11 @@ butil::Status Raft::Init(std::string& group_id, bool initial_conf_is_null) { assert(group_id.size() == RAFT_GROUPID_LEN); this->group_id_ = group_id; - // FIXME: g_config.ip is default to 127.0.0.0, which may not work in cluster. - raw_addr_ = g_config.ip + ":" + std::to_string(port); + // NOTE: Default raft_ip is 127.0.0.1. For cluster setup, configure the appropriate + // IP address in kiwi.conf using the 'raft-ip' directive. + raw_addr_ = g_config.raft_ip + ":" + std::to_string(port); butil::ip_t ip; - auto ret = butil::str2ip(g_config.ip.c_str(), &ip); + auto ret = butil::str2ip(g_config.raft_ip.c_str(), &ip); if (ret != 0) { server_.reset(); return ERROR_LOG_AND_STATUS("Failed to convert str_ip to butil::ip_t"); @@ -324,7 +325,7 @@ void Raft::SendNodeAddRequest(PClient* client) { // Node id in braft are ip:port, the node id param in RAFT.NODE ADD cmd will be ignored. int unused_node_id = 0; auto port = g_config.port + kiwi::g_config.raft_port_offset; - auto raw_addr = g_config.ip + ":" + std::to_string(port); + auto raw_addr = g_config.raft_ip + ":" + std::to_string(port); client->AppendArrayLen(int64_t(4)); client->AppendString("RAFT.NODE"); diff --git a/src/replication.cc b/src/replication.cc index a4f729b..df43024 100644 --- a/src/replication.cc +++ b/src/replication.cc @@ -177,7 +177,7 @@ void PReplication::Cron() { if (masterInfo_.addr.IsValid()) { switch (masterInfo_.state) { case kPReplStateNone: { - if (masterInfo_.addr.GetIP() == g_config.ip && masterInfo_.addr.GetPort() == g_config.port) { + if (masterInfo_.addr.GetIP() == g_config.raft_ip && masterInfo_.addr.GetPort() == g_config.port) { ERROR("Fix config, master addr is self addr!"); assert(!!!"wrong config for master addr"); }