Skip to content

Commit c471a98

Browse files
committed
feat:progressive compact
1 parent 4b65ebb commit c471a98

6 files changed

Lines changed: 147 additions & 4 deletions

File tree

include/pika_server.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -522,6 +522,7 @@ class PikaServer : public pstd::noncopyable {
522522
*/
523523
void DoTimingTask();
524524
void AutoCompactRange();
525+
pstd::Status AutoCompactOldSST();
525526
void AutoBinlogPurge();
526527
void AutoServerlogPurge();
527528
void AutoDeleteExpiredDump();

src/pika_server.cc

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1126,6 +1126,7 @@ int PikaServer::ClientPubSubChannelPatternSize(const std::shared_ptr<NetConn>& c
11261126
void PikaServer::DoTimingTask() {
11271127
// Maybe schedule compactrange
11281128
AutoCompactRange();
1129+
AutoCompactOldSST();
11291130
// Purge serverlog
11301131
AutoServerlogPurge();
11311132
// Purge binlog
@@ -1146,6 +1147,21 @@ void PikaServer::DoTimingTask() {
11461147
StatDiskUsage();
11471148
}
11481149

1150+
Status PikaServer::AutoCompactOldSST() {
1151+
static uint64_t last_compact_old_time = 0;
1152+
auto current_time = pstd::NowMicros();
1153+
if (current_time - last_compact_old_time < 90 * 1000 * 1000) {
1154+
return Status::OK();
1155+
}
1156+
1157+
last_compact_old_time = current_time;
1158+
1159+
for (const auto& db_item : dbs_) {
1160+
db_item.second->Compact(storage::kSST);
1161+
}
1162+
return Status::OK();
1163+
}
1164+
11491165
void PikaServer::StatDiskUsage() {
11501166
thread_local uint64_t last_update_time = 0;
11511167
auto current_time = pstd::NowMicros();

src/storage/include/storage/storage.h

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -126,9 +126,9 @@ struct ScoreMember {
126126

127127
enum BeforeOrAfter { Before, After };
128128

129-
enum DataType { kAll, kStrings, kHashes, kLists, kZSets, kSets, kStreams };
129+
enum DataType { kAll, kStrings, kHashes, kLists, kZSets, kSets, kStreams, kSST };
130130

131-
const char DataTypeTag[] = {'a', 'k', 'h', 'l', 'z', 's', 'x'};
131+
const char DataTypeTag[] = {'a', 'k', 'h', 'l', 'z', 's', 'x', 't'};
132132

133133
enum class OptionType {
134134
kDB,
@@ -150,7 +150,9 @@ enum Operation {
150150
kCleanSets,
151151
kCleanLists,
152152
kCleanStreams,
153-
kCompactRange
153+
kCompactRange,
154+
kCompactSST,
155+
kCompactOldSST
154156
};
155157

156158
struct BGTask {
@@ -1075,6 +1077,8 @@ class Storage {
10751077

10761078
Status Compact(const DataType& type, bool sync = false);
10771079
Status CompactRange(const DataType& type, const std::string& start, const std::string& end, bool sync = false);
1080+
Status CompactOldFiles(const DataType& type, bool sync = false);
1081+
Status DoCompactOldFiles(const DataType& type);
10781082
Status DoCompact(const DataType& type);
10791083
Status DoCompactRange(const DataType& type, const std::string& start, const std::string& end);
10801084

src/storage/src/redis.cc

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@
55

66
#include "src/redis.h"
77
#include <sstream>
8+
#include "rocksdb/column_family.h"
9+
#include "rocksdb/compaction_job_stats.h"
10+
#include "glog/logging.h" // 确保LOG宏可用
811

912
namespace storage {
1013

@@ -196,7 +199,67 @@ void Redis::SetCompactRangeOptions(const bool is_canceled) {
196199
default_compact_range_options_.canceled = new std::atomic<bool>(is_canceled);
197200
} else {
198201
default_compact_range_options_.canceled->store(is_canceled);
199-
}
202+
}
203+
}
204+
205+
uint64_t Redis::TableFileNameToNumber(const std::string& name) {
206+
uint64_t number = 0;
207+
uint64_t base = 1;
208+
int pos = static_cast<int>(name.find_last_of('.'));
209+
while (--pos >= 0 && name[pos] >= '0' && name[pos] <= '9') {
210+
number += (name[pos] - '0') * base;
211+
base *= 10;
212+
}
213+
return number;
214+
}
215+
216+
Status Redis::CompactOldFiles() {
217+
Status s;
218+
std::vector<rocksdb::ColumnFamilyHandle*> compact_handles_;
219+
for (auto handle : handles_) {
220+
compact_handles_.push_back(handle);
221+
}
222+
if (handles_.empty()) {
223+
compact_handles_.push_back(db_->DefaultColumnFamily());
224+
}
225+
for (auto handle : compact_handles_) {
226+
int rate = 0;
227+
while (rate < 70) {
228+
rate = 100;
229+
rocksdb::ColumnFamilyMetaData meta;
230+
rocksdb::CompactionOptions compact_options_;
231+
std::vector<std::string> input_file_names;
232+
db_->GetColumnFamilyMetaData(handle, &meta);
233+
uint64_t min_number = -1;
234+
int min_level = 0;
235+
std::string min_file = "999999.sst";
236+
std::string file_path;
237+
for (auto level_meta : meta.levels) {
238+
for (auto file_meta : level_meta.files) {
239+
file_path = file_meta.db_path;
240+
uint64_t number = 0;
241+
number = TableFileNameToNumber(file_meta.name);
242+
if (number < min_number) {
243+
min_file = file_meta.name;
244+
min_level = level_meta.level;
245+
min_number = number;
246+
}
247+
}
248+
}
249+
250+
if (min_level < 6) min_level++;
251+
252+
if (min_file != "999999.sst") {
253+
input_file_names.push_back(min_file);
254+
rocksdb::CompactionJobInfo job_info;
255+
db_->CompactFiles(compact_options_, handle, input_file_names, min_level, -1, nullptr, &job_info);
256+
LOG(WARNING) << "CompactFiles: " << handle->GetName()<< " : " << file_path << " : " << min_file << " : " << min_level << ", job_stats : " << job_info.stats.num_input_records << ": "<< job_info.stats.num_output_records;
257+
if (job_info.stats.num_input_records == 0) { break; }
258+
rate = job_info.stats.num_output_records * 100 / job_info.stats.num_input_records;
259+
}
260+
}
261+
}
262+
return s;
200263
}
201264

202265
} // namespace storage

src/storage/src/redis.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,9 @@ class Redis {
115115
Status SetSmallCompactionDurationThreshold(uint64_t small_compaction_duration_threshold);
116116
std::vector<rocksdb::ColumnFamilyHandle*> GetHandles(){ return handles_;};
117117
void GetRocksDBInfo(std::string &info, const char *prefix);
118+
Status CompactOldFiles();
119+
120+
static uint64_t TableFileNameToNumber(const std::string& name);
118121

119122
protected:
120123
Storage* const storage_;

src/storage/src/storage.cc

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1628,6 +1628,13 @@ Status Storage::StartBGThread() {
16281628

16291629
Status Storage::AddBGTask(const BGTask& bg_task) {
16301630
bg_tasks_mutex_.lock();
1631+
1632+
// 如果任务队列不为空且当前任务是 kCompactSST 类型,则直接返回,不添加到队列
1633+
if (!bg_tasks_queue_.empty() && bg_task.operation == kCompactSST) {
1634+
bg_tasks_mutex_.unlock();
1635+
return Status::OK();
1636+
}
1637+
16311638
if (bg_task.type == kAll) {
16321639
// if current task it is global compact,
16331640
// clear the bg_tasks_queue_;
@@ -1658,10 +1665,14 @@ Status Storage::RunBGTask() {
16581665

16591666
if (task.operation == kCleanAll) {
16601667
DoCompact(task.type);
1668+
} else if (task.operation == kCompactOldSST) {
1669+
DoCompactRange(task.type, "", "");
16611670
} else if (task.operation == kCompactRange) {
16621671
if (task.argv.size() == 2) {
16631672
DoCompactRange(task.type, task.argv.front(), task.argv.back());
16641673
}
1674+
} else if (task.operation == kCompactSST) {
1675+
DoCompactOldFiles(task.type);
16651676
}
16661677
}
16671678
return Status::OK();
@@ -1670,6 +1681,9 @@ Status Storage::RunBGTask() {
16701681
Status Storage::Compact(const DataType& type, bool sync) {
16711682
if (sync) {
16721683
return DoCompact(type);
1684+
} else if (type == kSST) {
1685+
LOG(WARNING) << "AddBGTask kCompactOldSST";
1686+
AddBGTask({type, kCompactOldSST});
16731687
} else {
16741688
AddBGTask({type, kCleanAll});
16751689
}
@@ -1722,6 +1736,48 @@ Status Storage::CompactRange(const DataType& type, const std::string& start, con
17221736
return Status::OK();
17231737
}
17241738

1739+
Status Storage::CompactOldFiles(const DataType& type, bool sync) {
1740+
if (sync) {
1741+
return DoCompactOldFiles(type);
1742+
} else {
1743+
AddBGTask({type, kCompactSST});
1744+
}
1745+
return Status::OK();
1746+
}
1747+
1748+
Status Storage::DoCompactOldFiles(const DataType& type) {
1749+
Status s;
1750+
if (type == kStrings) {
1751+
current_task_type_ = Operation::kCompactSST;
1752+
s = strings_db_->CompactOldFiles();
1753+
} else if (type == kHashes) {
1754+
current_task_type_ = Operation::kCompactSST;
1755+
s = hashes_db_->CompactOldFiles();
1756+
} else if (type == kSets) {
1757+
current_task_type_ = Operation::kCompactSST;
1758+
s = sets_db_->CompactOldFiles();
1759+
} else if (type == kZSets) {
1760+
current_task_type_ = Operation::kCompactSST;
1761+
s = zsets_db_->CompactOldFiles();
1762+
} else if (type == kLists) {
1763+
current_task_type_ = Operation::kCompactSST;
1764+
s = lists_db_->CompactOldFiles();
1765+
} else if (type == kStreams) {
1766+
current_task_type_ = Operation::kCompactSST;
1767+
s = streams_db_->CompactOldFiles();
1768+
} else {
1769+
current_task_type_ = Operation::kCompactSST;
1770+
s = strings_db_->CompactOldFiles();
1771+
s = hashes_db_->CompactOldFiles();
1772+
s = sets_db_->CompactOldFiles();
1773+
s = zsets_db_->CompactOldFiles();
1774+
s = lists_db_->CompactOldFiles();
1775+
s = streams_db_->CompactOldFiles();
1776+
}
1777+
current_task_type_ = Operation::kNone;
1778+
return s;
1779+
}
1780+
17251781
Status Storage::DoCompactRange(const DataType& type, const std::string& start, const std::string& end) {
17261782
Status s;
17271783
if (type == kStrings) {

0 commit comments

Comments
 (0)