Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support redis transaction #10

Open
wants to merge 21 commits into
base: unstable
Choose a base branch
from
Open

feat: support redis transaction #10

wants to merge 21 commits into from

Conversation

panlei-coder
Copy link
Collaborator

@panlei-coder panlei-coder commented Aug 31, 2024

#11

Summary by CodeRabbit

Release Notes

  • New Features

    • Added transaction management support with commands: MULTI, EXEC, WATCH, UNWATCH, and DISCARD.
    • Introduced enhanced error handling for command execution.
    • Implemented key watching and transaction validation mechanisms.
    • Added new command response types for improved error handling.
  • Improvements

    • Refined command queuing and execution process.
    • Added more precise error reporting for various command scenarios.
    • Enhanced client state management during transactions.
    • Improved key modification signaling during write operations.
    • Enhanced caching mechanism for build artifacts on macOS.
  • Configuration

    • Reduced default number of databases from 16 to 6.
  • Bug Fixes

    • Improved handling of invalid command arguments.
    • Added checks to prevent reinitialization of command tables.
    • Corrected test case names for accuracy in testing suite.
    • Updated tests for compatibility with the Kiwi environment.

Copy link

coderabbitai bot commented Aug 31, 2024

Warning

Rate limit exceeded

@lankunGitHub has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 0 minutes and 37 seconds before requesting another review.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

📥 Commits

Reviewing files that changed from the base of the PR and between 2ecb6be and 5187536.

📒 Files selected for processing (1)
  • src/transaction.cc (1 hunks)

Walkthrough

The pull request introduces a comprehensive implementation of transaction management in the Kiwi database system. The changes span multiple files and add support for Redis-like transaction commands such as MULTI, WATCH, UNWATCH, EXEC, and DISCARD. The implementation includes enhanced error handling, key watching mechanisms, and modifications to command execution flow. The changes focus on providing atomic transaction capabilities, allowing clients to queue and conditionally execute commands based on key modifications.

Changes

File Change Summary
src/cmd_thread_pool_worker.cc Enhanced error handling for command execution with new flagging mechanism.
src/base_cmd.h Renamed constant from kCmdNameUnwatch to kCmdNameUnWatch.
src/client.cc, src/client.h Added transaction-related methods, modified command handling logic.
src/cmd_table_manager.cc Added new transaction-related commands to command table.
src/resp/resp2_encode.cc, src/resp/resp_encode.h Added new response types for transaction and error scenarios.
src/transaction.cc, src/transaction.h Implemented full transaction management system with command classes.
src/base_cmd.cc Added key modification signaling during write operations.
src/store.cc, src/store.h Added methods for propagating key modifications.
tests/unit/multi.tcl Updated transaction-related test cases.
tests/assets/default.conf Reduced number of databases from 16 to 6.
.github/workflows/kiwidb.yml Enhanced caching mechanism for build artifacts on macOS.
tests/test_helper.tcl Activated unit/multi test in the test suite.

Sequence Diagram

sequenceDiagram
    participant Client
    participant Transaction
    participant Store
    
    Client->>Transaction: MULTI
    Transaction-->>Client: Enter multi-mode
    
    Client->>Transaction: Queue commands
    Transaction-->>Client: Commands queued
    
    Client->>Transaction: EXEC
    Transaction->>Store: Check key modifications
    Store-->>Transaction: Validate keys
    
    alt Transaction valid
        Transaction->>Client: Execute queued commands
    else Transaction invalid
        Transaction-->>Client: Abort transaction
    end
Loading

Possibly related PRs

  • fix: ci run on macos-12 #62: The changes in src/cmd_thread_pool_worker.cc involve modifications to the CmdWorkThreadPoolWorker::Work method, which is relevant to the command execution and error handling, similar to the focus on command handling in the CI workflow changes in this PR.
  • style: some keywords begins with 'k' #155: The changes in src/cmd_thread_pool_worker.cc include updates to string handling, which may relate to the style changes in keyword usage in this PR, indicating a broader refactoring effort that could impact command processing.

Poem

🐰 In the realm of Kiwi's code so bright,
Transactions dance with atomic might!
MULTI, WATCH, and EXEC in line,
Commands queue up, looking fine!
A rabbit's transaction magic takes flight! 🚀


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@AlexStocks AlexStocks requested review from lqxhub and dingxiaoshuai123 and removed request for lqxhub and dingxiaoshuai123 August 31, 2024 13:13
@longfar-ncy
Copy link
Collaborator

@Iam-WenYi 麻煩review一下

Copy link
Collaborator

@Iam-WenYi Iam-WenYi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

需要做更多了解才行,现在还不可以下定论。
首先,项目目前是 Kiwi,已经不再是 PikiwiDB,因此注释方面需要调整。
指令方面,建议把 Multi 指令的注释迁移到 Multi 处,不要放在 Watch 处。
其它的话,可以看出这种事务的思路是参考了 Redis 的,即把要处理的任务放到一个队列之中,同时结合了 Kiwi 项目自身任务池的特点来的,但是我个人认为还需要进一步优化,因此还不能够予以通过。

