Skip to content

Commit 66b19c7

Browse files
author
wuxianrong
committed
Optimize the log submission using the proto method
1 parent d93216f commit 66b19c7

15 files changed

Lines changed: 362 additions & 821 deletions

File tree

include/pika_raft.h

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -104,10 +104,5 @@ class RaftConfigCmd : public Cmd {
104104
std::string db_name_;
105105
};
106106

107-
// Thread-local flag to detect if we're in on_apply context
108-
namespace pika_raft {
109-
extern thread_local bool g_in_raft_apply;
110-
}
111-
112107
#endif // PIKA_RAFT_H_
113108

src/pika_command.cc

Lines changed: 5 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@
66
#include <memory>
77
#include <sstream>
88
#include <utility>
9-
#include <future>
10-
#include <chrono>
119

1210
#include <glog/logging.h>
1311
#include "include/pika_acl.h"
@@ -897,17 +895,16 @@ void Cmd::InternalProcessCommand(const HintKeys& hint_keys) {
897895

898896
uint64_t before_do_binlog_us = pstd::NowMicros();
899897
this->command_duration_ms = (before_do_binlog_us - before_do_command_us) / 1000;
900-
901-
// Release locks BEFORE calling DoBinlog() to avoid deadlock in Raft mode
902-
// In Raft mode, DoBinlog() will wait for on_apply which needs the same lock
898+
DoBinlog();
899+
900+
903901
if (!IsSuspend()) {
904902
db_->DBUnlockShared();
905903
}
906904
if (is_write()) {
907905
record_lock.Unlock(current_key());
908906
}
909907

910-
DoBinlog();
911908

912909
uint64_t end_us = pstd::NowMicros();
913910
this->binlog_duration_ms = (end_us - before_do_binlog_us) / 1000;
@@ -963,52 +960,8 @@ bool Cmd::DoReadCommandInCache() {
963960

964961

965962
void Cmd::DoBinlog() {
966-
// Skip binlog entirely if we're in on_apply context
967-
if (pika_raft::g_in_raft_apply) {
968-
return;
969-
}
970-
971-
// Check if Raft is enabled
972-
auto raft_manager = g_pika_server->GetRaftManager();
973-
if (raft_manager && is_write() && res().ok()) {
974-
// Plan A: Submit command to Raft using Redis protocol
975-
// Skip if we're already in on_apply context (avoid recursion)
976-
std::string redis_proto = ToRedisProtocol();
977-
978-
// Prepend db_name to Redis protocol for extraction in on_apply
979-
std::string log_data = db_name_ + "|" + redis_proto;
980-
981-
982-
// Create promise/future for synchronous waiting
983-
std::promise<rocksdb::Status> promise;
984-
auto future = promise.get_future();
985-
986-
// Submit to Raft
987-
auto status = raft_manager->SubmitCommandWithPromise(
988-
db_name_, log_data, std::move(promise));
989-
990-
if (!status.ok()) {
991-
LOG(ERROR) << "Failed to submit command to Raft: " << status.ToString();
992-
res_.SetRes(CmdRes::kErrOther, "Raft submit failed: " + status.ToString());
993-
return;
994-
}
995-
996-
// Wait for Raft to apply (with 10 second timeout)
997-
auto wait_status = future.wait_for(std::chrono::seconds(10));
998-
if (wait_status == std::future_status::timeout) {
999-
LOG(ERROR) << "Raft apply timeout for command: " << name_;
1000-
res_.SetRes(CmdRes::kErrOther, "Raft apply timeout");
1001-
return;
1002-
}
1003-
1004-
// Get the result
1005-
rocksdb::Status raft_result = future.get();
1006-
if (!raft_result.ok()) {
1007-
LOG(ERROR) << "Raft apply failed: " << raft_result.ToString();
1008-
res_.SetRes(CmdRes::kErrOther, "Raft apply failed: " + raft_result.ToString());
1009-
return;
1010-
}
1011-
963+
// 如果是 Raft 模式,跳过写 binlog(改用 Protobuf binlog)
964+
if (g_pika_server->GetRaftManager()) {
1012965
return;
1013966
}
1014967

src/pika_kv.cc

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,9 @@
1111

1212
#include "include/pika_cache.h"
1313
#include "include/pika_conf.h"
14-
#include "include/pika_raft.h"
15-
#include "include/pika_server.h"
1614
#include "include/pika_slot_command.h"
1715

1816
extern std::unique_ptr<PikaConf> g_pika_conf;
19-
extern PikaServer* g_pika_server;
2017
/* SET key value [NX] [XX] [EX <seconds>] [PX <milliseconds>] */
2118
void SetCmd::DoInitial() {
2219
if (!CheckArg(argv_.size())) {
@@ -68,17 +65,6 @@ void SetCmd::DoInitial() {
6865
}
6966

7067
void SetCmd::Do() {
71-
// Plan A: If Raft is enabled, skip actual write on first call
72-
// The write will happen when on_apply executes this command
73-
// Use thread-local flag to detect if we're in on_apply context
74-
if (g_pika_server->GetRaftManager() && !pika_raft::g_in_raft_apply) {
75-
// Raft mode: First call from client, skip write
76-
// Set OK response, actual write will happen in on_apply
77-
res_.SetRes(CmdRes::kOk);
78-
return;
79-
}
80-
81-
// Normal path: Either non-Raft mode or called from on_apply
8268
int32_t res = 1;
8369
STAGE_TIMER_GUARD(storage_duration_ms, true);
8470
switch (condition_) {

src/pika_raft.cc

Lines changed: 60 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -94,9 +94,17 @@ void RaftClusterCmd::Do() {
9494
<< " with peers: " << argv_[2];
9595
status = raft_mgr->InitCluster(db_name_, args_);
9696
if (status.ok()) {
97-
// Raft 模式下关闭 RocksDB WAL(Raft Log 已提供持久化)
98-
db_->storage()->DisableWal(true);
99-
LOG(INFO) << "Disabled RocksDB WAL for Raft mode (DB: " << db_name_ << ")";
97+
// Update raft-peers in config file and persist
98+
std::string peers_str = argv_[2];
99+
if (g_pika_conf->SetConfStr("raft-peers", peers_str)) {
100+
if (g_pika_conf->WriteBack()) {
101+
LOG(INFO) << "Updated raft-peers in config file: " << peers_str;
102+
} else {
103+
LOG(WARNING) << "Failed to write raft-peers to config file";
104+
}
105+
} else {
106+
LOG(WARNING) << "Failed to update raft-peers in config";
107+
}
100108
res_.AppendStringRaw("+OK\r\n");
101109
} else {
102110
res_.SetRes(CmdRes::kErrOther, "Failed to initialize cluster: " + status.ToString());
@@ -109,9 +117,6 @@ void RaftClusterCmd::Do() {
109117
<< " to leader: " << args_[0];
110118
status = raft_mgr->JoinCluster(db_name_, args_[0]);
111119
if (status.ok()) {
112-
// Raft 模式下关闭 RocksDB WAL(Raft Log 已提供持久化)
113-
db_->storage()->DisableWal(true);
114-
LOG(INFO) << "Disabled RocksDB WAL for Raft mode (DB: " << db_name_ << ")";
115120
res_.AppendStringRaw("+OK\r\n");
116121
} else {
117122
res_.SetRes(CmdRes::kErrOther, "Failed to join cluster: " + status.ToString());
@@ -123,7 +128,6 @@ void RaftClusterCmd::Do() {
123128
std::string info;
124129
status = raft_mgr->GetClusterInfo(db_name_, &info);
125130
if (status.ok()) {
126-
// 拆分成多行,以数组格式返回
127131
std::vector<std::string> lines;
128132
std::stringstream ss(info);
129133
std::string line;
@@ -200,6 +204,22 @@ void RaftNodeCmd::Do() {
200204
<< ", peer: " << peer_addr_;
201205
status = raft_mgr->AddNode(db_name_, peer_addr_);
202206
if (status.ok()) {
207+
// Update raft-peers in config file
208+
std::string current_peers = g_pika_conf->raft_peers();
209+
if (current_peers.empty()) {
210+
current_peers = peer_addr_;
211+
} else if (current_peers.find(peer_addr_) == std::string::npos) {
212+
current_peers += "," + peer_addr_;
213+
}
214+
215+
if (g_pika_conf->SetConfStr("raft-peers", current_peers)) {
216+
if (g_pika_conf->WriteBack()) {
217+
LOG(INFO) << "Updated raft-peers in config file: " << current_peers;
218+
} else {
219+
LOG(WARNING) << "Failed to write raft-peers to config file";
220+
}
221+
}
222+
203223
res_.AppendStringRaw("+OK\r\n");
204224
} else {
205225
res_.SetRes(CmdRes::kErrOther, "Failed to add node: " + status.ToString());
@@ -212,6 +232,39 @@ void RaftNodeCmd::Do() {
212232
<< ", peer: " << peer_addr_;
213233
status = raft_mgr->RemoveNode(db_name_, peer_addr_);
214234
if (status.ok()) {
235+
// Update raft-peers in config file
236+
std::string current_peers = g_pika_conf->raft_peers();
237+
if (!current_peers.empty()) {
238+
// Parse and rebuild peer list without the removed peer
239+
std::vector<std::string> peer_list;
240+
std::stringstream ss(current_peers);
241+
std::string peer;
242+
while (std::getline(ss, peer, ',')) {
243+
peer.erase(0, peer.find_first_not_of(" \t"));
244+
peer.erase(peer.find_last_not_of(" \t") + 1);
245+
if (!peer.empty() && peer != peer_addr_) {
246+
peer_list.push_back(peer);
247+
}
248+
}
249+
250+
// Rebuild peers string
251+
std::string new_peers;
252+
for (size_t i = 0; i < peer_list.size(); i++) {
253+
new_peers += peer_list[i];
254+
if (i < peer_list.size() - 1) {
255+
new_peers += ",";
256+
}
257+
}
258+
259+
if (g_pika_conf->SetConfStr("raft-peers", new_peers)) {
260+
if (g_pika_conf->WriteBack()) {
261+
LOG(INFO) << "Updated raft-peers in config file: " << new_peers;
262+
} else {
263+
LOG(WARNING) << "Failed to write raft-peers to config file";
264+
}
265+
}
266+
}
267+
215268
res_.AppendStringRaw("+OK\r\n");
216269
} else {
217270
res_.SetRes(CmdRes::kErrOther, "Failed to remove node: " + status.ToString());

src/pika_server.cc

Lines changed: 14 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,15 @@ PikaServer::PikaServer()
114114
LOG(FATAL) << "Failed to initialize Raft manager: " << status.ToString();
115115
}
116116
LOG(INFO) << "Raft manager initialized successfully";
117+
118+
std::lock_guard rwl(storage_options_rw_);
119+
storage_options_.append_log_function =
120+
[this](const ::pikiwidb::Binlog& binlog, std::promise<rocksdb::Status>&& promise) {
121+
std::string db_name = "db0";
122+
123+
raft_manager_->AppendLog(db_name, binlog, std::move(promise));
124+
};
125+
LOG(INFO) << "Raft append_log_function registered in storage_options";
117126
}
118127

119128
bgsave_thread_.set_thread_name("PikaServer::bgsave_thread_");
@@ -237,58 +246,13 @@ void PikaServer::Start() {
237246
} else {
238247
LOG(INFO) << "Raft manager started successfully";
239248

240-
// 为每个数据库注册 binlog 回调
249+
// Disable WAL for all databases when Raft is enabled
241250
for (const auto& db_item : dbs_) {
242-
std::string db_name = db_item.first;
243-
auto storage = db_item.second->storage(); // Returns std::shared_ptr<storage::Storage>
244-
245-
if (!storage) {
246-
LOG(WARNING) << "数据库 " << db_name << " 的 storage 为空,跳过回调注册";
247-
continue;
251+
auto storage = db_item.second->storage();
252+
if (storage) {
253+
storage->DisableWal(true);
254+
LOG(INFO) << "Disabled WAL for DB: " << db_item.first;
248255
}
249-
250-
// 设置存储引擎引用 (使用原始指针)
251-
raft_manager_->SetStorage(storage.get());
252-
253-
// 注册 binlog 回调(使用 promise/future 同步)
254-
storage->SetBinlogWriteCallback(
255-
[this, db_name](const pikiwidb::Binlog& binlog, std::promise<rocksdb::Status>&& promise) {
256-
// 序列化 binlog
257-
std::string binlog_data;
258-
if (!binlog.SerializeToString(&binlog_data)) {
259-
LOG(ERROR) << "Failed to serialize binlog";
260-
promise.set_value(rocksdb::Status::Corruption("Binlog serialization failed"));
261-
return;
262-
}
263-
264-
LOG(INFO) << "收到 binlog 回调,数据库: " << db_name
265-
<< ", binlog 大小: " << binlog_data.size() << " 字节"
266-
<< ", entries: " << binlog.entries_size();
267-
268-
// 创建异步回调闭包
269-
auto* closure = new pika_raft::WriteDoneClosure(nullptr, nullptr);
270-
closure->SetBinlogData(binlog_data);
271-
272-
// TODO: 这里需要将 promise 传递给 closure,让 Raft apply 完成后设置结果
273-
// 目前先立即返回 OK,实际应该等待 Raft 应用完成
274-
275-
// 提交到 Raft
276-
auto apply_status = raft_manager_->ApplyBinlog(db_name, binlog_data, closure);
277-
if (!apply_status.ok()) {
278-
LOG(ERROR) << "提交 binlog 到 Raft 失败: " << apply_status.ToString();
279-
// 设置错误状态并调用 Run,让 closure 自己清理
280-
closure->status().set_error(-1, "%s", apply_status.ToString().c_str());
281-
closure->Run();
282-
promise.set_value(rocksdb::Status::IOError(apply_status.ToString()));
283-
} else {
284-
// TODO: 应该在 Raft apply 完成后才 set_value
285-
// 目前先简单地立即返回 OK
286-
promise.set_value(rocksdb::Status::OK());
287-
}
288-
}
289-
);
290-
291-
LOG(INFO) << "已为数据库 " << db_name << " 注册 binlog 回调";
292256
}
293257
}
294258
}

0 commit comments

Comments
 (0)