Skip to content

Commit d1a606e

Browse files
Z-G-H1brother-jinchejinge
authored
feat: Fast slow separate (#3213)
* feat:Automated fast/slow command separation, help me Co-authored-by: chejinge <[email protected]> Co-authored-by: chejinge <[email protected]>
1 parent e33bb92 commit d1a606e

12 files changed

Lines changed: 934 additions & 12 deletions

File tree

conf/pika.conf

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,30 @@ thread-pool-size : 12
3333
# When set to no, they are not separated.
3434
slow-cmd-pool : no
3535

36+
# Enable thread pool borrowing mechanism, which allows threads to help each other when one pool is busy
37+
# and another is idle. This can improve resource utilization and reduce command latency under uneven load.
38+
# [yes | no] - The default value is yes.
39+
threadpool-borrow-enable : yes
40+
41+
# The queue length threshold (as a percentage of max queue size) at which a thread pool is considered busy
42+
# and may borrow threads from another pool. The value range is [1-99], default is 80.
43+
threadpool-borrow-threshold-percent : 80
44+
45+
# The queue length threshold (as a percentage of max queue size) at which a thread pool is considered idle
46+
# and may lend threads to another pool. The value range is [1-99], default is 20.
47+
threadpool-idle-threshold-percent : 20
48+
49+
# --- EMA queue-wait based load signals (used together with queue percent) ---
50+
# EMA alpha = numerator / denominator. Smaller alpha => smoother (less sensitive to spikes).
51+
threadpool-ema-alpha-numerator : 5
52+
threadpool-ema-alpha-denominator : 100
53+
54+
# Busy/idle thresholds based on EMA(queue_wait_us). Unit: microseconds.
55+
threadpool-fast-busy-threshold : 2000
56+
threadpool-fast-idle-threshold : 500
57+
threadpool-slow-busy-threshold : 5000
58+
threadpool-slow-idle-threshold : 1000
59+
3660
# Size of the low level thread pool, The threads within this pool
3761
# are dedicated to handling slow user requests.
3862
slow-cmd-thread-pool-size : 1

include/pika_admin.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,8 @@ class InfoCmd : public Cmd {
267267
kInfoAll,
268268
kInfoDebug,
269269
kInfoCommandStats,
270-
kInfoCache
270+
kInfoCache,
271+
kInfoThreadpool
271272
};
272273
InfoCmd(const std::string& name, int arity, uint32_t flag) : Cmd(name, arity, flag) {}
273274
void Do() override;
@@ -295,6 +296,7 @@ class InfoCmd : public Cmd {
295296
const static std::string kDebugSection;
296297
const static std::string kCommandStatsSection;
297298
const static std::string kCacheSection;
299+
const static std::string kThreadpoolSection;
298300

299301
void DoInitial() override;
300302
void Clear() override {

include/pika_binlog.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ class Version final : public pstd::noncopyable {
3737

3838
void debug() {
3939
std::shared_lock l(rwlock_);
40-
printf("Current pro_num %u pro_offset %llu\n", pro_num_, pro_offset_);
40+
printf("Current pro_num %u pro_offset %lu\n", pro_num_, pro_offset_);
4141
}
4242

4343
private:

include/pika_client_conn.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ struct TimeStat {
4141
class PikaClientConn : public net::RedisConn {
4242
public:
4343
using WriteCompleteCallback = std::function<void()>;
44-
4544
struct BgTaskArg {
4645
std::shared_ptr<Cmd> cmd_ptr;
4746
std::shared_ptr<PikaClientConn> conn_ptr;
@@ -50,6 +49,7 @@ class PikaClientConn : public net::RedisConn {
5049
LogOffset offset;
5150
std::string db_name;
5251
bool cache_miss_in_rtc_;
52+
TaskPoolType pool_type;
5353
};
5454

5555
struct TxnStateBitMask {
@@ -126,6 +126,7 @@ class PikaClientConn : public net::RedisConn {
126126

127127
bool authenticated_ = false;
128128
std::shared_ptr<User> user_;
129+
TaskPoolType task_pool_type_ = kFastCmdPool;
129130

130131
std::shared_ptr<Cmd> DoCmd(const PikaCmdArgsType& argv, const std::string& opt,
131132
const std::shared_ptr<std::string>& resp_ptr, bool cache_miss_in_rtc,

include/pika_conf.h

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,19 @@ class PikaConf : public pstd::BaseConf {
168168
std::shared_lock l(rwlock_);
169169
return slow_cmd_pool_;
170170
}
171+
bool threadpool_borrow_enable() {
172+
std::shared_lock l(rwlock_);
173+
return threadpool_borrow_enable_;
174+
}
175+
int threadpool_borrow_threshold_percent() {
176+
std::shared_lock l(rwlock_);
177+
return threadpool_borrow_threshold_percent_;
178+
}
179+
int threadpool_idle_threshold_percent() {
180+
std::shared_lock l(rwlock_);
181+
return threadpool_idle_threshold_percent_;
182+
}
183+
171184
std::string server_id() {
172185
std::shared_lock l(rwlock_);
173186
return server_id_;
@@ -481,6 +494,71 @@ class PikaConf : public pstd::BaseConf {
481494
std::lock_guard l(rwlock_);
482495
admin_thread_pool_size_ = value;
483496
}
497+
void SetThreadPoolBorrowEnable(const bool value) {
498+
std::lock_guard l(rwlock_);
499+
threadpool_borrow_enable_ = value;
500+
}
501+
502+
void SetThreadPoolBorrowThresholdPercent(const int value) {
503+
std::lock_guard l(rwlock_);
504+
threadpool_borrow_threshold_percent_ = value;
505+
}
506+
507+
void SetThreadPoolIdleThresholdPercent(const int value) {
508+
std::lock_guard l(rwlock_);
509+
threadpool_idle_threshold_percent_ = value;
510+
}
511+
512+
// Getters for EMA configuration
513+
uint32_t threadpool_ema_alpha_numerator() {
514+
std::shared_lock l(rwlock_);
515+
return threadpool_ema_alpha_numerator_;
516+
}
517+
uint32_t threadpool_ema_alpha_denominator() {
518+
std::shared_lock l(rwlock_);
519+
return threadpool_ema_alpha_denominator_;
520+
}
521+
uint64_t threadpool_fast_busy_threshold() {
522+
std::shared_lock l(rwlock_);
523+
return threadpool_fast_busy_threshold_;
524+
}
525+
uint64_t threadpool_fast_idle_threshold() {
526+
std::shared_lock l(rwlock_);
527+
return threadpool_fast_idle_threshold_;
528+
}
529+
uint64_t threadpool_slow_busy_threshold() {
530+
std::shared_lock l(rwlock_);
531+
return threadpool_slow_busy_threshold_;
532+
}
533+
uint64_t threadpool_slow_idle_threshold() {
534+
std::shared_lock l(rwlock_);
535+
return threadpool_slow_idle_threshold_;
536+
}
537+
538+
// Setters for EMA configuration
539+
void SetThreadPoolEmaAlpha(uint32_t numerator, uint32_t denominator) {
540+
std::lock_guard l(rwlock_);
541+
if (denominator > 0 && numerator <= denominator) {
542+
threadpool_ema_alpha_numerator_ = numerator;
543+
threadpool_ema_alpha_denominator_ = denominator;
544+
}
545+
}
546+
void SetThreadPoolFastBusyThreshold(uint64_t value) {
547+
std::lock_guard l(rwlock_);
548+
threadpool_fast_busy_threshold_ = value;
549+
}
550+
void SetThreadPoolFastIdleThreshold(uint64_t value) {
551+
std::lock_guard l(rwlock_);
552+
threadpool_fast_idle_threshold_ = value;
553+
}
554+
void SetThreadPoolSlowBusyThreshold(uint64_t value) {
555+
std::lock_guard l(rwlock_);
556+
threadpool_slow_busy_threshold_ = value;
557+
}
558+
void SetThreadPoolSlowIdleThreshold(uint64_t value) {
559+
std::lock_guard l(rwlock_);
560+
threadpool_slow_idle_threshold_ = value;
561+
}
484562

485563
void SetSlaveof(const std::string& value) {
486564
std::lock_guard l(rwlock_);
@@ -961,6 +1039,19 @@ class PikaConf : public pstd::BaseConf {
9611039
std::string bgsave_prefix_;
9621040
std::string pidfile_;
9631041
std::atomic<bool> slow_cmd_pool_;
1042+
1043+
// Thread pool task borrowing configuration
1044+
bool threadpool_borrow_enable_ = true;
1045+
int threadpool_borrow_threshold_percent_ = 80;
1046+
int threadpool_idle_threshold_percent_ = 20;
1047+
1048+
// EMA configuration
1049+
uint32_t threadpool_ema_alpha_numerator_{5};
1050+
uint32_t threadpool_ema_alpha_denominator_{100};
1051+
uint64_t threadpool_fast_busy_threshold_{2000}; // us
1052+
uint64_t threadpool_fast_idle_threshold_{500}; // us
1053+
uint64_t threadpool_slow_busy_threshold_{5000}; // us
1054+
uint64_t threadpool_slow_idle_threshold_{1000}; // us
9641055

9651056
std::string compression_;
9661057
std::string compression_per_level_;

include/pika_define.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -417,4 +417,12 @@ const int64_t CACHE_LOAD_QUEUE_MAX_SIZE = 2048;
417417
const int64_t CACHE_VALUE_ITEM_MAX_SIZE = 2048;
418418
const int64_t CACHE_LOAD_NUM_ONE_TIME = 256;
419419

420+
/*
421+
* cmd pool type
422+
*/
423+
enum TaskPoolType {
424+
kFastCmdPool,
425+
kSlowCmdPool,
426+
kAdminCmdPool
427+
};
420428
#endif

include/pika_server.h

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515
#endif
1616
#include <memory>
1717
#include <set>
18+
#include <array>
19+
#include <chrono>
20+
#include <unordered_map>
1821

1922
#include "src/cache/include/config.h"
2023
#include "net/include/bg_thread.h"
@@ -85,6 +88,20 @@ struct TaskArg {
8588
void DoBgslotscleanup(void* arg);
8689
void DoBgslotsreload(void* arg);
8790

91+
// threadpool metrics
92+
struct ThreadPoolMetrics {
93+
std::atomic<uint64_t> tasks_scheduled{0};
94+
std::atomic<uint64_t> active_tasks{0};
95+
std::atomic<uint64_t> borrow_attempts{0};
96+
97+
// latency(1ms, 5ms, 10ms, 50ms, 100ms, 500ms, 1s, 5s, >5s)
98+
std::array<std::atomic<uint64_t>, 9> latency_buckets{};
99+
100+
void RecordLatency(uint64_t latency_us);
101+
std::string ExportMetrics(const std::string& pool_name) const;
102+
void Reset();
103+
};
104+
88105
class PikaServer : public pstd::noncopyable {
89106
public:
90107
PikaServer();
@@ -201,6 +218,13 @@ class PikaServer : public pstd::noncopyable {
201218
size_t ClientProcessorThreadPoolMaxQueueSize();
202219
size_t SlowCmdThreadPoolCurQueueSize();
203220
size_t SlowCmdThreadPoolMaxQueueSize();
221+
222+
/*
223+
* Thread pool dynamic resize
224+
*/
225+
bool ResizeFastCmdThreadPool(size_t new_size);
226+
bool ResizeSlowCmdThreadPool(size_t new_size);
227+
void GetThreadPoolInfo(std::string* info);
204228

205229
/*
206230
* BGSave used
@@ -502,6 +526,22 @@ class PikaServer : public pstd::noncopyable {
502526
void ProcessCronTask();
503527
double HitRatio();
504528
void SetLogNetActivities(bool value);
529+
530+
/*
531+
* threadpool borrow
532+
*/
533+
bool IsSlowPoolBusy();
534+
bool IsSlowPoolIdle();
535+
bool IsFastPoolBusy();
536+
bool IsFastPoolIdle();
537+
TaskPoolType DecidePoolType(bool is_slow_cmd);
538+
std::string GetEnhancedThreadPoolMetrics();
539+
void ResetThreadPoolMetrics();
540+
void UpdateQueueWaitStats(TaskPoolType pool_type, uint64_t queue_wait_us);
541+
void LoadThreadPoolConfig();
542+
ThreadPoolMetrics* GetSlowPoolMetrics() { return slow_pool_metrics_.get(); }
543+
ThreadPoolMetrics* GetFastPoolMetrics() { return fast_pool_metrics_.get(); }
544+
505545
/*
506546
* disable compact
507547
*/
@@ -540,6 +580,7 @@ class PikaServer : public pstd::noncopyable {
540580
void AutoDeleteExpiredDump();
541581
void AutoUpdateNetworkMetric();
542582
void PrintThreadPoolQueueStatus();
583+
void AutoUpdateIdlePoolEMA();
543584
void StatDiskUsage();
544585
int64_t GetLastSaveTime(const std::string& dump_dir);
545586

@@ -662,7 +703,50 @@ class PikaServer : public pstd::noncopyable {
662703
* lastsave used
663704
*/
664705
int64_t lastsave_ = 0;
706+
707+
/*
708+
* metrics used
709+
*/
710+
std::unique_ptr<ThreadPoolMetrics> fast_pool_metrics_;
711+
std::unique_ptr<ThreadPoolMetrics> slow_pool_metrics_;
712+
713+
/*
714+
* EMA statistics for queue wait time (used together with queue percent)
715+
*/
716+
struct PoolLatencyStats {
717+
std::atomic<uint64_t> ema_queue_wait_us_{0}; // EMA queue wait time (us)
718+
std::atomic<uint64_t> last_update_us_{0}; // Last update time (us)
719+
};
665720

721+
PoolLatencyStats fast_pool_stats_;
722+
PoolLatencyStats slow_pool_stats_;
723+
724+
// EMA configuration parameters (atomic for dynamic updates)
725+
std::atomic<uint32_t> ema_alpha_numerator_{5};
726+
std::atomic<uint32_t> ema_alpha_denominator_{100};
727+
std::atomic<uint64_t> fast_busy_threshold_us_{2000};
728+
std::atomic<uint64_t> fast_idle_threshold_us_{500};
729+
std::atomic<uint64_t> slow_busy_threshold_us_{5000};
730+
std::atomic<uint64_t> slow_idle_threshold_us_{1000};
731+
732+
// Get EMA queue wait time (us)
733+
uint64_t GetEMAQueueWait(TaskPoolType pool_type) const;
734+
735+
// EMA decay helper functions
736+
uint64_t CalculateDecayedEMA(uint64_t old_ema, uint64_t last_update, uint64_t now);
737+
void UpdateSinglePoolEMA(PoolLatencyStats& stats, uint64_t new_sample, uint64_t now);
738+
void DecaySinglePool(PoolLatencyStats& stats, uint64_t now);
739+
740+
// Busy/Idle determination based on EMA (internal helpers)
741+
bool IsFastPoolBusyByEMA() const;
742+
bool IsFastPoolIdleByEMA() const;
743+
bool IsSlowPoolBusyByEMA() const;
744+
bool IsSlowPoolIdleByEMA() const;
745+
746+
// Borrow idle window
747+
std::atomic<uint64_t> threadpool_idle_window_us_{1000000};
748+
std::atomic<uint64_t> fast_pool_last_active_us_{0};
749+
std::atomic<uint64_t> slow_pool_last_active_us_{0};
666750
/*
667751
* acl
668752
*/

0 commit comments

Comments
 (0)