Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,30 @@ thread-pool-size : 12
# When set to no, they are not separated.
slow-cmd-pool : no

# Enable thread pool borrowing mechanism, which allows threads to help each other when one pool is busy
# and another is idle. This can improve resource utilization and reduce command latency under uneven load.
# [yes | no] - The default value is yes.
threadpool-borrow-enable : yes

# The queue length threshold (as a percentage of max queue size) at which a thread pool is considered busy
# and may borrow threads from another pool. The value range is [1-99], default is 80.
threadpool-borrow-threshold-percent : 80

# The queue length threshold (as a percentage of max queue size) at which a thread pool is considered idle
# and may lend threads to another pool. The value range is [1-99], default is 20.
threadpool-idle-threshold-percent : 20

# --- EMA queue-wait based load signals (used together with queue percent) ---
# EMA alpha = numerator / denominator. Smaller alpha => smoother (less sensitive to spikes).
threadpool-ema-alpha-numerator : 5
threadpool-ema-alpha-denominator : 100

# Busy/idle thresholds based on EMA(queue_wait_us). Unit: microseconds.
threadpool-fast-busy-threshold : 2000
threadpool-fast-idle-threshold : 500
threadpool-slow-busy-threshold : 5000
threadpool-slow-idle-threshold : 1000

# Size of the low level thread pool, The threads within this pool
# are dedicated to handling slow user requests.
slow-cmd-thread-pool-size : 1
Expand Down
4 changes: 3 additions & 1 deletion include/pika_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,8 @@ class InfoCmd : public Cmd {
kInfoAll,
kInfoDebug,
kInfoCommandStats,
kInfoCache
kInfoCache,
kInfoThreadpool
};
InfoCmd(const std::string& name, int arity, uint32_t flag) : Cmd(name, arity, flag) {}
void Do() override;
Expand Down Expand Up @@ -295,6 +296,7 @@ class InfoCmd : public Cmd {
const static std::string kDebugSection;
const static std::string kCommandStatsSection;
const static std::string kCacheSection;
const static std::string kThreadpoolSection;

void DoInitial() override;
void Clear() override {
Expand Down
2 changes: 1 addition & 1 deletion include/pika_binlog.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class Version final : public pstd::noncopyable {

void debug() {
std::shared_lock l(rwlock_);
printf("Current pro_num %u pro_offset %llu\n", pro_num_, pro_offset_);
printf("Current pro_num %u pro_offset %lu\n", pro_num_, pro_offset_);
}

private:
Expand Down
3 changes: 2 additions & 1 deletion include/pika_client_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ struct TimeStat {
class PikaClientConn : public net::RedisConn {
public:
using WriteCompleteCallback = std::function<void()>;

struct BgTaskArg {
std::shared_ptr<Cmd> cmd_ptr;
std::shared_ptr<PikaClientConn> conn_ptr;
Expand All @@ -50,6 +49,7 @@ class PikaClientConn : public net::RedisConn {
LogOffset offset;
std::string db_name;
bool cache_miss_in_rtc_;
TaskPoolType pool_type;
};

struct TxnStateBitMask {
Expand Down Expand Up @@ -126,6 +126,7 @@ class PikaClientConn : public net::RedisConn {

bool authenticated_ = false;
std::shared_ptr<User> user_;
TaskPoolType task_pool_type_ = kFastCmdPool;

std::shared_ptr<Cmd> DoCmd(const PikaCmdArgsType& argv, const std::string& opt,
const std::shared_ptr<std::string>& resp_ptr, bool cache_miss_in_rtc,
Expand Down
91 changes: 91 additions & 0 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,19 @@ class PikaConf : public pstd::BaseConf {
std::shared_lock l(rwlock_);
return slow_cmd_pool_;
}
bool threadpool_borrow_enable() {
std::shared_lock l(rwlock_);
return threadpool_borrow_enable_;
}
int threadpool_borrow_threshold_percent() {
std::shared_lock l(rwlock_);
return threadpool_borrow_threshold_percent_;
}
int threadpool_idle_threshold_percent() {
std::shared_lock l(rwlock_);
return threadpool_idle_threshold_percent_;
}

std::string server_id() {
std::shared_lock l(rwlock_);
return server_id_;
Expand Down Expand Up @@ -481,6 +494,71 @@ class PikaConf : public pstd::BaseConf {
std::lock_guard l(rwlock_);
admin_thread_pool_size_ = value;
}
void SetThreadPoolBorrowEnable(const bool value) {
std::lock_guard l(rwlock_);
threadpool_borrow_enable_ = value;
}

void SetThreadPoolBorrowThresholdPercent(const int value) {
std::lock_guard l(rwlock_);
threadpool_borrow_threshold_percent_ = value;
}

void SetThreadPoolIdleThresholdPercent(const int value) {
std::lock_guard l(rwlock_);
threadpool_idle_threshold_percent_ = value;
}

// Getters for EMA configuration
uint32_t threadpool_ema_alpha_numerator() {
std::shared_lock l(rwlock_);
return threadpool_ema_alpha_numerator_;
}
uint32_t threadpool_ema_alpha_denominator() {
std::shared_lock l(rwlock_);
return threadpool_ema_alpha_denominator_;
}
uint64_t threadpool_fast_busy_threshold() {
std::shared_lock l(rwlock_);
return threadpool_fast_busy_threshold_;
}
uint64_t threadpool_fast_idle_threshold() {
std::shared_lock l(rwlock_);
return threadpool_fast_idle_threshold_;
}
uint64_t threadpool_slow_busy_threshold() {
std::shared_lock l(rwlock_);
return threadpool_slow_busy_threshold_;
}
uint64_t threadpool_slow_idle_threshold() {
std::shared_lock l(rwlock_);
return threadpool_slow_idle_threshold_;
}

// Setters for EMA configuration
void SetThreadPoolEmaAlpha(uint32_t numerator, uint32_t denominator) {
std::lock_guard l(rwlock_);
if (denominator > 0 && numerator <= denominator) {
threadpool_ema_alpha_numerator_ = numerator;
threadpool_ema_alpha_denominator_ = denominator;
}
}
void SetThreadPoolFastBusyThreshold(uint64_t value) {
std::lock_guard l(rwlock_);
threadpool_fast_busy_threshold_ = value;
}
void SetThreadPoolFastIdleThreshold(uint64_t value) {
std::lock_guard l(rwlock_);
threadpool_fast_idle_threshold_ = value;
}
void SetThreadPoolSlowBusyThreshold(uint64_t value) {
std::lock_guard l(rwlock_);
threadpool_slow_busy_threshold_ = value;
}
void SetThreadPoolSlowIdleThreshold(uint64_t value) {
std::lock_guard l(rwlock_);
threadpool_slow_idle_threshold_ = value;
}

void SetSlaveof(const std::string& value) {
std::lock_guard l(rwlock_);
Expand Down Expand Up @@ -961,6 +1039,19 @@ class PikaConf : public pstd::BaseConf {
std::string bgsave_prefix_;
std::string pidfile_;
std::atomic<bool> slow_cmd_pool_;

// Thread pool task borrowing configuration
bool threadpool_borrow_enable_ = true;
int threadpool_borrow_threshold_percent_ = 80;
int threadpool_idle_threshold_percent_ = 20;

// EMA configuration
uint32_t threadpool_ema_alpha_numerator_{5};
uint32_t threadpool_ema_alpha_denominator_{100};
uint64_t threadpool_fast_busy_threshold_{2000}; // us
uint64_t threadpool_fast_idle_threshold_{500}; // us
uint64_t threadpool_slow_busy_threshold_{5000}; // us
uint64_t threadpool_slow_idle_threshold_{1000}; // us

std::string compression_;
std::string compression_per_level_;
Expand Down
8 changes: 8 additions & 0 deletions include/pika_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -417,4 +417,12 @@ const int64_t CACHE_LOAD_QUEUE_MAX_SIZE = 2048;
const int64_t CACHE_VALUE_ITEM_MAX_SIZE = 2048;
const int64_t CACHE_LOAD_NUM_ONE_TIME = 256;

/*
* cmd pool type
*/
enum TaskPoolType {
kFastCmdPool,
kSlowCmdPool,
kAdminCmdPool
};
#endif
84 changes: 84 additions & 0 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
#endif
#include <memory>
#include <set>
#include <array>
#include <chrono>
#include <unordered_map>

#include "src/cache/include/config.h"
#include "net/include/bg_thread.h"
Expand Down Expand Up @@ -85,6 +88,20 @@ struct TaskArg {
void DoBgslotscleanup(void* arg);
void DoBgslotsreload(void* arg);

// threadpool metrics
struct ThreadPoolMetrics {
std::atomic<uint64_t> tasks_scheduled{0};
std::atomic<uint64_t> active_tasks{0};
std::atomic<uint64_t> borrow_attempts{0};

// latency(1ms, 5ms, 10ms, 50ms, 100ms, 500ms, 1s, 5s, >5s)
Copy link

Copilot AI Jan 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment is unclear and has a typo. "latency(1ms" uses Chinese parentheses instead of English ones, and the description "latency" should be more specific like "Latency buckets in microseconds".

Suggested change
// latency(1ms, 5ms, 10ms, 50ms, 100ms, 500ms, 1s, 5s, >5s
// Latency buckets (1ms, 5ms, 10ms, 50ms, 100ms, 500ms, 1s, 5s, >5s)

Copilot uses AI. Check for mistakes.
std::array<std::atomic<uint64_t>, 9> latency_buckets{};

void RecordLatency(uint64_t latency_us);
std::string ExportMetrics(const std::string& pool_name) const;
void Reset();
};

class PikaServer : public pstd::noncopyable {
public:
PikaServer();
Expand Down Expand Up @@ -201,6 +218,13 @@ class PikaServer : public pstd::noncopyable {
size_t ClientProcessorThreadPoolMaxQueueSize();
size_t SlowCmdThreadPoolCurQueueSize();
size_t SlowCmdThreadPoolMaxQueueSize();

/*
* Thread pool dynamic resize
*/
bool ResizeFastCmdThreadPool(size_t new_size);
bool ResizeSlowCmdThreadPool(size_t new_size);
void GetThreadPoolInfo(std::string* info);

/*
* BGSave used
Expand Down Expand Up @@ -502,6 +526,22 @@ class PikaServer : public pstd::noncopyable {
void ProcessCronTask();
double HitRatio();
void SetLogNetActivities(bool value);

/*
* threadpool borrow
*/
bool IsSlowPoolBusy();
bool IsSlowPoolIdle();
bool IsFastPoolBusy();
bool IsFastPoolIdle();
TaskPoolType DecidePoolType(bool is_slow_cmd);
std::string GetEnhancedThreadPoolMetrics();
void ResetThreadPoolMetrics();
void UpdateQueueWaitStats(TaskPoolType pool_type, uint64_t queue_wait_us);
void LoadThreadPoolConfig();
ThreadPoolMetrics* GetSlowPoolMetrics() { return slow_pool_metrics_.get(); }
ThreadPoolMetrics* GetFastPoolMetrics() { return fast_pool_metrics_.get(); }

/*
* disable compact
*/
Expand Down Expand Up @@ -540,6 +580,7 @@ class PikaServer : public pstd::noncopyable {
void AutoDeleteExpiredDump();
void AutoUpdateNetworkMetric();
void PrintThreadPoolQueueStatus();
void AutoUpdateIdlePoolEMA();
void StatDiskUsage();
int64_t GetLastSaveTime(const std::string& dump_dir);

Expand Down Expand Up @@ -662,7 +703,50 @@ class PikaServer : public pstd::noncopyable {
* lastsave used
*/
int64_t lastsave_ = 0;

/*
* metrics used
*/
std::unique_ptr<ThreadPoolMetrics> fast_pool_metrics_;
std::unique_ptr<ThreadPoolMetrics> slow_pool_metrics_;

/*
* EMA statistics for queue wait time (used together with queue percent)
*/
struct PoolLatencyStats {
std::atomic<uint64_t> ema_queue_wait_us_{0}; // EMA queue wait time (us)
std::atomic<uint64_t> last_update_us_{0}; // Last update time (us)
};

PoolLatencyStats fast_pool_stats_;
PoolLatencyStats slow_pool_stats_;

// EMA configuration parameters (atomic for dynamic updates)
std::atomic<uint32_t> ema_alpha_numerator_{5};
std::atomic<uint32_t> ema_alpha_denominator_{100};
std::atomic<uint64_t> fast_busy_threshold_us_{2000};
std::atomic<uint64_t> fast_idle_threshold_us_{500};
std::atomic<uint64_t> slow_busy_threshold_us_{5000};
std::atomic<uint64_t> slow_idle_threshold_us_{1000};

// Get EMA queue wait time (us)
uint64_t GetEMAQueueWait(TaskPoolType pool_type) const;

// EMA decay helper functions
uint64_t CalculateDecayedEMA(uint64_t old_ema, uint64_t last_update, uint64_t now);
void UpdateSinglePoolEMA(PoolLatencyStats& stats, uint64_t new_sample, uint64_t now);
void DecaySinglePool(PoolLatencyStats& stats, uint64_t now);

// Busy/Idle determination based on EMA (internal helpers)
bool IsFastPoolBusyByEMA() const;
bool IsFastPoolIdleByEMA() const;
bool IsSlowPoolBusyByEMA() const;
bool IsSlowPoolIdleByEMA() const;

// Borrow idle window
std::atomic<uint64_t> threadpool_idle_window_us_{1000000};
std::atomic<uint64_t> fast_pool_last_active_us_{0};
std::atomic<uint64_t> slow_pool_last_active_us_{0};
/*
* acl
*/
Expand Down
Loading