@@ -770,15 +770,66 @@ void PikaServer::SetFirstMetaSync(bool v) {
770770}
771771
772772void PikaServer::ScheduleClientPool (net::TaskFunc func, void * arg, bool is_slow_cmd, bool is_admin_cmd) {
773- if (is_slow_cmd && g_pika_conf->slow_cmd_pool ()) {
774- pika_slow_cmd_thread_pool_->Schedule (func, arg);
775- return ;
776- }
773+ // Admin commands always go to admin thread pool
777774 if (is_admin_cmd) {
778775 pika_admin_cmd_thread_pool_->Schedule (func, arg);
779776 return ;
780777 }
781- pika_client_processor_->SchedulePool (func, arg);
778+
779+ // Check if thread pool borrowing is enabled
780+ bool borrow_enable = g_pika_conf->threadpool_borrow_enable ();
781+ bool slow_cmd_pool_enable = g_pika_conf->slow_cmd_pool ();
782+
783+ if (!borrow_enable || !slow_cmd_pool_enable) {
784+ // Original logic: direct routing without borrowing
785+ if (is_slow_cmd && slow_cmd_pool_enable) {
786+ pika_slow_cmd_thread_pool_->Schedule (func, arg);
787+ return ;
788+ }
789+ pika_client_processor_->SchedulePool (func, arg);
790+ return ;
791+ }
792+
793+ // Borrowing is enabled, check queue status and decide
794+ size_t slow_queue_size = SlowCmdThreadPoolCurQueueSize ();
795+ size_t slow_max_queue = SlowCmdThreadPoolMaxQueueSize ();
796+ size_t fast_queue_size = ClientProcessorThreadPoolCurQueueSize ();
797+ size_t fast_max_queue = ClientProcessorThreadPoolMaxQueueSize ();
798+
799+ int borrow_threshold_percent = g_pika_conf->threadpool_borrow_threshold_percent ();
800+ int idle_threshold_percent = g_pika_conf->threadpool_idle_threshold_percent ();
801+
802+ // Calculate thresholds
803+ size_t slow_borrow_threshold = slow_max_queue * borrow_threshold_percent / 100 ;
804+ size_t fast_borrow_threshold = fast_max_queue * borrow_threshold_percent / 100 ;
805+ size_t slow_idle_threshold = slow_max_queue * idle_threshold_percent / 100 ;
806+ size_t fast_idle_threshold = fast_max_queue * idle_threshold_percent / 100 ;
807+
808+ if (is_slow_cmd) {
809+ // This is a slow command
810+ if (slow_queue_size >= slow_borrow_threshold && fast_queue_size <= fast_idle_threshold) {
811+ // Slow pool is busy and fast pool is idle, borrow from fast pool
812+ pika_client_processor_->SchedulePool (func, arg);
813+ LOG (INFO) << " Slow cmd borrows fast pool (slow_queue: " << slow_queue_size
814+ << " /" << slow_max_queue << " , fast_queue: " << fast_queue_size
815+ << " /" << fast_max_queue << " )" ;
816+ } else {
817+ // Normal case: use slow pool
818+ pika_slow_cmd_thread_pool_->Schedule (func, arg);
819+ }
820+ } else {
821+ // This is a fast command
822+ if (fast_queue_size >= fast_borrow_threshold && slow_queue_size <= slow_idle_threshold) {
823+ // Fast pool is busy and slow pool is idle, borrow from slow pool
824+ pika_slow_cmd_thread_pool_->Schedule (func, arg);
825+ LOG (INFO) << " Fast cmd borrows slow pool (fast_queue: " << fast_queue_size
826+ << " /" << fast_max_queue << " , slow_queue: " << slow_queue_size
827+ << " /" << slow_max_queue << " )" ;
828+ } else {
829+ // Normal case: use fast pool
830+ pika_client_processor_->SchedulePool (func, arg);
831+ }
832+ }
782833}
783834
784835size_t PikaServer::ClientProcessorThreadPoolCurQueueSize () {
0 commit comments