@@ -39,7 +39,8 @@ PikaClientConn::PikaClientConn(int fd, const std::string& ip_port, net::Thread*
3939}
4040
4141std::shared_ptr<Cmd> PikaClientConn::DoCmd (const PikaCmdArgsType& argv, const std::string& opt,
42- const std::shared_ptr<std::string>& resp_ptr, bool cache_miss_in_rtc) {
42+ const std::shared_ptr<std::string>& resp_ptr, bool cache_miss_in_rtc,
43+ int pipeline_idx, int pipeline_total) {
4344 // Get command info
4445 std::shared_ptr<Cmd> c_ptr = g_pika_cmd_table_manager->GetCmd (opt);
4546 if (!c_ptr) {
@@ -226,13 +227,14 @@ std::shared_ptr<Cmd> PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const st
226227 (*cmdstat_map)[opt].cmd_time_consuming .fetch_add (time_stat_->total_time ());
227228
228229 if (g_pika_conf->slowlog_slower_than () >= 0 ) {
229- ProcessSlowlog (argv, c_ptr);
230+ ProcessSlowlog (argv, c_ptr, pipeline_idx, pipeline_total );
230231 }
231232
232233 return c_ptr;
233234}
234235
235- void PikaClientConn::ProcessSlowlog (const PikaCmdArgsType& argv, std::shared_ptr<Cmd> c_ptr) {
236+ void PikaClientConn::ProcessSlowlog (const PikaCmdArgsType& argv, std::shared_ptr<Cmd> c_ptr,
237+ int pipeline_idx, int pipeline_total) {
236238 if (time_stat_->total_time () > g_pika_conf->slowlog_slower_than ()) {
237239 g_pika_server->SlowlogPushEntry (argv, time_stat_->start_ts () / 1000000 , time_stat_->total_time ());
238240 if (g_pika_conf->slowlog_write_errorlog ()) {
@@ -257,6 +259,7 @@ void PikaClientConn::ProcessSlowlog(const PikaCmdArgsType& argv, std::shared_ptr
257259 << " , before_queue_time(ms): " << time_stat_->before_queue_time () / 1000
258260 << " , queue_time(ms): " << time_stat_->queue_time () / 1000
259261 << " , process_time(ms): " << time_stat_->process_time () / 1000
262+ << " , pipeline: " << pipeline_idx << " /" << pipeline_total
260263 << " , " << c_ptr->StagesDurationSummary (true /* skip zero counter*/ )
261264 << " , " << rocksdb::get_perf_context ()->ToString (true );
262265 }
@@ -358,10 +361,11 @@ void PikaClientConn::DoBackgroundTask(void* arg) {
358361
359362void PikaClientConn::BatchExecRedisCmd (const std::vector<net::RedisCmdArgsType>& argvs, bool cache_miss_in_rtc) {
360363 resp_num.store (static_cast <int32_t >(argvs.size ()));
361- for (const auto & argv : argvs) {
364+ for (auto idx = 0 ; idx < argvs.size (); idx++) {
365+ const auto & argv = argvs[idx];
362366 std::shared_ptr<std::string> resp_ptr = std::make_shared<std::string>();
363367 resp_array.push_back (resp_ptr);
364- ExecRedisCmd (argv, resp_ptr, cache_miss_in_rtc);
368+ ExecRedisCmd (argv, resp_ptr, cache_miss_in_rtc, idx + 1 , argvs. size () );
365369 }
366370 time_stat_->process_done_ts_ = pstd::NowMicros ();
367371 TryWriteResp ();
@@ -541,7 +545,7 @@ void PikaClientConn::ExitTxn() {
541545}
542546
543547void PikaClientConn::ExecRedisCmd (const PikaCmdArgsType& argv, std::shared_ptr<std::string>& resp_ptr,
544- bool cache_miss_in_rtc) {
548+ bool cache_miss_in_rtc, int pipeline_idx, int pipeline_total ) {
545549 // get opt
546550 std::string opt = argv[0 ];
547551 pstd::StringToLower (opt);
@@ -552,7 +556,7 @@ void PikaClientConn::ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr<s
552556 }
553557 }
554558
555- std::shared_ptr<Cmd> cmd_ptr = DoCmd (argv, opt, resp_ptr, cache_miss_in_rtc);
559+ std::shared_ptr<Cmd> cmd_ptr = DoCmd (argv, opt, resp_ptr, cache_miss_in_rtc, pipeline_idx, pipeline_total );
556560 *resp_ptr = std::move (cmd_ptr->res ().message ());
557561 resp_num--;
558562}
0 commit comments