Skip to content

Commit 8b935be

Browse files
author
wuxianrong
committed
Optimize the log submission using the proto method
1 parent d93216f commit 8b935be

14 files changed

Lines changed: 271 additions & 675 deletions

File tree

include/pika_raft.h

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -104,10 +104,5 @@ class RaftConfigCmd : public Cmd {
104104
std::string db_name_;
105105
};
106106

107-
// Thread-local flag to detect if we're in on_apply context
108-
namespace pika_raft {
109-
extern thread_local bool g_in_raft_apply;
110-
}
111-
112107
#endif // PIKA_RAFT_H_
113108

src/pika_command.cc

Lines changed: 5 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -897,17 +897,16 @@ void Cmd::InternalProcessCommand(const HintKeys& hint_keys) {
897897

898898
uint64_t before_do_binlog_us = pstd::NowMicros();
899899
this->command_duration_ms = (before_do_binlog_us - before_do_command_us) / 1000;
900-
901-
// Release locks BEFORE calling DoBinlog() to avoid deadlock in Raft mode
902-
// In Raft mode, DoBinlog() will wait for on_apply which needs the same lock
900+
DoBinlog();
901+
902+
903903
if (!IsSuspend()) {
904904
db_->DBUnlockShared();
905905
}
906906
if (is_write()) {
907907
record_lock.Unlock(current_key());
908908
}
909909

910-
DoBinlog();
911910

912911
uint64_t end_us = pstd::NowMicros();
913912
this->binlog_duration_ms = (end_us - before_do_binlog_us) / 1000;
@@ -963,52 +962,8 @@ bool Cmd::DoReadCommandInCache() {
963962

964963

965964
void Cmd::DoBinlog() {
966-
// Skip binlog entirely if we're in on_apply context
967-
if (pika_raft::g_in_raft_apply) {
968-
return;
969-
}
970-
971-
// Check if Raft is enabled
972-
auto raft_manager = g_pika_server->GetRaftManager();
973-
if (raft_manager && is_write() && res().ok()) {
974-
// Plan A: Submit command to Raft using Redis protocol
975-
// Skip if we're already in on_apply context (avoid recursion)
976-
std::string redis_proto = ToRedisProtocol();
977-
978-
// Prepend db_name to Redis protocol for extraction in on_apply
979-
std::string log_data = db_name_ + "|" + redis_proto;
980-
981-
982-
// Create promise/future for synchronous waiting
983-
std::promise<rocksdb::Status> promise;
984-
auto future = promise.get_future();
985-
986-
// Submit to Raft
987-
auto status = raft_manager->SubmitCommandWithPromise(
988-
db_name_, log_data, std::move(promise));
989-
990-
if (!status.ok()) {
991-
LOG(ERROR) << "Failed to submit command to Raft: " << status.ToString();
992-
res_.SetRes(CmdRes::kErrOther, "Raft submit failed: " + status.ToString());
993-
return;
994-
}
995-
996-
// Wait for Raft to apply (with 10 second timeout)
997-
auto wait_status = future.wait_for(std::chrono::seconds(10));
998-
if (wait_status == std::future_status::timeout) {
999-
LOG(ERROR) << "Raft apply timeout for command: " << name_;
1000-
res_.SetRes(CmdRes::kErrOther, "Raft apply timeout");
1001-
return;
1002-
}
1003-
1004-
// Get the result
1005-
rocksdb::Status raft_result = future.get();
1006-
if (!raft_result.ok()) {
1007-
LOG(ERROR) << "Raft apply failed: " << raft_result.ToString();
1008-
res_.SetRes(CmdRes::kErrOther, "Raft apply failed: " + raft_result.ToString());
1009-
return;
1010-
}
1011-
965+
// 如果是 Raft 模式,跳过写 binlog(改用 Protobuf binlog)
966+
if (g_pika_server->GetRaftManager()) {
1012967
return;
1013968
}
1014969

src/pika_kv.cc

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -68,17 +68,6 @@ void SetCmd::DoInitial() {
6868
}
6969

7070
void SetCmd::Do() {
71-
// Plan A: If Raft is enabled, skip actual write on first call
72-
// The write will happen when on_apply executes this command
73-
// Use thread-local flag to detect if we're in on_apply context
74-
if (g_pika_server->GetRaftManager() && !pika_raft::g_in_raft_apply) {
75-
// Raft mode: First call from client, skip write
76-
// Set OK response, actual write will happen in on_apply
77-
res_.SetRes(CmdRes::kOk);
78-
return;
79-
}
80-
81-
// Normal path: Either non-Raft mode or called from on_apply
8271
int32_t res = 1;
8372
STAGE_TIMER_GUARD(storage_duration_ms, true);
8473
switch (condition_) {

src/pika_server.cc

Lines changed: 22 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,21 @@ PikaServer::PikaServer()
114114
LOG(FATAL) << "Failed to initialize Raft manager: " << status.ToString();
115115
}
116116
LOG(INFO) << "Raft manager initialized successfully";
117+
118+
// 设置 Raft binlog 回调(pikiwidb_raft 风格)
119+
// 这样后续创建的所有 DB 都会使用这个回调
120+
std::lock_guard rwl(storage_options_rw_);
121+
storage_options_.append_log_function =
122+
[this](const ::pikiwidb::Binlog& binlog, std::promise<rocksdb::Status>&& promise) {
123+
// 使用 db_id 确定是哪个数据库(pikiwidb_raft 的实现方式)
124+
// 这里简化处理:使用第一个 DB("db0")
125+
// 更完整的实现需要根据 binlog.db_id() 查找对应的 DB
126+
std::string db_name = "db0"; // TODO: 支持多 DB
127+
128+
// 调用 RaftManager::AppendLog()
129+
raft_manager_->AppendLog(db_name, binlog, std::move(promise));
130+
};
131+
LOG(INFO) << "Raft append_log_function registered in storage_options";
117132
}
118133

119134
bgsave_thread_.set_thread_name("PikaServer::bgsave_thread_");
@@ -237,58 +252,23 @@ void PikaServer::Start() {
237252
} else {
238253
LOG(INFO) << "Raft manager started successfully";
239254

240-
// 为每个数据库注册 binlog 回调
255+
// 为每个数据库设置 Storage 引用并禁用 RocksDB WAL(供 on_apply 使用)
241256
for (const auto& db_item : dbs_) {
242257
std::string db_name = db_item.first;
243-
auto storage = db_item.second->storage(); // Returns std::shared_ptr<storage::Storage>
258+
auto storage = db_item.second->storage();
244259

245260
if (!storage) {
246-
LOG(WARNING) << "数据库 " << db_name << " 的 storage 为空,跳过回调注册";
261+
LOG(WARNING) << "数据库 " << db_name << " 的 storage 为空,跳过";
247262
continue;
248263
}
249264

265+
storage->DisableWal(true);
266+
LOG(INFO) << "已为数据库 " << db_name << " 禁用 RocksDB WAL(Raft 模式)";
267+
250268
// 设置存储引擎引用 (使用原始指针)
251269
raft_manager_->SetStorage(storage.get());
252270

253-
// 注册 binlog 回调(使用 promise/future 同步)
254-
storage->SetBinlogWriteCallback(
255-
[this, db_name](const pikiwidb::Binlog& binlog, std::promise<rocksdb::Status>&& promise) {
256-
// 序列化 binlog
257-
std::string binlog_data;
258-
if (!binlog.SerializeToString(&binlog_data)) {
259-
LOG(ERROR) << "Failed to serialize binlog";
260-
promise.set_value(rocksdb::Status::Corruption("Binlog serialization failed"));
261-
return;
262-
}
263-
264-
LOG(INFO) << "收到 binlog 回调,数据库: " << db_name
265-
<< ", binlog 大小: " << binlog_data.size() << " 字节"
266-
<< ", entries: " << binlog.entries_size();
267-
268-
// 创建异步回调闭包
269-
auto* closure = new pika_raft::WriteDoneClosure(nullptr, nullptr);
270-
closure->SetBinlogData(binlog_data);
271-
272-
// TODO: 这里需要将 promise 传递给 closure,让 Raft apply 完成后设置结果
273-
// 目前先立即返回 OK,实际应该等待 Raft 应用完成
274-
275-
// 提交到 Raft
276-
auto apply_status = raft_manager_->ApplyBinlog(db_name, binlog_data, closure);
277-
if (!apply_status.ok()) {
278-
LOG(ERROR) << "提交 binlog 到 Raft 失败: " << apply_status.ToString();
279-
// 设置错误状态并调用 Run,让 closure 自己清理
280-
closure->status().set_error(-1, "%s", apply_status.ToString().c_str());
281-
closure->Run();
282-
promise.set_value(rocksdb::Status::IOError(apply_status.ToString()));
283-
} else {
284-
// TODO: 应该在 Raft apply 完成后才 set_value
285-
// 目前先简单地立即返回 OK
286-
promise.set_value(rocksdb::Status::OK());
287-
}
288-
}
289-
);
290-
291-
LOG(INFO) << "已为数据库 " << db_name << " 注册 binlog 回调";
271+
LOG(INFO) << "已为数据库 " << db_name << " 设置 Storage 引用";
292272
}
293273
}
294274
}

src/praft/include/praft/praft.h

Lines changed: 11 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ namespace storage {
2929
class Storage;
3030
}
3131

32+
namespace pikiwidb {
33+
class Binlog;
34+
}
35+
3236
namespace net {
3337
class RedisConn;
3438
}
@@ -38,21 +42,6 @@ class Cmd;
3842
namespace pika_raft {
3943

4044
// Raft log entry data structure
41-
struct RaftLogEntry {
42-
std::string cmd_name;
43-
std::vector<std::string> args;
44-
std::string db_name;
45-
int64_t timestamp;
46-
47-
RaftLogEntry() : timestamp(0) {}
48-
49-
// Serialize to string
50-
std::string SerializeAsString() const;
51-
52-
// Deserialize from string
53-
bool ParseFromString(const std::string& data);
54-
};
55-
5645
// Write done closure for asynchronous Raft callback
5746
class WriteDoneClosure : public braft::Closure {
5847
public:
@@ -137,9 +126,6 @@ class PikaRaftNode {
137126
// Remove peer from the cluster
138127
pstd::Status RemovePeer(const braft::PeerId& peer);
139128

140-
// Apply a command to Raft
141-
pstd::Status Apply(const RaftLogEntry& entry);
142-
143129
// Get cluster status information
144130
void GetStatus(std::string* status_str);
145131

@@ -192,23 +178,17 @@ class RaftManager {
192178
// Check if Raft is enabled for a specific DB
193179
bool IsRaftEnabled(const std::string& db_name) const;
194180

195-
// Apply a command through Raft
196-
pstd::Status ApplyCommand(const std::string& db_name, const RaftLogEntry& entry);
197-
198181
// Apply binlog to Raft (called by storage callback)
199182
pstd::Status ApplyBinlog(const std::string& db_name,
200-
const std::string& binlog_data,
201-
WriteDoneClosure* done);
183+
const std::string& binlog_data,
184+
WriteDoneClosure* done);
202185

203-
// Submit command to Raft with promise (for synchronous waiting)
204-
pstd::Status SubmitCommandWithPromise(const std::string& db_name,
205-
const std::string& log_data,
206-
std::promise<rocksdb::Status>&& promise);
186+
// Append binlog (pikiwidb_raft 风格,直接接收 Binlog + promise)
187+
void AppendLog(const std::string& db_name,
188+
const ::pikiwidb::Binlog& log,
189+
std::promise<rocksdb::Status>&& promise);
207190

208-
// Apply command from Redis protocol (called in on_apply)
209-
rocksdb::Status ApplyCommandFromRedisProtocol(const std::string& redis_proto_data,
210-
const std::string& db_name);
211-
191+
// Submit command to Raft with promise (for synchronous waiting)
212192
// Get Raft node for a specific DB
213193
std::shared_ptr<PikaRaftNode> GetRaftNode(const std::string& db_name);
214194

src/praft/src/binlog.proto

Lines changed: 4 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,6 @@ package pikiwidb;
77

88
option optimize_for = LITE_RUNTIME;
99

10-
// 数据类型枚举
11-
enum DataType {
12-
kStrings = 0;
13-
kHashes = 1;
14-
kLists = 2;
15-
kSets = 3;
16-
kZSets = 4;
17-
kStreams = 5;
18-
}
19-
2010
// 操作类型
2111
enum OperateType {
2212
kNoOperate = 0;
@@ -26,28 +16,15 @@ enum OperateType {
2616

2717
// Binlog 条目(对应单个 RocksDB 操作)
2818
message BinlogEntry {
29-
DataType data_type = 1; // 数据类型(对应哪个 RocksDB)
19+
uint32 cf_idx = 1; // 列族索引 (column family index)
3020
OperateType op_type = 2; // 操作类型
3121
bytes key = 3; // 已编码的 key
3222
optional bytes value = 4; // 已编码的 value(包含 TTL、version 等)
33-
34-
// 用于日志恢复的元信息
35-
uint64 timestamp = 5; // 操作时间戳
36-
optional uint64 ttl = 6; // TTL(秒),0 表示永久
3723
}
3824

3925
// Binlog(对应一次 Raft 日志提交)
4026
message Binlog {
41-
uint32 db_id = 1; // 数据库 ID
42-
uint32 slot_idx = 2; // 槽位索引(预留)
43-
44-
// Raft 日志索引(对应 LogicOffset)
45-
uint32 term = 3; // Raft term
46-
uint64 log_index = 4; // Raft log index
47-
48-
// Binlog 文件位置(对应 BinlogOffset)
49-
uint32 filenum = 5; // Binlog 文件编号
50-
uint64 offset = 6; // Binlog 文件内偏移量
51-
52-
repeated BinlogEntry entries = 7; // 批量操作条目
27+
uint32 db_id = 1; // 数据库 ID
28+
uint32 slot_idx = 2; // 槽位索引(预留)
29+
repeated BinlogEntry entries = 3; // 批量操作条目
5330
}

src/praft/src/pika_raft.proto

Lines changed: 0 additions & 64 deletions
This file was deleted.

0 commit comments

Comments
 (0)