Skip to content

Commit c3797a7

Browse files
committed
fix_fast
1 parent 2ac82cb commit c3797a7

4 files changed

Lines changed: 182 additions & 10 deletions

File tree

include/pika_admin.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,7 @@ class InfoCmd : public Cmd {
295295
const static std::string kDebugSection;
296296
const static std::string kCommandStatsSection;
297297
const static std::string kCacheSection;
298+
const static std::string kThreadpoolSection;
298299

299300
void DoInitial() override;
300301
void Clear() override {

include/pika_server.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,13 @@ class PikaServer : public pstd::noncopyable {
194194
size_t ClientProcessorThreadPoolMaxQueueSize();
195195
size_t SlowCmdThreadPoolCurQueueSize();
196196
size_t SlowCmdThreadPoolMaxQueueSize();
197+
198+
/*
199+
* Thread pool dynamic resize
200+
*/
201+
bool ResizeFastCmdThreadPool(size_t new_size);
202+
bool ResizeSlowCmdThreadPool(size_t new_size);
203+
void GetThreadPoolInfo(std::string* info);
197204

198205
/*
199206
* BGSave used

src/pika_admin.cc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -884,6 +884,7 @@ const std::string InfoCmd::kRocksDBSection = "rocksdb";
884884
const std::string InfoCmd::kDebugSection = "debug";
885885
const std::string InfoCmd::kCommandStatsSection = "commandstats";
886886
const std::string InfoCmd::kCacheSection = "cache";
887+
const std::string InfoCmd::kThreadpoolSection = "threadpool";
887888

888889

889890
const std::string ClientCmd::KILLTYPE_NORMAL = "normal";
@@ -969,6 +970,8 @@ void InfoCmd::DoInitial() {
969970
info_section_ = kInfoCommandStats;
970971
} else if (strcasecmp(argv_[1].data(), kCacheSection.data()) == 0) {
971972
info_section_ = kInfoCache;
973+
} else if (strcasecmp(argv_[1].data(), kThreadpoolSection.data()) == 0) {
974+
info_section_ = kInfoThreadpool;
972975
} else {
973976
info_section_ = kInfoErr;
974977
}
@@ -1010,6 +1013,8 @@ void InfoCmd::Do() {
10101013
info.append("\r\n");
10111014
InfoCache(info, db_);
10121015
info.append("\r\n");
1016+
g_pika_server->GetThreadPoolInfo(&info);
1017+
info.append("\r\n");
10131018
InfoCPU(info);
10141019
info.append("\r\n");
10151020
InfoReplication(info);
@@ -1054,6 +1059,9 @@ void InfoCmd::Do() {
10541059
case kInfoCache:
10551060
InfoCache(info, db_);
10561061
break;
1062+
case kInfoThreadpool:
1063+
g_pika_server->GetThreadPoolInfo(&info);
1064+
break;
10571065
default:
10581066
// kInfoErr is nothing
10591067
break;

src/pika_server.cc

Lines changed: 166 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -790,6 +790,34 @@ void PikaServer::ScheduleClientPool(net::TaskFunc func, void* arg, bool is_slow_
790790
return;
791791
}
792792

793+
// Extract key from command to determine key affinity
794+
// This ensures commands on the same key always go to the same thread pool
795+
auto bg_arg = static_cast<PikaClientConn::BgTaskArg*>(arg);
796+
bool key_affinity_to_slow = false;
797+
bool has_key = false;
798+
799+
if (bg_arg && !bg_arg->redis_cmds.empty() && !bg_arg->redis_cmds[0].empty()) {
800+
const auto& cmd_args = bg_arg->redis_cmds[0];
801+
std::string cmd_name = cmd_args[0];
802+
pstd::StringToLower(cmd_name);
803+
804+
// Extract key based on command type
805+
// Most commands have key as the first argument after command name
806+
std::string key;
807+
if (cmd_args.size() > 1) {
808+
// For most commands, key is argv[1]
809+
// For special commands like MGET, MSET, we use the first key
810+
key = cmd_args[1];
811+
has_key = true;
812+
813+
// Use simple hash to determine key affinity
814+
// Same key will always have same affinity (fast or slow pool)
815+
std::hash<std::string> hasher;
816+
size_t hash_value = hasher(key);
817+
key_affinity_to_slow = (hash_value % 2 == 1);
818+
}
819+
}
820+
793821
// Borrowing is enabled, check queue status and decide
794822
size_t slow_queue_size = SlowCmdThreadPoolCurQueueSize();
795823
size_t slow_max_queue = SlowCmdThreadPoolMaxQueueSize();
@@ -805,26 +833,45 @@ void PikaServer::ScheduleClientPool(net::TaskFunc func, void* arg, bool is_slow_
805833
size_t slow_idle_threshold = slow_max_queue * idle_threshold_percent / 100;
806834
size_t fast_idle_threshold = fast_max_queue * idle_threshold_percent / 100;
807835

836+
// Decision logic considering both command type and key affinity
808837
if (is_slow_cmd) {
809838
// This is a slow command
810-
if (slow_queue_size >= slow_borrow_threshold && fast_queue_size <= fast_idle_threshold) {
811-
// Slow pool is busy and fast pool is idle, borrow from fast pool
839+
if (has_key && !key_affinity_to_slow) {
840+
// Key affinity is to fast pool, must use fast pool to maintain order
812841
pika_client_processor_->SchedulePool(func, arg);
813-
LOG(INFO) << "Slow cmd borrows fast pool (slow_queue: " << slow_queue_size
814-
<< "/" << slow_max_queue << ", fast_queue: " << fast_queue_size
815-
<< "/" << fast_max_queue << ")";
842+
} else if (slow_queue_size >= slow_borrow_threshold && fast_queue_size <= fast_idle_threshold) {
843+
// Slow pool is busy and fast pool is idle
844+
if (has_key && key_affinity_to_slow) {
845+
// Key belongs to slow pool, cannot borrow
846+
pika_slow_cmd_thread_pool_->Schedule(func, arg);
847+
} else {
848+
// Can borrow from fast pool
849+
pika_client_processor_->SchedulePool(func, arg);
850+
LOG(INFO) << "Slow cmd borrows fast pool (slow_queue: " << slow_queue_size
851+
<< "/" << slow_max_queue << ", fast_queue: " << fast_queue_size
852+
<< "/" << fast_max_queue << ")";
853+
}
816854
} else {
817855
// Normal case: use slow pool
818856
pika_slow_cmd_thread_pool_->Schedule(func, arg);
819857
}
820858
} else {
821859
// This is a fast command
822-
if (fast_queue_size >= fast_borrow_threshold && slow_queue_size <= slow_idle_threshold) {
823-
// Fast pool is busy and slow pool is idle, borrow from slow pool
860+
if (has_key && key_affinity_to_slow) {
861+
// Key affinity is to slow pool, must use slow pool to maintain order
824862
pika_slow_cmd_thread_pool_->Schedule(func, arg);
825-
LOG(INFO) << "Fast cmd borrows slow pool (fast_queue: " << fast_queue_size
826-
<< "/" << fast_max_queue << ", slow_queue: " << slow_queue_size
827-
<< "/" << slow_max_queue << ")";
863+
} else if (fast_queue_size >= fast_borrow_threshold && slow_queue_size <= slow_idle_threshold) {
864+
// Fast pool is busy and slow pool is idle
865+
if (has_key && !key_affinity_to_slow) {
866+
// Key belongs to fast pool, cannot borrow
867+
pika_client_processor_->SchedulePool(func, arg);
868+
} else {
869+
// Can borrow from slow pool
870+
pika_slow_cmd_thread_pool_->Schedule(func, arg);
871+
LOG(INFO) << "Fast cmd borrows slow pool (fast_queue: " << fast_queue_size
872+
<< "/" << fast_max_queue << ", slow_queue: " << slow_queue_size
873+
<< "/" << slow_max_queue << ")";
874+
}
828875
} else {
829876
// Normal case: use fast pool
830877
pika_client_processor_->SchedulePool(func, arg);
@@ -862,6 +909,115 @@ size_t PikaServer::SlowCmdThreadPoolMaxQueueSize() {
862909
return pika_slow_cmd_thread_pool_->max_queue_size();
863910
}
864911

912+
bool PikaServer::ResizeFastCmdThreadPool(size_t new_size) {
913+
if (new_size == 0 || new_size > 1024) {
914+
LOG(WARNING) << "Invalid fast cmd thread pool size: " << new_size << ", must be between 1 and 1024";
915+
return false;
916+
}
917+
918+
size_t old_size = g_pika_conf->thread_pool_size();
919+
if (new_size == old_size) {
920+
LOG(INFO) << "Fast cmd thread pool size unchanged: " << new_size;
921+
return true;
922+
}
923+
924+
LOG(INFO) << "Resizing fast cmd thread pool from " << old_size << " to " << new_size;
925+
926+
// Update config
927+
g_pika_conf->SetThreadPoolSize(static_cast<int>(new_size));
928+
929+
// Stop old thread pool gracefully
930+
LOG(INFO) << "Waiting for old fast cmd thread pool tasks to complete...";
931+
pika_client_processor_->Stop();
932+
933+
// Create and start new thread pool
934+
pika_client_processor_ = std::make_unique<PikaClientProcessor>(new_size, 100000);
935+
int ret = pika_client_processor_->Start();
936+
if (ret != net::kSuccess) {
937+
LOG(ERROR) << "Failed to start new fast cmd thread pool: " << ret;
938+
return false;
939+
}
940+
941+
LOG(INFO) << "Successfully resized fast cmd thread pool to " << new_size;
942+
return true;
943+
}
944+
945+
bool PikaServer::ResizeSlowCmdThreadPool(size_t new_size) {
946+
if (new_size == 0 || new_size > 1024) {
947+
LOG(WARNING) << "Invalid slow cmd thread pool size: " << new_size << ", must be between 1 and 1024";
948+
return false;
949+
}
950+
951+
size_t old_size = g_pika_conf->slow_cmd_thread_pool_size();
952+
if (new_size == old_size) {
953+
LOG(INFO) << "Slow cmd thread pool size unchanged: " << new_size;
954+
return true;
955+
}
956+
957+
LOG(INFO) << "Resizing slow cmd thread pool from " << old_size << " to " << new_size;
958+
959+
// Update config
960+
g_pika_conf->SetLowLevelThreadPoolSize(static_cast<int>(new_size));
961+
962+
// Stop old thread pool gracefully
963+
LOG(INFO) << "Waiting for old slow cmd thread pool tasks to complete...";
964+
while (SlowCmdThreadPoolCurQueueSize() != 0) {
965+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
966+
}
967+
pika_slow_cmd_thread_pool_->stop_thread_pool();
968+
969+
// Create and start new thread pool
970+
pika_slow_cmd_thread_pool_ = std::make_unique<net::ThreadPool>(new_size, 100000);
971+
int ret = pika_slow_cmd_thread_pool_->start_thread_pool();
972+
if (ret != net::kSuccess) {
973+
LOG(ERROR) << "Failed to start new slow cmd thread pool: " << ret;
974+
return false;
975+
}
976+
977+
LOG(INFO) << "Successfully resized slow cmd thread pool to " << new_size;
978+
return true;
979+
}
980+
981+
void PikaServer::GetThreadPoolInfo(std::string* info) {
982+
std::stringstream tmp_stream;
983+
984+
// Fast cmd thread pool info
985+
size_t fast_pool_size = g_pika_conf->thread_pool_size();
986+
size_t fast_queue_size = ClientProcessorThreadPoolCurQueueSize();
987+
size_t fast_max_queue = ClientProcessorThreadPoolMaxQueueSize();
988+
double fast_usage = fast_max_queue > 0 ? (fast_queue_size * 100.0 / fast_max_queue) : 0.0;
989+
990+
tmp_stream << "# Threadpool\r\n";
991+
tmp_stream << "fast_cmd_pool_size:" << fast_pool_size << "\r\n";
992+
tmp_stream << "fast_cmd_pool_queue_size:" << fast_queue_size << "\r\n";
993+
tmp_stream << "fast_cmd_pool_max_queue_size:" << fast_max_queue << "\r\n";
994+
tmp_stream << "fast_cmd_pool_usage:" << std::fixed << std::setprecision(2) << fast_usage << "%\r\n";
995+
996+
// Slow cmd thread pool info
997+
if (g_pika_conf->slow_cmd_pool()) {
998+
size_t slow_pool_size = g_pika_conf->slow_cmd_thread_pool_size();
999+
size_t slow_queue_size = SlowCmdThreadPoolCurQueueSize();
1000+
size_t slow_max_queue = SlowCmdThreadPoolMaxQueueSize();
1001+
double slow_usage = slow_max_queue > 0 ? (slow_queue_size * 100.0 / slow_max_queue) : 0.0;
1002+
1003+
tmp_stream << "slow_cmd_pool_size:" << slow_pool_size << "\r\n";
1004+
tmp_stream << "slow_cmd_pool_queue_size:" << slow_queue_size << "\r\n";
1005+
tmp_stream << "slow_cmd_pool_max_queue_size:" << slow_max_queue << "\r\n";
1006+
tmp_stream << "slow_cmd_pool_usage:" << std::fixed << std::setprecision(2) << slow_usage << "%\r\n";
1007+
} else {
1008+
tmp_stream << "slow_cmd_pool_size:0\r\n";
1009+
tmp_stream << "slow_cmd_pool_queue_size:0\r\n";
1010+
tmp_stream << "slow_cmd_pool_max_queue_size:0\r\n";
1011+
tmp_stream << "slow_cmd_pool_usage:0.00%\r\n";
1012+
}
1013+
1014+
// Admin cmd thread pool info
1015+
size_t admin_pool_size = g_pika_conf->admin_thread_pool_size();
1016+
tmp_stream << "admin_cmd_pool_size:" << admin_pool_size << "\r\n";
1017+
1018+
info->append(tmp_stream.str());
1019+
}
1020+
8651021
void PikaServer::BGSaveTaskSchedule(net::TaskFunc func, void* arg) {
8661022
bgsave_thread_.StartThread();
8671023
bgsave_thread_.Schedule(func, arg);

0 commit comments

Comments
 (0)