@@ -0,0 +1,194 @@
/*
* Copyright (c) 2023-present, Qihoo, Inc. All rights reserved.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Qihoo 、pikiwidb 相关都删了

@luky116 luky116 mentioned this pull request Dec 21, 2024
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 9

🧹 Nitpick comments (7)
src/transaction.cc (1)

161-161: Correct the typo in the error message

In MultiCmd::DoCmd, there's a typo in the error message. "can not" should be "cannot".

Apply this diff to fix the typo:

-    client->SetRes(CmdRes::kErrOther, "MULTI calls can not be nested");
+    client->SetRes(CmdRes::kErrOther, "MULTI calls cannot be nested");
src/store.h (2)

70-71: Avoid global variables and refine code comments

Declaring global variables like g_dirtyKeys can make the codebase harder to maintain and test. Additionally, the comment "ugly, but I don't want to write signalModifiedKey() every where" is informal and may not be appropriate for the codebase.

Consider encapsulating g_dirtyKeys within a class or namespace to limit its scope. Also, update the comment to be more professional, or remove it if it's unnecessary.


72-73: Standardize parameter order in overloaded Propagate functions

The two overloaded Propagate functions have parameters in different orders, which can lead to confusion and errors. Standardizing the parameter order improves readability and reduces the risk of mistakes.

Ensure both functions have parameters in the same order. For example:

extern void Propagate(int dbno, const std::vector<PString>& params);
extern void Propagate(int dbno);
src/store.cc (1)

84-90: Remove commented-out code

The code between lines 84 and 90 is commented out. Keeping commented-out code can clutter the codebase and make maintenance more difficult. If this code is no longer needed, it should be removed.

src/client.cc (1)

27-27: Consider using forward declaration.

Instead of using an external variable, consider using a forward declaration or dependency injection.

-extern kiwi::CmdTableManager cmd_table_manager_;
+namespace kiwi {
+class CmdTableManager;
+}
tests/unit/multi.tcl (2)

59-68: Consider implementing alternative test strategy.

The commented-out test for transaction failures is important for verifying transaction behavior.

Would you like me to help create alternative test cases that work with the current feature set?


Line range hint 70-937: Document unsupported features.

Many tests are commented out due to unsupported features. Consider adding documentation about which Redis features are not supported.

Add a section in the README or documentation listing unsupported Redis features and any planned support.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c9ceb4a and a67a2c2.

📒 Files selected for processing (17)
  • .github/workflows/kiwidb.yml (1 hunks)
  • src/base_cmd.cc (1 hunks)
  • src/base_cmd.h (1 hunks)
  • src/client.cc (5 hunks)
  • src/client.h (2 hunks)
  • src/cmd_table_manager.cc (2 hunks)
  • src/cmd_thread_pool_worker.cc (3 hunks)
  • src/proto_parser.cc (1 hunks)
  • src/resp/resp2_encode.cc (2 hunks)
  • src/resp/resp_encode.h (2 hunks)
  • src/store.cc (2 hunks)
  • src/store.h (1 hunks)
  • src/transaction.cc (1 hunks)
  • src/transaction.h (1 hunks)
  • tests/assets/default.conf (1 hunks)
  • tests/test_helper.tcl (1 hunks)
  • tests/unit/multi.tcl (4 hunks)
🚧 Files skipped from review as they are similar to previous changes (5)
  • src/base_cmd.h
  • src/resp/resp2_encode.cc
  • src/resp/resp_encode.h
  • src/client.h
  • src/transaction.h
🧰 Additional context used
🪛 actionlint (1.7.4)
.github/workflows/kiwidb.yml

56-56: shellcheck reported issue in this script: SC2086:info:1:31: Double quote to prevent globbing and word splitting

(shellcheck)


61-61: the runner of "actions/cache@v3" action is too old to run on GitHub Actions. update the action's version to fix this issue

(action)

🪛 cppcheck (2.10-2)
src/transaction.cc

[performance] 101-101: Variable 'downSince' is assigned in constructor body. Consider performing initialization in initialization list.

(useInitializationList)

src/cmd_table_manager.cc

[performance] 203-203: Variable 'func_' is assigned in constructor body. Consider performing initialization in initialization list.

(useInitializationList)

src/cmd_thread_pool_worker.cc

[error] 38-38: Syntax Error

(internalAstError)

🔇 Additional comments (16)
src/transaction.cc (1)

1-6: ⚠️ Potential issue

Update the copyright notice to reflect the current project

The file's header mentions "Qihoo, Inc." and references to LICENSE and PATENTS files that may not exist or be incorrect for this project. Please update the copyright notice to
accurately reflect the current project and its contributors.

Apply this diff to update the header:

-/*
- * Copyright (c) 2023-present, Qihoo, Inc.  All rights reserved.
- * This source code is licensed under the BSD-style license found in the
- * LICENSE file in the root directory of this source tree. An additional grant
- * of patent rights can be found in the PATENTS file in the same directory.
- */
+/*
+ * Copyright (c) 2023-present, Arana/Kiwi Community.  All rights reserved.
+ * This source code is licensed under the BSD-style license found in the
+ * LICENSE file in the root directory of this source tree.
+ */

Likely invalid or redundant comment.

src/base_cmd.cc (1)

73-77: LGTM! Key modification tracking for transactions.

The implementation correctly tracks key modifications for write commands before executing them, which is essential for transaction management (e.g., WATCH/UNWATCH commands).

tests/test_helper.tcl (1)

18-18: Verify that the unit/multi test file exists and is ready.

The change enables the unit/multi test, which is essential for testing the new transaction management feature.

Run the following script to verify the test file:

✅ Verification successful

Test file is present and ready for execution

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check if the unit/multi test file exists and is ready.

# Test: Check if the test file exists
echo "Checking if unit/multi test file exists..."
fd -t f "multi.tcl" tests/unit/

# Test: Check if the test file contains transaction-related test cases
echo "Checking for transaction-related test cases..."
rg -A 5 'MULTI|EXEC|WATCH|UNWATCH|DISCARD' tests/unit/multi.tcl

Length of output: 13283

src/proto_parser.cc (7)

23-29: LGTM! Clean initialization of parser state.

The function correctly resets all member variables to their initial states, which is essential for reusing the parser instance.


31-44: LGTM! Well-structured request parsing with proper error handling.

The function follows a clear flow:

  1. Parse the multi part if not already parsed
  2. Handle parsing errors appropriately
  3. Proceed to parse the string list

46-58: LGTM! Robust multi-bulk request parsing.

The function includes all necessary validations:

  • Buffer size check
  • Protocol format validation ('*' character)
  • Proper integer parsing until CRLF

60-77: LGTM! Efficient string list parsing with dynamic resizing.

The function handles the parsing efficiently:

  • Dynamic resizing of results vector
  • Proper parameter count tracking
  • Comprehensive error handling

79-92: LGTM! Two-phase string parsing with proper state management.

The function correctly implements the two-phase parsing:

  1. Parse string length if not already parsed
  2. Parse string value

94-113: LGTM! Robust string value parsing with partial read support.

The function includes all necessary validations and features:

  • Parameter length validation
  • Partial read handling
  • CRLF termination check

115-132: LGTM! Comprehensive string length parsing with proper error recovery.

The function includes all necessary validations and features:

  • Buffer size check
  • Protocol format validation ('$' character)
  • Proper integer parsing until CRLF
  • Pointer adjustment on error
.github/workflows/kiwidb.yml (1)

51-68: 🛠️ Refactor suggestion

Remove duplicate ccache configuration.

The ccache configuration is duplicated in the workflow. The first configuration block at lines 51-54 is redundant as the same steps are repeated later.

Remove the redundant configuration:

-        run: |
-          brew update
-          brew install ccache
-

Likely invalid or redundant comment.

🧰 Tools
🪛 actionlint (1.7.4)

