Skip to content
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,4 @@ pkg
!codis/cmd/fe/assets/**

tests/tmp
.claude
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,7 @@ if (USE_PIKA_TOOLS)
add_subdirectory(tools)
endif()
aux_source_directory(src DIR_SRCS)
list(REMOVE_ITEM DIR_SRCS "src/build_version.cc")

# # generate version
string(TIMESTAMP TS "%Y-%m-%d %H:%M:%S")
Expand Down
27 changes: 23 additions & 4 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -677,8 +677,9 @@ wash-data: true

# Pika automatic compact compact strategy, a complement to rocksdb compact.
# Trigger the compact background task periodically according to `compact-interval`
# Can choose `full-compact` or `obd-compact`.
# Can choose `full-compact`, `obd-compact` or `incremental-compact`.
# obd-compact https://github.com/OpenAtomFoundation/pika/issues/2255
# incremental-compact: incremental compact using CompactFiles, processes a small number of oldest SST files each time
compaction-strategy : obd-compact

# For OBD_Compact
Expand All @@ -704,7 +705,25 @@ force-compact-min-delete-ratio : 10
# compact every `compact-every-num-of-files` file.
dont-compact-sst-created-in-seconds : 20

# For OBD_Compact
# According to the number of sst files in rocksdb,
# For OBD_Compact
# According to the number of sst files in rocksdb,
# compact every `compact-every-num-of-files` file.
best-delete-min-ratio : 10
best-delete-min-ratio : 10

# ============================================
# For incremental-compact (when compaction-strategy = incremental-compact)
# ============================================
# Execution interval in seconds
incremental-compact-interval : 60

# Maximum number of files to compact per run
incremental-compact-max-files : 1

# Maximum execution time per run in milliseconds
incremental-compact-max-time-ms : 1000

# Compression rate threshold (%), continue processing if rate is below this value
incremental-compact-min-rate : 70

# Minimum file age in seconds to be considered for compaction
incremental-compact-min-file-age : 60
55 changes: 54 additions & 1 deletion include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ class PikaConf : public pstd::BaseConf {
enum CompactionStrategy {
NONE,
FullCompact,
OldestOrBestDeleteRatioSstCompact
OldestOrBestDeleteRatioSstCompact,
IncrementalCompact
};
PikaConf(const std::string& path);
~PikaConf() override = default;
Expand Down Expand Up @@ -147,6 +148,26 @@ class PikaConf : public pstd::BaseConf {
std::shared_lock l(rwlock_);
return compaction_strategy_;
}
int incremental_compact_interval() {
std::shared_lock l(rwlock_);
return incremental_compact_interval_;
}
int incremental_compact_max_files() {
std::shared_lock l(rwlock_);
return incremental_compact_max_files_;
}
int incremental_compact_max_time_ms() {
std::shared_lock l(rwlock_);
return incremental_compact_max_time_ms_;
}
int incremental_compact_min_rate() {
std::shared_lock l(rwlock_);
return incremental_compact_min_rate_;
}
int incremental_compact_min_file_age() {
std::shared_lock l(rwlock_);
return incremental_compact_min_file_age_;
}
bool disable_auto_compactions() {
std::shared_lock l(rwlock_);
return disable_auto_compactions_;
Expand Down Expand Up @@ -728,6 +749,31 @@ class PikaConf : public pstd::BaseConf {
TryPushDiffCommands("disable_auto_compactions", value);
disable_auto_compactions_ = value == "true";
}
void SetIncrementalCompactInterval(const int value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("incremental-compact-interval", std::to_string(value));
incremental_compact_interval_ = value;
}
void SetIncrementalCompactMaxFiles(const int value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("incremental-compact-max-files", std::to_string(value));
incremental_compact_max_files_ = value;
}
void SetIncrementalCompactMaxTimeMs(const int value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("incremental-compact-max-time-ms", std::to_string(value));
incremental_compact_max_time_ms_ = value;
}
void SetIncrementalCompactMinRate(const int value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("incremental-compact-min-rate", std::to_string(value));
incremental_compact_min_rate_ = value;
}
void SetIncrementalCompactMinFileAge(const int value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("incremental-compact-min-file-age", std::to_string(value));
incremental_compact_min_file_age_ = value;
}
void SetMaxSubcompactions(const int& value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("max-subcompactions", std::to_string(value));
Expand Down Expand Up @@ -1040,6 +1086,13 @@ class PikaConf : public pstd::BaseConf {
int best_delete_min_ratio_;
CompactionStrategy compaction_strategy_;

// for incremental-compact
int incremental_compact_interval_ = 60;
int incremental_compact_max_files_ = 1;
int incremental_compact_max_time_ms_ = 1000;
int incremental_compact_min_rate_ = 70;
int incremental_compact_min_file_age_ = 60;

int64_t resume_check_interval_ = 60; // seconds
int64_t least_free_disk_to_resume_ = 268435456; // 256 MB
double min_check_resume_ratio_ = 0.7;
Expand Down
1 change: 1 addition & 0 deletions include/pika_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ class DB : public std::enable_shared_from_this<DB>, public pstd::noncopyable {
void Compact(const storage::DataType& type);
void CompactRange(const storage::DataType& type, const std::string& start, const std::string& end);
void LongestNotCompactionSstCompact(const storage::DataType& type);
void IncrementalCompact(const storage::DataType& type);

void SetCompactRangeOptions(const bool is_canceled);

Expand Down
2 changes: 2 additions & 0 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ enum TaskType {
kBgSave,
kCompactRangeAll,
kCompactOldestOrBestDeleteRatioSst,
kIncrementalCompact,
};

struct TaskArg {
Expand Down Expand Up @@ -550,6 +551,7 @@ class PikaServer : public pstd::noncopyable {
*/
bool have_scheduled_crontask_ = false;
struct timeval last_check_compact_time_;
struct timeval last_incremental_compact_time_ = {0, 0};

