Skip to content

Commit

Permalink
feat: add client command (#3)
Browse files Browse the repository at this point in the history
add client command
  • Loading branch information
gukj-spel authored Nov 16, 2024
1 parent 1a22e96 commit f3acf2d
Show file tree
Hide file tree
Showing 11 changed files with 421 additions and 8 deletions.
7 changes: 7 additions & 0 deletions src/base_cmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,13 @@ const std::string kCmdNameUnwatch = "unwatch";
const std::string kCmdNameDiscard = "discard";

// admin
const std::string kCmdNameClient = "client";
const std::string kSubCmdNameClientGetname = "getname";
const std::string kSubCmdNameClientSetname = "setname";
const std::string kSubCmdNameClientId = "id";
const std::string kSubCmdNameClientList = "list";
const std::string kSubCmdNameClientKill = "kill";

const std::string kCmdNameConfig = "config";
const std::string kSubCmdNameConfigGet = "get";
const std::string kSubCmdNameConfigSet = "set";
Expand Down
14 changes: 9 additions & 5 deletions src/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@

namespace kiwi {

const ClientInfo ClientInfo::invalidClientInfo = {0, "", -1};

void CmdRes::RedisAppendLen(std::string& str, int64_t ori, const std::string& prefix) {
str.append(prefix);
str.append(pstd::Int2string(ori));
Expand Down Expand Up @@ -459,15 +461,15 @@ void PClient::OnConnect() {

std::string PClient::PeerIP() const {
if (!addr_.IsValid()) {
ERROR("Invalid address detected for client {}", uniqueID());
ERROR("Invalid address detected for client {}", GetUniqueID());
return "";
}
return addr_.GetIP();
}

int PClient::PeerPort() const {
if (!addr_.IsValid()) {
ERROR("Invalid address detected for client {}", uniqueID());
ERROR("Invalid address detected for client {}", GetUniqueID());
return 0;
}
return addr_.GetPort();
Expand Down Expand Up @@ -514,7 +516,9 @@ bool PClient::isClusterCmdTarget() const {
return PRAFT.GetClusterCmdCtx().GetPeerIp() == PeerIP() && PRAFT.GetClusterCmdCtx().GetPort() == PeerPort();
}

uint64_t PClient::uniqueID() const { return GetConnId(); }
uint64_t PClient::GetUniqueID() const { return GetConnId(); }

ClientInfo PClient::GetClientInfo() const { return {GetUniqueID(), PeerIP().c_str(), PeerPort()}; }

bool PClient::Watch(int dbno, const std::string& key) {
DEBUG("Client {} watch {}, db {}", name_, key, dbno);
Expand All @@ -523,12 +527,12 @@ bool PClient::Watch(int dbno, const std::string& key) {

bool PClient::NotifyDirty(int dbno, const std::string& key) {
if (IsFlagOn(kClientFlagDirty)) {
INFO("client is already dirty {}", uniqueID());
INFO("client is already dirty {}", GetUniqueID());
return true;
}

if (watch_keys_[dbno].contains(key)) {
INFO("{} client become dirty because key {} in db {}", uniqueID(), key, dbno);
INFO("{} client become dirty because key {} in db {}", GetUniqueID(), key, dbno);
SetFlag(kClientFlagDirty);
return true;
} else {
Expand Down
12 changes: 11 additions & 1 deletion src/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,14 @@ enum class ClientState {
class DB;
struct PSlaveInfo;

struct ClientInfo {
uint64_t client_id;
std::string ip;
int port;
static const ClientInfo invalidClientInfo;
bool operator==(const ClientInfo& ci) const { return client_id == ci.client_id; }
};

class PClient : public std::enable_shared_from_this<PClient>, public CmdRes {
public:
// PClient() = delete;
Expand All @@ -168,6 +176,8 @@ class PClient : public std::enable_shared_from_this<PClient>, public CmdRes {

std::string PeerIP() const;
int PeerPort() const;
const int GetFd() const;
ClientInfo GetClientInfo() const;

// bool SendPacket(const std::string& buf);
// bool SendPacket(const void* data, size_t size);
Expand Down Expand Up @@ -256,6 +266,7 @@ class PClient : public std::enable_shared_from_this<PClient>, public CmdRes {

void SetAuth() { auth_ = true; }
bool GetAuth() const { return auth_; }
uint64_t GetUniqueID() const;
void RewriteCmd(std::vector<std::string>& params) { parser_.SetParams(params); }
void Reexecutecommand() { this->executeCommand(); }

Expand Down Expand Up @@ -287,7 +298,6 @@ class PClient : public std::enable_shared_from_this<PClient>, public CmdRes {
int processInlineCmd(const char*, size_t, std::vector<std::string>&);
void reset();
bool isPeerMaster() const;
uint64_t uniqueID() const;

bool isClusterCmdTarget() const;

Expand Down
103 changes: 103 additions & 0 deletions src/client_map.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
#include "client_map.h"
#include "log.h"

namespace kiwi {

uint32_t ClientMap::GetAllClientInfos(std::vector<ClientInfo>& results) {
// client info string type: ip, port, fd.
std::shared_lock<std::shared_mutex> client_map_lock(client_map_mutex_);
for (auto& [id, client_weak] : clients_) {
if (auto client = client_weak.lock()) {
results.emplace_back(client->GetClientInfo());
}
}
return results.size();
}

bool ClientMap::AddClient(int id, std::weak_ptr<PClient> client) {
std::unique_lock client_map_lock(client_map_mutex_);
if (clients_.find(id) == clients_.end()) {
clients_.insert({id, client});
return true;
}
return false;
}

ClientInfo ClientMap::GetClientsInfoById(int id) {
std::shared_lock client_map_lock(client_map_mutex_);
if (auto it = clients_.find(id); it != clients_.end()) {
if (auto client = it->second.lock(); client) {
return client->GetClientInfo();
}
}
ERROR("Client with ID {} not found in GetClientsInfoById", id);
return ClientInfo::invalidClientInfo;
}

bool ClientMap::RemoveClientById(int id) {
std::unique_lock client_map_lock(client_map_mutex_);
if (auto it = clients_.find(id); it != clients_.end()) {
clients_.erase(it);
INFO("Removed client with ID {}", id);
return true;
}
return false;
}

bool ClientMap::KillAllClients() {
std::vector<std::shared_ptr<PClient>> clients_to_close;
{
std::shared_lock<std::shared_mutex> client_map_lock(client_map_mutex_);
for (auto& [id, client_weak] : clients_) {
if (auto client = client_weak.lock()) {
clients_to_close.push_back(client);
}
}
}
for (auto& client : clients_to_close) {
client->Close();
}
return true;
}

bool ClientMap::KillClientByAddrPort(const std::string& addr_port) {
std::shared_ptr<PClient> client_to_close;
{
std::shared_lock<std::shared_mutex> client_map_lock(client_map_mutex_);
for (auto& [id, client_weak] : clients_) {
if (auto client = client_weak.lock()) {
std::string client_ip_port = client->PeerIP() + ":" + std::to_string(client->PeerPort());
if (client_ip_port == addr_port) {
client_to_close = client;
break;
}
}
}
}
if (client_to_close) {
client_to_close->Close();
return true;
}
return false;
}

bool ClientMap::KillClientById(int client_id) {
std::shared_ptr<PClient> client_to_close;
{
std::shared_lock<std::shared_mutex> client_map_lock(client_map_mutex_);
if (auto it = clients_.find(client_id); it != clients_.end()) {
if (auto client = it->second.lock()) {
client_to_close = client;
}
}
}
if (client_to_close) {
INFO("Closing client with ID {}", client_id);
client_to_close->Close();
INFO("Client with ID {} closed", client_id);
return true;
}
return false;
}

} // namespace kiwi
41 changes: 41 additions & 0 deletions src/client_map.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#pragma once

#include <map>
#include <memory>
#include <shared_mutex>
#include <string>
#include "client.h"

namespace kiwi {
class ClientMap {
private:
ClientMap() = default;
// 禁用复制构造函数和赋值运算符

private:
std::map<int, std::weak_ptr<PClient>> clients_;
std::shared_mutex client_map_mutex_;

public:
static ClientMap& getInstance() {
static ClientMap instance;
return instance;
}

ClientMap(const ClientMap&) = delete;
ClientMap& operator=(const ClientMap&) = delete;

// client info function
kiwi::ClientInfo GetClientsInfoById(int id);
uint32_t GetAllClientInfos(std::vector<ClientInfo>& results);

bool AddClient(int id, std::weak_ptr<PClient>);

bool RemoveClientById(int id);

bool KillAllClients();
bool KillClientById(int client_id);
bool KillClientByAddrPort(const std::string& addr_port);
};

} // namespace kiwi
Loading

0 comments on commit f3acf2d

Please sign in to comment.