Skip to content

Commit 3ad1fc0

Browse files
committed
fast-slow-separate
1 parent f030ae5 commit 3ad1fc0

4 files changed

Lines changed: 233 additions & 57 deletions

File tree

include/pika_define.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -422,6 +422,7 @@ const int64_t CACHE_LOAD_NUM_ONE_TIME = 256;
422422
*/
423423
enum TaskPoolType {
424424
kFastCmdPool,
425-
kSlowCmdPool
425+
kSlowCmdPool,
426+
kAdminCmdPool
426427
};
427428
#endif

include/pika_server.h

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ void DoBgslotsreload(void* arg);
8484
// threadpool metrics
8585
struct ThreadPoolMetrics {
8686
std::atomic<uint64_t> tasks_scheduled{0};
87-
std::atomic<uint64_t> tasks_completed{0};
87+
std::atomic<uint64_t> active_tasks{0};
8888
std::atomic<uint64_t> borrow_attempts{0};
8989

9090
// latency(1ms, 5ms, 10ms, 50ms, 100ms, 500ms, 1s, 5s, >5s)
@@ -567,6 +567,7 @@ class PikaServer : public pstd::noncopyable {
567567
void AutoDeleteExpiredDump();
568568
void AutoUpdateNetworkMetric();
569569
void PrintThreadPoolQueueStatus();
570+
void AutoUpdateIdlePoolEMA();
570571
void StatDiskUsage();
571572
int64_t GetLastSaveTime(const std::string& dump_dir);
572573

@@ -708,22 +709,31 @@ class PikaServer : public pstd::noncopyable {
708709
PoolLatencyStats slow_pool_stats_;
709710

710711
// EMA configuration parameters (atomic for dynamic updates)
711-
std::atomic<uint32_t> ema_alpha_numerator_{5}; // EMA alpha numerator (alpha = 5/100 = 0.05)
712-
std::atomic<uint32_t> ema_alpha_denominator_{100}; // EMA alpha denominator
713-
std::atomic<uint64_t> fast_busy_threshold_us_{2000}; // Fast pool busy threshold (2ms)
714-
std::atomic<uint64_t> fast_idle_threshold_us_{500}; // Fast pool idle threshold (0.5ms)
715-
std::atomic<uint64_t> slow_busy_threshold_us_{5000}; // Slow pool busy threshold (5ms)
716-
std::atomic<uint64_t> slow_idle_threshold_us_{1000}; // Slow pool idle threshold (1ms)
712+
std::atomic<uint32_t> ema_alpha_numerator_{5};
713+
std::atomic<uint32_t> ema_alpha_denominator_{100};
714+
std::atomic<uint64_t> fast_busy_threshold_us_{2000};
715+
std::atomic<uint64_t> fast_idle_threshold_us_{500};
716+
std::atomic<uint64_t> slow_busy_threshold_us_{5000};
717+
std::atomic<uint64_t> slow_idle_threshold_us_{1000};
717718

718719
// Get EMA queue wait time (us)
719720
uint64_t GetEMAQueueWait(TaskPoolType pool_type) const;
721+
722+
// EMA decay helper functions
723+
uint64_t CalculateDecayedEMA(uint64_t old_ema, uint64_t last_update, uint64_t now);
724+
void UpdateSinglePoolEMA(PoolLatencyStats& stats, uint64_t new_sample, uint64_t now);
725+
void DecaySinglePool(PoolLatencyStats& stats, uint64_t now);
720726

721727
// Busy/Idle determination based on EMA (internal helpers)
722728
bool IsFastPoolBusyByEMA() const;
723729
bool IsFastPoolIdleByEMA() const;
724730
bool IsSlowPoolBusyByEMA() const;
725731
bool IsSlowPoolIdleByEMA() const;
726732

733+
// Borrow idle window
734+
std::atomic<uint64_t> threadpool_idle_window_us_{1000000};
735+
std::atomic<uint64_t> fast_pool_last_active_us_{0};
736+
std::atomic<uint64_t> slow_pool_last_active_us_{0};
727737
/*
728738
* acl
729739
*/

src/pika_client_conn.cc

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,6 @@ std::shared_ptr<Cmd> PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const st
233233
: g_pika_server->GetFastPoolMetrics();
234234
if (metrics) {
235235
metrics->RecordLatency(duration_us);
236-
metrics->tasks_completed.fetch_add(1, std::memory_order_relaxed);
237236
}
238237
if (g_pika_conf->slowlog_slower_than() >= 0) {
239238
ProcessSlowlog(argv, c_ptr, pipeline_idx, pipeline_total);
@@ -351,15 +350,28 @@ void PikaClientConn::DoBackgroundTask(void* arg) {
351350
std::unique_ptr<BgTaskArg> bg_arg(static_cast<BgTaskArg*>(arg));
352351
std::shared_ptr<PikaClientConn> conn_ptr = bg_arg->conn_ptr;
353352
conn_ptr->task_pool_type_ = bg_arg->pool_type;
354-
353+
354+
ThreadPoolMetrics* metrics = nullptr;
355+
if (bg_arg->pool_type == TaskPoolType::kFastCmdPool) {
356+
metrics = g_pika_server->GetFastPoolMetrics();
357+
} else if (bg_arg->pool_type == TaskPoolType::kSlowCmdPool) {
358+
metrics = g_pika_server->GetSlowPoolMetrics();
359+
}
360+
361+
if (metrics) {
362+
metrics->active_tasks.fetch_add(1, std::memory_order_relaxed);
363+
}
364+
DEFER {
365+
if (metrics) {
366+
metrics->active_tasks.fetch_sub(1, std::memory_order_relaxed);
367+
}
368+
};
355369
// Record dequeue time and calculate queue wait time
356370
uint64_t now = pstd::NowMicros();
357371
conn_ptr->time_stat_->dequeue_ts_ = now;
358372
uint64_t queue_wait = now - conn_ptr->time_stat_->enqueue_ts_;
359-
360373
// Update EMA statistics for queue wait time
361374
g_pika_server->UpdateQueueWaitStats(bg_arg->pool_type, queue_wait);
362-
363375
if (conn_ptr->IsClose()) {
364376
LOG(INFO) << "conn " << conn_ptr->ip_port() << " has already been closed, skip processing command";
365377
return;

0 commit comments

Comments
 (0)