Skip to content

Commit 893b4f0

Browse files
author
wuxianrong
committed
Add the mapping relationship between LogIndex and RocksDB SequnceNumber
1 parent e75170f commit 893b4f0

15 files changed

Lines changed: 815 additions & 82 deletions

File tree

CMakeLists.txt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -673,6 +673,10 @@ ExternalProject_Add(brpc
673673
-DCMAKE_INSTALL_PREFIX=${STAGED_INSTALL_PREFIX}
674674
-DCMAKE_BUILD_TYPE=${LIB_BUILD_TYPE}
675675
-DCMAKE_PREFIX_PATH=${CMAKE_PREFIX_PATH}
676+
-DCMAKE_CXX_FLAGS=-I${INSTALL_INCLUDEDIR}
677+
-DCMAKE_EXE_LINKER_FLAGS=-L${INSTALL_LIBDIR}\ -L${INSTALL_LIBDIR_64}
678+
-DCMAKE_SHARED_LINKER_FLAGS=-L${INSTALL_LIBDIR}\ -L${INSTALL_LIBDIR_64}
679+
-DCMAKE_MODULE_LINKER_FLAGS=-L${INSTALL_LIBDIR}\ -L${INSTALL_LIBDIR_64}
676680
-DWITH_GLOG=ON
677681
-DWITH_SNAPPY=ON
678682
-DBUILD_SHARED_LIBS=OFF
@@ -718,6 +722,10 @@ ExternalProject_Add(braft
718722
-DCMAKE_INSTALL_PREFIX=${STAGED_INSTALL_PREFIX}
719723
-DCMAKE_BUILD_TYPE=${LIB_BUILD_TYPE}
720724
-DCMAKE_PREFIX_PATH=${CMAKE_PREFIX_PATH}
725+
-DCMAKE_CXX_FLAGS=-I${INSTALL_INCLUDEDIR}
726+
-DCMAKE_EXE_LINKER_FLAGS=-L${INSTALL_LIBDIR}\ -L${INSTALL_LIBDIR_64}
727+
-DCMAKE_SHARED_LINKER_FLAGS=-L${INSTALL_LIBDIR}\ -L${INSTALL_LIBDIR_64}
728+
-DCMAKE_MODULE_LINKER_FLAGS=-L${INSTALL_LIBDIR}\ -L${INSTALL_LIBDIR_64}
721729
-DWITH_GLOG=ON
722730
-DBUILD_SHARED_LIBS=OFF
723731
-DBUILD_UNIT_TESTS=OFF

src/praft/include/praft/praft.h

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,7 @@ class PikaStateMachine : public braft::StateMachine {
9797
void on_stop_following(const ::braft::LeaderChangeContext& ctx) override;
9898

9999
private:
100-
std::atomic<int64_t> applied_index_;
101-
std::atomic<int64_t> leader_term_;
100+
102101
};
103102

104103
// Raft node wrapper
@@ -185,8 +184,8 @@ class RaftManager {
185184
std::shared_ptr<PikaRaftNode> GetRaftNode(const std::string& db_name);
186185

187186
// Apply binlog entry to storage (public for PikaStateMachine to call)
188-
rocksdb::Status ApplyBinlogEntry(const ::pikiwidb::Binlog& binlog);
189-
187+
rocksdb::Status ApplyBinlogEntry(const ::pikiwidb::Binlog& binlog, uint64_t log_index = 0);
188+
190189
private:
191190
std::atomic<bool> initialized_;
192191
std::atomic<bool> running_;
@@ -208,4 +207,3 @@ class RaftManager {
208207
} // namespace pika_raft
209208

210209
#endif // PRAFT_PRAFT_H_
211-

src/praft/src/praft.cc

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,7 @@ extern std::unique_ptr<PikaServer> g_pika_server;
2929
namespace pika_raft {
3030

3131
// PikaStateMachine implementation
32-
PikaStateMachine::PikaStateMachine()
33-
: applied_index_(0), leader_term_(-1) {
34-
}
32+
PikaStateMachine::PikaStateMachine() {}
3533

3634
void PikaStateMachine::on_apply(braft::Iterator& iter) {
3735
for (; iter.valid(); iter.next()) {
@@ -41,7 +39,6 @@ void PikaStateMachine::on_apply(braft::Iterator& iter) {
4139
int64_t index = iter.index();
4240

4341
if (!g_pika_server || !g_pika_server->GetRaftManager()) {
44-
applied_index_.store(index, std::memory_order_relaxed);
4542
// Run closure asynchronously in bthread to avoid blocking on_apply
4643
if (done) {
4744
braft::run_closure_in_bthread(done_guard.release());
@@ -55,14 +52,14 @@ void PikaStateMachine::on_apply(braft::Iterator& iter) {
5552
if (done) {
5653
done->status().set_error(EINVAL, "Failed to parse binlog");
5754
}
58-
applied_index_.store(index, std::memory_order_relaxed);
5955
if (done) {
6056
braft::run_closure_in_bthread(done_guard.release());
6157
}
6258
continue;
6359
}
6460

65-
rocksdb::Status apply_status = g_pika_server->GetRaftManager()->ApplyBinlogEntry(binlog);
61+
// Apply binlog with log index for tracking
62+
rocksdb::Status apply_status = g_pika_server->GetRaftManager()->ApplyBinlogEntry(binlog, index);
6663

6764
if (done) {
6865
if (apply_status.ok()) {
@@ -73,7 +70,6 @@ void PikaStateMachine::on_apply(braft::Iterator& iter) {
7370
}
7471
}
7572

76-
applied_index_.store(index, std::memory_order_relaxed);
7773

7874
// Run closure asynchronously in bthread to avoid blocking on_apply
7975
if (done) {
@@ -94,19 +90,16 @@ int PikaStateMachine::on_snapshot_load(braft::SnapshotReader* reader) {
9490
in >> index;
9591
in.close();
9692

97-
applied_index_.store(index, std::memory_order_relaxed);
9893
return 0;
9994
}
10095

10196
return -1;
10297
}
10398

10499
void PikaStateMachine::on_leader_start(int64_t term) {
105-
leader_term_.store(term, std::memory_order_relaxed);
106100
}
107101

108102
void PikaStateMachine::on_leader_stop(const butil::Status& status) {
109-
leader_term_.store(-1, std::memory_order_relaxed);
110103
}
111104

112105
void PikaStateMachine::on_error(const ::braft::Error& e) {
@@ -625,7 +618,7 @@ void RaftManager::AppendLog(const std::string& db_name,
625618
node->GetRaftNode()->apply(task);
626619
}
627620

628-
rocksdb::Status RaftManager::ApplyBinlogEntry(const ::pikiwidb::Binlog& binlog) {
621+
rocksdb::Status RaftManager::ApplyBinlogEntry(const ::pikiwidb::Binlog& binlog, uint64_t log_index) {
629622
std::string db_name = "db0";
630623

631624
auto db = g_pika_server->GetDB(db_name);
@@ -640,14 +633,15 @@ rocksdb::Status RaftManager::ApplyBinlogEntry(const ::pikiwidb::Binlog& binlog)
640633
return rocksdb::Status::InvalidArgument("Storage is null");
641634
}
642635

643-
auto status = storage->OnBinlogWrite(binlog, 0);
636+
// Pass log_index to storage layer for tracking
637+
auto status = storage->OnBinlogWrite(binlog, log_index);
644638

645639
if (!status.ok()) {
646-
LOG(ERROR) << "Failed to apply binlog to " << db_name << ": " << status.ToString();
640+
LOG(ERROR) << "Failed to apply binlog to " << db_name << " at log_index " << log_index
641+
<< ": " << status.ToString();
647642
}
648643

649644
return status;
650645
}
651646

652647
} // namespace pika_raft
653-

src/storage/include/storage/storage.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,8 @@ struct StorageOptions {
9999
std::function<void(const ::pikiwidb::Binlog&, std::promise<rocksdb::Status>&&,
100100
CommitCallback)> append_log_function;
101101

102+
std::function<void(int64_t, bool)> do_snapshot_function;
103+
102104
Status ResetOptions(const OptionType& option_type, const std::unordered_map<std::string, std::string>& options_map);
103105
};
104106

@@ -1159,6 +1161,8 @@ class Storage {
11591161

11601162
rocksdb::Status OnBinlogWrite(const ::pikiwidb::Binlog& binlog, uint64_t log_index);
11611163

1164+
uint64_t GetSmallestFlushedLogIndex();
1165+
11621166
private:
11631167
std::unique_ptr<RedisStrings> strings_db_;
11641168
std::unique_ptr<RedisHashes> hashes_db_;
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
// Copyright (c) 2024-present, Qihoo, Inc. All rights reserved.
2+
// This source code is licensed under the BSD-style license found in the
3+
// LICENSE file in the root directory of this source tree. An additional grant
4+
// of patent rights can be found in the PATENTS file in the same directory.
5+
6+
#ifndef STORAGE_STORAGE_DEFINE_H_
7+
#define STORAGE_STORAGE_DEFINE_H_
8+
9+
#include <cstdint>
10+
11+
namespace storage {
12+
13+
using LogIndex = int64_t;
14+
15+
} // namespace storage
16+
17+
#endif // STORAGE_STORAGE_DEFINE_H_

src/storage/src/log_index.cc

Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
/*
2+
* Copyright (c) 2024-present, Qihoo, Inc. All rights reserved.
3+
* This source code is licensed under the BSD-style license found in the
4+
* LICENSE file in the root directory of this source tree. An additional grant
5+
* of patent rights can be found in the PATENTS file in the same directory.
6+
*/
7+
8+
#include "log_index.h"
9+
10+
#include <glog/logging.h>
11+
#include <algorithm>
12+
#include <cinttypes>
13+
#include <set>
14+
15+
#include "redis.h"
16+
17+
namespace storage {
18+
19+
rocksdb::Status LogIndexOfColumnFamilies::Init(Redis *db) {
20+
// Resize cf_ vector based on actual number of column families
21+
size_t cf_count = db->GetHandles().size();
22+
cf_.resize(cf_count);
23+
24+
for (size_t i = 0; i < cf_.size(); i++) {
25+
rocksdb::TablePropertiesCollection collection;
26+
auto s = db->GetDB()->GetPropertiesOfAllTables(db->GetHandles()[i], &collection);
27+
if (!s.ok()) {
28+
return s;
29+
}
30+
auto res = LogIndexTablePropertiesCollector::GetLargestLogIndexFromTableCollection(collection);
31+
if (res.has_value()) {
32+
auto log_index = res->GetAppliedLogIndex();
33+
auto sequence_number = res->GetSequenceNumber();
34+
cf_[i].applied_index.SetLogIndexSeqnoPair(log_index, sequence_number);
35+
cf_[i].flushed_index.SetLogIndexSeqnoPair(log_index, sequence_number);
36+
}
37+
}
38+
return rocksdb::Status::OK();
39+
}
40+
41+
LogIndexOfColumnFamilies::SmallestIndexRes LogIndexOfColumnFamilies::GetSmallestLogIndex(int flush_cf) const {
42+
SmallestIndexRes res;
43+
for (int i = 0; i < static_cast<int>(cf_.size()); i++) {
44+
if (i != flush_cf && cf_[i].flushed_index >= cf_[i].applied_index) {
45+
continue;
46+
}
47+
auto applied_log_index = cf_[i].applied_index.GetLogIndex();
48+
auto flushed_log_index = cf_[i].flushed_index.GetLogIndex();
49+
auto flushed_seqno = cf_[i].flushed_index.GetSequenceNumber();
50+
if (applied_log_index < res.smallest_applied_log_index) {
51+
res.smallest_applied_log_index = applied_log_index;
52+
res.smallest_applied_log_index_cf = i;
53+
}
54+
if (flushed_log_index < res.smallest_flushed_log_index) {
55+
res.smallest_flushed_log_index = flushed_log_index;
56+
res.smallest_flushed_seqno = flushed_seqno;
57+
res.smallest_flushed_log_index_cf = i;
58+
}
59+
}
60+
return res;
61+
}
62+
63+
size_t LogIndexOfColumnFamilies::GetPendingFlushGap() const {
64+
std::set<LogIndex> s;
65+
for (size_t i = 0; i < cf_.size(); i++) {
66+
s.insert(cf_[i].applied_index.GetLogIndex());
67+
s.insert(cf_[i].flushed_index.GetLogIndex());
68+
}
69+
assert(!s.empty());
70+
if (s.size() == 1) {
71+
return 0;
72+
}
73+
auto iter_first = s.begin();
74+
auto iter_last = s.end();
75+
return *std::prev(iter_last) - *iter_first;
76+
}
77+
78+
std::atomic_int64_t LogIndexAndSequenceCollector::max_gap_ = 1000;
79+
80+
std::optional<LogIndexAndSequencePair> LogIndexTablePropertiesCollector::ReadStatsFromTableProps(
81+
const std::shared_ptr<const rocksdb::TableProperties> &table_props) {
82+
const auto &user_properties = table_props->user_collected_properties;
83+
const auto it = user_properties.find(kPropertyName.data());
84+
if (it == user_properties.end()) {
85+
return std::nullopt;
86+
}
87+
std::string s = it->second;
88+
LogIndex applied_log_index;
89+
SequenceNumber largest_seqno;
90+
auto res = sscanf(s.c_str(), "%" PRIi64 "/%" PRIu64 "", &applied_log_index, &largest_seqno);
91+
assert(res == 2);
92+
93+
return LogIndexAndSequencePair(applied_log_index, largest_seqno);
94+
}
95+
96+
LogIndex LogIndexAndSequenceCollector::FindAppliedLogIndex(SequenceNumber seqno) const {
97+
if (seqno == 0) { // the seqno will be 0 when executing compaction
98+
return 0;
99+
}
100+
std::shared_lock gd(mutex_);
101+
if (list_.empty() || seqno < list_.front().GetSequenceNumber()) {
102+
return 0;
103+
}
104+
if (seqno >= list_.back().GetSequenceNumber()) {
105+
return list_.back().GetAppliedLogIndex();
106+
}
107+
108+
auto it = std::lower_bound(
109+
list_.begin(), list_.end(), seqno,
110+
[](const LogIndexAndSequencePair &p, SequenceNumber tar) { return p.GetSequenceNumber() <= tar; });
111+
if (it->GetSequenceNumber() > seqno) {
112+
--it;
113+
}
114+
assert(it->GetSequenceNumber() <= seqno);
115+
return it->GetAppliedLogIndex();
116+
}
117+
118+
void LogIndexAndSequenceCollector::Update(LogIndex smallest_applied_log_index, SequenceNumber smallest_flush_seqno) {
119+
// If step length > 1, log index is sampled and sacrifice precision to save memory usage.
120+
// It means that extra applied log may be applied again on start stage.
121+
if ((smallest_applied_log_index & step_length_mask_) == 0) {
122+
std::lock_guard gd(mutex_);
123+
list_.emplace_back(smallest_applied_log_index, smallest_flush_seqno);
124+
}
125+
}
126+
127+
void LogIndexAndSequenceCollector::Purge(LogIndex smallest_applied_log_index) {
128+
// The reason that we use smallest applied log index of all column families instead of smallest flushed log index is
129+
// that the log index corresponding to the largest sequence number in the next flush must be greater than or equal to
130+
// the smallest applied log index at this moment.
131+
// So we just need to make sure that there is an element in the queue which is less than or equal to the smallest
132+
// applied log index to ensure that we can find a correct log index while doing next flush.
133+
std::lock_guard gd(mutex_);
134+
if (list_.size() < 2) {
135+
return;
136+
}
137+
auto second = std::next(list_.begin());
138+
while (list_.size() >= 2 && second->GetAppliedLogIndex() <= smallest_applied_log_index) {
139+
list_.pop_front();
140+
second = std::next(list_.begin());
141+
}
142+
}
143+
144+
auto LogIndexTablePropertiesCollector::GetLargestLogIndexFromTableCollection(
145+
const rocksdb::TablePropertiesCollection &collection) -> std::optional<LogIndexAndSequencePair> {
146+
LogIndex max_flushed_log_index{-1};
147+
rocksdb::SequenceNumber seqno{};
148+
for (const auto &[_, props] : collection) {
149+
auto res = LogIndexTablePropertiesCollector::ReadStatsFromTableProps(props);
150+
if (res.has_value() && res->GetAppliedLogIndex() > max_flushed_log_index) {
151+
max_flushed_log_index = res->GetAppliedLogIndex();
152+
seqno = res->GetSequenceNumber();
153+
}
154+
}
155+
return max_flushed_log_index == -1 ? std::nullopt
156+
: std::make_optional<LogIndexAndSequencePair>(max_flushed_log_index, seqno);
157+
}
158+
159+
void LogIndexAndSequenceCollectorPurger::OnFlushCompleted(rocksdb::DB *db,
160+
const rocksdb::FlushJobInfo &flush_job_info) {
161+
cf_->SetFlushedLogIndex(flush_job_info.cf_id, collector_->FindAppliedLogIndex(flush_job_info.largest_seqno),
162+
flush_job_info.largest_seqno);
163+
164+
auto [smallest_applied_log_index_cf, smallest_applied_log_index, smallest_flushed_log_index_cf,
165+
smallest_flushed_log_index, smallest_flushed_seqno] = cf_->GetSmallestLogIndex(flush_job_info.cf_id);
166+
collector_->Purge(smallest_applied_log_index);
167+
168+
if (smallest_flushed_log_index_cf != -1) {
169+
cf_->SetFlushedLogIndexGlobal(smallest_flushed_log_index, smallest_flushed_seqno);
170+
}
171+
auto count = count_.fetch_add(1);
172+
173+
if (count % 10 == 0 && callback_) {
174+
callback_(smallest_flushed_log_index, false);
175+
}
176+
177+
if (flush_job_info.cf_id == manul_flushing_cf_.load()) {
178+
manul_flushing_cf_.store(-1);
179+
}
180+
181+
auto flushing_cf = manul_flushing_cf_.load();
182+
if (flushing_cf != static_cast<size_t>(-1) || !collector_->IsFlushPending()) {
183+
return;
184+
}
185+
186+
assert(flushing_cf == static_cast<size_t>(-1));
187+
188+
if (!manul_flushing_cf_.compare_exchange_strong(flushing_cf, smallest_flushed_log_index_cf)) {
189+
return;
190+
}
191+
192+
assert(manul_flushing_cf_.load() == static_cast<size_t>(smallest_flushed_log_index_cf));
193+
rocksdb::FlushOptions flush_option;
194+
flush_option.wait = false;
195+
db->Flush(flush_option, column_families_->at(smallest_flushed_log_index_cf));
196+
}
197+
198+
} // namespace storage

0 commit comments

Comments
 (0)