diff --git a/include/pika_client_conn.h b/include/pika_client_conn.h index bc4c28db6a..27852e1bac 100644 --- a/include/pika_client_conn.h +++ b/include/pika_client_conn.h @@ -128,12 +128,13 @@ class PikaClientConn : public net::RedisConn { std::shared_ptr user_; std::shared_ptr DoCmd(const PikaCmdArgsType& argv, const std::string& opt, - const std::shared_ptr& resp_ptr, bool cache_miss_in_rtc); + const std::shared_ptr& resp_ptr, bool cache_miss_in_rtc, + int pipeline_idx, int pipeline_total); - void ProcessSlowlog(const PikaCmdArgsType& argv, std::shared_ptr c_ptr); + void ProcessSlowlog(const PikaCmdArgsType& argv, std::shared_ptr c_ptr, int pipeline_idx, int pipeline_total); void ProcessMonitor(const PikaCmdArgsType& argv); - void ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr& resp_ptr, bool cache_miss_in_rtc); + void ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr& resp_ptr, bool cache_miss_in_rtc, int pipeline_idx, int pipeline_total); void TryWriteResp(); }; diff --git a/src/pika_client_conn.cc b/src/pika_client_conn.cc index a6cd5ec62f..bb21f2ebfb 100644 --- a/src/pika_client_conn.cc +++ b/src/pika_client_conn.cc @@ -39,7 +39,8 @@ PikaClientConn::PikaClientConn(int fd, const std::string& ip_port, net::Thread* } std::shared_ptr PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const std::string& opt, - const std::shared_ptr& resp_ptr, bool cache_miss_in_rtc) { + const std::shared_ptr& resp_ptr, bool cache_miss_in_rtc, + int pipeline_idx, int pipeline_total) { // Get command info std::shared_ptr c_ptr = g_pika_cmd_table_manager->GetCmd(opt); if (!c_ptr) { @@ -226,13 +227,14 @@ std::shared_ptr PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const st (*cmdstat_map)[opt].cmd_time_consuming.fetch_add(time_stat_->total_time()); if (g_pika_conf->slowlog_slower_than() >= 0) { - ProcessSlowlog(argv, c_ptr); + ProcessSlowlog(argv, c_ptr, pipeline_idx, pipeline_total); } return c_ptr; } -void PikaClientConn::ProcessSlowlog(const PikaCmdArgsType& argv, std::shared_ptr c_ptr) { +void PikaClientConn::ProcessSlowlog(const PikaCmdArgsType& argv, std::shared_ptr c_ptr, + int pipeline_idx, int pipeline_total) { if (time_stat_->total_time() > g_pika_conf->slowlog_slower_than()) { g_pika_server->SlowlogPushEntry(argv, time_stat_->start_ts() / 1000000, time_stat_->total_time()); if (g_pika_conf->slowlog_write_errorlog()) { @@ -257,6 +259,7 @@ void PikaClientConn::ProcessSlowlog(const PikaCmdArgsType& argv, std::shared_ptr << ", before_queue_time(ms): " << time_stat_->before_queue_time() / 1000 << ", queue_time(ms): " << time_stat_->queue_time() / 1000 << ", process_time(ms): " << time_stat_->process_time() / 1000 + << ", pipeline: " << pipeline_idx << "/" << pipeline_total << ", " << c_ptr->StagesDurationSummary(true /*skip zero counter*/) << ", " << rocksdb::get_perf_context()->ToString(true); } @@ -358,10 +361,11 @@ void PikaClientConn::DoBackgroundTask(void* arg) { void PikaClientConn::BatchExecRedisCmd(const std::vector& argvs, bool cache_miss_in_rtc) { resp_num.store(static_cast(argvs.size())); - for (const auto& argv : argvs) { + for (auto idx = 0; idx < argvs.size(); idx++) { + const auto& argv = argvs[idx]; std::shared_ptr resp_ptr = std::make_shared(); resp_array.push_back(resp_ptr); - ExecRedisCmd(argv, resp_ptr, cache_miss_in_rtc); + ExecRedisCmd(argv, resp_ptr, cache_miss_in_rtc, idx + 1, argvs.size()); } time_stat_->process_done_ts_ = pstd::NowMicros(); TryWriteResp(); @@ -541,7 +545,7 @@ void PikaClientConn::ExitTxn() { } void PikaClientConn::ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr& resp_ptr, - bool cache_miss_in_rtc) { + bool cache_miss_in_rtc, int pipeline_idx, int pipeline_total) { // get opt std::string opt = argv[0]; pstd::StringToLower(opt); @@ -552,7 +556,7 @@ void PikaClientConn::ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr cmd_ptr = DoCmd(argv, opt, resp_ptr, cache_miss_in_rtc); + std::shared_ptr cmd_ptr = DoCmd(argv, opt, resp_ptr, cache_miss_in_rtc, pipeline_idx, pipeline_total); *resp_ptr = std::move(cmd_ptr->res().message()); resp_num--; }