Skip to content

Commit fe97dda

Browse files
author
wuxianrong
committed
Add the mapping relationship between LogIndex and RocksDB SequnceNumber
1 parent e75170f commit fe97dda

11 files changed

Lines changed: 321 additions & 82 deletions

File tree

src/praft/include/praft/praft.h

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,7 @@ class PikaStateMachine : public braft::StateMachine {
9797
void on_stop_following(const ::braft::LeaderChangeContext& ctx) override;
9898

9999
private:
100-
std::atomic<int64_t> applied_index_;
101-
std::atomic<int64_t> leader_term_;
100+
102101
};
103102

104103
// Raft node wrapper
@@ -185,8 +184,8 @@ class RaftManager {
185184
std::shared_ptr<PikaRaftNode> GetRaftNode(const std::string& db_name);
186185

187186
// Apply binlog entry to storage (public for PikaStateMachine to call)
188-
rocksdb::Status ApplyBinlogEntry(const ::pikiwidb::Binlog& binlog);
189-
187+
rocksdb::Status ApplyBinlogEntry(const ::pikiwidb::Binlog& binlog, uint64_t log_index = 0);
188+
190189
private:
191190
std::atomic<bool> initialized_;
192191
std::atomic<bool> running_;
@@ -208,4 +207,3 @@ class RaftManager {
208207
} // namespace pika_raft
209208

210209
#endif // PRAFT_PRAFT_H_
211-

src/praft/src/praft.cc

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,7 @@ extern std::unique_ptr<PikaServer> g_pika_server;
2929
namespace pika_raft {
3030

3131
// PikaStateMachine implementation
32-
PikaStateMachine::PikaStateMachine()
33-
: applied_index_(0), leader_term_(-1) {
34-
}
32+
PikaStateMachine::PikaStateMachine() {}
3533

3634
void PikaStateMachine::on_apply(braft::Iterator& iter) {
3735
for (; iter.valid(); iter.next()) {
@@ -41,7 +39,6 @@ void PikaStateMachine::on_apply(braft::Iterator& iter) {
4139
int64_t index = iter.index();
4240

4341
if (!g_pika_server || !g_pika_server->GetRaftManager()) {
44-
applied_index_.store(index, std::memory_order_relaxed);
4542
// Run closure asynchronously in bthread to avoid blocking on_apply
4643
if (done) {
4744
braft::run_closure_in_bthread(done_guard.release());
@@ -55,14 +52,14 @@ void PikaStateMachine::on_apply(braft::Iterator& iter) {
5552
if (done) {
5653
done->status().set_error(EINVAL, "Failed to parse binlog");
5754
}
58-
applied_index_.store(index, std::memory_order_relaxed);
5955
if (done) {
6056
braft::run_closure_in_bthread(done_guard.release());
6157
}
6258
continue;
6359
}
6460

65-
rocksdb::Status apply_status = g_pika_server->GetRaftManager()->ApplyBinlogEntry(binlog);
61+
// Apply binlog with log index for tracking
62+
rocksdb::Status apply_status = g_pika_server->GetRaftManager()->ApplyBinlogEntry(binlog, index);
6663

6764
if (done) {
6865
if (apply_status.ok()) {
@@ -73,7 +70,6 @@ void PikaStateMachine::on_apply(braft::Iterator& iter) {
7370
}
7471
}
7572

76-
applied_index_.store(index, std::memory_order_relaxed);
7773

7874
// Run closure asynchronously in bthread to avoid blocking on_apply
7975
if (done) {
@@ -94,19 +90,16 @@ int PikaStateMachine::on_snapshot_load(braft::SnapshotReader* reader) {
9490
in >> index;
9591
in.close();
9692

97-
applied_index_.store(index, std::memory_order_relaxed);
9893
return 0;
9994
}
10095

10196
return -1;
10297
}
10398

10499
void PikaStateMachine::on_leader_start(int64_t term) {
105-
leader_term_.store(term, std::memory_order_relaxed);
106100
}
107101

108102
void PikaStateMachine::on_leader_stop(const butil::Status& status) {
109-
leader_term_.store(-1, std::memory_order_relaxed);
110103
}
111104

112105
void PikaStateMachine::on_error(const ::braft::Error& e) {
@@ -625,7 +618,7 @@ void RaftManager::AppendLog(const std::string& db_name,
625618
node->GetRaftNode()->apply(task);
626619
}
627620

628-
rocksdb::Status RaftManager::ApplyBinlogEntry(const ::pikiwidb::Binlog& binlog) {
621+
rocksdb::Status RaftManager::ApplyBinlogEntry(const ::pikiwidb::Binlog& binlog, uint64_t log_index) {
629622
std::string db_name = "db0";
630623

631624
auto db = g_pika_server->GetDB(db_name);
@@ -640,14 +633,15 @@ rocksdb::Status RaftManager::ApplyBinlogEntry(const ::pikiwidb::Binlog& binlog)
640633
return rocksdb::Status::InvalidArgument("Storage is null");
641634
}
642635

643-
auto status = storage->OnBinlogWrite(binlog, 0);
636+
// Pass log_index to storage layer for tracking
637+
auto status = storage->OnBinlogWrite(binlog, log_index);
644638

645639
if (!status.ok()) {
646-
LOG(ERROR) << "Failed to apply binlog to " << db_name << ": " << status.ToString();
640+
LOG(ERROR) << "Failed to apply binlog to " << db_name << " at log_index " << log_index
641+
<< ": " << status.ToString();
647642
}
648643

649644
return status;
650645
}
651646

652647
} // namespace pika_raft
653-

src/storage/include/storage/storage.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,8 @@ struct StorageOptions {
9999
std::function<void(const ::pikiwidb::Binlog&, std::promise<rocksdb::Status>&&,
100100
CommitCallback)> append_log_function;
101101

102+
std::function<void(int64_t, bool)> do_snapshot_function;
103+
102104
Status ResetOptions(const OptionType& option_type, const std::unordered_map<std::string, std::string>& options_map);
103105
};
104106

@@ -1159,6 +1161,8 @@ class Storage {
11591161

11601162
rocksdb::Status OnBinlogWrite(const ::pikiwidb::Binlog& binlog, uint64_t log_index);
11611163

1164+
uint64_t GetSmallestFlushedLogIndex();
1165+
11621166
private:
11631167
std::unique_ptr<RedisStrings> strings_db_;
11641168
std::unique_ptr<RedisHashes> hashes_db_;

src/storage/src/redis.h

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include "pstd/include/env.h"
1818

1919
#include "src/lock_mgr.h"
20+
#include "src/log_index.h"
2021
#include "src/lru_cache.h"
2122
#include "src/mutex_impl.h"
2223
#include "storage/storage.h"
@@ -144,6 +145,27 @@ class Redis {
144145
Status UpdateSpecificKeyStatistics(const std::string& key, uint64_t count);
145146
Status UpdateSpecificKeyDuration(const std::string& key, uint64_t duration);
146147
Status AddCompactKeyTaskIfNeeded(const std::string& key, uint64_t count, uint64_t duration);
148+
149+
// Log index management for Raft
150+
LogIndexAndSequenceCollector log_index_collector_;
151+
LogIndexOfColumnFamilies log_index_of_all_cfs_;
152+
153+
public:
154+
void UpdateLogIndex(LogIndex applied_log_index, SequenceNumber seqno) {
155+
log_index_collector_.Update(applied_log_index, seqno);
156+
}
157+
158+
void UpdateAppliedLogIndexOfColumnFamily(size_t cf_idx, LogIndex logidx, SequenceNumber seqno) {
159+
log_index_of_all_cfs_.Update(cf_idx, logidx, seqno);
160+
}
161+
162+
bool IsApplied(size_t cf_idx, LogIndex logidx) const {
163+
return log_index_of_all_cfs_.IsApplied(cf_idx, logidx);
164+
}
165+
166+
LogIndexOfColumnFamilies& GetLogIndexOfColumnFamilies() { return log_index_of_all_cfs_; }
167+
168+
LogIndexAndSequenceCollector& GetCollector() { return log_index_collector_; }
147169
};
148170

149171
} // namespace storage

src/storage/src/redis_hashes.cc

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,15 +56,42 @@ Status RedisHashes::Open(const StorageOptions& storage_options, const std::strin
5656
meta_cf_table_ops.block_cache = rocksdb::NewLRUCache(storage_options.block_cache_size);
5757
data_cf_table_ops.block_cache = rocksdb::NewLRUCache(storage_options.block_cache_size);
5858
}
59+
5960
meta_cf_ops.table_factory.reset(rocksdb::NewBlockBasedTableFactory(meta_cf_table_ops));
6061
data_cf_ops.table_factory.reset(rocksdb::NewBlockBasedTableFactory(data_cf_table_ops));
62+
63+
// Add LogIndex table properties collector for Raft
64+
meta_cf_ops.table_properties_collector_factories.push_back(
65+
std::make_shared<LogIndexTablePropertiesCollectorFactory>(log_index_collector_));
66+
data_cf_ops.table_properties_collector_factories.push_back(
67+
std::make_shared<LogIndexTablePropertiesCollectorFactory>(log_index_collector_));
68+
69+
// Add LogIndex event listener for Raft
70+
if (storage_options.do_snapshot_function) {
71+
auto purger = std::make_shared<LogIndexAndSequenceCollectorPurger>(
72+
&handles_, &log_index_collector_, &log_index_of_all_cfs_,
73+
storage_options.do_snapshot_function);
74+
db_ops.listeners.push_back(purger);
75+
}
6176

6277
std::vector<rocksdb::ColumnFamilyDescriptor> column_families;
6378
// Meta CF
6479
column_families.emplace_back(rocksdb::kDefaultColumnFamilyName, meta_cf_ops);
6580
// Data CF
6681
column_families.emplace_back("data_cf", data_cf_ops);
67-
return rocksdb::DB::Open(db_ops, db_path, column_families, &handles_, &db_);
82+
83+
s = rocksdb::DB::Open(db_ops, db_path, column_families, &handles_, &db_);
84+
if (!s.ok()) {
85+
return s;
86+
}
87+
88+
// Initialize log index of column families
89+
s = log_index_of_all_cfs_.Init(this);
90+
if (!s.ok()) {
91+
LOG(ERROR) << "Failed to init log index of column families for hashes: " << s.ToString();
92+
}
93+
94+
return s;
6895
}
6996

7097
Status RedisHashes::CompactRange(const rocksdb::Slice* begin, const rocksdb::Slice* end, const ColumnFamilyType& type) {

src/storage/src/redis_lists.cc

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,15 +63,42 @@ Status RedisLists::Open(const StorageOptions& storage_options, const std::string
6363
meta_cf_table_ops.block_cache = rocksdb::NewLRUCache(storage_options.block_cache_size);
6464
data_cf_table_ops.block_cache = rocksdb::NewLRUCache(storage_options.block_cache_size);
6565
}
66+
6667
meta_cf_ops.table_factory.reset(rocksdb::NewBlockBasedTableFactory(meta_cf_table_ops));
6768
data_cf_ops.table_factory.reset(rocksdb::NewBlockBasedTableFactory(data_cf_table_ops));
69+
70+
// Add LogIndex table properties collector for Raft
71+
meta_cf_ops.table_properties_collector_factories.push_back(
72+
std::make_shared<LogIndexTablePropertiesCollectorFactory>(log_index_collector_));
73+
data_cf_ops.table_properties_collector_factories.push_back(
74+
std::make_shared<LogIndexTablePropertiesCollectorFactory>(log_index_collector_));
75+
76+
// Add LogIndex event listener for Raft
77+
if (storage_options.do_snapshot_function) {
78+
auto purger = std::make_shared<LogIndexAndSequenceCollectorPurger>(
79+
&handles_, &log_index_collector_, &log_index_of_all_cfs_,
80+
storage_options.do_snapshot_function);
81+
db_ops.listeners.push_back(purger);
82+
}
6883

6984
std::vector<rocksdb::ColumnFamilyDescriptor> column_families;
7085
// Meta CF
7186
column_families.emplace_back(rocksdb::kDefaultColumnFamilyName, meta_cf_ops);
7287
// Data CF
7388
column_families.emplace_back("data_cf", data_cf_ops);
74-
return rocksdb::DB::Open(db_ops, db_path, column_families, &handles_, &db_);
89+
90+
s = rocksdb::DB::Open(db_ops, db_path, column_families, &handles_, &db_);
91+
if (!s.ok()) {
92+
return s;
93+
}
94+
95+
// Initialize log index of column families
96+
s = log_index_of_all_cfs_.Init(this);
97+
if (!s.ok()) {
98+
LOG(ERROR) << "Failed to init log index of column families for lists: " << s.ToString();
99+
}
100+
101+
return s;
75102
}
76103

77104
Status RedisLists::CompactRange(const rocksdb::Slice* begin, const rocksdb::Slice* end, const ColumnFamilyType& type) {

src/storage/src/redis_sets.cc

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,15 +63,42 @@ rocksdb::Status RedisSets::Open(const StorageOptions& storage_options, const std
6363
meta_cf_table_ops.block_cache = rocksdb::NewLRUCache(storage_options.block_cache_size);
6464
member_cf_table_ops.block_cache = rocksdb::NewLRUCache(storage_options.block_cache_size);
6565
}
66+
6667
meta_cf_ops.table_factory.reset(rocksdb::NewBlockBasedTableFactory(meta_cf_table_ops));
6768
member_cf_ops.table_factory.reset(rocksdb::NewBlockBasedTableFactory(member_cf_table_ops));
69+
70+
// Add LogIndex table properties collector for Raft
71+
meta_cf_ops.table_properties_collector_factories.push_back(
72+
std::make_shared<LogIndexTablePropertiesCollectorFactory>(log_index_collector_));
73+
member_cf_ops.table_properties_collector_factories.push_back(
74+
std::make_shared<LogIndexTablePropertiesCollectorFactory>(log_index_collector_));
75+
76+
// Add LogIndex event listener for Raft
77+
if (storage_options.do_snapshot_function) {
78+
auto purger = std::make_shared<LogIndexAndSequenceCollectorPurger>(
79+
&handles_, &log_index_collector_, &log_index_of_all_cfs_,
80+
storage_options.do_snapshot_function);
81+
db_ops.listeners.push_back(purger);
82+
}
6883

6984
std::vector<rocksdb::ColumnFamilyDescriptor> column_families;
7085
// Meta CF
7186
column_families.emplace_back(rocksdb::kDefaultColumnFamilyName, meta_cf_ops);
7287
// Member CF
7388
column_families.emplace_back("member_cf", member_cf_ops);
74-
return rocksdb::DB::Open(db_ops, db_path, column_families, &handles_, &db_);
89+
90+
s = rocksdb::DB::Open(db_ops, db_path, column_families, &handles_, &db_);
91+
if (!s.ok()) {
92+
return s;
93+
}
94+
95+
// Initialize log index of column families
96+
s = log_index_of_all_cfs_.Init(this);
97+
if (!s.ok()) {
98+
LOG(ERROR) << "Failed to init log index of column families for sets: " << s.ToString();
99+
}
100+
101+
return s;
75102
}
76103

77104
rocksdb::Status RedisSets::CompactRange(const rocksdb::Slice* begin, const rocksdb::Slice* end, const ColumnFamilyType& type) {

src/storage/src/redis_streams.cc

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -358,15 +358,42 @@ Status RedisStreams::Open(const StorageOptions& storage_options, const std::stri
358358
meta_cf_table_ops.block_cache = rocksdb::NewLRUCache(storage_options.block_cache_size);
359359
data_cf_table_ops.block_cache = rocksdb::NewLRUCache(storage_options.block_cache_size);
360360
}
361+
361362
meta_cf_ops.table_factory.reset(rocksdb::NewBlockBasedTableFactory(meta_cf_table_ops));
362363
data_cf_ops.table_factory.reset(rocksdb::NewBlockBasedTableFactory(data_cf_table_ops));
364+
365+
// Add LogIndex table properties collector for Raft
366+
meta_cf_ops.table_properties_collector_factories.push_back(
367+
std::make_shared<LogIndexTablePropertiesCollectorFactory>(log_index_collector_));
368+
data_cf_ops.table_properties_collector_factories.push_back(
369+
std::make_shared<LogIndexTablePropertiesCollectorFactory>(log_index_collector_));
370+
371+
// Add LogIndex event listener for Raft
372+
if (storage_options.do_snapshot_function) {
373+
auto purger = std::make_shared<LogIndexAndSequenceCollectorPurger>(
374+
&handles_, &log_index_collector_, &log_index_of_all_cfs_,
375+
storage_options.do_snapshot_function);
376+
db_ops.listeners.push_back(purger);
377+
}
363378

364379
std::vector<rocksdb::ColumnFamilyDescriptor> column_families;
365380
// Meta CF
366381
column_families.emplace_back(rocksdb::kDefaultColumnFamilyName, meta_cf_ops);
367382
// Data CF
368383
column_families.emplace_back("data_cf", data_cf_ops);
369-
return rocksdb::DB::Open(db_ops, db_path, column_families, &handles_, &db_);
384+
385+
s = rocksdb::DB::Open(db_ops, db_path, column_families, &handles_, &db_);
386+
if (!s.ok()) {
387+
return s;
388+
}
389+
390+
// Initialize log index of column families
391+
s = log_index_of_all_cfs_.Init(this);
392+
if (!s.ok()) {
393+
LOG(ERROR) << "Failed to init log index of column families for streams: " << s.ToString();
394+
}
395+
396+
return s;
370397
}
371398

372399
Status RedisStreams::CompactRange(const rocksdb::Slice* begin, const rocksdb::Slice* end,

src/storage/src/redis_strings.cc

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,36 @@ Status RedisStrings::Open(const StorageOptions& storage_options, const std::stri
3737
table_ops.block_cache = rocksdb::NewLRUCache(storage_options.block_cache_size);
3838
}
3939
table_ops.filter_policy.reset(rocksdb::NewBloomFilterPolicy(10, true));
40+
4041
ops.table_factory.reset(rocksdb::NewBlockBasedTableFactory(table_ops));
42+
43+
// Add LogIndex table properties collector for Raft
44+
ops.table_properties_collector_factories.push_back(
45+
std::make_shared<LogIndexTablePropertiesCollectorFactory>(log_index_collector_));
46+
47+
// Add LogIndex event listener for Raft
48+
if (storage_options.do_snapshot_function) {
49+
auto purger = std::make_shared<LogIndexAndSequenceCollectorPurger>(
50+
&handles_, &log_index_collector_, &log_index_of_all_cfs_,
51+
storage_options.do_snapshot_function);
52+
ops.listeners.push_back(purger);
53+
}
4154

42-
return rocksdb::DB::Open(ops, db_path, &db_);
55+
Status s = rocksdb::DB::Open(ops, db_path, &db_);
56+
if (!s.ok()) {
57+
return s;
58+
}
59+
60+
// Initialize handles for strings (default column family only)
61+
handles_.push_back(db_->DefaultColumnFamily());
62+
63+
// Initialize log index of column families
64+
s = log_index_of_all_cfs_.Init(this);
65+
if (!s.ok()) {
66+
LOG(ERROR) << "Failed to init log index of column families for strings: " << s.ToString();
67+
}
68+
69+
return s;
4370
}
4471

4572
Status RedisStrings::CompactRange(const rocksdb::Slice* begin, const rocksdb::Slice* end,

0 commit comments

Comments
 (0)