/*
* ResumeDB used
Expand Down
65 changes: 65 additions & 0 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2012,6 +2012,31 @@ void ConfigCmd::ConfigGet(std::string& ret) {
EncodeString(&config_body, "disable_auto_compactions");
EncodeString(&config_body, g_pika_conf->disable_auto_compactions() ? "true" : "false");
}
if (pstd::stringmatch(pattern.data(), "incremental-compact-interval", 1) != 0) {
elements += 2;
EncodeString(&config_body, "incremental-compact-interval");
EncodeNumber(&config_body, g_pika_conf->incremental_compact_interval());
}
if (pstd::stringmatch(pattern.data(), "incremental-compact-max-files", 1) != 0) {
elements += 2;
EncodeString(&config_body, "incremental-compact-max-files");
EncodeNumber(&config_body, g_pika_conf->incremental_compact_max_files());
}
if (pstd::stringmatch(pattern.data(), "incremental-compact-max-time-ms", 1) != 0) {
elements += 2;
EncodeString(&config_body, "incremental-compact-max-time-ms");
EncodeNumber(&config_body, g_pika_conf->incremental_compact_max_time_ms());
}
if (pstd::stringmatch(pattern.data(), "incremental-compact-min-rate", 1) != 0) {
elements += 2;
EncodeString(&config_body, "incremental-compact-min-rate");
EncodeNumber(&config_body, g_pika_conf->incremental_compact_min_rate());
}
if (pstd::stringmatch(pattern.data(), "incremental-compact-min-file-age", 1) != 0) {
elements += 2;
EncodeString(&config_body, "incremental-compact-min-file-age");
EncodeNumber(&config_body, g_pika_conf->incremental_compact_min_file_age());
}
if (pstd::stringmatch(pattern.data(), "network-interface", 1) != 0) {
elements += 2;
EncodeString(&config_body, "network-interface");
Expand Down Expand Up @@ -2345,6 +2370,11 @@ void ConfigCmd::ConfigSet(std::shared_ptr<DB> db) {
"compact-cron",
"compact-interval",
"disable_auto_compactions",
"incremental-compact-interval",
"incremental-compact-max-files",
"incremental-compact-max-time-ms",
"incremental-compact-min-rate",
"incremental-compact-min-file-age",
"slave-priority",
"sync-window-size",
"slow-cmd-list",
Expand Down Expand Up @@ -2563,6 +2593,41 @@ void ConfigCmd::ConfigSet(std::shared_ptr<DB> db) {
}
g_pika_conf->SetDisableAutoCompaction(value);
res_.AppendStringRaw("+OK\r\n");
} else if (set_item == "incremental-compact-interval") {
if (pstd::string2int(value.data(), value.size(), &ival) == 0 || ival <= 0) {
res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'incremental-compact-interval'\r\n");
return;
}
g_pika_conf->SetIncrementalCompactInterval(static_cast<int>(ival));
res_.AppendStringRaw("+OK\r\n");
} else if (set_item == "incremental-compact-max-files") {
if (pstd::string2int(value.data(), value.size(), &ival) == 0 || ival <= 0) {
res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'incremental-compact-max-files'\r\n");
return;
}
g_pika_conf->SetIncrementalCompactMaxFiles(static_cast<int>(ival));
res_.AppendStringRaw("+OK\r\n");
} else if (set_item == "incremental-compact-max-time-ms") {
if (pstd::string2int(value.data(), value.size(), &ival) == 0 || ival <= 0) {
res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'incremental-compact-max-time-ms'\r\n");
return;
}
g_pika_conf->SetIncrementalCompactMaxTimeMs(static_cast<int>(ival));
res_.AppendStringRaw("+OK\r\n");
} else if (set_item == "incremental-compact-min-rate") {
if (pstd::string2int(value.data(), value.size(), &ival) == 0 || ival <= 0 || ival > 100) {
res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'incremental-compact-min-rate'\r\n");
return;
}
g_pika_conf->SetIncrementalCompactMinRate(static_cast<int>(ival));
res_.AppendStringRaw("+OK\r\n");
} else if (set_item == "incremental-compact-min-file-age") {
if (pstd::string2int(value.data(), value.size(), &ival) == 0 || ival < 0) {
res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'incremental-compact-min-file-age'\r\n");
return;
}
g_pika_conf->SetIncrementalCompactMinFileAge(static_cast<int>(ival));
res_.AppendStringRaw("+OK\r\n");
} else if (set_item == "rate-limiter-bandwidth") {
int64_t new_bandwidth = 0;
if (pstd::string2int(value.data(), value.size(), &new_bandwidth) == 0 || new_bandwidth <= 0) {
Expand Down
33 changes: 33 additions & 0 deletions src/pika_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -341,10 +341,34 @@ int PikaConf::Load() {
compaction_strategy_ = FullCompact;
} else if (cs_ == "obd-compact") {
compaction_strategy_ = OldestOrBestDeleteRatioSstCompact;
} else if (cs_ == "incremental-compact") {
compaction_strategy_ = IncrementalCompact;
} else {
compaction_strategy_ = NONE;
}

// for incremental-compact
GetConfInt("incremental-compact-interval", &incremental_compact_interval_);
if (incremental_compact_interval_ <= 0) {
incremental_compact_interval_ = 60;
}
GetConfInt("incremental-compact-max-files", &incremental_compact_max_files_);
if (incremental_compact_max_files_ <= 0) {
incremental_compact_max_files_ = 1;
}
GetConfInt("incremental-compact-max-time-ms", &incremental_compact_max_time_ms_);
if (incremental_compact_max_time_ms_ <= 0) {
incremental_compact_max_time_ms_ = 1000;
}
GetConfInt("incremental-compact-min-rate", &incremental_compact_min_rate_);
if (incremental_compact_min_rate_ <= 0 || incremental_compact_min_rate_ > 100) {
incremental_compact_min_rate_ = 70;
}
GetConfInt("incremental-compact-min-file-age", &incremental_compact_min_file_age_);
if (incremental_compact_min_file_age_ < 0) {
incremental_compact_min_file_age_ = 60;
}

// least-free-disk-resume-size
GetConfInt64Human("least-free-disk-resume-size", &least_free_disk_to_resume_);
if (least_free_disk_to_resume_ <= 0) {
Expand Down Expand Up @@ -893,10 +917,19 @@ int PikaConf::ConfigRewrite() {
compaction_strategy_ = FullCompact;
} else if (cs_ == "obd-compact") {
compaction_strategy_ = OldestOrBestDeleteRatioSstCompact;
} else if (cs_ == "incremental-compact") {
compaction_strategy_ = IncrementalCompact;
} else {
compaction_strategy_ = NONE;
}

// for incremental-compact config update
SetConfInt("incremental-compact-interval", incremental_compact_interval_);
SetConfInt("incremental-compact-max-files", incremental_compact_max_files_);
SetConfInt("incremental-compact-max-time-ms", incremental_compact_max_time_ms_);
SetConfInt("incremental-compact-min-rate", incremental_compact_min_rate_);
SetConfInt("incremental-compact-min-file-age", incremental_compact_min_file_age_);

SetConfStr("disable_auto_compactions", disable_auto_compactions_ ? "true" : "false");
SetConfStr("cache-type", scachetype);
SetConfInt64("least-free-disk-resume-size", least_free_disk_to_resume_);
Expand Down
12 changes: 12 additions & 0 deletions src/pika_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,18 @@ void DB::LongestNotCompactionSstCompact(const storage::DataType& type) {
storage_->LongestNotCompactionSstCompact(type);
}

void DB::IncrementalCompact(const storage::DataType& type) {
std::lock_guard rwl(dbs_rw_);
if (!opened_) {
return;
}
storage_->IncrementalCompact(type,
g_pika_conf->incremental_compact_max_files(),
g_pika_conf->incremental_compact_max_time_ms(),
g_pika_conf->incremental_compact_min_rate(),
g_pika_conf->incremental_compact_min_file_age());
}

void DB::DoKeyScan(void* arg) {
std::unique_ptr <BgTaskArg> bg_task_arg(static_cast<BgTaskArg*>(arg));
bg_task_arg->db->RunKeyScan();
Expand Down
15 changes: 15 additions & 0 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,9 @@ Status PikaServer::DoSameThingEveryDB(const TaskType& type) {
case TaskType::kCompactOldestOrBestDeleteRatioSst:
db_item.second->LongestNotCompactionSstCompact(storage::DataType::kAll);
break;
case TaskType::kIncrementalCompact:
db_item.second->IncrementalCompact(storage::DataType::kAll);
break;
default:
break;
}
Expand Down Expand Up @@ -1234,6 +1237,18 @@ void PikaServer::AutoCompactRange() {
DoSameThingEveryDB(TaskType::kCompactAll);
} else if (g_pika_conf->compaction_strategy() == PikaConf::OldestOrBestDeleteRatioSstCompact) {
DoSameThingEveryDB(TaskType::kCompactOldestOrBestDeleteRatioSst);
} else if (g_pika_conf->compaction_strategy() == PikaConf::IncrementalCompact) {
struct timeval now;
gettimeofday(&now, nullptr);
int interval = g_pika_conf->incremental_compact_interval();
if (interval <= 0) {
return;
}
if (last_incremental_compact_time_.tv_sec == 0 ||
now.tv_sec - last_incremental_compact_time_.tv_sec >= interval) {
gettimeofday(&last_incremental_compact_time_, nullptr);
DoSameThingEveryDB(TaskType::kIncrementalCompact);
}
}
Comment thread
chenbt-hz marked this conversation as resolved.
}

Expand Down
15 changes: 15 additions & 0 deletions src/storage/include/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ enum Operation {
kCleanAll,
kCompactRange,
kCompactOldestOrBestDeleteRatioSst,
kIncrementalCompact,
};

struct BGTask {
Expand Down Expand Up @@ -1098,6 +1099,20 @@ class Storage {
*/
Status LongestNotCompactionSstCompact(const DataType &type, bool sync = false);

/**
* IncrementalCompact: 渐进式 compact,每次只处理少量最老的 SST 文件
* @param type: 数据类型
* @param max_files: 单次最多处理文件数
* @param max_time_ms: 单次最大执行时间
* @param min_rate: 压缩率阈值,低于此值继续处理
* @param min_file_age: 文件最小年龄(秒)
* @param sync: 是否同步执行
* @return Status
*/
Status IncrementalCompact(const DataType &type, int max_files = 1, int max_time_ms = 1000,
int min_rate = 70, int min_file_age = 60,
bool sync = false);

Status SetMaxCacheStatisticKeys(uint32_t max_cache_statistic_keys);
Status SetSmallCompactionThreshold(uint32_t small_compaction_threshold);
Status SetSmallCompactionDurationThreshold(uint32_t small_compaction_duration_threshold);
Expand Down
Loading
Loading