Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Resolve the inconsistency between member configuration and log index in snapshot meta #8

Open
wants to merge 14 commits into
base: unstable
Choose a base branch
from
13 changes: 12 additions & 1 deletion src/praft/praft.cc
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,16 @@ storage::LogIndex PRaft::GetLastLogIndex(bool is_flush) {
return node_->get_last_log_index(is_flush);
}

void PRaft::GetConfigurationByIndex(const int64_t index, braft::ConfigurationEntry* conf,
braft::ConfigurationEntry* learner_conf) {
if (!node_) {
ERROR("Node is not initialized");
return;
}

node_->get_configuration(index, conf, learner_conf);
}

void PRaft::SendNodeRequest(PClient* client) {
assert(client);

Expand Down Expand Up @@ -697,8 +707,9 @@ int PRaft::on_snapshot_load(braft::SnapshotReader* reader) {
2. When a node is improperly shut down and restarted, the minimum flush-index should
be obtained as the starting point for fault recovery.
*/
// replay from <replay_point + 1>
uint64_t replay_point = PSTORE.GetBackend(db_id_)->GetStorage()->GetSmallestFlushedLogIndex();
node_->set_self_playback_point(replay_point);
node_->set_last_applied_index_and_term(replay_point);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

看了下 BRAFT 中没有这个方法

is_node_first_start_up_ = false;
INFO("set replay_point: {}", replay_point);

Expand Down
3 changes: 3 additions & 0 deletions src/praft/praft.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <tuple>
#include <vector>

#include "braft/configuration_manager.h"
#include "braft/file_system_adaptor.h"
#include "braft/raft.h"
#include "brpc/server.h"
Expand Down Expand Up @@ -141,6 +142,8 @@ class PRaft : public braft::StateMachine {
butil::Status GetListPeers(std::vector<braft::PeerId>* peers);
storage::LogIndex GetTerm(uint64_t log_index);
storage::LogIndex GetLastLogIndex(bool is_flush = false);
void GetConfigurationByIndex(const int64_t index, braft::ConfigurationEntry* conf,
braft::ConfigurationEntry* learner_conf);

bool IsInitialized() const { return node_ != nullptr && server_ != nullptr; }

Expand Down
19 changes: 18 additions & 1 deletion src/praft/psnapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

#include "psnapshot.h"

#include "braft/configuration.h"
#include "braft/configuration_manager.h"
#include "braft/local_file_meta.pb.h"
#include "braft/snapshot.h"
#include "butil/files/file_path.h"
Expand Down Expand Up @@ -79,12 +81,27 @@ braft::FileAdaptor* PPosixFileSystemAdaptor::open(const std::string& path, int o
PSTORE.HandleTaskSpecificDB(tasks);
AddAllFiles(snapshot_path, &snapshot_meta_memtable, snapshot_path);

// update snapshot last log index and last_log_term
// update snapshot last log index, last_log_term, conf of last log index and learners
auto& new_meta = const_cast<braft::SnapshotMeta&>(snapshot_meta_memtable.meta());
auto last_log_index = PSTORE.GetBackend(db_id)->GetStorage()->GetSmallestFlushedLogIndex();
new_meta.set_last_included_index(last_log_index);
auto last_log_term = PRAFT.GetTerm(last_log_index);
new_meta.set_last_included_term(last_log_term);
braft::ConfigurationEntry conf_entry;
braft::ConfigurationEntry learner_conf_entry;
PRAFT.GetConfigurationByIndex(last_log_index, &conf_entry, &learner_conf_entry);
new_meta.clear_peers();
for (auto iter = conf_entry.conf.begin(); iter != conf_entry.conf.end(); ++iter) {
*new_meta.add_peers() = iter->to_string();
}
new_meta.clear_old_peers();
for (auto iter = conf_entry.old_conf.begin(); iter != conf_entry.old_conf.end(); ++iter) {
*new_meta.add_old_peers() = iter->to_string();
}
new_meta.clear_learners();
for (auto iter = learner_conf_entry.conf.begin(); iter != learner_conf_entry.conf.end(); ++iter) {
*new_meta.add_learners() = iter->to_string();
}
INFO("Succeed to fix db_{} snapshot meta: {}, {}", db_id, last_log_index, last_log_term);

auto rc = snapshot_meta_memtable.save_to_file(fs, meta_path);
Expand Down
3 changes: 1 addition & 2 deletions src/proto_parser.cc
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ void PProtoParser::Reset() {
numOfParam_ = 0;

params_.clear();

}

PParseResult PProtoParser::ParseRequest(const char*& ptr, const char* end) {
Expand Down Expand Up @@ -96,7 +95,7 @@ PParseResult PProtoParser::parseStrval(const char*& ptr, const char* end, PStrin
assert(paramLen_ >= 0);

if (static_cast<int>(end - ptr) < paramLen_ + 2) {
paramLen_-=(end-ptr);
paramLen_ -= (end - ptr);
result.append(ptr, end - ptr);
return PParseResult::kWait;
}
Expand Down
Loading