56-56: shellcheck reported issue in this script: SC2086:info:1:31: Double quote to prevent globbing and word splitting

(shellcheck)


61-61: the runner of "actions/cache@v3" action is too old to run on GitHub Actions. update the action's version to fix this issue

(action)

src/cmd_thread_pool_worker.cc (2)

38-38: LGTM! Enhanced error handling with execution flags.

The addition of FlagExecWrong() calls improves error handling by properly marking failed command executions.

Also applies to: 43-43, 48-48, 58-58

🧰 Tools
🪛 cppcheck (2.10-2)

[error] 38-38: Syntax Error

(internalAstError)


83-83: LGTM! Added monitor feed.

The addition of monitor feed ensures proper command monitoring.

src/cmd_table_manager.cc (1)

203-208: LGTM! Added Redis transaction commands.

The implementation adds essential Redis transaction commands with appropriate argument counts:

  • Multi: 1 argument
  • Watch: -2 arguments (variable, at least 2)
  • UnWatch: 1 argument
  • Exec: 1 argument
  • Discard: 1 argument
🧰 Tools
🪛 cppcheck (2.10-2)

[performance] 203-203: Variable 'func_' is assigned in constructor body. Consider performing initialization in initialization list.

(useInitializationList)

src/client.cc (1)

296-310: Verify transaction check logic.

The transaction check implementation needs verification for edge cases and error handling.

Run this script to verify the transaction check implementation:

✅ Verification successful

Transaction check implementation verified successfully

The implementation correctly handles all transaction states, command queueing, and error conditions. Edge cases are properly managed with appropriate flags and validations.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify transaction check implementation
# Test: Search for transaction check usage and related error handling
rg -A 5 "CheckTransation|kClientFlagMulti|kClientFlagWrongExec"

Length of output: 3643

tests/unit/multi.tcl (1)

1-9: LGTM! Added helper function for database size verification.

The wait_for_dbsize helper function provides a reliable way to verify database state changes.

