-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support redis transaction #10
base: unstable
Are you sure you want to change the base?
Changes from 19 commits
f17a14f
74f05e0
c13b629
10243a4
540e641
f0264f5
f8ca9fe
34f681f
6f9c8b3
7224f20
3d33ded
ffa2d3a
b5f59bd
0d39d95
a67a2c2
358baf6
2e3b3d4
020c099
92df802
2ecb6be
5187536
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -13,6 +13,7 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
#include "base_cmd.h" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
#include "client.h" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
#include "cmd_thread_pool_worker.h" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
#include "config.h" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
#include "env.h" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
#include "kiwi.h" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -23,6 +24,8 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
namespace kiwi { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
CmdTableManager cmd_table_manager; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
const ClientInfo ClientInfo::invalidClientInfo = {0, "", -1}; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
thread_local PClient* PClient::s_current = nullptr; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -167,10 +170,6 @@ int PClient::HandlePacket(std::string&& data) { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
for (const auto& item : params) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
FeedMonitors(item); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
auto now = std::chrono::steady_clock::now(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
time_stat_->SetEnqueueTs(now); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -180,26 +179,9 @@ int PClient::HandlePacket(std::string&& data) { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
g_kiwi->SubmitFast(std::make_shared<CmdThreadPoolTask>(shared_from_this(), std::move(params))); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// check transaction | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// if (IsFlagOn(ClientFlag_multi)) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// if (cmdName_ != kCmdNameMulti && cmdName_ != kCmdNameExec && cmdName_ != kCmdNameWatch && | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// cmdName_ != kCmdNameUnwatch && cmdName_ != kCmdNameDiscard) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// if (!info->CheckParamsCount(static_cast<int>(params.size()))) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// ERROR("queue failed: cmd {} has params {}", cmdName_, params.size()); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// ReplyError(info ? PError_param : PError_unknowCmd, &reply_); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// FlagExecWrong(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// } else { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// if (!IsFlagOn(ClientFlag_wrongExec)) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// queue_cmds_.push_back(params); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// reply_.PushData("+QUEUED\r\n", 9); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// INFO("queue cmd {}", cmdName_); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// return static_cast<int>(ptr - start); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// Propagate(params, GetCurrentDB()); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// g_kiwi->SubmitFast(std::make_shared<CmdThreadPoolTask>(shared_from_this())); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+182
to
+184
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Remove commented-out code. If the propagation functionality is no longer needed, remove the commented code. If it's still required, implement it properly. - // Propagate(params, GetCurrentDB());
-
- // g_kiwi->SubmitFast(std::make_shared<CmdThreadPoolTask>(shared_from_this())); |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// check readonly slave and execute command | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// PError err = PError_ok; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -311,6 +293,22 @@ uint64_t PClient::GetUniqueID() const { return GetConnId(); } | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ClientInfo PClient::GetClientInfo() const { return {GetUniqueID(), PeerIP().c_str(), PeerPort()}; } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
bool PClient::CheckTransation(std::vector<std::string>& param) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if (IsFlagOn(kClientFlagMulti)) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if (cmdName_ != kCmdNameMulti && cmdName_ != kCmdNameExec && cmdName_ != kCmdNameWatch && | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
cmdName_ != kCmdNameUnWatch && cmdName_ != kCmdNameDiscard) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if (!IsFlagOn(kClientFlagWrongExec)) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
queue_cmds_.push_back(param); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
INFO("queue cmd {}", cmdName_); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
this->SetRes(CmdRes::kQueued); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
g_kiwi->PushWriteTask(shared_from_this()); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return true; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return false; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+296
to
+310
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Fix typo in method name and add validation. Issues found:
Apply this diff to fix the issues: -bool PClient::CheckTransation(std::vector<std::string>& param) {
+bool PClient::CheckTransaction(std::vector<std::string>& param) {
+ if (param.empty()) {
+ return false;
+ }
if (IsFlagOn(kClientFlagMulti)) {
if (cmdName_ != kCmdNameMulti && cmdName_ != kCmdNameExec && cmdName_ != kCmdNameWatch &&
cmdName_ != kCmdNameUnWatch && cmdName_ != kCmdNameDiscard) {
if (!IsFlagOn(kClientFlagWrongExec)) {
queue_cmds_.push_back(param);
}
INFO("queue cmd {}", cmdName_);
this->SetRes(CmdRes::kQueued);
g_kiwi->PushWriteTask(shared_from_this());
return true;
}
}
return false;
} 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
bool PClient::Watch(int dbno, const std::string& key) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
DEBUG("Client {} watch {}, db {}", name_, key, dbno); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return watch_keys_[dbno].insert(key).second; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -321,7 +319,6 @@ bool PClient::NotifyDirty(int dbno, const std::string& key) { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
INFO("client is already dirty {}", GetUniqueID()); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return true; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if (watch_keys_[dbno].contains(key)) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
INFO("{} client become dirty because key {} in db {}", GetUniqueID(), key, dbno); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
SetFlag(kClientFlagDirty); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -338,29 +335,48 @@ bool PClient::Exec() { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
this->ClearMulti(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
this->ClearWatch(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
DEBUG("Exec"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if (IsFlagOn(kClientFlagWrongExec)) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return false; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if (IsFlagOn(kClientFlagDirty)) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// FormatNullArray(&reply_); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
AppendString(""); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
std::string message_ = "$-1\r\n"; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
resp_encode_->Reply(message_); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return true; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
resp_encode_->ClearReply(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
DEBUG("size : {}", queue_cmds_.size()); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
AppendArrayLen(queue_cmds_.size()); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
DEBUG("judge"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
auto client = shared_from_this(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
cmd_table_manager.InitCmdTable(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
for (auto& cmd : queue_cmds_) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
SetCmdName(kstd::StringToLower(cmd[0])); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
SetArgv(cmd); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
kstd::StringToLower(client->cmdName_); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
auto [cmdPtr, ret] = cmd_table_manager.GetCommand(client->CmdName(), client.get()); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
auto cmdstat_map = GetCommandStatMap(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
CommandStatistics statistics; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if (cmdstat_map->find(cmd[0]) == cmdstat_map->end()) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
cmdstat_map->emplace(cmd[0], statistics); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
auto now = std::chrono::steady_clock::now(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
GetTimeStat()->SetDequeueTs(now); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
cmdPtr->Execute(client.get()); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// PreFormatMultiBulk(queue_cmds_.size(), &reply_); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// for (const auto& cmd : queue_cmds_) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// DEBUG("EXEC {}, for client {}", cmd[0], UniqueId()); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// const PCommandInfo* info = PCommandTable::GetCommandInfo(cmd[0]); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// PError err = PCommandTable::ExecuteCmd(cmd, info, &reply_); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// may dirty clients; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// if (err == PError_ok && (info->attr & PAttr_write)) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// Propagate(cmd); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// Info Commandstats used | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
now = std::chrono::steady_clock::now(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
GetTimeStat()->SetProcessDoneTs(now); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(*cmdstat_map)[cmd[0]].cmd_count_.fetch_add(1); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(*cmdstat_map)[cmd[0]].cmd_time_consuming_.fetch_add(GetTimeStat()->GetTotalTime()); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
FeedMonitors(cmd); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
DEBUG("over"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
g_kiwi->PushWriteTask(client); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// Propagate(client->params_, GetCurrentDB()); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return true; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,14 +35,17 @@ void CmdWorkThreadPoolWorker::Work() { | |
if (!cmdPtr) { | ||
if (ret == CmdRes::kUnknownCmd) { | ||
task->Client()->SetRes(CmdRes::kUnknownCmd, fmt::format("unknown command '{}'", param[0])); | ||
task->Client()->FlagExecWrong(); | ||
WARN("client IP:{},port:{} unknown command '{}'", task->Client()->PeerIP(), task->Client()->PeerPort(), | ||
param[0]); | ||
} else if (ret == CmdRes::kUnknownSubCmd) { | ||
task->Client()->SetRes(CmdRes::kUnknownSubCmd, task->Client()->argv_[1]); | ||
task->Client()->FlagExecWrong(); | ||
WARN("client IP:{},port:{} unknown sub command '{}'", task->Client()->PeerIP(), task->Client()->PeerPort(), | ||
task->Client()->argv_[1]); | ||
} else { | ||
task->Client()->SetRes(CmdRes::kWrongNum, param[0]); | ||
task->Client()->FlagExecWrong(); | ||
WARN("client IP:{},port:{} unknown command '{}'", task->Client()->PeerIP(), task->Client()->PeerPort(), | ||
param[0]); | ||
} | ||
|
@@ -52,10 +55,16 @@ void CmdWorkThreadPoolWorker::Work() { | |
|
||
if (!cmdPtr->CheckArg(task->Client()->ParamsSize())) { | ||
task->Client()->SetRes(CmdRes::kWrongNum, param[0]); | ||
task->Client()->FlagExecWrong(); | ||
g_kiwi->PushWriteTask(task->Client()); | ||
continue; | ||
} | ||
|
||
// check transaction | ||
if (task->Client()->CheckTransation(param)) { | ||
continue; | ||
} | ||
|
||
Comment on lines
+63
to
+67
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 💡 Codebase verification Transaction check implementation is correct but contains a typo The transaction check implementation is properly integrated with appropriate error handling and state management. However, there is a consistent typo in the method name:
🔗 Analysis chainVerify transaction check implementation. The new transaction check is added but its behavior needs verification, especially around error handling and state management. Run this script to verify the transaction check implementation: 🏁 Scripts executedThe following scripts were executed for the analysis: Script: #!/bin/bash
# Description: Verify transaction check implementation and error handling
# Test: Search for transaction check usage and related error handling
ast-grep --pattern 'CheckTransation($$$)'
Length of output: 43 Script: #!/bin/bash
# Search for both spellings and Client class implementation
echo "=== Searching for Transaction methods ==="
rg -p "CheckTransaction|CheckTransation" -t cpp -t hpp
echo -e "\n=== Looking for Client class definition ==="
ast-grep --pattern 'class Client {
$$$
}'
echo -e "\n=== Searching for transaction-related code ==="
rg -p "transaction|Transaction" -t cpp -t hpp -C 2
Length of output: 516 Script: #!/bin/bash
# Search with corrected file types and broader patterns
echo "=== Searching for Client class and methods ==="
rg -p "class.*Client" -g "*.{h,cc}"
echo -e "\n=== Searching for transaction check methods ==="
rg -p "Check.*Trans.*" -g "*.{h,cc}" -C 2
echo -e "\n=== Searching for transaction-related code ==="
rg -p "transaction|Transaction" -g "*.{h,cc}" -C 2
echo -e "\n=== Looking for the specific file content ==="
cat src/cmd_thread_pool_worker.cc
Length of output: 12283 |
||
auto cmdstat_map = task->Client()->GetCommandStatMap(); | ||
CommandStatistics statistics; | ||
if (cmdstat_map->find(param[0]) == cmdstat_map->end()) { | ||
|
@@ -71,6 +80,8 @@ void CmdWorkThreadPoolWorker::Work() { | |
(*cmdstat_map)[param[0]].cmd_count_.fetch_add(1); | ||
(*cmdstat_map)[param[0]].cmd_time_consuming_.fetch_add(task->Client()->GetTimeStat()->GetTotalTime()); | ||
|
||
task->Client()->FeedMonitors(param); | ||
|
||
g_kiwi->PushWriteTask(task->Client()); | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,7 @@ | |
#include "db.h" | ||
#include "std/log.h" | ||
#include "std/std_string.h" | ||
#include "transaction.h" | ||
|
||
namespace kiwi { | ||
|
||
|
@@ -76,4 +77,34 @@ void Store::HandleTaskSpecificDB(const TasksVector& tasks) { | |
} | ||
}); | ||
} | ||
|
||
void Propagate(const std::vector<PString>& params, int dbno) { | ||
assert(!params.empty()); | ||
// | ||
// if (!g_dirtyKeys.empty()) { | ||
// for (const auto& k : g_dirtyKeys) { | ||
// PTransaction::Instance().NotifyDirty(PSTORE.GetDBNumber(), k); | ||
// | ||
// } | ||
// g_dirtyKeys.clear(); | ||
// } else if (params.size() > 1) { | ||
// PTransaction::Instance().NotifyDirty(PSTORE.GetDBNumber(), params[1]); | ||
// } | ||
if (params.size() > 1) { | ||
PTransaction::Instance().NotifyDirty(dbno, params[1]); | ||
} | ||
} | ||
Comment on lines
+81
to
+96
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Validate In Modify the code to include a size check: void Propagate(const std::vector<PString>& params, int dbno) {
if (params.size() > 1) {
PTransaction::Instance().NotifyDirty(dbno, params[1]);
} else {
// Handle insufficient parameters
}
} |
||
|
||
void Propagate(int dbno, const std::vector<PString>& params) { | ||
PTransaction::Instance().NotifyDirtyAll(dbno); | ||
Propagate(params, dbno); | ||
} | ||
|
||
void signalModifiedKey(const std::vector<PString>& keys, int dbno) { | ||
if (keys.size() > 1) { | ||
for (const auto& key : keys) { | ||
PTransaction::Instance().NotifyDirty(dbno, key); | ||
} | ||
} | ||
Comment on lines
+103
to
+108
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ensure all modified keys are signaled regardless of count In Update the condition to include all non-empty key vectors: void signalModifiedKey(const std::vector<PString>& keys, int dbno) {
if (!keys.empty()) {
for (const auto& key : keys) {
PTransaction::Instance().NotifyDirty(dbno, key);
}
}
} |
||
} | ||
} // namespace kiwi |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ensure thread-safe access to the global command table manager.
The global
cmd_table_manager
variable could lead to race conditions in a multi-threaded environment. Consider: