Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add client max connection #136

Open
wants to merge 18 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/kiwi.cc
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ bool KiwiDB::Init() {
auto num = g_config.worker_threads_num + g_config.slave_threads_num;
options_.SetThreadNum(num);

options_.SetMaxClients(g_config.max_clients);

// now we only use fast cmd thread pool
auto status = cmd_threads_.Init(g_config.fast_cmd_threads_num, 1, "kiwi-cmd");
if (!status.ok()) {
Expand Down
1 change: 1 addition & 0 deletions src/net/event_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class EventServer final {
std::vector<std::unique_ptr<ThreadManager<T>>> threadsManager_;

std::mutex mtx_;

std::condition_variable cv_;

std::shared_ptr<Timer> timer_;
Expand Down
4 changes: 4 additions & 0 deletions src/net/net_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ class NetOptions {

bool GetRwSeparation() const { return rwSeparation_; }

void SetMaxClients(uint32_t maxClients) { maxClients_ = maxClients; }

uint32_t GetMaxClients() const { return maxClients_; }
void SetOpTcpKeepAlive(uint32_t tcpKeepAlive) { tcpKeepAlive_ = tcpKeepAlive; }

uint32_t GetOpTcpKeepAlive() const { return tcpKeepAlive_; }
Expand All @@ -31,6 +34,7 @@ class NetOptions {

int8_t threadNum_ = 1; // The number of threads

uint32_t maxClients_ = 1; // The maximum number of connections(default 40000)
uint32_t tcpKeepAlive_ = 300; // The timeout of the keepalive connection in seconds
};

Expand Down
28 changes: 26 additions & 2 deletions src/net/thread_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,18 +90,26 @@ class ThreadManager {

uint64_t DoTCPConnect(T &t, int fd, const std::shared_ptr<Connection> &conn);

uint32_t get_client_count() const { return clientCount_.load(); }

void client_count_increment() { clientCount_.fetch_add(1, std::memory_order_relaxed); }

void client_count_decrement() { clientCount_.fetch_sub(1, std::memory_order_relaxed); }

private:
const int8_t index_ = 0; // The index of the thread
uint32_t tcpKeepAlive_ = 300; // The timeout of the keepalive connection in seconds
std::atomic<bool> running_ = true; // Whether the thread is running

NetOptions netOptions_;

inline static std::atomic<uint32_t> clientCount_{0};
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Fix inconsistent memory ordering in client count operations

The client count operations use inconsistent memory ordering:

  • compare_exchange_strong uses memory_order_seq_cst
  • fetch_sub uses memory_order_relaxed

This asymmetry could lead to temporary inconsistencies in client counting. Consider using consistent ordering:

void client_count_decrement() { 
    clientCount_.fetch_sub(1, std::memory_order_seq_cst);
}
🔗 Analysis chain

Consider memory ordering implications of static initialization.

The static atomic counter is correctly used for thread-safe counting, but consider potential initialization order issues across translation units. Also, the relaxed memory ordering in client_count_decrement might be too weak for accurate client counting.

Run this script to check for other static atomics that might interact with this counter:

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for other static atomic variables that might have initialization order dependencies
rg -l 'static\s+std::atomic'

# Search for other client count related variables
ast-grep --pattern 'std::atomic<$_> $_ = $_'

Length of output: 130


Script:

#!/bin/bash
# Check content of log_index.h for static atomics
echo "=== log_index.h static atomics ==="
rg -A 2 -B 2 'static\s+std::atomic' src/storage/src/log_index.h

# Check memory ordering usage
echo -e "\n=== Memory ordering usage ==="
rg 'memory_order|\.load\(|\.store\(|\.fetch_' src/net/thread_manager.h src/storage/src/log_index.h

# Check for any direct interactions between these files
echo -e "\n=== File interactions ==="
rg -l "thread_manager.*log_index|log_index.*thread_manager"

Length of output: 2006


std::unique_ptr<IOThread> readThread_; // Read thread
std::unique_ptr<IOThread> writeThread_; // Write thread

// All connections for the current thread
std::unordered_map<uint64_t, std::pair<T, std::shared_ptr<Connection>>> connections_;
std::unordered_map<uint64_t, std::pair<T, std::shared_ptr<Connection>>>
connections_; // All connections for the current thread

std::shared_mutex mutex_;

Expand Down Expand Up @@ -149,6 +157,20 @@ void ThreadManager<T>::Stop() {
template <typename T>
requires HasSetFdFunction<T>
void ThreadManager<T>::OnNetEventCreate(int fd, const std::shared_ptr<Connection> &conn) {
if (!clientCount_.compare_exchange_strong(expected, expected + 1, std::memory_order_seq_cst,
std::memory_order_seq_cst) || expected >= netOptions_.GetMaxClients()) {
INFO("Max client connetions, refuse new connection fd: %d", fd);
std::string response = "-ERR max clients reached\r\n";
ssize_t sent = ::send(fd, response.c_str(), response.size(), 0);
if (sent < 0) {
ERROR("Failed to send error response to fd: %d, errno: %d", fd, errno);
}
if (::close(fd) < 0) {
ERROR("Failed to close fd: %d, errno: %d", fd, errno);
}
return;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

返回 redis 格式的一个 response

}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Fix code formatting issues.

The condition statement needs proper formatting according to clang-format style. Also, there's a typo in "connetions".

-  if (!clientCount_.compare_exchange_strong(expected, expected + 1, std::memory_order_seq_cst,
-                                            std::memory_order_seq_cst) || expected >= netOptions_.GetMaxClients()) {
-    INFO("Max client connetions, refuse new connection fd: %d", fd);
+  if (!clientCount_.compare_exchange_strong(expected, expected + 1,
+                                          std::memory_order_seq_cst,
+                                          std::memory_order_seq_cst) ||
+      expected >= netOptions_.GetMaxClients()) {
+    INFO("Max client connections, refuse new connection fd: %d", fd);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (!clientCount_.compare_exchange_strong(expected, expected + 1, std::memory_order_seq_cst,
std::memory_order_seq_cst) || expected >= netOptions_.GetMaxClients()) {
INFO("Max client connetions, refuse new connection fd: %d", fd);
std::string response = "-ERR max clients reached\r\n";
ssize_t sent = ::send(fd, response.c_str(), response.size(), 0);
if (sent < 0) {
ERROR("Failed to send error response to fd: %d, errno: %d", fd, errno);
}
if (::close(fd) < 0) {
ERROR("Failed to close fd: %d, errno: %d", fd, errno);
}
return;
}
if (!clientCount_.compare_exchange_strong(expected, expected + 1,
std::memory_order_seq_cst,
std::memory_order_seq_cst) ||
expected >= netOptions_.GetMaxClients()) {
INFO("Max client connections, refuse new connection fd: %d", fd);
std::string response = "-ERR max clients reached\r\n";
ssize_t sent = ::send(fd, response.c_str(), response.size(), 0);
if (sent < 0) {
ERROR("Failed to send error response to fd: %d, errno: %d", fd, errno);
}
if (::close(fd) < 0) {
ERROR("Failed to close fd: %d, errno: %d", fd, errno);
}
return;
}


T t;
onInit_(&t);
auto connId = getConnId();
Expand All @@ -170,6 +192,7 @@ void ThreadManager<T>::OnNetEventCreate(int fd, const std::shared_ptr<Connection
}

onCreate_(connId, t, conn->addr_);
client_count_increment();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Move client count increment before onCreate_.

If onCreate_ throws an exception, the client count won't reflect the actual connection state. Move the increment before the callback to ensure accurate counting.

   readThread_->AddNewEvent(connId, fd, BaseEvent::EVENT_READ);
   if (writeThread_) {
     writeThread_->AddNewEvent(connId, fd, BaseEvent::EVENT_NULL);
   }
+  client_count_increment();
   onCreate_(connId, t, conn->addr_);
-  client_count_increment();
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
client_count_increment();
readThread_->AddNewEvent(connId, fd, BaseEvent::EVENT_READ);
if (writeThread_) {
writeThread_->AddNewEvent(connId, fd, BaseEvent::EVENT_NULL);
}
client_count_increment();
onCreate_(connId, t, conn->addr_);
🧰 Tools
🪛 GitHub Actions: kiwi

[error] Code formatting issues detected by clang-format. Issues include indentation and spacing in the OnNetEventCreate function.

}

template <typename T>
Expand Down Expand Up @@ -206,6 +229,7 @@ void ThreadManager<T>::OnNetEventClose(uint64_t connId, std::string &&err) {
iter->second.second->netEvent_->Close(); // close socket
onClose_(iter->second.first, std::move(err));
connections_.erase(iter);
client_count_decrement();
}

template <typename T>
Expand Down
Loading