Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,8 @@ ExternalProject_Add(rocksdb
-DPORTABLE=1
BUILD_COMMAND
make -j${CPU_CORE}
INSTALL_COMMAND
make install
)

ExternalProject_Add(rediscache
Expand Down
10 changes: 10 additions & 0 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,16 @@ disable_auto_compactions : false
# Rocksdb max_subcompactions, increasing this value can accelerate the exec speed of a single compaction task
# it's recommended to increase it's value if large compaction is found in you instance
max-subcompactions : 1

# Progressive compaction configuration
# Enable progressive compaction (yes/no)
# This feature automatically compacts old SST files in the background
enable-progressive-compact : no

# Progressive compaction interval (seconds)
# Specifies the time interval between progressive compaction runs
# Minimum value is 60 seconds, default is 90 seconds
progressive-compact-interval : 90
# The minimum disk usage ratio for checking resume.
# If the disk usage ratio is lower than min-check-resume-ratio, it will not check resume, only higher will check resume.
# Its default value is 0.7.
Expand Down
49 changes: 48 additions & 1 deletion include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,26 @@ class PikaConf : public pstd::BaseConf {
std::shared_lock l(rwlock_);
return rate_limiter_mode_;
}

bool enable_progressive_compact() {
std::shared_lock l(rwlock_);
return enable_auto_compact_old_sst_;
}

int progressive_compact_interval() {
std::shared_lock l(rwlock_);
return auto_compact_old_sst_interval_;
}

bool enable_auto_compact_old_sst() {
std::shared_lock l(rwlock_);
return enable_auto_compact_old_sst_;
}

int auto_compact_old_sst_interval() {
std::shared_lock l(rwlock_);
return auto_compact_old_sst_interval_;
}
int64_t rate_limiter_bandwidth() {
std::shared_lock l(rwlock_);
return rate_limiter_bandwidth_;
Expand Down Expand Up @@ -721,6 +741,29 @@ class PikaConf : public pstd::BaseConf {
max_compaction_bytes_ = value;
}

// Progressive compaction methods
bool enable_auto_compact_old_sst() const {
std::shared_lock l(rwlock_);
return enable_auto_compact_old_sst_;
}

void SetEnableAutoCompactOldSst(bool value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("enable-progressive-compact", value ? "yes" : "no");
enable_auto_compact_old_sst_ = value;
}

int64_t auto_compact_old_sst_interval() const {
std::shared_lock l(rwlock_);
return auto_compact_old_sst_interval_;
}

void SetAutoCompactOldSstInterval(int64_t value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("progressive-compact-interval", std::to_string(value));
auto_compact_old_sst_interval_ = value;
}

void SetLogNetActivities(std::string& value) {
TryPushDiffCommands("log-net-activities", value);
if (value == "yes") {
Expand Down Expand Up @@ -1005,6 +1048,10 @@ class PikaConf : public pstd::BaseConf {
bool write_binlog_ = false;
int target_file_size_base_ = 0;
int64_t max_compaction_bytes_ = 0;

// Progressive compaction configuration
bool enable_auto_compact_old_sst_ = false;
int64_t auto_compact_old_sst_interval_ = 90; // seconds
int binlog_file_size_ = 0;

// cache
Expand Down Expand Up @@ -1041,7 +1088,7 @@ class PikaConf : public pstd::BaseConf {
std::string blob_compression_type_ = "none";

std::unique_ptr<PikaMeta> local_meta_;
std::shared_mutex rwlock_;
mutable std::shared_mutex rwlock_;

// Rsync Rate limiting configuration
int throttle_bytes_per_second_ = 200 << 20; // 200MB/s
Expand Down
2 changes: 2 additions & 0 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ enum TaskType {
kCompactRangeSets,
kCompactRangeZSets,
kCompactRangeList,
kCompactOldSST,
};

struct TaskArg {
Expand Down Expand Up @@ -522,6 +523,7 @@ class PikaServer : public pstd::noncopyable {
*/
void DoTimingTask();
void AutoCompactRange();
pstd::Status AutoCompactOldSST();
void AutoBinlogPurge();
void AutoServerlogPurge();
void AutoDeleteExpiredDump();
Expand Down
27 changes: 27 additions & 0 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2005,6 +2005,16 @@ void ConfigCmd::ConfigGet(std::string& ret) {
EncodeString(&config_body, "disable_auto_compactions");
EncodeString(&config_body, g_pika_conf->disable_auto_compactions() ? "true" : "false");
}
if (pstd::stringmatch(pattern.data(), "enable_auto_compact_old_sst", 1) != 0) {
elements += 2;
EncodeString(&config_body, "enable_auto_compact_old_sst");
EncodeString(&config_body, g_pika_conf->enable_auto_compact_old_sst() ? "yes" : "no");
}
if (pstd::stringmatch(pattern.data(), "auto-compact-old-sst-interval", 1) != 0) {
elements += 2;
EncodeString(&config_body, "auto-compact-old-sst-interval");
EncodeString(&config_body, std::to_string(g_pika_conf->auto_compact_old_sst_interval()));
}
if (pstd::stringmatch(pattern.data(), "network-interface", 1) != 0) {
elements += 2;
EncodeString(&config_body, "network-interface");
Expand Down Expand Up @@ -2326,6 +2336,8 @@ void ConfigCmd::ConfigSet(std::shared_ptr<DB> db) {
"compact-cron",
"compact-interval",
"disable_auto_compactions",
"enable_auto_compact_old_sst",
"auto-compact-old-sst-interval",
"slave-priority",
"sync-window-size",
"slow-cmd-list",
Expand Down Expand Up @@ -2539,6 +2551,21 @@ void ConfigCmd::ConfigSet(std::shared_ptr<DB> db) {
}
g_pika_conf->SetDisableAutoCompaction(value);
res_.AppendStringRaw("+OK\r\n");
} else if (set_item == "enable_auto_compact_old_sst") {
if (value != "yes" && value != "no") {
res_.AppendStringRaw("-ERR invalid enable_auto_compact_old_sst (yes or no)\r\n");
return;
}
g_pika_conf->SetEnableAutoCompactOldSst(value == "yes");
res_.AppendStringRaw("+OK\r\n");
} else if (set_item == "auto-compact-old-sst-interval") {
int64_t interval = 0;
if (pstd::string2int(value.data(), value.size(), &interval) == 0 || interval < 60) {
res_.AppendStringRaw("-ERR invalid auto-compact-old-sst-interval (must be >= 60 seconds)\r\n");
return;
}
g_pika_conf->SetAutoCompactOldSstInterval(interval);
res_.AppendStringRaw("+OK\r\n");
} else if (set_item == "rate-limiter-bandwidth") {
int64_t new_bandwidth = 0;
if (pstd::string2int(value.data(), value.size(), &new_bandwidth) == 0 || new_bandwidth <= 0) {
Expand Down
32 changes: 32 additions & 0 deletions src/pika_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,20 @@ int PikaConf::Load() {
// disable_auto_compactions
GetConfBool("disable_auto_compactions", &disable_auto_compactions_);

// Progressive compaction configuration
std::string enable_auto_compact_old_sst_str;
GetConfStr("enable_auto_compact_old_sst", &enable_auto_compact_old_sst_str);
if (enable_auto_compact_old_sst_str == "yes") {
enable_auto_compact_old_sst_ = true;
} else {
enable_auto_compact_old_sst_ = false;
}

GetConfInt64("auto-compact-old-sst-interval", &auto_compact_old_sst_interval_);
if (auto_compact_old_sst_interval_ < 60) { // Minimum 60 seconds
auto_compact_old_sst_interval_ = 90;
}

small_compaction_threshold_ = 5000;
GetConfInt("small-compaction-threshold", &small_compaction_threshold_);
if (small_compaction_threshold_ < 0) {
Expand All @@ -430,6 +444,22 @@ int PikaConf::Load() {
} else if (small_compaction_duration_threshold_ >= 1000000) {
small_compaction_duration_threshold_ = 1000000;
}

// Progressive compact configuration
enable_auto_compact_old_sst_ = false;
std::string enable_pc;
GetConfStr("enable-progressive-compact", &enable_pc);
if (enable_pc == "yes") {
enable_auto_compact_old_sst_ = true;
}

auto_compact_old_sst_interval_ = 90;
int interval;
GetConfInt("progressive-compact-interval", &interval);
auto_compact_old_sst_interval_ = interval;
if (auto_compact_old_sst_interval_ < 1) {
auto_compact_old_sst_interval_ = 90;
}

GetConfInt("max-background-flushes", &max_background_flushes_);
if (max_background_flushes_ <= 0 && max_background_flushes_ != -1) {
Expand Down Expand Up @@ -773,6 +803,8 @@ int PikaConf::ConfigRewrite() {
SetConfStr("compact-cron", compact_cron_);
SetConfStr("compact-interval", compact_interval_);
SetConfStr("disable_auto_compactions", disable_auto_compactions_ ? "true" : "false");
SetConfStr("enable-progressive-compact", enable_auto_compact_old_sst_ ? "yes" : "no");
SetConfInt64("progressive-compact-interval", auto_compact_old_sst_interval_);
SetConfStr("cache-type", scachetype);
SetConfInt64("least-free-disk-resume-size", least_free_disk_to_resume_);
SetConfInt64("manually-resume-interval", resume_check_interval_);
Expand Down
25 changes: 25 additions & 0 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,9 @@ Status PikaServer::DoSameThingSpecificDB(const std::set<std::string>& dbs, const
case TaskType::kCompactRangeList:
db_item.second->CompactRange(storage::DataType::kLists, arg.argv[0], arg.argv[1]);
break;
case TaskType::kCompactOldSST:
db_item.second->Compact(storage::DataType::kSST);
break;
default:
break;
}
Expand Down Expand Up @@ -1126,6 +1129,7 @@ int PikaServer::ClientPubSubChannelPatternSize(const std::shared_ptr<NetConn>& c
void PikaServer::DoTimingTask() {
// Maybe schedule compactrange
AutoCompactRange();
AutoCompactOldSST();
// Purge serverlog
AutoServerlogPurge();
// Purge binlog
Expand All @@ -1146,6 +1150,27 @@ void PikaServer::DoTimingTask() {
StatDiskUsage();
}

Status PikaServer::AutoCompactOldSST() {

if (!g_pika_conf->enable_auto_compact_old_sst()) {
return Status::OK();
}

static uint64_t last_compact_old_time = 0;
auto current_time = pstd::NowMicros();

if (current_time - last_compact_old_time < g_pika_conf->auto_compact_old_sst_interval() * 1000 * 1000) {
return Status::OK();
}

last_compact_old_time = current_time;

for (const auto& db_item : dbs_) {
db_item.second->Compact(storage::kSST);
}
return Status::OK();
}

void PikaServer::StatDiskUsage() {
thread_local uint64_t last_update_time = 0;
auto current_time = pstd::NowMicros();
Expand Down
10 changes: 7 additions & 3 deletions src/storage/include/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,9 @@ struct ScoreMember {

enum BeforeOrAfter { Before, After };

enum DataType { kAll, kStrings, kHashes, kLists, kZSets, kSets, kStreams };
enum DataType { kAll, kStrings, kHashes, kLists, kZSets, kSets, kStreams, kSST };
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个写法看着有点特别,是为了让渐进式支持动态开关么?


const char DataTypeTag[] = {'a', 'k', 'h', 'l', 'z', 's', 'x'};
const char DataTypeTag[] = {'a', 'k', 'h', 'l', 'z', 's', 'x', 't'};

enum class OptionType {
kDB,
Expand All @@ -150,7 +150,9 @@ enum Operation {
kCleanSets,
kCleanLists,
kCleanStreams,
kCompactRange
kCompactRange,
kCompactSST,
kCompactOldSST
};

struct BGTask {
Expand Down Expand Up @@ -1075,6 +1077,8 @@ class Storage {

Status Compact(const DataType& type, bool sync = false);
Status CompactRange(const DataType& type, const std::string& start, const std::string& end, bool sync = false);
Status CompactOldFiles(const DataType& type, bool sync = false);
Status DoCompactOldFiles(const DataType& type);
Status DoCompact(const DataType& type);
Status DoCompactRange(const DataType& type, const std::string& start, const std::string& end);

Expand Down
65 changes: 64 additions & 1 deletion src/storage/src/redis.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

#include "src/redis.h"
#include <sstream>
#include "rocksdb/db.h"
#include "rocksdb/compaction_job_stats.h"
#include "glog/logging.h" // 确保LOG宏可用

namespace storage {

Expand Down Expand Up @@ -196,7 +199,67 @@ void Redis::SetCompactRangeOptions(const bool is_canceled) {
default_compact_range_options_.canceled = new std::atomic<bool>(is_canceled);
} else {
default_compact_range_options_.canceled->store(is_canceled);
}
}
}

uint64_t Redis::TableFileNameToNumber(const std::string& name) {
uint64_t number = 0;
uint64_t base = 1;
int pos = static_cast<int>(name.find_last_of('.'));
while (--pos >= 0 && name[pos] >= '0' && name[pos] <= '9') {
number += (name[pos] - '0') * base;
base *= 10;
}
return number;
}

Status Redis::CompactOldFiles() {
Status s;
std::vector<rocksdb::ColumnFamilyHandle*> compact_handles_;
for (auto handle : handles_) {
compact_handles_.push_back(handle);
}
if (handles_.empty()) {
compact_handles_.push_back(db_->DefaultColumnFamily());
}
for (auto handle : compact_handles_) {
int rate = 0;
while (rate < 70) {
rate = 100;
rocksdb::ColumnFamilyMetaData meta;
rocksdb::CompactionOptions compact_options_;
std::vector<std::string> input_file_names;
db_->GetColumnFamilyMetaData(handle, &meta);
uint64_t min_number = -1;
int min_level = 0;
std::string min_file = "999999.sst";
std::string file_path;
for (auto level_meta : meta.levels) {
for (auto file_meta : level_meta.files) {
file_path = file_meta.db_path;
uint64_t number = 0;
number = TableFileNameToNumber(file_meta.name);
if (number < min_number) {
min_file = file_meta.name;
min_level = level_meta.level;
min_number = number;
}
}
}

if (min_level < 6) min_level++;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

L0层的文件是不是交给后台自动处理比较好? L6文件的处理IO我觉得相对也比较高,我这边的策略是不处理,交给低峰期的全量compact(正在测试中)


if (min_file != "999999.sst") {
input_file_names.push_back(min_file);
rocksdb::CompactionJobInfo job_info;
db_->CompactFiles(compact_options_, handle, input_file_names, min_level, -1, nullptr, &job_info);
LOG(WARNING) << "CompactFiles: " << handle->GetName()<< " : " << file_path << " : " << min_file << " : " << min_level << ", job_stats : " << job_info.stats.num_input_records << ": "<< job_info.stats.num_output_records;
if (job_info.stats.num_input_records == 0) { break; }
rate = job_info.stats.num_output_records * 100 / job_info.stats.num_input_records;
}
}
}
return s;
}

} // namespace storage
3 changes: 3 additions & 0 deletions src/storage/src/redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ class Redis {
Status SetSmallCompactionDurationThreshold(uint64_t small_compaction_duration_threshold);
std::vector<rocksdb::ColumnFamilyHandle*> GetHandles(){ return handles_;};
void GetRocksDBInfo(std::string &info, const char *prefix);
Status CompactOldFiles();

static uint64_t TableFileNameToNumber(const std::string& name);

protected:
Storage* const storage_;
Expand Down
Loading
Loading