Skip to content

Commit 0629948

Browse files
committed
fix: 优化渐进式compact;todo-检查配置能否生效
1 parent 8103862 commit 0629948

4 files changed

Lines changed: 82 additions & 19 deletions

File tree

src/pika_db.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,9 @@ void DB::IncrementalCompact(const storage::DataType& type) {
209209
storage_->IncrementalCompact(type,
210210
g_pika_conf->incremental_compact_max_files(),
211211
g_pika_conf->incremental_compact_max_time_ms(),
212-
g_pika_conf->incremental_compact_min_rate());
212+
g_pika_conf->incremental_compact_min_rate(),
213+
g_pika_conf->incremental_compact_target_level(),
214+
g_pika_conf->incremental_compact_min_file_age());
213215
}
214216

215217
void DB::DoKeyScan(void* arg) {

src/storage/include/storage/storage.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1105,11 +1105,14 @@ class Storage {
11051105
* @param max_files: 单次最多处理文件数
11061106
* @param max_time_ms: 单次最大执行时间
11071107
* @param min_rate: 压缩率阈值,低于此值继续处理
1108+
* @param target_level: 目标 level,-1 表示当前 level + 1
1109+
* @param min_file_age: 文件最小年龄(秒)
11081110
* @param sync: 是否同步执行
11091111
* @return Status
11101112
*/
11111113
Status IncrementalCompact(const DataType &type, int max_files = 1, int max_time_ms = 1000,
1112-
int min_rate = 70, bool sync = false);
1114+
int min_rate = 70, int target_level = -1, int min_file_age = 60,
1115+
bool sync = false);
11131116

11141117
Status SetMaxCacheStatisticKeys(uint32_t max_cache_statistic_keys);
11151118
Status SetSmallCompactionThreshold(uint32_t small_compaction_threshold);

src/storage/src/redis.cc

Lines changed: 50 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -438,6 +438,29 @@ Status Redis::LongestNotCompactionSstCompact(const DataType& option_type, std::v
438438
return Status::OK();
439439
}
440440

441+
// Helper function to extract table file number from filename
442+
// e.g., "000123.sst" -> 123
443+
static uint64_t ExtractFileNumber(const std::string& name) {
444+
uint64_t number = 0;
445+
uint64_t base = 1;
446+
size_t pos = name.find_last_of('.');
447+
if (pos == std::string::npos) {
448+
return 0;
449+
}
450+
// Move backwards from '.' to find the digits
451+
while (pos > 0) {
452+
--pos;
453+
char c = name[pos];
454+
if (c >= '0' && c <= '9') {
455+
number += (c - '0') * base;
456+
base *= 10;
457+
} else {
458+
break;
459+
}
460+
}
461+
return number;
462+
}
463+
441464
Status Redis::IncrementalCompact(const DataType& option_type, std::vector<Status>* compact_result_vec,
442465
const ColumnFamilyType& type, int max_files, int max_time_ms,
443466
int min_rate, int target_level, int min_file_age) {
@@ -461,17 +484,19 @@ Status Redis::IncrementalCompact(const DataType& option_type, std::vector<Status
461484
compact_result_vec->clear();
462485
}
463486

464-
// 3. 记录开始时间
465-
auto start_time = std::chrono::steady_clock::now();
466487
int64_t now_sec = std::time(nullptr);
467488

468489
for (auto idx : handleIdxVec) {
490+
// 每个 CF 独立计时和配额
491+
auto cf_start_time = std::chrono::steady_clock::now();
469492
int processed = 0;
493+
470494
while (processed < max_files) {
471-
// 3.1 检查超时
472-
auto elapsed = std::chrono::steady_clock::now() - start_time;
495+
// 3.1 检查该 CF 的超时
496+
auto elapsed = std::chrono::steady_clock::now() - cf_start_time;
473497
if (std::chrono::duration_cast<std::chrono::milliseconds>(elapsed).count() >= max_time_ms) {
474-
LOG(INFO) << "IncrementalCompact timeout, processed " << processed << " files";
498+
LOG(INFO) << "IncrementalCompact timeout for cf=" << handles_[idx]->GetName()
499+
<< ", processed " << processed << " files";
475500
break;
476501
}
477502

@@ -484,14 +509,19 @@ Status Redis::IncrementalCompact(const DataType& option_type, std::vector<Status
484509
uint64_t oldest_number = UINT64_MAX;
485510

486511
for (const auto& level_meta : meta.levels) {
512+
// FIX: 跳过 L0,让 RocksDB 自动处理
513+
if (level_meta.level == 0) {
514+
continue;
515+
}
516+
487517
for (const auto& file_meta : level_meta.files) {
488518
// 跳过太新的文件
489519
if (file_meta.file_creation_time > 0 &&
490520
(now_sec - file_meta.file_creation_time) < min_file_age) {
491521
continue;
492522
}
493523

494-
uint64_t number = TableFileNameToNumber(file_meta.name);
524+
uint64_t number = ExtractFileNumber(file_meta.name);
495525
if (number < oldest_number) {
496526
oldest_number = number;
497527
oldest_file = file_meta.db_path + "/" + file_meta.name;
@@ -504,10 +534,22 @@ Status Redis::IncrementalCompact(const DataType& option_type, std::vector<Status
504534
break; // 没有符合条件的文件
505535
}
506536

507-
// 3.3 使用 CompactFiles 进行 compact
537+
// FIX: 跳过 L6 文件(没有上层了,避免 L6→L6 无效重写)
538+
if (oldest_level >= 6) {
539+
LOG(INFO) << "IncrementalCompact skip L6 file: " << oldest_file;
540+
break;
541+
}
542+
543+
// 3.3 使用 CompactFiles 进行 compact(只处理 L1-L5)
508544
std::vector<std::string> input_files{oldest_file};
509545
rocksdb::CompactionOptions compact_options;
510-
int dest_level = (target_level >= 0) ? target_level : oldest_level + 1;
546+
// 目标层 = 当前层 + 1(L1→L2, L2→L3, ... L5→L6)
547+
int dest_level = oldest_level + 1;
548+
549+
LOG(INFO) << "IncrementalCompact start: file=" << oldest_file
550+
<< ", cf=" << handles_[idx]->GetName()
551+
<< ", from_level=" << oldest_level
552+
<< ", to_level=" << dest_level;
511553

512554
rocksdb::CompactionJobInfo job_info;
513555
Status s = db_->CompactFiles(compact_options, handles_[idx],

src/storage/src/storage.cc

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1730,10 +1730,20 @@ Status Storage::RunBGTask() {
17301730
} else if (task.operation == kCompactOldestOrBestDeleteRatioSst) {
17311731
LongestNotCompactionSstCompact(task.type, true);
17321732
} else if (task.operation == kIncrementalCompact) {
1733-
IncrementalCompact(task.type,
1734-
g_pika_conf->incremental_compact_max_files(),
1735-
g_pika_conf->incremental_compact_max_time_ms(),
1736-
g_pika_conf->incremental_compact_min_rate(), true);
1733+
// Parse parameters from argv, use defaults if not provided
1734+
int max_files = 1;
1735+
int max_time_ms = 1000;
1736+
int min_rate = 70;
1737+
int target_level = -1;
1738+
int min_file_age = 60;
1739+
1740+
if (task.argv.size() > 0) max_files = std::atoi(task.argv[0].c_str());
1741+
if (task.argv.size() > 1) max_time_ms = std::atoi(task.argv[1].c_str());
1742+
if (task.argv.size() > 2) min_rate = std::atoi(task.argv[2].c_str());
1743+
if (task.argv.size() > 3) target_level = std::atoi(task.argv[3].c_str());
1744+
if (task.argv.size() > 4) min_file_age = std::atoi(task.argv[4].c_str());
1745+
1746+
IncrementalCompact(task.type, max_files, max_time_ms, min_rate, target_level, min_file_age, true);
17371747
} else if (task.operation == kCompactRange) {
17381748
if (task.argv.size() == 1) {
17391749
DoCompactSpecificKey(task.type, task.argv[0]);
@@ -1760,21 +1770,20 @@ Status Storage::LongestNotCompactionSstCompact(const DataType &type, bool sync)
17601770
}
17611771
return s;
17621772
} else {
1763-
AddBGTask({type, kIncrementalCompact});
1773+
AddBGTask({type, kCompactOldestOrBestDeleteRatioSst});
17641774
}
17651775
return Status::OK();
17661776
}
17671777

17681778
Status Storage::IncrementalCompact(const DataType &type, int max_files, int max_time_ms,
1769-
int min_rate, bool sync) {
1779+
int min_rate, int target_level, int min_file_age, bool sync) {
17701780
if (sync) {
17711781
Status s;
17721782
for (const auto& inst : insts_) {
17731783
std::vector<rocksdb::Status> compact_result_vec;
17741784
s = inst->IncrementalCompact(type, &compact_result_vec, storage::kMetaAndData,
17751785
max_files, max_time_ms, min_rate,
1776-
g_pika_conf->incremental_compact_target_level(),
1777-
g_pika_conf->incremental_compact_min_file_age());
1786+
target_level, min_file_age);
17781787
for (auto compact_result : compact_result_vec) {
17791788
if (!compact_result.ok()) {
17801789
LOG(ERROR) << compact_result.ToString();
@@ -1783,7 +1792,14 @@ Status Storage::IncrementalCompact(const DataType &type, int max_files, int max_
17831792
}
17841793
return s;
17851794
} else {
1786-
AddBGTask({type, kCompactOldestOrBestDeleteRatioSst});
1795+
// Pass parameters via argv for async execution
1796+
AddBGTask({type, kIncrementalCompact, {
1797+
std::to_string(max_files),
1798+
std::to_string(max_time_ms),
1799+
std::to_string(min_rate),
1800+
std::to_string(target_level),
1801+
std::to_string(min_file_age)
1802+
}});
17871803
}
17881804
return Status::OK();
17891805
}

0 commit comments

Comments
 (0)