diff --git a/.gitignore b/.gitignore index ab567194a1..7518e2e548 100644 --- a/.gitignore +++ b/.gitignore @@ -74,3 +74,4 @@ pkg !codis/cmd/fe/assets/** tests/tmp +.claude \ No newline at end of file diff --git a/CMakeLists.txt b/CMakeLists.txt index b0c54e0607..7877cc7ea0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -760,6 +760,7 @@ if (USE_PIKA_TOOLS) add_subdirectory(tools) endif() aux_source_directory(src DIR_SRCS) +list(REMOVE_ITEM DIR_SRCS "src/build_version.cc") # # generate version string(TIMESTAMP TS "%Y-%m-%d %H:%M:%S") diff --git a/conf/pika.conf b/conf/pika.conf index 4f51f9cdbd..b06f4f5162 100644 --- a/conf/pika.conf +++ b/conf/pika.conf @@ -677,8 +677,9 @@ wash-data: true # Pika automatic compact compact strategy, a complement to rocksdb compact. # Trigger the compact background task periodically according to `compact-interval` -# Can choose `full-compact` or `obd-compact`. +# Can choose `full-compact`, `obd-compact` or `incremental-compact`. # obd-compact https://github.com/OpenAtomFoundation/pika/issues/2255 +# incremental-compact: incremental compact using CompactFiles, processes a small number of oldest SST files each time compaction-strategy : obd-compact # For OBD_Compact @@ -704,7 +705,25 @@ force-compact-min-delete-ratio : 10 # compact every `compact-every-num-of-files` file. dont-compact-sst-created-in-seconds : 20 -# For OBD_Compact -# According to the number of sst files in rocksdb, +# For OBD_Compact +# According to the number of sst files in rocksdb, # compact every `compact-every-num-of-files` file. -best-delete-min-ratio : 10 \ No newline at end of file +best-delete-min-ratio : 10 + +# ============================================ +# For incremental-compact (when compaction-strategy = incremental-compact) +# ============================================ +# Execution interval in seconds +incremental-compact-interval : 60 + +# Maximum number of files to compact per run +incremental-compact-max-files : 1 + +# Maximum execution time per run in milliseconds +incremental-compact-max-time-ms : 1000 + +# Compression rate threshold (%), continue processing if rate is below this value +incremental-compact-min-rate : 70 + +# Minimum file age in seconds to be considered for compaction +incremental-compact-min-file-age : 60 \ No newline at end of file diff --git a/include/pika_conf.h b/include/pika_conf.h index e3c1519be2..12e1665c82 100644 --- a/include/pika_conf.h +++ b/include/pika_conf.h @@ -33,7 +33,8 @@ class PikaConf : public pstd::BaseConf { enum CompactionStrategy { NONE, FullCompact, - OldestOrBestDeleteRatioSstCompact + OldestOrBestDeleteRatioSstCompact, + IncrementalCompact }; PikaConf(const std::string& path); ~PikaConf() override = default; @@ -147,6 +148,26 @@ class PikaConf : public pstd::BaseConf { std::shared_lock l(rwlock_); return compaction_strategy_; } + int incremental_compact_interval() { + std::shared_lock l(rwlock_); + return incremental_compact_interval_; + } + int incremental_compact_max_files() { + std::shared_lock l(rwlock_); + return incremental_compact_max_files_; + } + int incremental_compact_max_time_ms() { + std::shared_lock l(rwlock_); + return incremental_compact_max_time_ms_; + } + int incremental_compact_min_rate() { + std::shared_lock l(rwlock_); + return incremental_compact_min_rate_; + } + int incremental_compact_min_file_age() { + std::shared_lock l(rwlock_); + return incremental_compact_min_file_age_; + } bool disable_auto_compactions() { std::shared_lock l(rwlock_); return disable_auto_compactions_; @@ -728,6 +749,31 @@ class PikaConf : public pstd::BaseConf { TryPushDiffCommands("disable_auto_compactions", value); disable_auto_compactions_ = value == "true"; } + void SetIncrementalCompactInterval(const int value) { + std::lock_guard l(rwlock_); + TryPushDiffCommands("incremental-compact-interval", std::to_string(value)); + incremental_compact_interval_ = value; + } + void SetIncrementalCompactMaxFiles(const int value) { + std::lock_guard l(rwlock_); + TryPushDiffCommands("incremental-compact-max-files", std::to_string(value)); + incremental_compact_max_files_ = value; + } + void SetIncrementalCompactMaxTimeMs(const int value) { + std::lock_guard l(rwlock_); + TryPushDiffCommands("incremental-compact-max-time-ms", std::to_string(value)); + incremental_compact_max_time_ms_ = value; + } + void SetIncrementalCompactMinRate(const int value) { + std::lock_guard l(rwlock_); + TryPushDiffCommands("incremental-compact-min-rate", std::to_string(value)); + incremental_compact_min_rate_ = value; + } + void SetIncrementalCompactMinFileAge(const int value) { + std::lock_guard l(rwlock_); + TryPushDiffCommands("incremental-compact-min-file-age", std::to_string(value)); + incremental_compact_min_file_age_ = value; + } void SetMaxSubcompactions(const int& value) { std::lock_guard l(rwlock_); TryPushDiffCommands("max-subcompactions", std::to_string(value)); @@ -1040,6 +1086,13 @@ class PikaConf : public pstd::BaseConf { int best_delete_min_ratio_; CompactionStrategy compaction_strategy_; + // for incremental-compact + int incremental_compact_interval_ = 60; + int incremental_compact_max_files_ = 1; + int incremental_compact_max_time_ms_ = 1000; + int incremental_compact_min_rate_ = 70; + int incremental_compact_min_file_age_ = 60; + int64_t resume_check_interval_ = 60; // seconds int64_t least_free_disk_to_resume_ = 268435456; // 256 MB double min_check_resume_ratio_ = 0.7; diff --git a/include/pika_db.h b/include/pika_db.h index 3dfe3b69f5..f40a171a6e 100644 --- a/include/pika_db.h +++ b/include/pika_db.h @@ -133,6 +133,7 @@ class DB : public std::enable_shared_from_this, public pstd::noncopyable { void Compact(const storage::DataType& type); void CompactRange(const storage::DataType& type, const std::string& start, const std::string& end); void LongestNotCompactionSstCompact(const storage::DataType& type); + void IncrementalCompact(const storage::DataType& type); void SetCompactRangeOptions(const bool is_canceled); diff --git a/include/pika_server.h b/include/pika_server.h index df75229188..a78c5e290f 100644 --- a/include/pika_server.h +++ b/include/pika_server.h @@ -60,6 +60,7 @@ enum TaskType { kBgSave, kCompactRangeAll, kCompactOldestOrBestDeleteRatioSst, + kIncrementalCompact, }; struct TaskArg { @@ -550,6 +551,7 @@ class PikaServer : public pstd::noncopyable { */ bool have_scheduled_crontask_ = false; struct timeval last_check_compact_time_; + struct timeval last_incremental_compact_time_ = {0, 0}; /* * ResumeDB used diff --git a/src/pika_admin.cc b/src/pika_admin.cc index 944c65f9ec..c98aa897f8 100644 --- a/src/pika_admin.cc +++ b/src/pika_admin.cc @@ -2012,6 +2012,31 @@ void ConfigCmd::ConfigGet(std::string& ret) { EncodeString(&config_body, "disable_auto_compactions"); EncodeString(&config_body, g_pika_conf->disable_auto_compactions() ? "true" : "false"); } + if (pstd::stringmatch(pattern.data(), "incremental-compact-interval", 1) != 0) { + elements += 2; + EncodeString(&config_body, "incremental-compact-interval"); + EncodeNumber(&config_body, g_pika_conf->incremental_compact_interval()); + } + if (pstd::stringmatch(pattern.data(), "incremental-compact-max-files", 1) != 0) { + elements += 2; + EncodeString(&config_body, "incremental-compact-max-files"); + EncodeNumber(&config_body, g_pika_conf->incremental_compact_max_files()); + } + if (pstd::stringmatch(pattern.data(), "incremental-compact-max-time-ms", 1) != 0) { + elements += 2; + EncodeString(&config_body, "incremental-compact-max-time-ms"); + EncodeNumber(&config_body, g_pika_conf->incremental_compact_max_time_ms()); + } + if (pstd::stringmatch(pattern.data(), "incremental-compact-min-rate", 1) != 0) { + elements += 2; + EncodeString(&config_body, "incremental-compact-min-rate"); + EncodeNumber(&config_body, g_pika_conf->incremental_compact_min_rate()); + } + if (pstd::stringmatch(pattern.data(), "incremental-compact-min-file-age", 1) != 0) { + elements += 2; + EncodeString(&config_body, "incremental-compact-min-file-age"); + EncodeNumber(&config_body, g_pika_conf->incremental_compact_min_file_age()); + } if (pstd::stringmatch(pattern.data(), "network-interface", 1) != 0) { elements += 2; EncodeString(&config_body, "network-interface"); @@ -2345,6 +2370,11 @@ void ConfigCmd::ConfigSet(std::shared_ptr db) { "compact-cron", "compact-interval", "disable_auto_compactions", + "incremental-compact-interval", + "incremental-compact-max-files", + "incremental-compact-max-time-ms", + "incremental-compact-min-rate", + "incremental-compact-min-file-age", "slave-priority", "sync-window-size", "slow-cmd-list", @@ -2563,6 +2593,41 @@ void ConfigCmd::ConfigSet(std::shared_ptr db) { } g_pika_conf->SetDisableAutoCompaction(value); res_.AppendStringRaw("+OK\r\n"); + } else if (set_item == "incremental-compact-interval") { + if (pstd::string2int(value.data(), value.size(), &ival) == 0 || ival <= 0) { + res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'incremental-compact-interval'\r\n"); + return; + } + g_pika_conf->SetIncrementalCompactInterval(static_cast(ival)); + res_.AppendStringRaw("+OK\r\n"); + } else if (set_item == "incremental-compact-max-files") { + if (pstd::string2int(value.data(), value.size(), &ival) == 0 || ival <= 0) { + res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'incremental-compact-max-files'\r\n"); + return; + } + g_pika_conf->SetIncrementalCompactMaxFiles(static_cast(ival)); + res_.AppendStringRaw("+OK\r\n"); + } else if (set_item == "incremental-compact-max-time-ms") { + if (pstd::string2int(value.data(), value.size(), &ival) == 0 || ival <= 0) { + res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'incremental-compact-max-time-ms'\r\n"); + return; + } + g_pika_conf->SetIncrementalCompactMaxTimeMs(static_cast(ival)); + res_.AppendStringRaw("+OK\r\n"); + } else if (set_item == "incremental-compact-min-rate") { + if (pstd::string2int(value.data(), value.size(), &ival) == 0 || ival <= 0 || ival > 100) { + res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'incremental-compact-min-rate'\r\n"); + return; + } + g_pika_conf->SetIncrementalCompactMinRate(static_cast(ival)); + res_.AppendStringRaw("+OK\r\n"); + } else if (set_item == "incremental-compact-min-file-age") { + if (pstd::string2int(value.data(), value.size(), &ival) == 0 || ival < 0) { + res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'incremental-compact-min-file-age'\r\n"); + return; + } + g_pika_conf->SetIncrementalCompactMinFileAge(static_cast(ival)); + res_.AppendStringRaw("+OK\r\n"); } else if (set_item == "rate-limiter-bandwidth") { int64_t new_bandwidth = 0; if (pstd::string2int(value.data(), value.size(), &new_bandwidth) == 0 || new_bandwidth <= 0) { diff --git a/src/pika_conf.cc b/src/pika_conf.cc index b7bf82e647..4a0199182f 100644 --- a/src/pika_conf.cc +++ b/src/pika_conf.cc @@ -341,10 +341,34 @@ int PikaConf::Load() { compaction_strategy_ = FullCompact; } else if (cs_ == "obd-compact") { compaction_strategy_ = OldestOrBestDeleteRatioSstCompact; + } else if (cs_ == "incremental-compact") { + compaction_strategy_ = IncrementalCompact; } else { compaction_strategy_ = NONE; } + // for incremental-compact + GetConfInt("incremental-compact-interval", &incremental_compact_interval_); + if (incremental_compact_interval_ <= 0) { + incremental_compact_interval_ = 60; + } + GetConfInt("incremental-compact-max-files", &incremental_compact_max_files_); + if (incremental_compact_max_files_ <= 0) { + incremental_compact_max_files_ = 1; + } + GetConfInt("incremental-compact-max-time-ms", &incremental_compact_max_time_ms_); + if (incremental_compact_max_time_ms_ <= 0) { + incremental_compact_max_time_ms_ = 1000; + } + GetConfInt("incremental-compact-min-rate", &incremental_compact_min_rate_); + if (incremental_compact_min_rate_ <= 0 || incremental_compact_min_rate_ > 100) { + incremental_compact_min_rate_ = 70; + } + GetConfInt("incremental-compact-min-file-age", &incremental_compact_min_file_age_); + if (incremental_compact_min_file_age_ < 0) { + incremental_compact_min_file_age_ = 60; + } + // least-free-disk-resume-size GetConfInt64Human("least-free-disk-resume-size", &least_free_disk_to_resume_); if (least_free_disk_to_resume_ <= 0) { @@ -893,10 +917,19 @@ int PikaConf::ConfigRewrite() { compaction_strategy_ = FullCompact; } else if (cs_ == "obd-compact") { compaction_strategy_ = OldestOrBestDeleteRatioSstCompact; + } else if (cs_ == "incremental-compact") { + compaction_strategy_ = IncrementalCompact; } else { compaction_strategy_ = NONE; } + // for incremental-compact config update + SetConfInt("incremental-compact-interval", incremental_compact_interval_); + SetConfInt("incremental-compact-max-files", incremental_compact_max_files_); + SetConfInt("incremental-compact-max-time-ms", incremental_compact_max_time_ms_); + SetConfInt("incremental-compact-min-rate", incremental_compact_min_rate_); + SetConfInt("incremental-compact-min-file-age", incremental_compact_min_file_age_); + SetConfStr("disable_auto_compactions", disable_auto_compactions_ ? "true" : "false"); SetConfStr("cache-type", scachetype); SetConfInt64("least-free-disk-resume-size", least_free_disk_to_resume_); diff --git a/src/pika_db.cc b/src/pika_db.cc index f3d52fdec3..f91c41883d 100644 --- a/src/pika_db.cc +++ b/src/pika_db.cc @@ -201,6 +201,18 @@ void DB::LongestNotCompactionSstCompact(const storage::DataType& type) { storage_->LongestNotCompactionSstCompact(type); } +void DB::IncrementalCompact(const storage::DataType& type) { + std::lock_guard rwl(dbs_rw_); + if (!opened_) { + return; + } + storage_->IncrementalCompact(type, + g_pika_conf->incremental_compact_max_files(), + g_pika_conf->incremental_compact_max_time_ms(), + g_pika_conf->incremental_compact_min_rate(), + g_pika_conf->incremental_compact_min_file_age()); +} + void DB::DoKeyScan(void* arg) { std::unique_ptr bg_task_arg(static_cast(arg)); bg_task_arg->db->RunKeyScan(); diff --git a/src/pika_server.cc b/src/pika_server.cc index 7b8941b32d..d81152f119 100644 --- a/src/pika_server.cc +++ b/src/pika_server.cc @@ -520,6 +520,9 @@ Status PikaServer::DoSameThingEveryDB(const TaskType& type) { case TaskType::kCompactOldestOrBestDeleteRatioSst: db_item.second->LongestNotCompactionSstCompact(storage::DataType::kAll); break; + case TaskType::kIncrementalCompact: + db_item.second->IncrementalCompact(storage::DataType::kAll); + break; default: break; } @@ -1234,6 +1237,18 @@ void PikaServer::AutoCompactRange() { DoSameThingEveryDB(TaskType::kCompactAll); } else if (g_pika_conf->compaction_strategy() == PikaConf::OldestOrBestDeleteRatioSstCompact) { DoSameThingEveryDB(TaskType::kCompactOldestOrBestDeleteRatioSst); + } else if (g_pika_conf->compaction_strategy() == PikaConf::IncrementalCompact) { + struct timeval now; + gettimeofday(&now, nullptr); + int interval = g_pika_conf->incremental_compact_interval(); + if (interval <= 0) { + return; + } + if (last_incremental_compact_time_.tv_sec == 0 || + now.tv_sec - last_incremental_compact_time_.tv_sec >= interval) { + gettimeofday(&last_incremental_compact_time_, nullptr); + DoSameThingEveryDB(TaskType::kIncrementalCompact); + } } } diff --git a/src/storage/include/storage/storage.h b/src/storage/include/storage/storage.h index dd41b3ea94..f17e72bca2 100644 --- a/src/storage/include/storage/storage.h +++ b/src/storage/include/storage/storage.h @@ -167,6 +167,7 @@ enum Operation { kCleanAll, kCompactRange, kCompactOldestOrBestDeleteRatioSst, + kIncrementalCompact, }; struct BGTask { @@ -1098,6 +1099,20 @@ class Storage { */ Status LongestNotCompactionSstCompact(const DataType &type, bool sync = false); + /** + * IncrementalCompact: 渐进式 compact,每次只处理少量最老的 SST 文件 + * @param type: 数据类型 + * @param max_files: 单次最多处理文件数 + * @param max_time_ms: 单次最大执行时间 + * @param min_rate: 压缩率阈值,低于此值继续处理 + * @param min_file_age: 文件最小年龄(秒) + * @param sync: 是否同步执行 + * @return Status + */ + Status IncrementalCompact(const DataType &type, int max_files = 1, int max_time_ms = 1000, + int min_rate = 70, int min_file_age = 60, + bool sync = false); + Status SetMaxCacheStatisticKeys(uint32_t max_cache_statistic_keys); Status SetSmallCompactionThreshold(uint32_t small_compaction_threshold); Status SetSmallCompactionDurationThreshold(uint32_t small_compaction_duration_threshold); diff --git a/src/storage/src/redis.cc b/src/storage/src/redis.cc index 077fe15dd0..ad269e9905 100644 --- a/src/storage/src/redis.cc +++ b/src/storage/src/redis.cc @@ -3,7 +3,9 @@ // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. +#include #include +#include #include "rocksdb/env.h" @@ -194,8 +196,10 @@ Status Redis::SetMaxCacheStatisticKeys(size_t max_cache_statistic_keys) { /* * compactrange no longer supports compact for a single data type + * */ Status Redis::CompactRange(const rocksdb::Slice* begin, const rocksdb::Slice* end) { + db_->CompactRange(default_compact_range_options_, begin, end); db_->CompactRange(default_compact_range_options_, handles_[kHashesDataCF], begin, end); db_->CompactRange(default_compact_range_options_, handles_[kSetsDataCF], begin, end); @@ -203,6 +207,7 @@ Status Redis::CompactRange(const rocksdb::Slice* begin, const rocksdb::Slice* en db_->CompactRange(default_compact_range_options_, handles_[kZsetsDataCF], begin, end); db_->CompactRange(default_compact_range_options_, handles_[kZsetsScoreCF], begin, end); db_->CompactRange(default_compact_range_options_, handles_[kStreamsDataCF], begin, end); + return Status::OK(); } @@ -266,9 +271,9 @@ void SelectColumnFamilyHandles(const DataType& option_type, const ColumnFamilyTy Status Redis::LongestNotCompactionSstCompact(const DataType& option_type, std::vector* compact_result_vec, const ColumnFamilyType& type) { bool no_compact = false; - bool to_comapct = true; - if (!in_compact_flag_.compare_exchange_weak(no_compact, to_comapct, std::memory_order_relaxed, - std::memory_order_relaxed)) { + bool to_compact = true; + if (!in_compact_flag_.compare_exchange_strong(no_compact, to_compact, std::memory_order_relaxed, + std::memory_order_relaxed)) { return Status::Busy("compact running"); } @@ -310,17 +315,6 @@ Status Redis::LongestNotCompactionSstCompact(const DataType& option_type, std::v // clear deleted sst file records because we use them in different cf listener_.Clear(); - // The main goal of compaction was reclaimed the disk space and removed - // the tombstone. It seems that compaction scheduler was unnecessary here when - // the live files was too few, Hard code to 1 here. - if (props.size() <= 1) { - // LOG(WARNING) << "LongestNotCompactionSstCompact " << handles_[idx]->GetName() << " only one file"; - if (compact_result_vec) { - compact_result_vec->push_back(Status::OK()); - } - continue; - } - size_t max_files_to_compact = 1; const StorageOptions& storageOptions = storage_->GetStorageOptions(); if (props.size() / storageOptions.compact_param_.compact_every_num_of_files_ > max_files_to_compact) { @@ -388,7 +382,12 @@ Status Redis::LongestNotCompactionSstCompact(const DataType& option_type, std::v if (file_creation_time < static_cast(now / 1000 - storageOptions.compact_param_.force_compact_file_age_seconds_) && delete_ratio >= force_compact_min_ratio) { - compact_result = db_->CompactRange(default_compact_range_options_, &start_key, &stop_key); + compact_result = db_->CompactRange(default_compact_range_options_, handles_[idx], &start_key, &stop_key); + if (!compact_result.ok()) { + LOG(WARNING) << handles_[idx]->GetName() + << " force CompactRange failed: " << compact_result.ToString() + << " file=" << file_path; + } if (--max_files_to_compact == 0) { break; } @@ -436,6 +435,149 @@ Status Redis::LongestNotCompactionSstCompact(const DataType& option_type, std::v return Status::OK(); } +// Helper function to extract table file number from filename +// e.g., "000123.sst" -> 123 +static uint64_t ExtractFileNumber(const std::string& name) { + uint64_t number = 0; + uint64_t base = 1; + size_t pos = name.find_last_of('.'); + if (pos == std::string::npos) { + return 0; + } + // Move backwards from '.' to find the digits + while (pos > 0) { + --pos; + char c = name[pos]; + if (c >= '0' && c <= '9') { + number += (c - '0') * base; + base *= 10; + } else { + break; + } + } + return number; +} + +Status Redis::IncrementalCompact(const DataType& option_type, std::vector* compact_result_vec, + const ColumnFamilyType& type, int max_files, int max_time_ms, + int min_rate, int min_file_age) { + // 1. 并发控制 + bool no_compact = false; + bool to_compact = true; + if (!in_compact_flag_.compare_exchange_strong(no_compact, to_compact, std::memory_order_relaxed, + std::memory_order_relaxed)) { + return Status::Busy("compact running"); + } + DEFER { in_compact_flag_.store(false); }; + + // 2. 选择 Column Family + std::vector handleIdxVec; + SelectColumnFamilyHandles(option_type, type, handleIdxVec); + if (handleIdxVec.empty()) { + return Status::Corruption("Invalid data type"); + } + + if (compact_result_vec) { + compact_result_vec->clear(); + } + + int64_t now_sec = std::time(nullptr); + + for (auto idx : handleIdxVec) { + // 每个 CF 独立计时和配额 + auto cf_start_time = std::chrono::steady_clock::now(); + int processed = 0; + + while (processed < max_files) { + // 3.1 检查该 CF 的超时 + auto elapsed = std::chrono::steady_clock::now() - cf_start_time; + if (std::chrono::duration_cast(elapsed).count() >= max_time_ms) { + LOG(INFO) << "IncrementalCompact timeout for cf=" << handles_[idx]->GetName() + << ", processed " << processed << " files"; + break; + } + + // 3.2 获取元数据,找最老的文件 + rocksdb::ColumnFamilyMetaData meta; + db_->GetColumnFamilyMetaData(handles_[idx], &meta); + + std::string oldest_file; + int oldest_level = 0; + uint64_t oldest_number = UINT64_MAX; + + for (const auto& level_meta : meta.levels) { + // FIX: 跳过 L0 和 L6,L0 让 RocksDB 自动处理,L6 没有下一层不参与 incremental compact + if (level_meta.level == 0 || level_meta.level >= 6) { + continue; + } + + for (const auto& file_meta : level_meta.files) { + // 跳过太新的文件 + if (file_meta.file_creation_time > 0 && + (now_sec - file_meta.file_creation_time) < min_file_age) { + continue; + } + + uint64_t number = ExtractFileNumber(file_meta.name); + if (number < oldest_number) { + oldest_number = number; + oldest_file = file_meta.db_path + "/" + file_meta.name; + oldest_level = level_meta.level; + } + } + } + + if (oldest_file.empty()) { + break; // 没有符合条件的文件 + } + + // 3.3 使用 CompactFiles 进行 compact(只处理 L1-L5) + std::vector input_files{oldest_file}; + rocksdb::CompactionOptions compact_options; + // 目标层 = 当前层 + 1(L1→L2, L2→L3, ... L5→L6) + int dest_level = oldest_level + 1; + + LOG(INFO) << "IncrementalCompact start: file=" << oldest_file + << ", cf=" << handles_[idx]->GetName() + << ", from_level=" << oldest_level + << ", to_level=" << dest_level; + + rocksdb::CompactionJobInfo job_info; + Status s = db_->CompactFiles(compact_options, handles_[idx], + input_files, dest_level, -1, + nullptr, &job_info); + + if (!s.ok()) { + LOG(WARNING) << "IncrementalCompact failed for file " << oldest_file + << ": " << s.ToString(); + if (compact_result_vec) { + compact_result_vec->push_back(s); + } + break; + } + + // 3.4 检查压缩率,决定是否继续 + if (job_info.stats.num_input_records > 0) { + int rate = job_info.stats.num_output_records * 100 / job_info.stats.num_input_records; + LOG(INFO) << "IncrementalCompact " << oldest_file << " rate=" << rate << "%"; + + if (rate >= min_rate) { + // 压缩效果好,暂停处理 + break; + } + } + + processed++; + } + + if (compact_result_vec && (compact_result_vec->empty() || compact_result_vec->back().ok())) { + compact_result_vec->push_back(Status::OK()); + } + } + + return Status::OK(); +} + Status Redis::SetSmallCompactionThreshold(uint64_t small_compaction_threshold) { small_compaction_threshold_ = small_compaction_threshold; return Status::OK(); diff --git a/src/storage/src/redis.h b/src/storage/src/redis.h index 54c6e10d46..3c451b0cde 100644 --- a/src/storage/src/redis.h +++ b/src/storage/src/redis.h @@ -110,6 +110,11 @@ class Redis { virtual Status LongestNotCompactionSstCompact(const DataType& option_type, std::vector* compact_result_vec, const ColumnFamilyType& type = kMetaAndData); + virtual Status IncrementalCompact(const DataType& option_type, std::vector* compact_result_vec, + const ColumnFamilyType& type = kMetaAndData, + int max_files = 1, int max_time_ms = 1000, + int min_rate = 70, int min_file_age = 60); + virtual Status GetProperty(const std::string& property, uint64_t* out); Status ScanKeyNum(std::vector* key_info); @@ -520,7 +525,7 @@ class Redis { rocksdb::WriteOptions default_write_options_; rocksdb::ReadOptions default_read_options_; rocksdb::CompactRangeOptions default_compact_range_options_; - std::atomic in_compact_flag_; + std::atomic in_compact_flag_{false}; OBDSstListener listener_; // listening created sst file while compacting in OBD-compact // For Scan diff --git a/src/storage/src/storage.cc b/src/storage/src/storage.cc index cc7ca864f0..bf2018114c 100644 --- a/src/storage/src/storage.cc +++ b/src/storage/src/storage.cc @@ -1697,7 +1697,7 @@ Status Storage::StartBGThread() { Status Storage::AddBGTask(const BGTask& bg_task) { bg_tasks_mutex_.lock(); - if (bg_task.type == DataType::kAll) { + if (bg_task.operation == kCleanAll) { // if current task it is global compact, // clear the bg_tasks_queue_; std::queue empty_queue; @@ -1729,6 +1729,19 @@ Status Storage::RunBGTask() { DoCompactRange(task.type, "", ""); } else if (task.operation == kCompactOldestOrBestDeleteRatioSst) { LongestNotCompactionSstCompact(task.type, true); + } else if (task.operation == kIncrementalCompact) { + // Parse parameters from argv, use defaults if not provided + int max_files = 1; + int max_time_ms = 1000; + int min_rate = 70; + int min_file_age = 60; + + if (task.argv.size() > 0) max_files = std::atoi(task.argv[0].c_str()); + if (task.argv.size() > 1) max_time_ms = std::atoi(task.argv[1].c_str()); + if (task.argv.size() > 2) min_rate = std::atoi(task.argv[2].c_str()); + if (task.argv.size() > 3) min_file_age = std::atoi(task.argv[3].c_str()); + + IncrementalCompact(task.type, max_files, max_time_ms, min_rate, min_file_age, true); } else if (task.operation == kCompactRange) { if (task.argv.size() == 1) { DoCompactSpecificKey(task.type, task.argv[0]); @@ -1760,6 +1773,34 @@ Status Storage::LongestNotCompactionSstCompact(const DataType &type, bool sync) return Status::OK(); } +Status Storage::IncrementalCompact(const DataType &type, int max_files, int max_time_ms, + int min_rate, int min_file_age, bool sync) { + if (sync) { + Status s; + for (const auto& inst : insts_) { + std::vector compact_result_vec; + s = inst->IncrementalCompact(type, &compact_result_vec, storage::kMetaAndData, + max_files, max_time_ms, min_rate, + min_file_age); + for (auto compact_result : compact_result_vec) { + if (!compact_result.ok()) { + LOG(ERROR) << compact_result.ToString(); + } + } + } + return s; + } else { + // Pass parameters via argv for async execution + AddBGTask({type, kIncrementalCompact, { + std::to_string(max_files), + std::to_string(max_time_ms), + std::to_string(min_rate), + std::to_string(min_file_age) + }}); + } + return Status::OK(); +} + Status Storage::Compact(const DataType& type, bool sync) { if (sync) { return DoCompactRange(type, "", "");