Comment on lines +119 to +122
bool WatchCmd::DoInitial(PClient* client) {
client->SetKey(client->argv_[1]);
return true;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Check argument count before accessing client->argv_

In the DoInitial methods of command classes, you're accessing client->argv_[1] without verifying that the argument count is sufficient. This could lead to out-of-bounds access if the commands are called with insufficient arguments.

Apply this diff to validate the argument count:

 bool WatchCmd::DoInitial(PClient* client) {
+  if (client->argv_.size() < 2) {
+    client->SetRes(CmdRes::kWrongNum);
+    return false;
+  }
   client->SetKey(client->argv_[1]);
   return true;
 }

 bool UnWatchCmd::DoInitial(PClient* client) {
+  if (client->argv_.size() < 2) {
+    client->SetRes(CmdRes::kWrongNum);
+    return false;
+  }
   client->SetKey(client->argv_[1]);
   return true;
 }

 bool MultiCmd::DoInitial(PClient* client) {
+  if (client->argv_.size() < 2) {
+    client->SetRes(CmdRes::kWrongNum);
+    return false;
+  }
   client->SetKey(client->argv_[1]);
   return true;
 }

 bool ExecCmd::DoInitial(PClient* client) {
+  if (client->argv_.size() < 2) {
+    client->SetRes(CmdRes::kWrongNum);
+    return false;
+  }
   client->SetKey(client->argv_[1]);
   return true;
 }

 bool DiscardCmd::DoInitial(PClient* client) {
+  if (client->argv_.size() < 2) {
+    client->SetRes(CmdRes::kWrongNum);
+    return false;
+  }
   client->SetKey(client->argv_[1]);
   return true;
 }

Also applies to: 139-142, 152-155, 168-171, 187-190

Comment on lines +46 to +47
std::shared_lock<std::shared_mutex> r_lock(watched_clients_mutex_[dbno]);

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Validate dbno before accessing mutexes

Before accessing watched_clients_mutex_[dbno], ensure that dbno is within the valid range to prevent out-of-bounds access, which could result in undefined behavior.

Consider adding a check like:

if (dbno < 0 || dbno >= watched_clients_mutex_.size()) {
  // Handle error
  return;
}

Also applies to: 66-66

Comment on lines 63 to 66
Clients cls = std::move(it->second);
r_lock.unlock();

std::unique_lock<std::shared_mutex> w_lock(watched_clients_mutex_[dbno]);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Review lock management to prevent race conditions

In NotifyDirty, after moving it->second to cls and before acquiring the unique lock, there's a gap where the shared lock is released. This could lead to race conditions if another thread modifies clients_. It's safer to hold the lock throughout the operation or restructure the code to avoid such gaps.

Comment on lines 82 to 85
if (cls.empty()) {
it = dbWatchedKeys.find(key);
dbWatchedKeys.erase(it);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ensure iterator validity before erasing from dbWatchedKeys

When cls is empty, you attempt to erase the key from dbWatchedKeys by finding it again. However, there's a possibility that the key no longer exists if modified by another thread. Ensure that the iterator is valid before erasing.

Modify the code as follows:

it = dbWatchedKeys.find(key);
if (it != dbWatchedKeys.end()) {
  dbWatchedKeys.erase(it);
}

Comment on lines +81 to +96
void Propagate(const std::vector<PString>& params, int dbno) {
assert(!params.empty());
//
// if (!g_dirtyKeys.empty()) {
// for (const auto& k : g_dirtyKeys) {
// PTransaction::Instance().NotifyDirty(PSTORE.GetDBNumber(), k);
//
// }
// g_dirtyKeys.clear();
// } else if (params.size() > 1) {
// PTransaction::Instance().NotifyDirty(PSTORE.GetDBNumber(), params[1]);
// }
if (params.size() > 1) {
PTransaction::Instance().NotifyDirty(dbno, params[1]);
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Validate params size before accessing elements

In Propagate, you assume params.size() > 1 to access params[1]. While there's an assert, it's better to handle this explicitly to prevent potential out-of-bounds access in release builds where assertions may be disabled.

Modify the code to include a size check:

void Propagate(const std::vector<PString>& params, int dbno) {
  if (params.size() > 1) {
    PTransaction::Instance().NotifyDirty(dbno, params[1]);
  } else {
    // Handle insufficient parameters
  }
}

Comment on lines +103 to +108
void signalModifiedKey(const std::vector<PString>& keys, int dbno) {
if (keys.size() > 1) {
for (const auto& key : keys) {
PTransaction::Instance().NotifyDirty(dbno, key);
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ensure all modified keys are signaled regardless of count

In signalModifiedKey, you're only notifying dirty keys when keys.size() > 1. This means that if there's only one key, it won't be notified, which may not be the intended behavior.

Update the condition to include all non-empty key vectors:

void signalModifiedKey(const std::vector<PString>& keys, int dbno) {
  if (!keys.empty()) {
    for (const auto& key : keys) {
      PTransaction::Instance().NotifyDirty(dbno, key);
    }
  }
}

@@ -35,7 +35,7 @@ logfile stdout
# Set the number of databases. The default database is DB 0, you can select
# a different one on a per-connection basis using SELECT <dbid> where
# dbid is a number between 0 and 'databases'-1
databases 16
databases 6
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Reducing databases to 6 will break existing tests

The change from 16 to 6 databases will break at least one test case that explicitly uses database 9:

  • tests/unit/other.tcl: Uses SELECT 9 command which would fail with only 6 databases available
🔗 Analysis chain

Verify the impact of reducing the number of databases.

The change reduces the maximum number of databases from 16 to 6, which could affect applications that rely on databases with indices > 5.

Run the following script to verify if any tests or code rely on databases with indices > 5:

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for usage of database indices > 5 in tests and code.

# Test: Search for SELECT commands with indices > 5 in test files
echo "Checking test files for SELECT commands with indices > 5..."
rg -A 5 'SELECT\s+[6-9]|SELECT\s+1[0-6]' tests/

# Test: Search for database index references in source files
echo "Checking source files for database index references > 5..."
rg -A 5 'GetCurrentDB|SetCurrentDB|GetBackend.*\b([6-9]|1[0-6])\b' src/

Length of output: 45082

Comment on lines +63 to +67
// check transaction
if (task->Client()->CheckTransation(param)) {
continue;
}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Transaction check implementation is correct but contains a typo

The transaction check implementation is properly integrated with appropriate error handling and state management. However, there is a consistent typo in the method name:

  • CheckTransation should be CheckTransaction (missing 'c')
🔗 Analysis chain

Verify transaction check implementation.

The new transaction check is added but its behavior needs verification, especially around error handling and state management.

Run this script to verify the transaction check implementation:

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify transaction check implementation and error handling
# Test: Search for transaction check usage and related error handling
ast-grep --pattern 'CheckTransation($$$)'

Length of output: 43


Script:

#!/bin/bash
# Search for both spellings and Client class implementation
echo "=== Searching for Transaction methods ==="
rg -p "CheckTransaction|CheckTransation" -t cpp -t hpp

echo -e "\n=== Looking for Client class definition ==="
ast-grep --pattern 'class Client {
  $$$
}'

echo -e "\n=== Searching for transaction-related code ==="
rg -p "transaction|Transaction" -t cpp -t hpp -C 2

Length of output: 516


Script:

#!/bin/bash
# Search with corrected file types and broader patterns
echo "=== Searching for Client class and methods ==="
rg -p "class.*Client" -g "*.{h,cc}"

echo -e "\n=== Searching for transaction check methods ==="
rg -p "Check.*Trans.*" -g "*.{h,cc}" -C 2

echo -e "\n=== Searching for transaction-related code ==="
rg -p "transaction|Transaction" -g "*.{h,cc}" -C 2

echo -e "\n=== Looking for the specific file content ==="
cat src/cmd_thread_pool_worker.cc

Length of output: 12283

src/client.cc Outdated
Comment on lines 344 to 377
std::string message_ = "$-1\r\n";
resp_encode_->Reply(message_);
return true;
}
resp_encode_->ClearReply();
AppendArrayLen(queue_cmds_.size());

auto client = shared_from_this();
for (auto& cmd : queue_cmds_) {
SetCmdName(kstd::StringToLower(cmd[0]));
SetArgv(cmd);
kstd::StringToLower(client->cmdName_);
auto [cmdPtr, ret] = cmd_table_manager_.GetCommand(client->CmdName(), client.get());

auto cmdstat_map = GetCommandStatMap();
CommandStatistics statistics;
if (cmdstat_map->find(cmd[0]) == cmdstat_map->end()) {
cmdstat_map->emplace(cmd[0], statistics);
}
auto now = std::chrono::steady_clock::now();
GetTimeStat()->SetDequeueTs(now);
cmdPtr->Execute(client.get());

// PreFormatMultiBulk(queue_cmds_.size(), &reply_);
// for (const auto& cmd : queue_cmds_) {
// DEBUG("EXEC {}, for client {}", cmd[0], UniqueId());
// const PCommandInfo* info = PCommandTable::GetCommandInfo(cmd[0]);
// PError err = PCommandTable::ExecuteCmd(cmd, info, &reply_);
// Info Commandstats used
now = std::chrono::steady_clock::now();
GetTimeStat()->SetProcessDoneTs(now);
(*cmdstat_map)[cmd[0]].cmd_count_.fetch_add(1);
(*cmdstat_map)[cmd[0]].cmd_time_consuming_.fetch_add(GetTimeStat()->GetTotalTime());

// may dirty clients;
// if (err == PError_ok && (info->attr & PAttr_write)) {
// Propagate(cmd);
// }
// }
FeedMonitors(cmd);
}

g_kiwi->PushWriteTask(client);
// Propagate(client->params_, GetCurrentDB());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Review transaction execution implementation.

The transaction execution implementation has several potential issues:

  1. No error handling for command execution failures
  2. No rollback mechanism if a command fails
  3. Potential memory leak if command execution throws an exception

Consider adding:

  1. Error handling for each command execution
  2. Rollback mechanism for failed transactions
  3. RAII pattern for resource management

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🔭 Outside diff range comments (1)
src/transaction.cc (1)

Line range hint 338-379: Add transaction atomicity guarantees.

The Exec implementation lacks proper atomicity guarantees:

  1. No rollback mechanism if a command fails
  2. No error handling for individual command execution
  3. No transaction state validation

Consider implementing a transaction wrapper:

bool PClient::Exec() {
  DEFER {
    this->ClearMulti();
    this->ClearWatch();
  };

  if (IsFlagOn(kClientFlagWrongExec) || IsFlagOn(kClientFlagDirty)) {
    resp_encode_->Reply("$-1\r\n");
    return true;
  }

  resp_encode_->ClearReply();
  AppendArrayLen(queue_cmds_.size());

  auto client = shared_from_this();
  cmd_table_manager.InitCmdTable();
  
  // Store original state for rollback
  auto originalState = SaveState();
  bool success = true;

  try {
    for (auto& cmd : queue_cmds_) {
      if (!ExecuteCommand(cmd)) {
        success = false;
        break;
      }
    }

    if (!success) {
      RestoreState(originalState);
      resp_encode_->Reply("$-1\r\n");
    }

    g_kiwi->PushWriteTask(client);
    return true;
  } catch (const std::exception& e) {
    RestoreState(originalState);
    resp_encode_->Reply("$-1\r\n");
    return false;
  }
}
🧰 Tools
🪛 cppcheck (2.10-2)

[performance] 101-101: Variable 'downSince' is assigned in constructor body. Consider performing initialization in initialization list.

(useInitializationList)

♻️ Duplicate comments (1)
src/store.cc (1)

103-109: ⚠️ Potential issue

Modify key size check to include single key modifications.

The current implementation only processes keys when there are multiple keys (size > 1), which could miss single key modifications.

Apply this diff to fix the issue:

void signalModifiedKey(const std::vector<PString>& keys, int dbno) {
-  if (keys.size() > 1) {
+  if (!keys.empty()) {
    for (const auto& key : keys) {
      PTransaction::Instance().NotifyDirty(dbno, key);
    }
  }
}
🧹 Nitpick comments (1)
src/client.cc (1)

182-184: Remove commented out code.

The commented out code should be removed as it's no longer needed.

Apply this diff:

-  // Propagate(params, GetCurrentDB());
-
-  // g_kiwi->SubmitFast(std::make_shared<CmdThreadPoolTask>(shared_from_this()));
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a67a2c2 and 020c099.

📒 Files selected for processing (4)
  • src/client.cc (5 hunks)
  • src/cmd_table_manager.cc (3 hunks)
  • src/store.cc (2 hunks)
  • src/transaction.cc (1 hunks)
🧰 Additional context used
🪛 cppcheck (2.10-2)
src/store.cc

[performance] 101-101: Variable 'downSince' is assigned in constructor body. Consider performing initialization in initialization list.

(useInitializationList)

src/transaction.cc

[performance] 101-101: Variable 'downSince' is assigned in constructor body. Consider performing initialization in initialization list.

(useInitializationList)

🔇 Additional comments (3)
src/store.cc (2)

98-101: LGTM: Function correctly propagates changes.

The function properly notifies all clients before propagating individual changes.

🧰 Tools
🪛 cppcheck (2.10-2)

[performance] 101-101: Variable 'downSince' is assigned in constructor body. Consider performing initialization in initialization list.

(useInitializationList)


81-96: 🛠️ Refactor suggestion

Remove commented out code and add parameter validation.

The function has two issues:

  1. Contains commented out code that should be removed
  2. Lacks proper parameter validation before accessing params[1]

Apply this diff to fix the issues:

void Propagate(const std::vector<PString>& params, int dbno) {
  assert(!params.empty());
-  //
-  //  if (!g_dirtyKeys.empty()) {
-  //    for (const auto& k : g_dirtyKeys) {
-  //      PTransaction::Instance().NotifyDirty(PSTORE.GetDBNumber(), k);
-  //
-  //    }
-  //    g_dirtyKeys.clear();
-  //  } else if (params.size() > 1) {
-  //    PTransaction::Instance().NotifyDirty(PSTORE.GetDBNumber(), params[1]);
-  //  }
+  if (params.size() <= 1) {
+    return;
+  }
  if (params.size() > 1) {
    PTransaction::Instance().NotifyDirty(dbno, params[1]);
  }
}

Likely invalid or redundant comment.

src/cmd_table_manager.cc (1)

55-57: LGTM: Proper initialization and command registration.

The changes correctly:

  1. Add initialization check to prevent multiple initializations
  2. Register all required transaction commands with appropriate arities

Also applies to: 207-212

Comment on lines 19 to 26
void PTransaction::Watch(PClient* client, int dbno, const std::vector<PString>& keys) {
for (const auto& key : keys) {
if (client->Watch(dbno, key)) {
Clients& cls = clients_[dbno][key];
cls.push_back(std::static_pointer_cast<PClient>(client->shared_from_this()));
}
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling for Watch operation.

The Watch implementation should handle potential errors:

  1. Null client pointer
  2. Invalid database number
  3. Empty keys vector

Apply this diff to add error handling:

void PTransaction::Watch(PClient* client, int dbno, const std::vector<PString>& keys) {
+  if (!client || dbno < 0 || keys.empty()) {
+    return;
+  }
  for (const auto& key : keys) {
    if (client->Watch(dbno, key)) {
      Clients& cls = clients_[dbno][key];
      cls.push_back(std::static_pointer_cast<PClient>(client->shared_from_this()));
    }
  }
}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
void PTransaction::Watch(PClient* client, int dbno, const std::vector<PString>& keys) {
for (const auto& key : keys) {
if (client->Watch(dbno, key)) {
Clients& cls = clients_[dbno][key];
cls.push_back(std::static_pointer_cast<PClient>(client->shared_from_this()));
}
}
}
void PTransaction::Watch(PClient* client, int dbno, const std::vector<PString>& keys) {
if (!client || dbno < 0 || keys.empty()) {
return;
}
for (const auto& key : keys) {
if (client->Watch(dbno, key)) {
Clients& cls = clients_[dbno][key];
cls.push_back(std::static_pointer_cast<PClient>(client->shared_from_this()));
}
}
}

Comment on lines +296 to +310
bool PClient::CheckTransation(std::vector<std::string>& param) {
if (IsFlagOn(kClientFlagMulti)) {
if (cmdName_ != kCmdNameMulti && cmdName_ != kCmdNameExec && cmdName_ != kCmdNameWatch &&
cmdName_ != kCmdNameUnWatch && cmdName_ != kCmdNameDiscard) {
if (!IsFlagOn(kClientFlagWrongExec)) {
queue_cmds_.push_back(param);
}
INFO("queue cmd {}", cmdName_);
this->SetRes(CmdRes::kQueued);
g_kiwi->PushWriteTask(shared_from_this());
return true;
}
}
return false;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Fix typo in method name and add validation.

Issues found:

  1. Method name has a typo: "Transation" should be "Transaction"
  2. Missing validation for empty parameter vector

Apply this diff to fix the issues:

-bool PClient::CheckTransation(std::vector<std::string>& param) {
+bool PClient::CheckTransaction(std::vector<std::string>& param) {
+  if (param.empty()) {
+    return false;
+  }
  if (IsFlagOn(kClientFlagMulti)) {
    if (cmdName_ != kCmdNameMulti && cmdName_ != kCmdNameExec && cmdName_ != kCmdNameWatch &&
        cmdName_ != kCmdNameUnWatch && cmdName_ != kCmdNameDiscard) {
      if (!IsFlagOn(kClientFlagWrongExec)) {
        queue_cmds_.push_back(param);
      }
      INFO("queue cmd {}", cmdName_);
      this->SetRes(CmdRes::kQueued);
      g_kiwi->PushWriteTask(shared_from_this());
      return true;
    }
  }
  return false;
}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
bool PClient::CheckTransation(std::vector<std::string>& param) {
if (IsFlagOn(kClientFlagMulti)) {
if (cmdName_ != kCmdNameMulti && cmdName_ != kCmdNameExec && cmdName_ != kCmdNameWatch &&
cmdName_ != kCmdNameUnWatch && cmdName_ != kCmdNameDiscard) {
if (!IsFlagOn(kClientFlagWrongExec)) {
queue_cmds_.push_back(param);
}
INFO("queue cmd {}", cmdName_);
this->SetRes(CmdRes::kQueued);
g_kiwi->PushWriteTask(shared_from_this());
return true;
}
}
return false;
}
bool PClient::CheckTransaction(std::vector<std::string>& param) {
if (param.empty()) {
return false;
}
if (IsFlagOn(kClientFlagMulti)) {
if (cmdName_ != kCmdNameMulti && cmdName_ != kCmdNameExec && cmdName_ != kCmdNameWatch &&
cmdName_ != kCmdNameUnWatch && cmdName_ != kCmdNameDiscard) {
if (!IsFlagOn(kClientFlagWrongExec)) {
queue_cmds_.push_back(param);
}
INFO("queue cmd {}", cmdName_);
this->SetRes(CmdRes::kQueued);
g_kiwi->PushWriteTask(shared_from_this());
return true;
}
}
return false;
}

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
src/transaction.cc (1)

19-26: ⚠️ Potential issue

Add input validation in Watch method

The Watch method lacks validation for input parameters which could lead to undefined behavior.

Add parameter validation:

 void PTransaction::Watch(PClient* client, int dbno, const std::vector<PString>& keys) {
+  if (!client || dbno < 0 || keys.empty()) {
+    return;
+  }
   for (const auto& key : keys) {
     if (client->Watch(dbno, key)) {
🧹 Nitpick comments (3)
src/transaction.cc (3)

122-125: Remove unnecessary key setting in DoInitial methods

The SetKey calls in WatchCmd, UnWatchCmd, and DiscardCmd are unnecessary as these commands don't operate on a single key.

Apply this diff to remove unnecessary key setting:

 bool WatchCmd::DoInitial(PClient* client) {
-  client->SetKey(client->argv_[1]);
   return true;
 }

 bool UnWatchCmd::DoInitial(PClient* client) {
-  client->SetKey(client->argv_[1]);
   return true;
 }

 bool DiscardCmd::DoInitial(PClient* client) {
-  client->SetKey(client->argv_[1]);
   return true;
 }

Also applies to: 142-145, 184-187


91-116: Optimize NotifyDirtyAll implementation

The current implementation has duplicate code for handling single and all databases. Consider refactoring to reduce code duplication.

 void PTransaction::NotifyDirtyAll(int dbno) {
+  auto notify_db = [](const auto& key_clients) {
+    std::for_each(key_clients.second.begin(), key_clients.second.end(),
+      [](const std::weak_ptr<PClient>& wcli) {
+        if (auto scli = wcli.lock()) {
+          scli->SetFlag(kClientFlagDirty);
+        }
+      });
+  };
+
   if (dbno == -1) {
     for (auto& db_set : clients_) {
       for (auto& key_clients : db_set.second) {
-        std::for_each(key_clients.second.begin(), key_clients.second.end(),
-          [&](const std::weak_ptr<PClient>& wcli) {
-            auto scli = wcli.lock();
-            if (scli) {
-              scli->SetFlag(kClientFlagDirty);
-            }
-          });
+        notify_db(key_clients);
       }
     }
   } else {
     auto it = clients_.find(dbno);
     if (it != clients_.end()) {
       for (auto& key_clients : it->second) {
-        std::for_each(key_clients.second.begin(), key_clients.second.end(),
-          [&](const std::weak_ptr<PClient>& wcli) {
-            auto scli = wcli.lock();
-            if (scli) {
-              scli->SetFlag(kClientFlagDirty);
-            }
-          });
+        notify_db(key_clients);
       }
     }
   }
 }
🧰 Tools
🪛 cppcheck (2.10-2)

[performance] 101-101: Variable 'downSince' is assigned in constructor body. Consider performing initialization in initialization list.

(useInitializationList)


19-196: Consider adding transaction isolation level documentation

The current implementation follows Redis's transaction model, but it would be beneficial to explicitly document:

  1. The transaction isolation level provided
  2. Behavior during concurrent modifications
  3. Limitations compared to traditional ACID transactions

This documentation would help users understand the guarantees and limitations of the transaction implementation, especially regarding:

  • Watch/Unwatch behavior during concurrent modifications
  • Atomicity guarantees during EXEC
  • Handling of errors during transaction execution
🧰 Tools
🪛 cppcheck (2.10-2)

[performance] 101-101: Variable 'downSince' is assigned in constructor body. Consider performing initialization in initialization list.

(useInitializationList)

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 020c099 and 92df802.

📒 Files selected for processing (1)
  • src/transaction.cc (1 hunks)
🧰 Additional context used
🪛 cppcheck (2.10-2)
src/transaction.cc

[performance] 101-101: Variable 'downSince' is assigned in constructor body. Consider performing initialization in initialization list.

(useInitializationList)

🔇 Additional comments (1)
src/transaction.cc (1)

127-137: ⚠️ Potential issue

Add argument validation in WatchCmd::DoCmd

The method accesses argv_ without validating the argument count.

Add argument validation:

 void WatchCmd::DoCmd(PClient* client) {
+  if (client->argv_.size() < 2) {
+    client->SetRes(CmdRes::kWrongNum, "wrong number of arguments for 'watch' command");
+    return;
+  }
+
   if (client->IsFlagOn(kClientFlagMulti)) {
     client->SetRes(CmdRes::kPErrorWatch);
     return;
   }

Likely invalid or redundant comment.

Comment on lines +45 to +89
void PTransaction::NotifyDirty(int dbno, const PString& key) {
std::shared_lock<std::shared_mutex> r_lock(watched_clients_mutex_[dbno]);

auto tmpDBIter = clients_.find(dbno);
// 判断 db-id 对应的 clients 是否存在
if (tmpDBIter == clients_.end()) {
return;
}

// 取出 std::map<PString, Clients> 这个容器
auto& dbWatchedKeys = tmpDBIter->second;
auto it = dbWatchedKeys.find(key);
// 如果没有找到这个 key 对应的 client, 说明这个 key 没有被 watch
if (it == dbWatchedKeys.end()) {
return;
}

r_lock.unlock();
std::unique_lock<std::shared_mutex> w_lock(watched_clients_mutex_[dbno]);

// 取出这个被 watch 的一批连接池
it = dbWatchedKeys.find(key);
if (it == dbWatchedKeys.end()) {
return;
}
Clients& cls = it->second;
for (auto itCli(cls.begin()); itCli != cls.end();) {
auto client(itCli->lock());
if (!client) {
WARN("Erase not exist client when notify dirty key[{}]", key);
itCli = cls.erase(itCli);
} else {
if (client->NotifyDirty(dbno, key)) {
WARN("Erase dirty client {} when notify dirty key[{}]", client->GetName(), key);
itCli = cls.erase(itCli);
} else {
++itCli;
}
}
}

if (cls.empty()) {
dbWatchedKeys.erase(it);
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix potential race condition in NotifyDirty

The method has a lock gap between releasing the shared lock and acquiring the unique lock, which could lead to race conditions.

Consider these approaches:

  1. Use a single unique lock throughout the operation
  2. Use lock upgrading if available in your implementation
  3. Restructure the code to minimize the critical section

Here's a safer implementation using approach #1:

 void PTransaction::NotifyDirty(int dbno, const PString& key) {
-  std::shared_lock<std::shared_mutex> r_lock(watched_clients_mutex_[dbno]);
+  std::unique_lock<std::shared_mutex> lock(watched_clients_mutex_[dbno]);

   auto tmpDBIter = clients_.find(dbno);
   if (tmpDBIter == clients_.end()) {
     return;
   }

   auto& dbWatchedKeys = tmpDBIter->second;
   auto it = dbWatchedKeys.find(key);
   if (it == dbWatchedKeys.end()) {
     return;
   }

-  r_lock.unlock();
-  std::unique_lock<std::shared_mutex> w_lock(watched_clients_mutex_[dbno]);
-
-  it = dbWatchedKeys.find(key);
-  if (it == dbWatchedKeys.end()) {
-    return;
-  }
   Clients& cls = it->second;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
void PTransaction::NotifyDirty(int dbno, const PString& key) {
std::shared_lock<std::shared_mutex> r_lock(watched_clients_mutex_[dbno]);
auto tmpDBIter = clients_.find(dbno);
// 判断 db-id 对应的 clients 是否存在
if (tmpDBIter == clients_.end()) {
return;
}
// 取出 std::map<PString, Clients> 这个容器
auto& dbWatchedKeys = tmpDBIter->second;
auto it = dbWatchedKeys.find(key);
// 如果没有找到这个 key 对应的 client, 说明这个 key 没有被 watch
if (it == dbWatchedKeys.end()) {
return;
}
r_lock.unlock();
std::unique_lock<std::shared_mutex> w_lock(watched_clients_mutex_[dbno]);
// 取出这个被 watch 的一批连接池
it = dbWatchedKeys.find(key);
if (it == dbWatchedKeys.end()) {
return;
}
Clients& cls = it->second;
for (auto itCli(cls.begin()); itCli != cls.end();) {
auto client(itCli->lock());
if (!client) {
WARN("Erase not exist client when notify dirty key[{}]", key);
itCli = cls.erase(itCli);
} else {
if (client->NotifyDirty(dbno, key)) {
WARN("Erase dirty client {} when notify dirty key[{}]", client->GetName(), key);
itCli = cls.erase(itCli);
} else {
++itCli;
}
}
}
if (cls.empty()) {
dbWatchedKeys.erase(it);
}
}
void PTransaction::NotifyDirty(int dbno, const PString& key) {
std::unique_lock<std::shared_mutex> lock(watched_clients_mutex_[dbno]);
auto tmpDBIter = clients_.find(dbno);
// 判断 db-id 对应的 clients 是否存在
if (tmpDBIter == clients_.end()) {
return;
}
// 取出 std::map<PString, Clients> 这个容器
auto& dbWatchedKeys = tmpDBIter->second;
auto it = dbWatchedKeys.find(key);
// 如果没有找到这个 key 对应的 client, 说明这个 key 没有被 watch
if (it == dbWatchedKeys.end()) {
return;
}
Clients& cls = it->second;
for (auto itCli(cls.begin()); itCli != cls.end();) {
auto client(itCli->lock());
if (!client) {
WARN("Erase not exist client when notify dirty key[{}]", key);
itCli = cls.erase(itCli);
} else {
if (client->NotifyDirty(dbno, key)) {
WARN("Erase dirty client {} when notify dirty key[{}]", client->GetName(), key);
itCli = cls.erase(itCli);
} else {
++itCli;
}
}
}
if (cls.empty()) {
dbWatchedKeys.erase(it);
}
}

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (2)
src/resp/resp2_encode.h (1)

20-20: Optimize string operations to avoid temporary allocation.

The current implementation creates a temporary string during concatenation before appending. Consider using two separate append calls to avoid this overhead.

-  void SetLineString(const std::string& value) override { reply_.append(value + CRLF); }
+  void SetLineString(const std::string& value) override {
+    reply_.append(value);
+    reply_.append(CRLF);
+  }
src/client.cc (1)

Line range hint 296-377: Add documentation and use RAII patterns for transaction methods.

The transaction-related methods lack proper documentation explaining their purpose, parameters, and return values. Additionally, consider using RAII patterns for managing transaction state.

Example documentation and RAII implementation:

/**
 * @brief Checks if the command should be queued as part of a transaction
 * @param param Command parameters
 * @return true if command was queued, false otherwise
 */
bool PClient::CheckTransaction(std::vector<std::string>& param) {
    // ... implementation ...
}

/**
 * @brief Executes all queued commands in the transaction
 * @return true if all commands executed successfully, false otherwise
 */
bool PClient::Exec() {
    // Use RAII for transaction state
    class TransactionGuard {
        PClient* client_;
    public:
        TransactionGuard(PClient* client) : client_(client) {}
        ~TransactionGuard() {
            client_->ClearMulti();
            client_->ClearWatch();
        }
    };
    TransactionGuard guard(this);
    
    // ... implementation ...
}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 92df802 and 2ecb6be.

📒 Files selected for processing (2)
  • src/client.cc (5 hunks)
  • src/resp/resp2_encode.h (1 hunks)
🔇 Additional comments (3)
src/resp/resp2_encode.h (1)

20-20: Verify protocol encoding correctness.

Since this method is part of the RESP2 protocol encoding implementation, please ensure:

  1. The change maintains protocol compatibility
  2. Existing tests cover this modification

Let's verify the usage and test coverage:

✅ Verification successful

Protocol encoding implementation verified as correct

The implementation correctly appends CRLF line endings as required by RESP2 protocol, and this is properly validated by the test suite.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for SetLineString usage
echo "Searching for SetLineString usage..."
rg "SetLineString" -A 2

# Search for related tests
echo "Searching for related tests..."
fd -e cc -e cpp -e h "test.*resp.*encode" --exec rg -l "SetLineString" {}

Length of output: 4491


Script:

#!/bin/bash
# Check the test implementation
echo "Checking test implementation..."
rg -A 10 "TEST_F.*SetLineString" src/resp/tests/resp2_encode_test.cc

Length of output: 412

src/client.cc (2)

296-310: Fix typo in method name and add validation.

The same issues identified in the previous review still exist:

  1. Method name has a typo: "Transation" should be "Transaction"
  2. Missing validation for empty parameter vector

349-376: ⚠️ Potential issue

Improve transaction safety and error handling.

The current implementation has several critical issues:

  1. No error handling for command execution failures
  2. No rollback mechanism if a command fails
  3. Command statistics are updated regardless of command success
  4. No transaction atomicity guarantee

Consider implementing the following:

 resp_encode_->ClearReply();
 AppendArrayLen(queue_cmds_.size());
 auto client = shared_from_this();
 cmd_table_manager.InitCmdTable();
 
+std::vector<std::function<void()>> rollback_actions;
 for (auto& cmd : queue_cmds_) {
   SetCmdName(kstd::StringToLower(cmd[0]));
   SetArgv(cmd);
   kstd::StringToLower(client->cmdName_);
   auto [cmdPtr, ret] = cmd_table_manager.GetCommand(client->CmdName(), client.get());
 
   auto cmdstat_map = GetCommandStatMap();
   CommandStatistics statistics;
   if (cmdstat_map->find(cmd[0]) == cmdstat_map->end()) {
     cmdstat_map->emplace(cmd[0], statistics);
   }
   auto now = std::chrono::steady_clock::now();
   GetTimeStat()->SetDequeueTs(now);
-  cmdPtr->Execute(client.get());
+  try {
+    // Store the current state for rollback
+    auto snapshot = client->CreateStateSnapshot();
+    rollback_actions.push_back([client, snapshot]() {
+      client->RestoreStateSnapshot(snapshot);
+    });
+    
+    // Execute the command
+    if (!cmdPtr->Execute(client.get())) {
+      // Rollback all previous commands
+      for (auto it = rollback_actions.rbegin(); it != rollback_actions.rend(); ++it) {
+        (*it)();
+      }
+      return false;
+    }
+    
+    // Update statistics only on success
+    now = std::chrono::steady_clock::now();
+    GetTimeStat()->SetProcessDoneTs(now);
+    (*cmdstat_map)[cmd[0]].cmd_count_.fetch_add(1);
+    (*cmdstat_map)[cmd[0]].cmd_time_consuming_.fetch_add(GetTimeStat()->GetTotalTime());
+  } catch (const std::exception& e) {
+    // Log the error
+    ERROR("Command execution failed: {}", e.what());
+    
+    // Rollback all previous commands
+    for (auto it = rollback_actions.rbegin(); it != rollback_actions.rend(); ++it) {
+      (*it)();
+    }
+    return false;
+  }
 
-  // Info Commandstats used
-  now = std::chrono::steady_clock::now();
-  GetTimeStat()->SetProcessDoneTs(now);
-  (*cmdstat_map)[cmd[0]].cmd_count_.fetch_add(1);
-  (*cmdstat_map)[cmd[0]].cmd_time_consuming_.fetch_add(GetTimeStat()->GetTotalTime());
   FeedMonitors(cmd);
 }

Likely invalid or redundant comment.

Comment on lines +182 to +184
// Propagate(params, GetCurrentDB());

// g_kiwi->SubmitFast(std::make_shared<CmdThreadPoolTask>(shared_from_this()));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Remove commented-out code.

If the propagation functionality is no longer needed, remove the commented code. If it's still required, implement it properly.

-  // Propagate(params, GetCurrentDB());
-
-  // g_kiwi->SubmitFast(std::make_shared<CmdThreadPoolTask>(shared_from_this()));

Comment on lines +27 to +28
CmdTableManager cmd_table_manager;

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ensure thread-safe access to the global command table manager.

The global cmd_table_manager variable could lead to race conditions in a multi-threaded environment. Consider:

  1. Making it thread-safe using a singleton pattern with proper synchronization
  2. Or moving it to a context object that manages its lifecycle and access
-CmdTableManager cmd_table_manager;
+class SafeCmdTableManager {
+private:
+    static std::mutex mutex_;
+    static std::unique_ptr<CmdTableManager> instance_;
+
+public:
+    static CmdTableManager& Instance() {
+        std::lock_guard<std::mutex> lock(mutex_);
+        if (!instance_) {
+            instance_ = std::make_unique<CmdTableManager>();
+        }
+        return *instance_;
+    }
+};
+
+std::mutex SafeCmdTableManager::mutex_;
+std::unique_ptr<CmdTableManager> SafeCmdTableManager::instance_;

Committable suggestion skipped: line range outside the PR's diff.

@marsevilspirit
Copy link
Collaborator

tcl测试失败了,看一下怎么回事

@Issues-translate-bot
Copy link

Bot detected the issue body's language is not English, translate it automatically.


The tcl test failed, let’s see what happened

@luky116
Copy link
Collaborator

luky116 commented Jan 18, 2025

参照文档:#148 处理下 TCL 测试问题

@Issues-translate-bot
Copy link

Bot detected the issue body's language is not English, translate it automatically.


Refer to the document: #148 Handle TCL test issues

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants