Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,4 @@ pkg
!codis/cmd/fe/assets/**

tests/tmp
.claude
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,7 @@ if (USE_PIKA_TOOLS)
add_subdirectory(tools)
endif()
aux_source_directory(src DIR_SRCS)
list(REMOVE_ITEM DIR_SRCS "src/build_version.cc")

# # generate version
string(TIMESTAMP TS "%Y-%m-%d %H:%M:%S")
Expand Down
27 changes: 23 additions & 4 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -677,8 +677,9 @@ wash-data: true

# Pika automatic compact compact strategy, a complement to rocksdb compact.
# Trigger the compact background task periodically according to `compact-interval`
# Can choose `full-compact` or `obd-compact`.
# Can choose `full-compact`, `obd-compact` or `progressive-compact`.
# obd-compact https://github.com/OpenAtomFoundation/pika/issues/2255
# progressive-compact: progressive compact using CompactFiles, processes a small number of oldest SST files each time
compaction-strategy : obd-compact

# For OBD_Compact
Expand All @@ -704,7 +705,25 @@ force-compact-min-delete-ratio : 10
# compact every `compact-every-num-of-files` file.
dont-compact-sst-created-in-seconds : 20

# For OBD_Compact
# According to the number of sst files in rocksdb,
# For OBD_Compact
# According to the number of sst files in rocksdb,
# compact every `compact-every-num-of-files` file.
best-delete-min-ratio : 10
best-delete-min-ratio : 10

# ============================================
# For progressive-compact (when compaction-strategy = progressive-compact)
# ============================================
# Execution interval in seconds
progressive-compact-interval : 60

# Maximum number of files to compact per run
progressive-compact-max-files : 1

# Maximum execution time per run in milliseconds
progressive-compact-max-time-ms : 1000

# Compression rate threshold (%), continue processing if rate is below this value
progressive-compact-min-rate : 70

# Minimum file age in seconds to be considered for compaction
progressive-compact-min-file-age : 60
68 changes: 67 additions & 1 deletion include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ class PikaConf : public pstd::BaseConf {
enum CompactionStrategy {
NONE,
FullCompact,
OldestOrBestDeleteRatioSstCompact
OldestOrBestDeleteRatioSstCompact,
ProgressiveCompact
};
PikaConf(const std::string& path);
~PikaConf() override = default;
Expand Down Expand Up @@ -147,6 +148,26 @@ class PikaConf : public pstd::BaseConf {
std::shared_lock l(rwlock_);
return compaction_strategy_;
}
int progressive_compact_interval() {
std::shared_lock l(rwlock_);
return progressive_compact_interval_;
}
int progressive_compact_max_files() {
std::shared_lock l(rwlock_);
return progressive_compact_max_files_;
}
int progressive_compact_max_time_ms() {
std::shared_lock l(rwlock_);
return progressive_compact_max_time_ms_;
}
int progressive_compact_min_rate() {
std::shared_lock l(rwlock_);
return progressive_compact_min_rate_;
}
int progressive_compact_min_file_age() {
std::shared_lock l(rwlock_);
return progressive_compact_min_file_age_;
}
bool disable_auto_compactions() {
std::shared_lock l(rwlock_);
return disable_auto_compactions_;
Expand Down Expand Up @@ -723,11 +744,49 @@ class PikaConf : public pstd::BaseConf {
TryPushDiffCommands("compact-interval", value);
compact_interval_ = value;
}
void SetCompactionStrategy(const std::string& value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("compaction-strategy", value);
if (value == "full-compact") {
compaction_strategy_ = FullCompact;
} else if (value == "obd-compact") {
compaction_strategy_ = OldestOrBestDeleteRatioSstCompact;
} else if (value == "progressive-compact") {
compaction_strategy_ = ProgressiveCompact;
} else {
compaction_strategy_ = NONE;
}
}
void SetDisableAutoCompaction(const std::string& value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("disable_auto_compactions", value);
disable_auto_compactions_ = value == "true";
}
void SetProgressiveCompactInterval(const int value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("progressive-compact-interval", std::to_string(value));
progressive_compact_interval_ = value;
}
void SetProgressiveCompactMaxFiles(const int value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("progressive-compact-max-files", std::to_string(value));
progressive_compact_max_files_ = value;
}
void SetProgressiveCompactMaxTimeMs(const int value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("progressive-compact-max-time-ms", std::to_string(value));
progressive_compact_max_time_ms_ = value;
}
void SetProgressiveCompactMinRate(const int value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("progressive-compact-min-rate", std::to_string(value));
progressive_compact_min_rate_ = value;
}
void SetProgressiveCompactMinFileAge(const int value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("progressive-compact-min-file-age", std::to_string(value));
progressive_compact_min_file_age_ = value;
}
void SetMaxSubcompactions(const int& value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("max-subcompactions", std::to_string(value));
Expand Down Expand Up @@ -1040,6 +1099,13 @@ class PikaConf : public pstd::BaseConf {
int best_delete_min_ratio_;
CompactionStrategy compaction_strategy_;

// for progressive-compact
int progressive_compact_interval_ = 60;
int progressive_compact_max_files_ = 1;
int progressive_compact_max_time_ms_ = 1000;
int progressive_compact_min_rate_ = 70;
int progressive_compact_min_file_age_ = 60;

int64_t resume_check_interval_ = 60; // seconds
int64_t least_free_disk_to_resume_ = 268435456; // 256 MB
double min_check_resume_ratio_ = 0.7;
Expand Down
1 change: 1 addition & 0 deletions include/pika_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ class DB : public std::enable_shared_from_this<DB>, public pstd::noncopyable {
void Compact(const storage::DataType& type);
void CompactRange(const storage::DataType& type, const std::string& start, const std::string& end);
void LongestNotCompactionSstCompact(const storage::DataType& type);
void ProgressiveCompact(const storage::DataType& type);

void SetCompactRangeOptions(const bool is_canceled);

Expand Down
2 changes: 2 additions & 0 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ enum TaskType {
kBgSave,
kCompactRangeAll,
kCompactOldestOrBestDeleteRatioSst,
kProgressiveCompact,
};

struct TaskArg {
Expand Down Expand Up @@ -550,6 +551,7 @@ class PikaServer : public pstd::noncopyable {
*/
bool have_scheduled_crontask_ = false;
struct timeval last_check_compact_time_;
struct timeval last_progressive_compact_time_ = {0, 0};

/*
* ResumeDB used
Expand Down
95 changes: 95 additions & 0 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2012,6 +2012,51 @@ void ConfigCmd::ConfigGet(std::string& ret) {
EncodeString(&config_body, "disable_auto_compactions");
EncodeString(&config_body, g_pika_conf->disable_auto_compactions() ? "true" : "false");
}
if (pstd::stringmatch(pattern.data(), "progressive-compact-interval", 1) != 0) {
elements += 2;
EncodeString(&config_body, "progressive-compact-interval");
EncodeNumber(&config_body, g_pika_conf->progressive_compact_interval());
}
if (pstd::stringmatch(pattern.data(), "progressive-compact-max-files", 1) != 0) {
elements += 2;
EncodeString(&config_body, "progressive-compact-max-files");
EncodeNumber(&config_body, g_pika_conf->progressive_compact_max_files());
}
if (pstd::stringmatch(pattern.data(), "progressive-compact-max-time-ms", 1) != 0) {
elements += 2;
EncodeString(&config_body, "progressive-compact-max-time-ms");
EncodeNumber(&config_body, g_pika_conf->progressive_compact_max_time_ms());
}
if (pstd::stringmatch(pattern.data(), "progressive-compact-min-rate", 1) != 0) {
elements += 2;
EncodeString(&config_body, "progressive-compact-min-rate");
EncodeNumber(&config_body, g_pika_conf->progressive_compact_min_rate());
}
if (pstd::stringmatch(pattern.data(), "progressive-compact-min-file-age", 1) != 0) {
elements += 2;
EncodeString(&config_body, "progressive-compact-min-file-age");
EncodeNumber(&config_body, g_pika_conf->progressive_compact_min_file_age());
}
if (pstd::stringmatch(pattern.data(), "compaction-strategy", 1) != 0) {
elements += 2;
EncodeString(&config_body, "compaction-strategy");
std::string cs;
switch (g_pika_conf->compaction_strategy()) {
case PikaConf::FullCompact:
cs = "full-compact";
break;
case PikaConf::OldestOrBestDeleteRatioSstCompact:
cs = "obd-compact";
break;
case PikaConf::ProgressiveCompact:
cs = "progressive-compact";
break;
default:
cs = "none";
break;
}
EncodeString(&config_body, cs);
}
if (pstd::stringmatch(pattern.data(), "network-interface", 1) != 0) {
elements += 2;
EncodeString(&config_body, "network-interface");
Expand Down Expand Up @@ -2345,6 +2390,12 @@ void ConfigCmd::ConfigSet(std::shared_ptr<DB> db) {
"compact-cron",
"compact-interval",
"disable_auto_compactions",
"progressive-compact-interval",
"progressive-compact-max-files",
"progressive-compact-max-time-ms",
"progressive-compact-min-rate",
"progressive-compact-min-file-age",
"compaction-strategy",
"slave-priority",
"sync-window-size",
"slow-cmd-list",
Expand Down Expand Up @@ -2563,6 +2614,50 @@ void ConfigCmd::ConfigSet(std::shared_ptr<DB> db) {
}
g_pika_conf->SetDisableAutoCompaction(value);
res_.AppendStringRaw("+OK\r\n");
} else if (set_item == "progressive-compact-interval") {
if (pstd::string2int(value.data(), value.size(), &ival) == 0 || ival <= 0) {
res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'progressive-compact-interval'\r\n");
return;
}
g_pika_conf->SetProgressiveCompactInterval(static_cast<int>(ival));
res_.AppendStringRaw("+OK\r\n");
} else if (set_item == "progressive-compact-max-files") {
if (pstd::string2int(value.data(), value.size(), &ival) == 0 || ival <= 0) {
res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'progressive-compact-max-files'\r\n");
return;
}
g_pika_conf->SetProgressiveCompactMaxFiles(static_cast<int>(ival));
res_.AppendStringRaw("+OK\r\n");
} else if (set_item == "progressive-compact-max-time-ms") {
if (pstd::string2int(value.data(), value.size(), &ival) == 0 || ival <= 0) {
res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'progressive-compact-max-time-ms'\r\n");
return;
}
g_pika_conf->SetProgressiveCompactMaxTimeMs(static_cast<int>(ival));
res_.AppendStringRaw("+OK\r\n");
} else if (set_item == "progressive-compact-min-rate") {
if (pstd::string2int(value.data(), value.size(), &ival) == 0 || ival <= 0 || ival > 100) {
res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'progressive-compact-min-rate'\r\n");
return;
}
g_pika_conf->SetProgressiveCompactMinRate(static_cast<int>(ival));
res_.AppendStringRaw("+OK\r\n");
} else if (set_item == "progressive-compact-min-file-age") {
if (pstd::string2int(value.data(), value.size(), &ival) == 0 || ival < 0) {
res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'progressive-compact-min-file-age'\r\n");
return;
}
g_pika_conf->SetProgressiveCompactMinFileAge(static_cast<int>(ival));
res_.AppendStringRaw("+OK\r\n");
} else if (set_item == "compaction-strategy") {
if (value != "full-compact" && value != "obd-compact" &&
value != "progressive-compact" && value != "none") {
res_.AppendStringRaw("-ERR Invalid argument \'" + value +
"\' for CONFIG SET 'compaction-strategy'\r\n");
return;
}
g_pika_conf->SetCompactionStrategy(value);
res_.AppendStringRaw("+OK\r\n");
} else if (set_item == "rate-limiter-bandwidth") {
int64_t new_bandwidth = 0;
if (pstd::string2int(value.data(), value.size(), &new_bandwidth) == 0 || new_bandwidth <= 0) {
Expand Down
45 changes: 38 additions & 7 deletions src/pika_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -341,10 +341,34 @@ int PikaConf::Load() {
compaction_strategy_ = FullCompact;
} else if (cs_ == "obd-compact") {
compaction_strategy_ = OldestOrBestDeleteRatioSstCompact;
} else if (cs_ == "progressive-compact") {
compaction_strategy_ = ProgressiveCompact;
} else {
compaction_strategy_ = NONE;
}

// for progressive-compact
GetConfInt("progressive-compact-interval", &progressive_compact_interval_);
if (progressive_compact_interval_ <= 0) {
progressive_compact_interval_ = 60;
}
GetConfInt("progressive-compact-max-files", &progressive_compact_max_files_);
if (progressive_compact_max_files_ <= 0) {
progressive_compact_max_files_ = 1;
}
GetConfInt("progressive-compact-max-time-ms", &progressive_compact_max_time_ms_);
if (progressive_compact_max_time_ms_ <= 0) {
progressive_compact_max_time_ms_ = 1000;
}
GetConfInt("progressive-compact-min-rate", &progressive_compact_min_rate_);
if (progressive_compact_min_rate_ <= 0 || progressive_compact_min_rate_ > 100) {
progressive_compact_min_rate_ = 70;
}
GetConfInt("progressive-compact-min-file-age", &progressive_compact_min_file_age_);
if (progressive_compact_min_file_age_ < 0) {
progressive_compact_min_file_age_ = 60;
}

// least-free-disk-resume-size
GetConfInt64Human("least-free-disk-resume-size", &least_free_disk_to_resume_);
if (least_free_disk_to_resume_ <= 0) {
Expand Down Expand Up @@ -888,14 +912,21 @@ int PikaConf::ConfigRewrite() {
}

std::string cs_;
SetConfStr("compaction-strategy", cs_);
if (cs_ == "full-compact") {
compaction_strategy_ = FullCompact;
} else if (cs_ == "obd-compact") {
compaction_strategy_ = OldestOrBestDeleteRatioSstCompact;
} else {
compaction_strategy_ = NONE;
if (compaction_strategy_ == FullCompact) {
cs_ = "full-compact";
} else if (compaction_strategy_ == OldestOrBestDeleteRatioSstCompact) {
cs_ = "obd-compact";
} else if (compaction_strategy_ == ProgressiveCompact) {
cs_ = "progressive-compact";
}
SetConfStr("compaction-strategy", cs_);

// for progressive-compact config update
SetConfInt("progressive-compact-interval", progressive_compact_interval_);
SetConfInt("progressive-compact-max-files", progressive_compact_max_files_);
SetConfInt("progressive-compact-max-time-ms", progressive_compact_max_time_ms_);
SetConfInt("progressive-compact-min-rate", progressive_compact_min_rate_);
SetConfInt("progressive-compact-min-file-age", progressive_compact_min_file_age_);

SetConfStr("disable_auto_compactions", disable_auto_compactions_ ? "true" : "false");
SetConfStr("cache-type", scachetype);
Expand Down
12 changes: 12 additions & 0 deletions src/pika_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,18 @@ void DB::LongestNotCompactionSstCompact(const storage::DataType& type) {
storage_->LongestNotCompactionSstCompact(type);
}

void DB::ProgressiveCompact(const storage::DataType& type) {
std::lock_guard rwl(dbs_rw_);
if (!opened_) {
return;
}
storage_->ProgressiveCompact(type,
g_pika_conf->progressive_compact_max_files(),
g_pika_conf->progressive_compact_max_time_ms(),
g_pika_conf->progressive_compact_min_rate(),
g_pika_conf->progressive_compact_min_file_age());
}

void DB::DoKeyScan(void* arg) {
std::unique_ptr <BgTaskArg> bg_task_arg(static_cast<BgTaskArg*>(arg));
bg_task_arg->db->RunKeyScan();
Expand Down
Loading
Loading