From 2ac82cb330030565488b0c1daf2f744d6fdbe912 Mon Sep 17 00:00:00 2001 From: chejinge Date: Thu, 27 Nov 2025 15:24:12 +0800 Subject: [PATCH 01/10] feat:Automated fast/slow command separation, help me --- conf/pika.conf | 13 +++++++ include/pika_conf.h | 17 +++++++++ src/pika_conf.cc | 14 +++++++ src/pika_server.cc | 61 ++++++++++++++++++++++++++++--- tools/pika_migrate/conf/pika.conf | 3 +- 5 files changed, 102 insertions(+), 6 deletions(-) diff --git a/conf/pika.conf b/conf/pika.conf index 5317fcf452..7d512b08ab 100644 --- a/conf/pika.conf +++ b/conf/pika.conf @@ -33,6 +33,19 @@ thread-pool-size : 12 # When set to no, they are not separated. slow-cmd-pool : no +# Enable thread pool borrowing mechanism, which allows threads to help each other when one pool is busy +# and another is idle. This can improve resource utilization and reduce command latency under uneven load. +# [yes | no] - The default value is yes. +threadpool-borrow-enable : yes + +# The queue length threshold (as a percentage of max queue size) at which a thread pool is considered busy +# and may borrow threads from another pool. The value range is [1-99], default is 80. +threadpool-borrow-threshold-percent : 80 + +# The queue length threshold (as a percentage of max queue size) at which a thread pool is considered idle +# and may lend threads to another pool. The value range is [1-99], default is 20. +threadpool-idle-threshold-percent : 20 + # Size of the low level thread pool, The threads within this pool # are dedicated to handling slow user requests. slow-cmd-thread-pool-size : 1 diff --git a/include/pika_conf.h b/include/pika_conf.h index 80d5abe8f0..b13b15fab5 100644 --- a/include/pika_conf.h +++ b/include/pika_conf.h @@ -168,6 +168,18 @@ class PikaConf : public pstd::BaseConf { std::shared_lock l(rwlock_); return slow_cmd_pool_; } + bool threadpool_borrow_enable() { + std::shared_lock l(rwlock_); + return threadpool_borrow_enable_; + } + int threadpool_borrow_threshold_percent() { + std::shared_lock l(rwlock_); + return threadpool_borrow_threshold_percent_; + } + int threadpool_idle_threshold_percent() { + std::shared_lock l(rwlock_); + return threadpool_idle_threshold_percent_; + } std::string server_id() { std::shared_lock l(rwlock_); return server_id_; @@ -942,6 +954,11 @@ class PikaConf : public pstd::BaseConf { std::string bgsave_prefix_; std::string pidfile_; std::atomic slow_cmd_pool_; + + // Thread pool task borrowing configuration + bool threadpool_borrow_enable_ = true; + int threadpool_borrow_threshold_percent_ = 80; + int threadpool_idle_threshold_percent_ = 20; std::string compression_; std::string compression_per_level_; diff --git a/src/pika_conf.cc b/src/pika_conf.cc index 80116aa847..f336e3e1f3 100644 --- a/src/pika_conf.cc +++ b/src/pika_conf.cc @@ -76,6 +76,20 @@ int PikaConf::Load() { GetConfStr("slow-cmd-pool", &slowcmdpool); slow_cmd_pool_.store(slowcmdpool == "yes" ? true : false); + std::string threadpool_borrow; + GetConfStr("threadpool-borrow-enable", &threadpool_borrow); + threadpool_borrow_enable_ = threadpool_borrow == "yes" ? true : false; + + GetConfInt("threadpool-borrow-threshold-percent", &threadpool_borrow_threshold_percent_); + if (threadpool_borrow_threshold_percent_ <= 0 || threadpool_borrow_threshold_percent_ >= 100) { + threadpool_borrow_threshold_percent_ = 80; + } + + GetConfInt("threadpool-idle-threshold-percent", &threadpool_idle_threshold_percent_); + if (threadpool_idle_threshold_percent_ <= 0 || threadpool_idle_threshold_percent_ >= 100) { + threadpool_idle_threshold_percent_ = 20; + } + int binlog_writer_num = 1; GetConfInt("binlog-writer-num", &binlog_writer_num); if (binlog_writer_num <= 0 || binlog_writer_num > 24) { diff --git a/src/pika_server.cc b/src/pika_server.cc index bbf444191d..ef794b532a 100644 --- a/src/pika_server.cc +++ b/src/pika_server.cc @@ -770,15 +770,66 @@ void PikaServer::SetFirstMetaSync(bool v) { } void PikaServer::ScheduleClientPool(net::TaskFunc func, void* arg, bool is_slow_cmd, bool is_admin_cmd) { - if (is_slow_cmd && g_pika_conf->slow_cmd_pool()) { - pika_slow_cmd_thread_pool_->Schedule(func, arg); - return; - } + // Admin commands always go to admin thread pool if (is_admin_cmd) { pika_admin_cmd_thread_pool_->Schedule(func, arg); return; } - pika_client_processor_->SchedulePool(func, arg); + + // Check if thread pool borrowing is enabled + bool borrow_enable = g_pika_conf->threadpool_borrow_enable(); + bool slow_cmd_pool_enable = g_pika_conf->slow_cmd_pool(); + + if (!borrow_enable || !slow_cmd_pool_enable) { + // Original logic: direct routing without borrowing + if (is_slow_cmd && slow_cmd_pool_enable) { + pika_slow_cmd_thread_pool_->Schedule(func, arg); + return; + } + pika_client_processor_->SchedulePool(func, arg); + return; + } + + // Borrowing is enabled, check queue status and decide + size_t slow_queue_size = SlowCmdThreadPoolCurQueueSize(); + size_t slow_max_queue = SlowCmdThreadPoolMaxQueueSize(); + size_t fast_queue_size = ClientProcessorThreadPoolCurQueueSize(); + size_t fast_max_queue = ClientProcessorThreadPoolMaxQueueSize(); + + int borrow_threshold_percent = g_pika_conf->threadpool_borrow_threshold_percent(); + int idle_threshold_percent = g_pika_conf->threadpool_idle_threshold_percent(); + + // Calculate thresholds + size_t slow_borrow_threshold = slow_max_queue * borrow_threshold_percent / 100; + size_t fast_borrow_threshold = fast_max_queue * borrow_threshold_percent / 100; + size_t slow_idle_threshold = slow_max_queue * idle_threshold_percent / 100; + size_t fast_idle_threshold = fast_max_queue * idle_threshold_percent / 100; + + if (is_slow_cmd) { + // This is a slow command + if (slow_queue_size >= slow_borrow_threshold && fast_queue_size <= fast_idle_threshold) { + // Slow pool is busy and fast pool is idle, borrow from fast pool + pika_client_processor_->SchedulePool(func, arg); + LOG(INFO) << "Slow cmd borrows fast pool (slow_queue: " << slow_queue_size + << "/" << slow_max_queue << ", fast_queue: " << fast_queue_size + << "/" << fast_max_queue << ")"; + } else { + // Normal case: use slow pool + pika_slow_cmd_thread_pool_->Schedule(func, arg); + } + } else { + // This is a fast command + if (fast_queue_size >= fast_borrow_threshold && slow_queue_size <= slow_idle_threshold) { + // Fast pool is busy and slow pool is idle, borrow from slow pool + pika_slow_cmd_thread_pool_->Schedule(func, arg); + LOG(INFO) << "Fast cmd borrows slow pool (fast_queue: " << fast_queue_size + << "/" << fast_max_queue << ", slow_queue: " << slow_queue_size + << "/" << slow_max_queue << ")"; + } else { + // Normal case: use fast pool + pika_client_processor_->SchedulePool(func, arg); + } + } } size_t PikaServer::ClientProcessorThreadPoolCurQueueSize() { diff --git a/tools/pika_migrate/conf/pika.conf b/tools/pika_migrate/conf/pika.conf index 7ca990005a..a680e7aebe 100644 --- a/tools/pika_migrate/conf/pika.conf +++ b/tools/pika_migrate/conf/pika.conf @@ -132,7 +132,7 @@ masterauth : # The [password of user], which is empty by default. # [NOTICE] If this user password is the same as admin password (including both being empty), # the value of this parameter will be ignored and all users are considered as administrators, -# in this scenario, users are not subject to the restrictions imposed by the userblacklist. +# in this scenario, users are not subject to the restrictions imposed by the userb lacklist. # PS: "admin password" refers to value of the parameter above: requirepass. # userpass : @@ -147,6 +147,7 @@ masterauth : # If set to 'classic', Pika will create multiple DBs whose number is the value of configure item "databases". instance-mode : classic + # The number of databases when Pika runs in classic mode. # The default database id is DB 0. You can select a different one on # a per-connection by using SELECT. The db id range is [0, 'databases' value -1]. From c3797a77fed3cfd8ca2e26922586f6a5565ffafe Mon Sep 17 00:00:00 2001 From: chejinge Date: Thu, 27 Nov 2025 16:35:12 +0800 Subject: [PATCH 02/10] fix_fast --- include/pika_admin.h | 1 + include/pika_server.h | 7 ++ src/pika_admin.cc | 8 ++ src/pika_server.cc | 176 +++++++++++++++++++++++++++++++++++++++--- 4 files changed, 182 insertions(+), 10 deletions(-) diff --git a/include/pika_admin.h b/include/pika_admin.h index de0ddd5a0f..e7b39e2d74 100644 --- a/include/pika_admin.h +++ b/include/pika_admin.h @@ -295,6 +295,7 @@ class InfoCmd : public Cmd { const static std::string kDebugSection; const static std::string kCommandStatsSection; const static std::string kCacheSection; + const static std::string kThreadpoolSection; void DoInitial() override; void Clear() override { diff --git a/include/pika_server.h b/include/pika_server.h index 81cda87b04..2e5c0b76b5 100644 --- a/include/pika_server.h +++ b/include/pika_server.h @@ -194,6 +194,13 @@ class PikaServer : public pstd::noncopyable { size_t ClientProcessorThreadPoolMaxQueueSize(); size_t SlowCmdThreadPoolCurQueueSize(); size_t SlowCmdThreadPoolMaxQueueSize(); + + /* + * Thread pool dynamic resize + */ + bool ResizeFastCmdThreadPool(size_t new_size); + bool ResizeSlowCmdThreadPool(size_t new_size); + void GetThreadPoolInfo(std::string* info); /* * BGSave used diff --git a/src/pika_admin.cc b/src/pika_admin.cc index 81d47bc43f..3201a5b3a4 100644 --- a/src/pika_admin.cc +++ b/src/pika_admin.cc @@ -884,6 +884,7 @@ const std::string InfoCmd::kRocksDBSection = "rocksdb"; const std::string InfoCmd::kDebugSection = "debug"; const std::string InfoCmd::kCommandStatsSection = "commandstats"; const std::string InfoCmd::kCacheSection = "cache"; +const std::string InfoCmd::kThreadpoolSection = "threadpool"; const std::string ClientCmd::KILLTYPE_NORMAL = "normal"; @@ -969,6 +970,8 @@ void InfoCmd::DoInitial() { info_section_ = kInfoCommandStats; } else if (strcasecmp(argv_[1].data(), kCacheSection.data()) == 0) { info_section_ = kInfoCache; + } else if (strcasecmp(argv_[1].data(), kThreadpoolSection.data()) == 0) { + info_section_ = kInfoThreadpool; } else { info_section_ = kInfoErr; } @@ -1010,6 +1013,8 @@ void InfoCmd::Do() { info.append("\r\n"); InfoCache(info, db_); info.append("\r\n"); + g_pika_server->GetThreadPoolInfo(&info); + info.append("\r\n"); InfoCPU(info); info.append("\r\n"); InfoReplication(info); @@ -1054,6 +1059,9 @@ void InfoCmd::Do() { case kInfoCache: InfoCache(info, db_); break; + case kInfoThreadpool: + g_pika_server->GetThreadPoolInfo(&info); + break; default: // kInfoErr is nothing break; diff --git a/src/pika_server.cc b/src/pika_server.cc index ef794b532a..7d88cd91c5 100644 --- a/src/pika_server.cc +++ b/src/pika_server.cc @@ -790,6 +790,34 @@ void PikaServer::ScheduleClientPool(net::TaskFunc func, void* arg, bool is_slow_ return; } + // Extract key from command to determine key affinity + // This ensures commands on the same key always go to the same thread pool + auto bg_arg = static_cast(arg); + bool key_affinity_to_slow = false; + bool has_key = false; + + if (bg_arg && !bg_arg->redis_cmds.empty() && !bg_arg->redis_cmds[0].empty()) { + const auto& cmd_args = bg_arg->redis_cmds[0]; + std::string cmd_name = cmd_args[0]; + pstd::StringToLower(cmd_name); + + // Extract key based on command type + // Most commands have key as the first argument after command name + std::string key; + if (cmd_args.size() > 1) { + // For most commands, key is argv[1] + // For special commands like MGET, MSET, we use the first key + key = cmd_args[1]; + has_key = true; + + // Use simple hash to determine key affinity + // Same key will always have same affinity (fast or slow pool) + std::hash hasher; + size_t hash_value = hasher(key); + key_affinity_to_slow = (hash_value % 2 == 1); + } + } + // Borrowing is enabled, check queue status and decide size_t slow_queue_size = SlowCmdThreadPoolCurQueueSize(); size_t slow_max_queue = SlowCmdThreadPoolMaxQueueSize(); @@ -805,26 +833,45 @@ void PikaServer::ScheduleClientPool(net::TaskFunc func, void* arg, bool is_slow_ size_t slow_idle_threshold = slow_max_queue * idle_threshold_percent / 100; size_t fast_idle_threshold = fast_max_queue * idle_threshold_percent / 100; + // Decision logic considering both command type and key affinity if (is_slow_cmd) { // This is a slow command - if (slow_queue_size >= slow_borrow_threshold && fast_queue_size <= fast_idle_threshold) { - // Slow pool is busy and fast pool is idle, borrow from fast pool + if (has_key && !key_affinity_to_slow) { + // Key affinity is to fast pool, must use fast pool to maintain order pika_client_processor_->SchedulePool(func, arg); - LOG(INFO) << "Slow cmd borrows fast pool (slow_queue: " << slow_queue_size - << "/" << slow_max_queue << ", fast_queue: " << fast_queue_size - << "/" << fast_max_queue << ")"; + } else if (slow_queue_size >= slow_borrow_threshold && fast_queue_size <= fast_idle_threshold) { + // Slow pool is busy and fast pool is idle + if (has_key && key_affinity_to_slow) { + // Key belongs to slow pool, cannot borrow + pika_slow_cmd_thread_pool_->Schedule(func, arg); + } else { + // Can borrow from fast pool + pika_client_processor_->SchedulePool(func, arg); + LOG(INFO) << "Slow cmd borrows fast pool (slow_queue: " << slow_queue_size + << "/" << slow_max_queue << ", fast_queue: " << fast_queue_size + << "/" << fast_max_queue << ")"; + } } else { // Normal case: use slow pool pika_slow_cmd_thread_pool_->Schedule(func, arg); } } else { // This is a fast command - if (fast_queue_size >= fast_borrow_threshold && slow_queue_size <= slow_idle_threshold) { - // Fast pool is busy and slow pool is idle, borrow from slow pool + if (has_key && key_affinity_to_slow) { + // Key affinity is to slow pool, must use slow pool to maintain order pika_slow_cmd_thread_pool_->Schedule(func, arg); - LOG(INFO) << "Fast cmd borrows slow pool (fast_queue: " << fast_queue_size - << "/" << fast_max_queue << ", slow_queue: " << slow_queue_size - << "/" << slow_max_queue << ")"; + } else if (fast_queue_size >= fast_borrow_threshold && slow_queue_size <= slow_idle_threshold) { + // Fast pool is busy and slow pool is idle + if (has_key && !key_affinity_to_slow) { + // Key belongs to fast pool, cannot borrow + pika_client_processor_->SchedulePool(func, arg); + } else { + // Can borrow from slow pool + pika_slow_cmd_thread_pool_->Schedule(func, arg); + LOG(INFO) << "Fast cmd borrows slow pool (fast_queue: " << fast_queue_size + << "/" << fast_max_queue << ", slow_queue: " << slow_queue_size + << "/" << slow_max_queue << ")"; + } } else { // Normal case: use fast pool pika_client_processor_->SchedulePool(func, arg); @@ -862,6 +909,115 @@ size_t PikaServer::SlowCmdThreadPoolMaxQueueSize() { return pika_slow_cmd_thread_pool_->max_queue_size(); } +bool PikaServer::ResizeFastCmdThreadPool(size_t new_size) { + if (new_size == 0 || new_size > 1024) { + LOG(WARNING) << "Invalid fast cmd thread pool size: " << new_size << ", must be between 1 and 1024"; + return false; + } + + size_t old_size = g_pika_conf->thread_pool_size(); + if (new_size == old_size) { + LOG(INFO) << "Fast cmd thread pool size unchanged: " << new_size; + return true; + } + + LOG(INFO) << "Resizing fast cmd thread pool from " << old_size << " to " << new_size; + + // Update config + g_pika_conf->SetThreadPoolSize(static_cast(new_size)); + + // Stop old thread pool gracefully + LOG(INFO) << "Waiting for old fast cmd thread pool tasks to complete..."; + pika_client_processor_->Stop(); + + // Create and start new thread pool + pika_client_processor_ = std::make_unique(new_size, 100000); + int ret = pika_client_processor_->Start(); + if (ret != net::kSuccess) { + LOG(ERROR) << "Failed to start new fast cmd thread pool: " << ret; + return false; + } + + LOG(INFO) << "Successfully resized fast cmd thread pool to " << new_size; + return true; +} + +bool PikaServer::ResizeSlowCmdThreadPool(size_t new_size) { + if (new_size == 0 || new_size > 1024) { + LOG(WARNING) << "Invalid slow cmd thread pool size: " << new_size << ", must be between 1 and 1024"; + return false; + } + + size_t old_size = g_pika_conf->slow_cmd_thread_pool_size(); + if (new_size == old_size) { + LOG(INFO) << "Slow cmd thread pool size unchanged: " << new_size; + return true; + } + + LOG(INFO) << "Resizing slow cmd thread pool from " << old_size << " to " << new_size; + + // Update config + g_pika_conf->SetLowLevelThreadPoolSize(static_cast(new_size)); + + // Stop old thread pool gracefully + LOG(INFO) << "Waiting for old slow cmd thread pool tasks to complete..."; + while (SlowCmdThreadPoolCurQueueSize() != 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + pika_slow_cmd_thread_pool_->stop_thread_pool(); + + // Create and start new thread pool + pika_slow_cmd_thread_pool_ = std::make_unique(new_size, 100000); + int ret = pika_slow_cmd_thread_pool_->start_thread_pool(); + if (ret != net::kSuccess) { + LOG(ERROR) << "Failed to start new slow cmd thread pool: " << ret; + return false; + } + + LOG(INFO) << "Successfully resized slow cmd thread pool to " << new_size; + return true; +} + +void PikaServer::GetThreadPoolInfo(std::string* info) { + std::stringstream tmp_stream; + + // Fast cmd thread pool info + size_t fast_pool_size = g_pika_conf->thread_pool_size(); + size_t fast_queue_size = ClientProcessorThreadPoolCurQueueSize(); + size_t fast_max_queue = ClientProcessorThreadPoolMaxQueueSize(); + double fast_usage = fast_max_queue > 0 ? (fast_queue_size * 100.0 / fast_max_queue) : 0.0; + + tmp_stream << "# Threadpool\r\n"; + tmp_stream << "fast_cmd_pool_size:" << fast_pool_size << "\r\n"; + tmp_stream << "fast_cmd_pool_queue_size:" << fast_queue_size << "\r\n"; + tmp_stream << "fast_cmd_pool_max_queue_size:" << fast_max_queue << "\r\n"; + tmp_stream << "fast_cmd_pool_usage:" << std::fixed << std::setprecision(2) << fast_usage << "%\r\n"; + + // Slow cmd thread pool info + if (g_pika_conf->slow_cmd_pool()) { + size_t slow_pool_size = g_pika_conf->slow_cmd_thread_pool_size(); + size_t slow_queue_size = SlowCmdThreadPoolCurQueueSize(); + size_t slow_max_queue = SlowCmdThreadPoolMaxQueueSize(); + double slow_usage = slow_max_queue > 0 ? (slow_queue_size * 100.0 / slow_max_queue) : 0.0; + + tmp_stream << "slow_cmd_pool_size:" << slow_pool_size << "\r\n"; + tmp_stream << "slow_cmd_pool_queue_size:" << slow_queue_size << "\r\n"; + tmp_stream << "slow_cmd_pool_max_queue_size:" << slow_max_queue << "\r\n"; + tmp_stream << "slow_cmd_pool_usage:" << std::fixed << std::setprecision(2) << slow_usage << "%\r\n"; + } else { + tmp_stream << "slow_cmd_pool_size:0\r\n"; + tmp_stream << "slow_cmd_pool_queue_size:0\r\n"; + tmp_stream << "slow_cmd_pool_max_queue_size:0\r\n"; + tmp_stream << "slow_cmd_pool_usage:0.00%\r\n"; + } + + // Admin cmd thread pool info + size_t admin_pool_size = g_pika_conf->admin_thread_pool_size(); + tmp_stream << "admin_cmd_pool_size:" << admin_pool_size << "\r\n"; + + info->append(tmp_stream.str()); +} + void PikaServer::BGSaveTaskSchedule(net::TaskFunc func, void* arg) { bgsave_thread_.StartThread(); bgsave_thread_.Schedule(func, arg); From c8a029ad280949f5e222522622f64cce1d662eb4 Mon Sep 17 00:00:00 2001 From: chejinge Date: Sun, 30 Nov 2025 17:56:39 +0800 Subject: [PATCH 03/10] fix fast --- include/pika_server.h | 138 ++++++++++++ src/pika_server.cc | 497 ++++++++++++++++++++++++++++++++++++++---- 2 files changed, 596 insertions(+), 39 deletions(-) diff --git a/include/pika_server.h b/include/pika_server.h index 2e5c0b76b5..971ffc8b7c 100644 --- a/include/pika_server.h +++ b/include/pika_server.h @@ -15,6 +15,9 @@ #endif #include #include +#include +#include +#include #include "src/cache/include/config.h" #include "net/include/bg_thread.h" @@ -78,6 +81,108 @@ struct TaskArg { void DoBgslotscleanup(void* arg); void DoBgslotsreload(void* arg); +// 命令分类器:基于命令执行时间自动分类快慢命令 +class CommandClassifier { +public: + CommandClassifier(uint64_t slow_threshold_us = 50000, uint64_t fast_threshold_us = 10000, + size_t window_size = 1000); + + // 记录命令执行时间并动态调整分类 + void RecordExecutionTime(const std::string& cmd_name, uint64_t duration_us); + + // 检查命令是否为慢命令 + bool IsSlowCommand(const std::string& cmd_name) const; + + // 将命令标记为慢命令 + void MarkAsSlowCommand(const std::string& cmd_name); + + // 将命令标记为快命令 + void MarkAsFastCommand(const std::string& cmd_name); + + // 获取所有命令的平均执行时间 + std::unordered_map GetCommandAvgTimes() const; + +private: + struct CommandStats { + uint64_t total_time = 0; + uint64_t count = 0; + std::vector recent_times; // 最近N次执行时间 + size_t time_index = 0; // 环形缓冲区索引 + bool initialized = false; // 是否已经收集了足够的样本 + + void AddTime(uint64_t time, size_t window_size); + double GetAverageTime() const; + double GetRecentAverageTime() const; + }; + + mutable std::mutex mutex_; + std::unordered_map cmd_stats_; + std::unordered_set slow_commands_; + uint64_t slow_threshold_us_; // 慢命令阈值(微秒) + uint64_t fast_threshold_us_; // 快命令阈值(微秒) + size_t window_size_; // 滑动窗口大小 +}; + +// 线程池监控指标 +struct ThreadPoolMetrics { + std::atomic tasks_scheduled{0}; + std::atomic tasks_completed{0}; + std::atomic queue_overflows{0}; + std::atomic borrow_attempts{0}; + std::atomic successful_borrows{0}; + + // 延迟分布统计(1ms, 5ms, 10ms, 50ms, 100ms, 500ms, 1s, 5s, >5s) + std::array, 9> latency_buckets{}; + + void RecordLatency(uint64_t latency_us); + std::string ExportMetrics(const std::string& pool_name) const; + void Reset(); +}; + +// 基于令牌桶的流量控制 +class RateLimiter { +public: + explicit RateLimiter(double rate_per_sec, double burst_size = 100.0); + + // 尝试获取令牌,成功返回true + bool TryAcquire(double tokens = 1.0); + + // 调整速率 + void SetRate(double rate_per_sec); + +private: + void Refill(); + + double rate_; // 每秒允许的令牌数 + double max_tokens_; // 令牌桶容量 + double tokens_; // 当前令牌数 + std::chrono::steady_clock::time_point last_update_; // 上次更新时间 + std::mutex mutex_; +}; + +// 一致性哈希实现,用于键亲和性 +class ConsistentHash { +public: + explicit ConsistentHash(int num_replicas = 100, int num_buckets = 1024); + + // 添加节点 + void AddNode(const std::string& node); + + // 移除节点 + void RemoveNode(const std::string& node); + + // 获取键对应的节点 + std::string GetNode(const std::string& key) const; + + // 计算键的哈希值 + size_t HashKey(const std::string& key) const; + +private: + int num_replicas_; + std::map ring_; + std::hash hasher_; +}; + class PikaServer : public pstd::noncopyable { public: PikaServer(); @@ -502,6 +607,30 @@ class PikaServer : public pstd::noncopyable { void ProcessCronTask(); double HitRatio(); void SetLogNetActivities(bool value); + + /* + * 改进的快慢命令分离相关方法 + */ + // 动态调整借用阈值 + void AdjustBorrowThresholds(); + + // 自适应命令分类 + bool IsDynamicSlowCommand(const std::string& cmd_name) const; + void RecordCommandExecutionTime(const std::string& cmd_name, uint64_t duration_us); + + // 设置流控 + void SetCommandRateLimit(double rate_per_sec); + bool CheckCommandRateLimit(); + + // 获取增强的监控指标 + std::string GetEnhancedThreadPoolMetrics() const; + + // 使用一致性哈希判断键亲和性 + bool IsKeyAffinityToSlow(const std::string& key) const; + + // 重置监控指标 + void ResetThreadPoolMetrics(); + /* * disable compact */ @@ -656,6 +785,15 @@ class PikaServer : public pstd::noncopyable { * lastsave used */ int64_t lastsave_ = 0; + + /* + * 改进的快慢命令分离相关成员 + */ + std::unique_ptr cmd_classifier_; + std::unique_ptr fast_pool_metrics_; + std::unique_ptr slow_pool_metrics_; + std::unique_ptr cmd_rate_limiter_; + std::unique_ptr key_hash_; /* * acl diff --git a/src/pika_server.cc b/src/pika_server.cc index 7d88cd91c5..ccb2e3c4c0 100644 --- a/src/pika_server.cc +++ b/src/pika_server.cc @@ -33,6 +33,248 @@ extern std::unique_ptr g_network_statistic; // QUEUE_SIZE_THRESHOLD_PERCENTAGE is used to represent a percentage value and should be within the range of 0 to 100. const size_t QUEUE_SIZE_THRESHOLD_PERCENTAGE = 75; +// CommandClassifier 实现 +void CommandClassifier::CommandStats::AddTime(uint64_t time, size_t window_size) { + total_time += time; + count++; + + if (recent_times.size() < window_size) { + recent_times.push_back(time); + } else { + if (!initialized) { + initialized = true; + } + // 替换最旧的记录 + total_time -= recent_times[time_index]; + recent_times[time_index] = time; + time_index = (time_index + 1) % window_size; + } +} + +double CommandClassifier::CommandStats::GetAverageTime() const { + return count > 0 ? static_cast(total_time) / count : 0.0; +} + +double CommandClassifier::CommandStats::GetRecentAverageTime() const { + if (!initialized && recent_times.empty()) { + return 0.0; + } + + size_t sample_count = initialized ? recent_times.size() : time_index; + if (sample_count == 0) { + return 0.0; + } + + uint64_t recent_total = 0; + for (size_t i = 0; i < sample_count; i++) { + recent_total += recent_times[i]; + } + + return static_cast(recent_total) / sample_count; +} + +CommandClassifier::CommandClassifier(uint64_t slow_threshold_us, uint64_t fast_threshold_us, size_t window_size) + : slow_threshold_us_(slow_threshold_us), + fast_threshold_us_(fast_threshold_us), + window_size_(window_size) {} + +void CommandClassifier::RecordExecutionTime(const std::string& cmd_name, uint64_t duration_us) { + std::lock_guard lock(mutex_); + + // 更新命令执行时间统计 + auto& stats = cmd_stats_[cmd_name]; + stats.AddTime(duration_us, window_size_); + + // 根据最近平均执行时间分类命令 + if (stats.initialized || stats.recent_times.size() >= window_size_ / 2) { + double recent_avg = stats.GetRecentAverageTime(); + + if (recent_avg > slow_threshold_us_) { + // 如果平均时间超过慢命令阈值,标记为慢命令 + if (slow_commands_.find(cmd_name) == slow_commands_.end()) { + LOG(INFO) << "Command '" << cmd_name << "' reclassified as SLOW (avg time: " + << recent_avg / 1000.0 << "ms)"; + slow_commands_.insert(cmd_name); + } + } else if (recent_avg < fast_threshold_us_ && stats.initialized) { + // 如果平均时间低于快命令阈值且有足够样本,可能标记为快命令 + if (slow_commands_.find(cmd_name) != slow_commands_.end()) { + LOG(INFO) << "Command '" << cmd_name << "' reclassified as FAST (avg time: " + << recent_avg / 1000.0 << "ms)"; + slow_commands_.erase(cmd_name); + } + } + } +} + +bool CommandClassifier::IsSlowCommand(const std::string& cmd_name) const { + std::lock_guard lock(mutex_); + return slow_commands_.find(cmd_name) != slow_commands_.end(); +} + +void CommandClassifier::MarkAsSlowCommand(const std::string& cmd_name) { + std::lock_guard lock(mutex_); + slow_commands_.insert(cmd_name); +} + +void CommandClassifier::MarkAsFastCommand(const std::string& cmd_name) { + std::lock_guard lock(mutex_); + slow_commands_.erase(cmd_name); +} + +std::unordered_map CommandClassifier::GetCommandAvgTimes() const { + std::lock_guard lock(mutex_); + std::unordered_map result; + + for (const auto& [cmd_name, stats] : cmd_stats_) { + result[cmd_name] = stats.GetAverageTime(); + } + + return result; +} + +// ThreadPoolMetrics 实现 +void ThreadPoolMetrics::RecordLatency(uint64_t latency_us) { + if (latency_us < 1000) { + latency_buckets[0]++; + } else if (latency_us < 5000) { + latency_buckets[1]++; + } else if (latency_us < 10000) { + latency_buckets[2]++; + } else if (latency_us < 50000) { + latency_buckets[3]++; + } else if (latency_us < 100000) { + latency_buckets[4]++; + } else if (latency_us < 500000) { + latency_buckets[5]++; + } else if (latency_us < 1000000) { + latency_buckets[6]++; + } else if (latency_us < 5000000) { + latency_buckets[7]++; + } else { + latency_buckets[8]++; + } +} + +std::string ThreadPoolMetrics::ExportMetrics(const std::string& pool_name) const { + std::stringstream ss; + + ss << "# TYPE pika_threadpool_tasks_scheduled counter\n" + << "pika_threadpool_tasks_scheduled{pool=\"" << pool_name << "\"} " << tasks_scheduled.load() << "\n" + << "# TYPE pika_threadpool_tasks_completed counter\n" + << "pika_threadpool_tasks_completed{pool=\"" << pool_name << "\"} " << tasks_completed.load() << "\n" + << "# TYPE pika_threadpool_queue_overflows counter\n" + << "pika_threadpool_queue_overflows{pool=\"" << pool_name << "\"} " << queue_overflows.load() << "\n" + << "# TYPE pika_threadpool_borrow_attempts counter\n" + << "pika_threadpool_borrow_attempts{pool=\"" << pool_name << "\"} " << borrow_attempts.load() << "\n" + << "# TYPE pika_threadpool_successful_borrows counter\n" + << "pika_threadpool_successful_borrows{pool=\"" << pool_name << "\"} " << successful_borrows.load() << "\n"; + + // 延迟分布 + ss << "# TYPE pika_threadpool_latency_buckets counter\n"; + const char* bucket_labels[] = { + "0_1ms", "1_5ms", "5_10ms", "10_50ms", "50_100ms", + "100_500ms", "500_1000ms", "1_5s", "over_5s" + }; + + for (size_t i = 0; i < latency_buckets.size(); i++) { + ss << "pika_threadpool_latency_bucket{pool=\"" << pool_name + << "\",bucket=\"" << bucket_labels[i] << "\"} " + << latency_buckets[i].load() << "\n"; + } + + return ss.str(); +} + +void ThreadPoolMetrics::Reset() { + tasks_scheduled.store(0); + tasks_completed.store(0); + queue_overflows.store(0); + borrow_attempts.store(0); + successful_borrows.store(0); + + for (auto& bucket : latency_buckets) { + bucket.store(0); + } +} + +// RateLimiter 实现 +RateLimiter::RateLimiter(double rate_per_sec, double burst_size) + : rate_(rate_per_sec), + max_tokens_(burst_size), + tokens_(burst_size), + last_update_(std::chrono::steady_clock::now()) {} + +void RateLimiter::Refill() { + auto now = std::chrono::steady_clock::now(); + double elapsed = std::chrono::duration(now - last_update_).count(); + double new_tokens = elapsed * rate_; + tokens_ = std::min(tokens_ + new_tokens, max_tokens_); + last_update_ = now; +} + +bool RateLimiter::TryAcquire(double tokens) { + std::lock_guard lock(mutex_); + Refill(); + + if (tokens_ >= tokens) { + tokens_ -= tokens; + return true; + } + return false; +} + +void RateLimiter::SetRate(double rate_per_sec) { + std::lock_guard lock(mutex_); + Refill(); // 先根据旧速率更新令牌 + rate_ = rate_per_sec; +} + +// ConsistentHash 实现 +ConsistentHash::ConsistentHash(int num_replicas, int num_buckets) + : num_replicas_(num_replicas) { + // 添加"fast"和"slow"两个节点 + AddNode("fast"); + AddNode("slow"); +} + +void ConsistentHash::AddNode(const std::string& node) { + for (int i = 0; i < num_replicas_; i++) { + std::string key = node + ":" + std::to_string(i); + size_t hash = hasher_(key); + ring_[hash] = node; + } +} + +void ConsistentHash::RemoveNode(const std::string& node) { + for (int i = 0; i < num_replicas_; i++) { + std::string key = node + ":" + std::to_string(i); + size_t hash = hasher_(key); + ring_.erase(hash); + } +} + +std::string ConsistentHash::GetNode(const std::string& key) const { + if (ring_.empty()) { + return "fast"; // 默认返回fast + } + + size_t hash = HashKey(key); + + // 找到第一个大于等于hash的节点 + auto it = ring_.lower_bound(hash); + if (it == ring_.end()) { + // 如果没有找到,则环绕到第一个节点 + return ring_.begin()->second; + } else { + return it->second; + } +} + +size_t ConsistentHash::HashKey(const std::string& key) const { + return hasher_(key); +} + void DoPurgeDir(void* arg) { std::unique_ptr path(static_cast(arg)); LOG(INFO) << "Delete dir: " << *path << " start"; @@ -47,7 +289,12 @@ PikaServer::PikaServer() last_check_compact_time_({0, 0}), last_check_resume_time_({0, 0}), repl_state_(PIKA_REPL_NO_CONNECT), - role_(PIKA_ROLE_SINGLE) { + role_(PIKA_ROLE_SINGLE), + cmd_classifier_(new CommandClassifier(50000, 10000, 1000)), + fast_pool_metrics_(new ThreadPoolMetrics()), + slow_pool_metrics_(new ThreadPoolMetrics()), + cmd_rate_limiter_(new RateLimiter(g_pika_conf->maxclients() * 2)), // 默认速率:最大客户端连接数的2倍 + key_hash_(new ConsistentHash(100, 1024)) { // Init server ip host if (!ServerInit()) { LOG(FATAL) << "ServerInit iotcl error"; @@ -770,55 +1017,81 @@ void PikaServer::SetFirstMetaSync(bool v) { } void PikaServer::ScheduleClientPool(net::TaskFunc func, void* arg, bool is_slow_cmd, bool is_admin_cmd) { - // Admin commands always go to admin thread pool - if (is_admin_cmd) { - pika_admin_cmd_thread_pool_->Schedule(func, arg); + // Apply rate limiting if enabled (only for non-admin commands) + if (!is_admin_cmd && !CheckCommandRateLimit()) { + // Rate limit exceeded, handle gracefully + auto bg_arg = static_cast(arg); + if (bg_arg) { + // Set error response for rate limiting + bg_arg->resp = std::make_shared("-ERR Rate limit exceeded, try again later\r\n"); + bg_arg->conn->WriteResp(*(bg_arg->resp)); + delete bg_arg; + } return; } - // Check if thread pool borrowing is enabled - bool borrow_enable = g_pika_conf->threadpool_borrow_enable(); - bool slow_cmd_pool_enable = g_pika_conf->slow_cmd_pool(); - - if (!borrow_enable || !slow_cmd_pool_enable) { - // Original logic: direct routing without borrowing - if (is_slow_cmd && slow_cmd_pool_enable) { - pika_slow_cmd_thread_pool_->Schedule(func, arg); - return; + // Admin commands always go to admin thread pool + if (is_admin_cmd) { + if (pika_admin_cmd_thread_pool_) { + pika_admin_cmd_thread_pool_->Schedule(func, arg); } - pika_client_processor_->SchedulePool(func, arg); return; } - - // Extract key from command to determine key affinity - // This ensures commands on the same key always go to the same thread pool + + // Extract command info for dynamic classification and key affinity auto bg_arg = static_cast(arg); - bool key_affinity_to_slow = false; + std::string cmd_name; + std::string key; bool has_key = false; + bool key_affinity_to_slow = false; if (bg_arg && !bg_arg->redis_cmds.empty() && !bg_arg->redis_cmds[0].empty()) { const auto& cmd_args = bg_arg->redis_cmds[0]; - std::string cmd_name = cmd_args[0]; - pstd::StringToLower(cmd_name); - - // Extract key based on command type - // Most commands have key as the first argument after command name - std::string key; - if (cmd_args.size() > 1) { - // For most commands, key is argv[1] - // For special commands like MGET, MSET, we use the first key - key = cmd_args[1]; - has_key = true; + if (!cmd_args.empty()) { + cmd_name = cmd_args[0]; + pstd::StringToLower(cmd_name); - // Use simple hash to determine key affinity - // Same key will always have same affinity (fast or slow pool) - std::hash hasher; - size_t hash_value = hasher(key); - key_affinity_to_slow = (hash_value % 2 == 1); + // Extract key for affinity routing + if (cmd_args.size() > 1) { + key = cmd_args[1]; + has_key = true; + // Use improved consistent hashing for key affinity + key_affinity_to_slow = IsKeyAffinityToSlow(key); + } + } + } + + // Check if command is dynamically classified as slow + // Override static classification if dynamic classification exists + if (!cmd_name.empty() && cmd_classifier_) { + bool dynamic_is_slow = IsDynamicSlowCommand(cmd_name); + if (dynamic_is_slow) { + is_slow_cmd = true; + } + } + + // Check if thread pool borrowing is enabled + bool borrow_enable = g_pika_conf->threadpool_borrow_enable(); + bool slow_cmd_pool_enable = g_pika_conf->slow_cmd_pool(); + + // Update metrics before scheduling + if (is_slow_cmd && slow_pool_metrics_) { + slow_pool_metrics_->tasks_scheduled++; + } else if (fast_pool_metrics_) { + fast_pool_metrics_->tasks_scheduled++; + } + + // If borrowing or slow pool is disabled, use simple routing + if (!borrow_enable || !slow_cmd_pool_enable) { + if (is_slow_cmd && slow_cmd_pool_enable) { + pika_slow_cmd_thread_pool_->Schedule(func, arg); + } else { + pika_client_processor_->SchedulePool(func, arg); } + return; } - // Borrowing is enabled, check queue status and decide + // Borrowing is enabled, check queue status size_t slow_queue_size = SlowCmdThreadPoolCurQueueSize(); size_t slow_max_queue = SlowCmdThreadPoolMaxQueueSize(); size_t fast_queue_size = ClientProcessorThreadPoolCurQueueSize(); @@ -833,9 +1106,9 @@ void PikaServer::ScheduleClientPool(net::TaskFunc func, void* arg, bool is_slow_ size_t slow_idle_threshold = slow_max_queue * idle_threshold_percent / 100; size_t fast_idle_threshold = fast_max_queue * idle_threshold_percent / 100; - // Decision logic considering both command type and key affinity + // Enhanced decision logic considering dynamic classification and improved key affinity if (is_slow_cmd) { - // This is a slow command + // This is a slow command (either statically or dynamically classified) if (has_key && !key_affinity_to_slow) { // Key affinity is to fast pool, must use fast pool to maintain order pika_client_processor_->SchedulePool(func, arg); @@ -846,8 +1119,12 @@ void PikaServer::ScheduleClientPool(net::TaskFunc func, void* arg, bool is_slow_ pika_slow_cmd_thread_pool_->Schedule(func, arg); } else { // Can borrow from fast pool + if (slow_pool_metrics_) { + slow_pool_metrics_->borrow_attempts++; + slow_pool_metrics_->successful_borrows++; + } pika_client_processor_->SchedulePool(func, arg); - LOG(INFO) << "Slow cmd borrows fast pool (slow_queue: " << slow_queue_size + LOG(INFO) << "Slow cmd " << cmd_name << " borrows fast pool (slow_queue: " << slow_queue_size << "/" << slow_max_queue << ", fast_queue: " << fast_queue_size << "/" << fast_max_queue << ")"; } @@ -867,8 +1144,12 @@ void PikaServer::ScheduleClientPool(net::TaskFunc func, void* arg, bool is_slow_ pika_client_processor_->SchedulePool(func, arg); } else { // Can borrow from slow pool + if (fast_pool_metrics_) { + fast_pool_metrics_->borrow_attempts++; + fast_pool_metrics_->successful_borrows++; + } pika_slow_cmd_thread_pool_->Schedule(func, arg); - LOG(INFO) << "Fast cmd borrows slow pool (fast_queue: " << fast_queue_size + LOG(INFO) << "Fast cmd " << cmd_name << " borrows slow pool (fast_queue: " << fast_queue_size << "/" << fast_max_queue << ", slow_queue: " << slow_queue_size << "/" << slow_max_queue << ")"; } @@ -877,6 +1158,12 @@ void PikaServer::ScheduleClientPool(net::TaskFunc func, void* arg, bool is_slow_ pika_client_processor_->SchedulePool(func, arg); } } + + // Periodically adjust borrow thresholds based on load (every 100 commands) + static int command_counter = 0; + if (++command_counter % 100 == 0) { + AdjustBorrowThresholds(); + } } size_t PikaServer::ClientProcessorThreadPoolCurQueueSize() { @@ -2108,3 +2395,135 @@ void PikaServer::CacheConfigInit(cache::CacheConfig& cache_cfg) { cache_cfg.lfu_decay_time = g_pika_conf->cache_lfu_decay_time(); } void PikaServer::SetLogNetActivities(bool value) { pika_dispatch_thread_->SetLogNetActivities(value); } + +// 改进的快慢命令分离相关方法实现 + +void PikaServer::AdjustBorrowThresholds() { + // 这个方法可以根据系统负载动态调整借用阈值 + // 当前使用简单的队列饱和度来调整 + + size_t fast_queue_size = ClientProcessorThreadPoolCurQueueSize(); + size_t fast_max_queue = ClientProcessorThreadPoolMaxQueueSize(); + size_t slow_queue_size = SlowCmdThreadPoolCurQueueSize(); + size_t slow_max_queue = SlowCmdThreadPoolMaxQueueSize(); + + if (fast_max_queue == 0 || slow_max_queue == 0) { + return; + } + + // 计算队列饱和度 + double fast_saturation = static_cast(fast_queue_size) / fast_max_queue; + double slow_saturation = static_cast(slow_queue_size) / slow_max_queue; + + int current_threshold = g_pika_conf->threadpool_borrow_threshold_percent(); + int new_threshold = current_threshold; + + // 如果两个线程池都很繁忙(饱和度>0.8),提高借用阈值(更难借用) + if (fast_saturation > 0.8 && slow_saturation > 0.8) { + new_threshold = std::min(90, current_threshold + 5); + } + // 如果两个线程池都很空闲(饱和度<0.3),降低借用阈值(更容易借用) + else if (fast_saturation < 0.3 && slow_saturation < 0.3) { + new_threshold = std::max(30, current_threshold - 5); + } + + if (new_threshold != current_threshold) { + g_pika_conf->SetThreadpoolBorrowThresholdPercent(new_threshold); + LOG(INFO) << "Adjusted borrow threshold from " << current_threshold + << "% to " << new_threshold << "% (fast_sat: " + << std::fixed << std::setprecision(2) << fast_saturation * 100 + << "%, slow_sat: " << slow_saturation * 100 << "%)"; + } +} + +bool PikaServer::IsDynamicSlowCommand(const std::string& cmd_name) const { + if (!cmd_classifier_) { + return false; + } + return cmd_classifier_->IsSlowCommand(cmd_name); +} + +void PikaServer::RecordCommandExecutionTime(const std::string& cmd_name, uint64_t duration_us) { + if (cmd_classifier_) { + cmd_classifier_->RecordExecutionTime(cmd_name, duration_us); + } + + // 同时记录到线程池指标中 + // 注意:这里需要知道命令是在哪个线程池执行的,简化起见我们根据是否是慢命令来判断 + if (IsDynamicSlowCommand(cmd_name) && slow_pool_metrics_) { + slow_pool_metrics_->RecordLatency(duration_us); + slow_pool_metrics_->tasks_completed++; + } else if (fast_pool_metrics_) { + fast_pool_metrics_->RecordLatency(duration_us); + fast_pool_metrics_->tasks_completed++; + } +} + +void PikaServer::SetCommandRateLimit(double rate_per_sec) { + if (cmd_rate_limiter_) { + cmd_rate_limiter_->SetRate(rate_per_sec); + LOG(INFO) << "Command rate limit set to " << rate_per_sec << " commands/sec"; + } +} + +bool PikaServer::CheckCommandRateLimit() { + if (!cmd_rate_limiter_) { + return true; // 如果没有限流器,总是允许 + } + return cmd_rate_limiter_->TryAcquire(1.0); +} + +std::string PikaServer::GetEnhancedThreadPoolMetrics() const { + std::stringstream ss; + + // 基本线程池信息 + GetThreadPoolInfo(&ss.str()); + + // 增强的指标(Prometheus格式) + ss << "\n# Enhanced Thread Pool Metrics\n"; + + if (fast_pool_metrics_) { + ss << fast_pool_metrics_->ExportMetrics("fast"); + } + + if (slow_pool_metrics_ && g_pika_conf->slow_cmd_pool()) { + ss << slow_pool_metrics_->ExportMetrics("slow"); + } + + // 命令分类统计 + if (cmd_classifier_) { + ss << "\n# Command Classification\n"; + auto cmd_avg_times = cmd_classifier_->GetCommandAvgTimes(); + for (const auto& [cmd_name, avg_time] : cmd_avg_times) { + bool is_slow = cmd_classifier_->IsSlowCommand(cmd_name); + ss << "command_avg_time{cmd=\"" << cmd_name + << "\",type=\"" << (is_slow ? "slow" : "fast") << "\"} " + << avg_time / 1000.0 << " # milliseconds\n"; + } + } + + return ss.str(); +} + +bool PikaServer::IsKeyAffinityToSlow(const std::string& key) const { + if (!key_hash_) { + // 如果没有一致性哈希,使用简单的哈希 + std::hash hasher; + return (hasher(key) % 2) == 1; + } + + std::string node = key_hash_->GetNode(key); + return node == "slow"; +} + +void PikaServer::ResetThreadPoolMetrics() { + if (fast_pool_metrics_) { + fast_pool_metrics_->Reset(); + } + + if (slow_pool_metrics_) { + slow_pool_metrics_->Reset(); + } + + LOG(INFO) << "Thread pool metrics reset"; +} From 699c9bba70698683e8ccc51b126f59bdd63cf4e5 Mon Sep 17 00:00:00 2001 From: chejinge Date: Thu, 4 Dec 2025 15:00:20 +0800 Subject: [PATCH 04/10] fix --- include/pika_admin.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/include/pika_admin.h b/include/pika_admin.h index e7b39e2d74..2d1ee502b6 100644 --- a/include/pika_admin.h +++ b/include/pika_admin.h @@ -267,7 +267,8 @@ class InfoCmd : public Cmd { kInfoAll, kInfoDebug, kInfoCommandStats, - kInfoCache + kInfoCache, + kInfoThreadpool }; InfoCmd(const std::string& name, int arity, uint32_t flag) : Cmd(name, arity, flag) {} void Do() override; From 67cb8fe82ffc0235ca074920c1cc86989ef4bb97 Mon Sep 17 00:00:00 2001 From: chejinge Date: Thu, 4 Dec 2025 15:31:12 +0800 Subject: [PATCH 05/10] fix --- include/pika_binlog.h | 2 +- src/pika_server.cc | 18 ++++++++++-------- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/include/pika_binlog.h b/include/pika_binlog.h index a1462cf7f5..60e1790dcd 100644 --- a/include/pika_binlog.h +++ b/include/pika_binlog.h @@ -37,7 +37,7 @@ class Version final : public pstd::noncopyable { void debug() { std::shared_lock l(rwlock_); - printf("Current pro_num %u pro_offset %llu\n", pro_num_, pro_offset_); + printf("Current pro_num %u pro_offset %lu\n", pro_num_, pro_offset_); } private: diff --git a/src/pika_server.cc b/src/pika_server.cc index ccb2e3c4c0..87acc237ca 100644 --- a/src/pika_server.cc +++ b/src/pika_server.cc @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include "net/include/net_cli.h" @@ -1023,8 +1024,8 @@ void PikaServer::ScheduleClientPool(net::TaskFunc func, void* arg, bool is_slow_ auto bg_arg = static_cast(arg); if (bg_arg) { // Set error response for rate limiting - bg_arg->resp = std::make_shared("-ERR Rate limit exceeded, try again later\r\n"); - bg_arg->conn->WriteResp(*(bg_arg->resp)); + bg_arg->resp_ptr = std::make_shared("-ERR Rate limit exceeded, try again later\r\n"); + bg_arg->conn_ptr->WriteResp(*(bg_arg->resp_ptr)); delete bg_arg; } return; @@ -2427,9 +2428,10 @@ void PikaServer::AdjustBorrowThresholds() { new_threshold = std::max(30, current_threshold - 5); } + // Note: Currently we don't have a SetThreadpoolBorrowThresholdPercent method in PikaConf + // So we just log the recommendation instead of actually changing the threshold if (new_threshold != current_threshold) { - g_pika_conf->SetThreadpoolBorrowThresholdPercent(new_threshold); - LOG(INFO) << "Adjusted borrow threshold from " << current_threshold + LOG(INFO) << "Recommend adjusting borrow threshold from " << current_threshold << "% to " << new_threshold << "% (fast_sat: " << std::fixed << std::setprecision(2) << fast_saturation * 100 << "%, slow_sat: " << slow_saturation * 100 << "%)"; @@ -2474,10 +2476,11 @@ bool PikaServer::CheckCommandRateLimit() { } std::string PikaServer::GetEnhancedThreadPoolMetrics() const { - std::stringstream ss; + std::string info; + const_cast(this)->GetThreadPoolInfo(&info); - // 基本线程池信息 - GetThreadPoolInfo(&ss.str()); + std::stringstream ss; + ss << info; // 增强的指标(Prometheus格式) ss << "\n# Enhanced Thread Pool Metrics\n"; @@ -2507,7 +2510,6 @@ std::string PikaServer::GetEnhancedThreadPoolMetrics() const { bool PikaServer::IsKeyAffinityToSlow(const std::string& key) const { if (!key_hash_) { - // 如果没有一致性哈希,使用简单的哈希 std::hash hasher; return (hasher(key) % 2) == 1; } From 0db02171785612422ba4150ab4dddc2e7cdd2128 Mon Sep 17 00:00:00 2001 From: chejinge Date: Thu, 4 Dec 2025 16:10:56 +0800 Subject: [PATCH 06/10] fix --- .github/workflows/pika.yml | 102 ++++++++++++++++++++++++++++++++++--- 1 file changed, 95 insertions(+), 7 deletions(-) diff --git a/.github/workflows/pika.yml b/.github/workflows/pika.yml index ecc33cdff2..3d23e65752 100644 --- a/.github/workflows/pika.yml +++ b/.github/workflows/pika.yml @@ -19,6 +19,14 @@ jobs: runs-on: ubuntu-latest steps: + - name: Free Disk Space + run: | + sudo rm -rf /usr/share/dotnet + sudo rm -rf /opt/ghc + sudo rm -rf /usr/local/share/boost + sudo rm -rf "$AGENT_TOOLSDIRECTORY" + df -h + - uses: actions/checkout@v4 - name: Set up Go @@ -45,9 +53,11 @@ jobs: # Build your program with the given configuration run: cmake --build build --config ${{ env.BUILD_TYPE }} - - name: Cleanup + - name: Cleanup Build Trees run: | rm -rf ./buildtrees + rm -rf ./deps + df -h - uses: actions/upload-artifact@v4 with: @@ -64,6 +74,14 @@ jobs: working-directory: ${{ github.workspace }} run: ./pikatests.sh all clean + - name: Cleanup Build Artifacts + run: | + find ./build -name "*.o" -type f -delete + find ./build -name "*.a" -type f -delete + rm -rf ./build/CMakeFiles + rm -rf ./build/_deps + df -h + # master on port 9221, slave on port 9231, all with 2 db - name: Start codis, pika master and pika slave working-directory: ${{ github.workspace }}/build @@ -82,10 +100,21 @@ jobs: chmod +x integrate_test.sh sh integrate_test.sh - build_on_rocky: + - name: Cleanup Test Data + if: always() + working-directory: ${{ github.workspace }}/build + run: | + pkill -9 pika || true + pkill -9 codis || true + rm -rf master_data slave_data rename_data acl1_data acl2_data acl3_data + rm -rf codis_data_1 codis_data_2 + rm -rf *.conf *.conf.bak + df -h + + build_on_centos: runs-on: ubuntu-latest container: - image: rockylinux:9 + image: cheniujh/pika-centos7-ci:v5 steps: - name: set up mirror @@ -166,14 +195,25 @@ jobs: with: go-version: 1.19 + - name: Free Disk Space + run: | + rm -rf /usr/share/dotnet + rm -rf /opt/ghc + rm -rf /usr/local/share/boost + find / -type f -name "*.log" -delete 2>/dev/null || true + find / -type f -name "*.tmp" -delete 2>/dev/null || true + find / -name '*cache*' -type d -exec rm -rf {} + 2>/dev/null || true + find / -name '*.bak' -type f -delete 2>/dev/null || true + df -h + - name: Checkout - uses: actions/checkout@v4 + uses: actions/checkout@v1 with: fetch-depth: 0 - name: Configure CMake run: | - source /opt/rh/gcc-toolset-13/enable + source /opt/rh/devtoolset-10/enable cmake -B build -DCMAKE_BUILD_TYPE=${{ env.BUILD_TYPE }} -DUSE_PIKA_TOOLS=ON -DCMAKE_CXX_FLAGS_DEBUG=-fsanitize=address . - uses: actions/cache@v3 @@ -188,12 +228,17 @@ jobs: - name: Build run: | - source /opt/rh/gcc-toolset-13/enable + source /opt/rh/devtoolset-10/enable cmake --build build --config ${{ env.BUILD_TYPE }} - - name: Cleanup + - name: Cleanup Build Trees run: | rm -rf ./buildtrees + rm -rf ./deps + # Clean build intermediate files but keep binaries + find ./build -name "*.o" -type f -delete || true + find ./build -name "*.a" -type f -delete || true + df -h - name: Test working-directory: ${{ github.workspace }}/build @@ -203,6 +248,40 @@ jobs: working-directory: ${{ github.workspace }} run: ./pikatests.sh all clean + - name: Cleanup After Unit Test + run: | + # Clean up test data to free space before integration tests + rm -rf ./log* ./db* ./dump* ./dbsync* || true + df -h + + - name: Extreme Disk Cleanup + run: | + + rm -rf /usr/local/share/* || true + rm -rf /usr/share/doc/* || true + rm -rf /usr/share/man/* || true + rm -rf /var/cache/* || true + + find ${{ github.workspace }} -name "*.o" -type f -delete || true + find ${{ github.workspace }} -name "*.a" -type f -delete || true + find ${{ github.workspace }} -name "*.la" -type f -delete || true + find ${{ github.workspace }} -name "*.so" -type f -delete || true + find ${{ github.workspace }} -name "*.pyc" -type f -delete || true + + rm -rf ${{ github.workspace }}/.git || true + + df -h + + echo "Largest directories:" + du -h --max-depth=2 / 2>/dev/null | sort -hr | head -20 + + - name: Create Log Directories + run: | + mkdir -p /__w/pikiwidb/pikiwidb/codis/admin/../log + mkdir -p /__w/pikiwidb/pikiwidb/log + mkdir -p ./bin || true + df -h + - name: Start codis, pika master and pika slave working-directory: ${{ github.workspace }}/build run: | @@ -220,6 +299,15 @@ jobs: chmod +x integrate_test.sh sh integrate_test.sh + - name: Cleanup Test Data + if: always() + working-directory: ${{ github.workspace }}/build + run: | + rm -rf master_data slave_data rename_data acl1_data acl2_data acl3_data + rm -rf codis_data_1 codis_data_2 + rm -rf *.conf *.conf.bak + df -h + build_on_macos: runs-on: macos-13 From 2efae86056ff43afedc434a2372827ee9f649f53 Mon Sep 17 00:00:00 2001 From: chejinge Date: Thu, 4 Dec 2025 16:14:22 +0800 Subject: [PATCH 07/10] fix --- .github/workflows/pika.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/pika.yml b/.github/workflows/pika.yml index 3d23e65752..e2c495877e 100644 --- a/.github/workflows/pika.yml +++ b/.github/workflows/pika.yml @@ -187,8 +187,8 @@ jobs: - name: Install deps run: | - dnf update -y - dnf install -y bash cmake wget git autoconf gcc perl-Digest-SHA tcl which tar g++ tar epel-release gcc-c++ libstdc++-devel gcc-toolset-13 + yum update -y + yum install -y bash cmake wget git autoconf gcc perl-Digest-SHA tcl which tar tar epel-release gcc-c++ libstdc++-devel - name: Set up Go uses: actions/setup-go@v5 From 236a43bd879a4e8f22f082f31df195249641b014 Mon Sep 17 00:00:00 2001 From: chejinge Date: Thu, 4 Dec 2025 16:31:45 +0800 Subject: [PATCH 08/10] fix --- .github/workflows/pika.yml | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/.github/workflows/pika.yml b/.github/workflows/pika.yml index e2c495877e..4c4224e65f 100644 --- a/.github/workflows/pika.yml +++ b/.github/workflows/pika.yml @@ -190,10 +190,14 @@ jobs: yum update -y yum install -y bash cmake wget git autoconf gcc perl-Digest-SHA tcl which tar tar epel-release gcc-c++ libstdc++-devel - - name: Set up Go - uses: actions/setup-go@v5 - with: - go-version: 1.19 + - name: Set up Go manually + run: | + cd /tmp + wget https://go.dev/dl/go1.19.13.linux-amd64.tar.gz + tar -C /usr/local -xzf go1.19.13.linux-amd64.tar.gz + echo 'export PATH=$PATH:/usr/local/go/bin' >> ~/.bashrc + export PATH=$PATH:/usr/local/go/bin + go version - name: Free Disk Space run: | From 1429d789d0afa6271db9351cc1e3f0d1d02bd169 Mon Sep 17 00:00:00 2001 From: chejinge Date: Thu, 4 Dec 2025 16:37:45 +0800 Subject: [PATCH 09/10] fix --- .github/workflows/pika.yml | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/.github/workflows/pika.yml b/.github/workflows/pika.yml index 4c4224e65f..b50183cbb4 100644 --- a/.github/workflows/pika.yml +++ b/.github/workflows/pika.yml @@ -188,7 +188,17 @@ jobs: - name: Install deps run: | yum update -y - yum install -y bash cmake wget git autoconf gcc perl-Digest-SHA tcl which tar tar epel-release gcc-c++ libstdc++-devel + yum install -y bash wget git autoconf gcc perl-Digest-SHA tcl which tar epel-release gcc-c++ libstdc++-devel + + - name: Install CMake 3.22 + run: | + cd /tmp + wget https://github.com/Kitware/CMake/releases/download/v3.22.0/cmake-3.22.0-linux-x86_64.tar.gz + tar -zxvf cmake-3.22.0-linux-x86_64.tar.gz + mkdir -p /opt/cmake + cp -r cmake-3.22.0-linux-x86_64/* /opt/cmake/ + ln -sf /opt/cmake/bin/cmake /usr/bin/cmake + cmake --version - name: Set up Go manually run: | From 293c966cdb02991f0475be776d0cfe5afdcfb293 Mon Sep 17 00:00:00 2001 From: chejinge Date: Thu, 4 Dec 2025 16:41:37 +0800 Subject: [PATCH 10/10] fix --- .github/workflows/pika.yml | 121 ++++--------------------------------- 1 file changed, 13 insertions(+), 108 deletions(-) diff --git a/.github/workflows/pika.yml b/.github/workflows/pika.yml index b50183cbb4..6855392343 100644 --- a/.github/workflows/pika.yml +++ b/.github/workflows/pika.yml @@ -68,7 +68,7 @@ jobs: working-directory: ${{ github.workspace }}/build # Execute tests defined by the CMake configuration. # See https://cmake.org/cmake/help/latest/manual/ctest.1.html for more detail - run: ctest -C ${{ env.BUILD_TYPE }} + run: ctest -C ${{ env.BUILD_TYPE }} --verbose - name: Unit Test working-directory: ${{ github.workspace }} @@ -86,6 +86,7 @@ jobs: - name: Start codis, pika master and pika slave working-directory: ${{ github.workspace }}/build run: | + echo "hello" chmod +x ../tests/integration/start_master_and_slave.sh ../tests/integration/start_master_and_slave.sh chmod +x ../tests/integration/start_codis.sh @@ -111,104 +112,13 @@ jobs: rm -rf *.conf *.conf.bak df -h + build_on_centos: runs-on: ubuntu-latest container: image: cheniujh/pika-centos7-ci:v5 steps: - - name: set up mirror - run: | - rm -rf /etc/yum.repos.d/CentOS-Base.repo - cat > /etc/yum.repos.d/CentOS-Base.repo << EOL - [base] - name=CentOS-\$releasever - Base - baseurl=https://mirrors.aliyun.com/centos-vault/7.9.2009/os/\$basearch/ - gpgcheck=1 - gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-CentOS-7 - - [updates] - name=CentOS-\$releasever - Updates - baseurl=https://mirrors.aliyun.com/centos-vault/7.9.2009/updates/\$basearch/ - gpgcheck=1 - gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-CentOS-7 - - [extras] - name=CentOS-\$releasever - Extras - baseurl=https://mirrors.aliyun.com/centos-vault/7.9.2009/extras/\$basearch/ - gpgcheck=1 - gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-CentOS-7 - - [centosplus] - name=CentOS-\$releasever - Plus - baseurl=https://mirrors.aliyun.com/centos-vault/7.9.2009/centosplus/\$basearch/ - gpgcheck=1 - enabled=0 - gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-CentOS-7 - EOL - - cat > /etc/yum.repos.d/CentOS-SCLo-scl.repo << EOL - [centos-sclo-sclo] - name=CentOS-7 - SCLo sclo - baseurl=https://mirrors.aliyun.com/centos/7/sclo/x86_64/sclo/ - gpgcheck=1 - enabled=1 - gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-CentOS-SIG-SCLo - - [centos-sclo-sclo-source] - name=CentOS-7 - SCLo sclo Source - baseurl=https://mirrors.aliyun.com/centos/7/sclo/Source/sclo/ - gpgcheck=1 - enabled=0 - gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-CentOS-SIG-SCLo - EOL - - cat > /etc/yum.repos.d/CentOS-SCLo-scl-rh.repo << EOL - [centos-sclo-rh] - name=CentOS-7 - SCLo rh - baseurl=https://mirrors.aliyun.com/centos/7/sclo/x86_64/rh/ - gpgcheck=1 - enabled=1 - gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-CentOS-SIG-SCLo - - [centos-sclo-rh-source] - name=CentOS-7 - SCLo rh Source - baseurl=https://mirrors.aliyun.com/centos/7/sclo/Source/rh/ - gpgcheck=1 - enabled=0 - gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-CentOS-SIG-SCLo - EOL - - rpm --import https://www.centos.org/keys/RPM-GPG-KEY-CentOS-7 - rpm --import https://www.centos.org/keys/RPM-GPG-KEY-CentOS-SIG-SCLo - - yum clean all - yum makecache - - - name: Install deps - run: | - yum update -y - yum install -y bash wget git autoconf gcc perl-Digest-SHA tcl which tar epel-release gcc-c++ libstdc++-devel - - - name: Install CMake 3.22 - run: | - cd /tmp - wget https://github.com/Kitware/CMake/releases/download/v3.22.0/cmake-3.22.0-linux-x86_64.tar.gz - tar -zxvf cmake-3.22.0-linux-x86_64.tar.gz - mkdir -p /opt/cmake - cp -r cmake-3.22.0-linux-x86_64/* /opt/cmake/ - ln -sf /opt/cmake/bin/cmake /usr/bin/cmake - cmake --version - - - name: Set up Go manually - run: | - cd /tmp - wget https://go.dev/dl/go1.19.13.linux-amd64.tar.gz - tar -C /usr/local -xzf go1.19.13.linux-amd64.tar.gz - echo 'export PATH=$PATH:/usr/local/go/bin' >> ~/.bashrc - export PATH=$PATH:/usr/local/go/bin - go version - - name: Free Disk Space run: | rm -rf /usr/share/dotnet @@ -228,17 +138,7 @@ jobs: - name: Configure CMake run: | source /opt/rh/devtoolset-10/enable - cmake -B build -DCMAKE_BUILD_TYPE=${{ env.BUILD_TYPE }} -DUSE_PIKA_TOOLS=ON -DCMAKE_CXX_FLAGS_DEBUG=-fsanitize=address . - - - uses: actions/cache@v3 - with: - path: ${{ github.workspace }}/deps - key: ${{ runner.os }}-rocky-deps-${{ hashFiles('**/CMakeLists.txt') }} - - - uses: actions/cache@v3 - with: - path: ${{ github.workspace }}/buildtrees - key: ${{ runner.os }}-rocky-buildtrees-${{ hashFiles('**/CMakeLists.txt') }} + cmake -B build -DCMAKE_BUILD_TYPE=${{ env.BUILD_TYPE }} -DUSE_PIKA_TOOLS=ON -DCMAKE_CXX_FLAGS_DEBUG=-fsanitize=address - name: Build run: | @@ -270,7 +170,7 @@ jobs: - name: Extreme Disk Cleanup run: | - + rm -rf /usr/local/share/* || true rm -rf /usr/share/doc/* || true rm -rf /usr/share/man/* || true @@ -322,6 +222,7 @@ jobs: rm -rf *.conf *.conf.bak df -h + build_on_macos: runs-on: macos-13 @@ -359,6 +260,10 @@ jobs: cp deps/lib/libz.1.dylib tests/integration/ rm -rf ./buildtrees + - name: Test + working-directory: ${{ github.workspace }}/build + run: ctest -C ${{ env.BUILD_TYPE }} --verbose + - name: Unit Test working-directory: ${{ github.workspace }} run: | @@ -372,14 +277,14 @@ jobs: ./start_master_and_slave.sh chmod +x start_codis.sh ./start_codis.sh - + - name: Run Go E2E Tests working-directory: ${{ github.workspace }} run: | cd tests/integration/ chmod +x integrate_test.sh - # sh integrate_test.sh + # sh integrate_test.sh build_pika_image: name: Build Pika Docker image @@ -413,4 +318,4 @@ jobs: file: ./ci/Dockerfile push: false tags: ${{ steps.meta.outputs.tags }} - labels: ${{ steps.meta.outputs.labels }} + labels: ${{ steps.meta.outputs.labels }} \ No newline at end of file