Skip to content

Commit f6ddc52

Browse files
committed
fix: 修复L6跳过逻辑错误;新增动态参数配置
1 parent 85c4e70 commit f6ddc52

11 files changed

Lines changed: 109 additions & 39 deletions

File tree

conf/pika.conf

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -725,8 +725,5 @@ incremental-compact-max-time-ms : 1000
725725
# Compression rate threshold (%), continue processing if rate is below this value
726726
incremental-compact-min-rate : 70
727727

728-
# Target level for compact files (-1 means current level + 1)
729-
incremental-compact-target-level : -1
730-
731728
# Minimum file age in seconds to be considered for compaction
732729
incremental-compact-min-file-age : 60

include/pika_conf.h

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -164,10 +164,6 @@ class PikaConf : public pstd::BaseConf {
164164
std::shared_lock l(rwlock_);
165165
return incremental_compact_min_rate_;
166166
}
167-
int incremental_compact_target_level() {
168-
std::shared_lock l(rwlock_);
169-
return incremental_compact_target_level_;
170-
}
171167
int incremental_compact_min_file_age() {
172168
std::shared_lock l(rwlock_);
173169
return incremental_compact_min_file_age_;
@@ -753,6 +749,31 @@ class PikaConf : public pstd::BaseConf {
753749
TryPushDiffCommands("disable_auto_compactions", value);
754750
disable_auto_compactions_ = value == "true";
755751
}
752+
void SetIncrementalCompactInterval(const int value) {
753+
std::lock_guard l(rwlock_);
754+
TryPushDiffCommands("incremental-compact-interval", std::to_string(value));
755+
incremental_compact_interval_ = value;
756+
}
757+
void SetIncrementalCompactMaxFiles(const int value) {
758+
std::lock_guard l(rwlock_);
759+
TryPushDiffCommands("incremental-compact-max-files", std::to_string(value));
760+
incremental_compact_max_files_ = value;
761+
}
762+
void SetIncrementalCompactMaxTimeMs(const int value) {
763+
std::lock_guard l(rwlock_);
764+
TryPushDiffCommands("incremental-compact-max-time-ms", std::to_string(value));
765+
incremental_compact_max_time_ms_ = value;
766+
}
767+
void SetIncrementalCompactMinRate(const int value) {
768+
std::lock_guard l(rwlock_);
769+
TryPushDiffCommands("incremental-compact-min-rate", std::to_string(value));
770+
incremental_compact_min_rate_ = value;
771+
}
772+
void SetIncrementalCompactMinFileAge(const int value) {
773+
std::lock_guard l(rwlock_);
774+
TryPushDiffCommands("incremental-compact-min-file-age", std::to_string(value));
775+
incremental_compact_min_file_age_ = value;
776+
}
756777
void SetMaxSubcompactions(const int& value) {
757778
std::lock_guard l(rwlock_);
758779
TryPushDiffCommands("max-subcompactions", std::to_string(value));
@@ -1070,7 +1091,6 @@ class PikaConf : public pstd::BaseConf {
10701091
int incremental_compact_max_files_ = 1;
10711092
int incremental_compact_max_time_ms_ = 1000;
10721093
int incremental_compact_min_rate_ = 70;
1073-
int incremental_compact_target_level_ = -1;
10741094
int incremental_compact_min_file_age_ = 60;
10751095

10761096
int64_t resume_check_interval_ = 60; // seconds

include/pika_server.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -551,6 +551,7 @@ class PikaServer : public pstd::noncopyable {
551551
*/
552552
bool have_scheduled_crontask_ = false;
553553
struct timeval last_check_compact_time_;
554+
struct timeval last_incremental_compact_time_ = {0, 0};
554555

555556
/*
556557
* ResumeDB used

src/pika_admin.cc

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2012,6 +2012,31 @@ void ConfigCmd::ConfigGet(std::string& ret) {
20122012
EncodeString(&config_body, "disable_auto_compactions");
20132013
EncodeString(&config_body, g_pika_conf->disable_auto_compactions() ? "true" : "false");
20142014
}
2015+
if (pstd::stringmatch(pattern.data(), "incremental-compact-interval", 1) != 0) {
2016+
elements += 2;
2017+
EncodeString(&config_body, "incremental-compact-interval");
2018+
EncodeNumber(&config_body, g_pika_conf->incremental_compact_interval());
2019+
}
2020+
if (pstd::stringmatch(pattern.data(), "incremental-compact-max-files", 1) != 0) {
2021+
elements += 2;
2022+
EncodeString(&config_body, "incremental-compact-max-files");
2023+
EncodeNumber(&config_body, g_pika_conf->incremental_compact_max_files());
2024+
}
2025+
if (pstd::stringmatch(pattern.data(), "incremental-compact-max-time-ms", 1) != 0) {
2026+
elements += 2;
2027+
EncodeString(&config_body, "incremental-compact-max-time-ms");
2028+
EncodeNumber(&config_body, g_pika_conf->incremental_compact_max_time_ms());
2029+
}
2030+
if (pstd::stringmatch(pattern.data(), "incremental-compact-min-rate", 1) != 0) {
2031+
elements += 2;
2032+
EncodeString(&config_body, "incremental-compact-min-rate");
2033+
EncodeNumber(&config_body, g_pika_conf->incremental_compact_min_rate());
2034+
}
2035+
if (pstd::stringmatch(pattern.data(), "incremental-compact-min-file-age", 1) != 0) {
2036+
elements += 2;
2037+
EncodeString(&config_body, "incremental-compact-min-file-age");
2038+
EncodeNumber(&config_body, g_pika_conf->incremental_compact_min_file_age());
2039+
}
20152040
if (pstd::stringmatch(pattern.data(), "network-interface", 1) != 0) {
20162041
elements += 2;
20172042
EncodeString(&config_body, "network-interface");
@@ -2345,6 +2370,11 @@ void ConfigCmd::ConfigSet(std::shared_ptr<DB> db) {
23452370
"compact-cron",
23462371
"compact-interval",
23472372
"disable_auto_compactions",
2373+
"incremental-compact-interval",
2374+
"incremental-compact-max-files",
2375+
"incremental-compact-max-time-ms",
2376+
"incremental-compact-min-rate",
2377+
"incremental-compact-min-file-age",
23482378
"slave-priority",
23492379
"sync-window-size",
23502380
"slow-cmd-list",
@@ -2563,6 +2593,41 @@ void ConfigCmd::ConfigSet(std::shared_ptr<DB> db) {
25632593
}
25642594
g_pika_conf->SetDisableAutoCompaction(value);
25652595
res_.AppendStringRaw("+OK\r\n");
2596+
} else if (set_item == "incremental-compact-interval") {
2597+
if (pstd::string2int(value.data(), value.size(), &ival) == 0 || ival <= 0) {
2598+
res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'incremental-compact-interval'\r\n");
2599+
return;
2600+
}
2601+
g_pika_conf->SetIncrementalCompactInterval(static_cast<int>(ival));
2602+
res_.AppendStringRaw("+OK\r\n");
2603+
} else if (set_item == "incremental-compact-max-files") {
2604+
if (pstd::string2int(value.data(), value.size(), &ival) == 0 || ival <= 0) {
2605+
res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'incremental-compact-max-files'\r\n");
2606+
return;
2607+
}
2608+
g_pika_conf->SetIncrementalCompactMaxFiles(static_cast<int>(ival));
2609+
res_.AppendStringRaw("+OK\r\n");
2610+
} else if (set_item == "incremental-compact-max-time-ms") {
2611+
if (pstd::string2int(value.data(), value.size(), &ival) == 0 || ival <= 0) {
2612+
res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'incremental-compact-max-time-ms'\r\n");
2613+
return;
2614+
}
2615+
g_pika_conf->SetIncrementalCompactMaxTimeMs(static_cast<int>(ival));
2616+
res_.AppendStringRaw("+OK\r\n");
2617+
} else if (set_item == "incremental-compact-min-rate") {
2618+
if (pstd::string2int(value.data(), value.size(), &ival) == 0 || ival <= 0 || ival > 100) {
2619+
res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'incremental-compact-min-rate'\r\n");
2620+
return;
2621+
}
2622+
g_pika_conf->SetIncrementalCompactMinRate(static_cast<int>(ival));
2623+
res_.AppendStringRaw("+OK\r\n");
2624+
} else if (set_item == "incremental-compact-min-file-age") {
2625+
if (pstd::string2int(value.data(), value.size(), &ival) == 0 || ival < 0) {
2626+
res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'incremental-compact-min-file-age'\r\n");
2627+
return;
2628+
}
2629+
g_pika_conf->SetIncrementalCompactMinFileAge(static_cast<int>(ival));
2630+
res_.AppendStringRaw("+OK\r\n");
25662631
} else if (set_item == "rate-limiter-bandwidth") {
25672632
int64_t new_bandwidth = 0;
25682633
if (pstd::string2int(value.data(), value.size(), &new_bandwidth) == 0 || new_bandwidth <= 0) {

src/pika_conf.cc

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,6 @@ int PikaConf::Load() {
364364
if (incremental_compact_min_rate_ <= 0 || incremental_compact_min_rate_ > 100) {
365365
incremental_compact_min_rate_ = 70;
366366
}
367-
GetConfInt("incremental-compact-target-level", &incremental_compact_target_level_);
368367
GetConfInt("incremental-compact-min-file-age", &incremental_compact_min_file_age_);
369368
if (incremental_compact_min_file_age_ < 0) {
370369
incremental_compact_min_file_age_ = 60;
@@ -929,7 +928,6 @@ int PikaConf::ConfigRewrite() {
929928
SetConfInt("incremental-compact-max-files", incremental_compact_max_files_);
930929
SetConfInt("incremental-compact-max-time-ms", incremental_compact_max_time_ms_);
931930
SetConfInt("incremental-compact-min-rate", incremental_compact_min_rate_);
932-
SetConfInt("incremental-compact-target-level", incremental_compact_target_level_);
933931
SetConfInt("incremental-compact-min-file-age", incremental_compact_min_file_age_);
934932

935933
SetConfStr("disable_auto_compactions", disable_auto_compactions_ ? "true" : "false");

src/pika_db.cc

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,6 @@ void DB::IncrementalCompact(const storage::DataType& type) {
210210
g_pika_conf->incremental_compact_max_files(),
211211
g_pika_conf->incremental_compact_max_time_ms(),
212212
g_pika_conf->incremental_compact_min_rate(),
213-
g_pika_conf->incremental_compact_target_level(),
214213
g_pika_conf->incremental_compact_min_file_age());
215214
}
216215

src/pika_server.cc

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1238,7 +1238,14 @@ void PikaServer::AutoCompactRange() {
12381238
} else if (g_pika_conf->compaction_strategy() == PikaConf::OldestOrBestDeleteRatioSstCompact) {
12391239
DoSameThingEveryDB(TaskType::kCompactOldestOrBestDeleteRatioSst);
12401240
} else if (g_pika_conf->compaction_strategy() == PikaConf::IncrementalCompact) {
1241-
DoSameThingEveryDB(TaskType::kIncrementalCompact);
1241+
struct timeval now;
1242+
gettimeofday(&now, nullptr);
1243+
int interval = g_pika_conf->incremental_compact_interval();
1244+
if (last_incremental_compact_time_.tv_sec == 0 ||
1245+
now.tv_sec - last_incremental_compact_time_.tv_sec >= interval) {
1246+
gettimeofday(&last_incremental_compact_time_, nullptr);
1247+
DoSameThingEveryDB(TaskType::kIncrementalCompact);
1248+
}
12421249
}
12431250
}
12441251

src/storage/include/storage/storage.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1105,13 +1105,12 @@ class Storage {
11051105
* @param max_files: 单次最多处理文件数
11061106
* @param max_time_ms: 单次最大执行时间
11071107
* @param min_rate: 压缩率阈值,低于此值继续处理
1108-
* @param target_level: 目标 level,-1 表示当前 level + 1
11091108
* @param min_file_age: 文件最小年龄(秒)
11101109
* @param sync: 是否同步执行
11111110
* @return Status
11121111
*/
11131112
Status IncrementalCompact(const DataType &type, int max_files = 1, int max_time_ms = 1000,
1114-
int min_rate = 70, int target_level = -1, int min_file_age = 60,
1113+
int min_rate = 70, int min_file_age = 60,
11151114
bool sync = false);
11161115

11171116
Status SetMaxCacheStatisticKeys(uint32_t max_cache_statistic_keys);

src/storage/src/redis.cc

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -333,13 +333,7 @@ Status Redis::LongestNotCompactionSstCompact(const DataType& option_type, std::v
333333
// clear deleted sst file records because we use them in different cf
334334
listener_.Clear();
335335

336-
// 魔改:删除文件数量限制,只要有文件就 compact
337-
// 原代码:if (props.size() <= 1) { ... continue; }
338-
// 现在:注释掉这段逻辑,让 compact 更激进
339-
340-
// size_t max_files_to_compact = 1;
341-
// 魔改:每次处理 5 个文件,增加并发任务数
342-
size_t max_files_to_compact = 5;
336+
size_t max_files_to_compact = 1;
343337
const StorageOptions& storageOptions = storage_->GetStorageOptions();
344338
if (props.size() / storageOptions.compact_param_.compact_every_num_of_files_ > max_files_to_compact) {
345339
max_files_to_compact = props.size() / storageOptions.compact_param_.compact_every_num_of_files_;
@@ -479,7 +473,7 @@ static uint64_t ExtractFileNumber(const std::string& name) {
479473

480474
Status Redis::IncrementalCompact(const DataType& option_type, std::vector<Status>* compact_result_vec,
481475
const ColumnFamilyType& type, int max_files, int max_time_ms,
482-
int min_rate, int target_level, int min_file_age) {
476+
int min_rate, int min_file_age) {
483477
// 1. 并发控制
484478
bool no_compact = false;
485479
bool to_compact = true;
@@ -525,8 +519,8 @@ Status Redis::IncrementalCompact(const DataType& option_type, std::vector<Status
525519
uint64_t oldest_number = UINT64_MAX;
526520

527521
for (const auto& level_meta : meta.levels) {
528-
// FIX: 跳过 L0让 RocksDB 自动处理
529-
if (level_meta.level == 0) {
522+
// FIX: 跳过 L0 和 L6,L0 让 RocksDB 自动处理,L6 没有下一层不参与 incremental compact
523+
if (level_meta.level == 0 || level_meta.level >= 6) {
530524
continue;
531525
}
532526

@@ -550,12 +544,6 @@ Status Redis::IncrementalCompact(const DataType& option_type, std::vector<Status
550544
break; // 没有符合条件的文件
551545
}
552546

553-
// FIX: 跳过 L6 文件(没有上层了,避免 L6→L6 无效重写)
554-
if (oldest_level >= 6) {
555-
LOG(INFO) << "IncrementalCompact skip L6 file: " << oldest_file;
556-
break;
557-
}
558-
559547
// 3.3 使用 CompactFiles 进行 compact(只处理 L1-L5)
560548
std::vector<std::string> input_files{oldest_file};
561549
rocksdb::CompactionOptions compact_options;

src/storage/src/redis.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,7 @@ class Redis {
113113
virtual Status IncrementalCompact(const DataType& option_type, std::vector<Status>* compact_result_vec,
114114
const ColumnFamilyType& type = kMetaAndData,
115115
int max_files = 1, int max_time_ms = 1000,
116-
int min_rate = 70, int target_level = -1,
117-
int min_file_age = 60);
116+
int min_rate = 70, int min_file_age = 60);
118117

119118
virtual Status GetProperty(const std::string& property, uint64_t* out);
120119

0 commit comments

Comments
 (0)