Skip to content

Commit 4cca7bf

Browse files
wangshao1wuxianrong
authored andcommitted
cherry-pick 3
1 parent 0d6550c commit 4cca7bf

12 files changed

Lines changed: 842 additions & 151 deletions

include/pika_client_conn.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ class PikaClientConn : public net::RedisConn {
130130
std::shared_ptr<Cmd> DoCmd(const PikaCmdArgsType& argv, const std::string& opt,
131131
const std::shared_ptr<std::string>& resp_ptr, bool cache_miss_in_rtc);
132132

133-
void ProcessSlowlog(const PikaCmdArgsType& argv, uint64_t do_duration);
133+
void ProcessSlowlog(const PikaCmdArgsType& argv, std::shared_ptr<Cmd> c_ptr);
134134
void ProcessMonitor(const PikaCmdArgsType& argv);
135135

136136
void ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr<std::string>& resp_ptr, bool cache_miss_in_rtc);

include/pika_command.h

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,16 @@
1717
#include "net/include/net_conn.h"
1818
#include "net/include/redis_conn.h"
1919
#include "pstd/include/pstd_string.h"
20+
#include "pstd/include/stage_timer.h"
2021

2122
#include "net/src/dispatch_thread.h"
2223

24+
// Declare and set start time of the timer
25+
#define STAGE_TIMER_GUARD(metric, enabled) \
26+
pstd::StageTimer stage_timer_##metric( \
27+
&metric, enabled); \
28+
stage_timer_##metric.Start();
29+
2330
class SyncMasterDB;
2431
class SyncSlaveDB;
2532
class DB;
@@ -552,7 +559,7 @@ class Cmd : public std::enable_shared_from_this<Cmd> {
552559
bool IsNeedReadCache() const;
553560
bool IsNeedCacheDo() const;
554561
bool HashtagIsConsistent(const std::string& lhs, const std::string& rhs) const;
555-
uint64_t GetDoDuration() const { return do_duration_; };
562+
virtual std::string StagesDurationSummary(bool exclude_zero_value) const;
556563
std::shared_ptr<DB> GetDB() const { return db_; };
557564
uint32_t AclCategory() const;
558565
void AddAclCategory(uint32_t aclCategory);
@@ -607,7 +614,13 @@ class Cmd : public std::enable_shared_from_this<Cmd> {
607614
std::weak_ptr<net::NetConn> conn_;
608615
std::weak_ptr<std::string> resp_;
609616
CmdStage stage_ = kNone;
610-
uint64_t do_duration_ = 0;
617+
618+
uint64_t acquire_lock_duration_ms = 0;
619+
uint64_t command_duration_ms = 0;
620+
uint64_t binlog_duration_ms = 0;
621+
uint64_t storage_duration_ms = 0;
622+
uint64_t cache_duration_ms = 0;
623+
611624
uint32_t cmdId_ = 0;
612625
uint32_t aclCategory_ = 0;
613626
bool cache_missed_in_rtc_{false};

include/pika_conf.h

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -858,6 +858,30 @@ class PikaConf : public pstd::BaseConf {
858858
rsync_timeout_ms_.store(value);
859859
}
860860

861+
int RocksDBPerfLevel() const {
862+
return rocksdb_perf_level_.load();
863+
}
864+
865+
bool UpdateRocksDBPerfLevel(int perf_level) {
866+
if (perf_level >= 6 || perf_level < 0) {
867+
return false;
868+
}
869+
rocksdb_perf_level_.store(perf_level);
870+
return true;
871+
}
872+
873+
int RocksDBPerfPercent() const {
874+
return rocksdb_perf_percent_.load();
875+
}
876+
877+
bool UpdateRocksDBPerfPercent(int percent) {
878+
if (percent > 100 || percent < 0) {
879+
return false;
880+
}
881+
rocksdb_perf_percent_.store(percent);
882+
return true;
883+
}
884+
861885
void SetAclPubsubDefault(const std::string& value) {
862886
std::lock_guard l(rwlock_);
863887
TryPushDiffCommands("acl-pubsub-default", value);
@@ -1121,6 +1145,21 @@ class PikaConf : public pstd::BaseConf {
11211145
int max_rsync_parallel_num_ = kMaxRsyncParallelNum;
11221146
std::atomic_int64_t rsync_timeout_ms_ = 1000;
11231147

1148+
/*
1149+
kUninitialized = 0, // unknown setting
1150+
kDisable = 1, // disable perf stats
1151+
kEnableCount = 2, // enable only count stats
1152+
kEnableTimeExceptForMutex = 3, // Other than count stats, also enable time
1153+
// stats except for mutexes
1154+
// Other than time, also measure CPU time counters. Still don't measure
1155+
// time (neither wall time nor CPU time) for mutexes.
1156+
kEnableTimeAndCPUTimeExceptForMutex = 4,
1157+
kEnableTime = 5, // enable count and time stats
1158+
kOutOfBounds = 6 // N.B. Must always be the last value!
1159+
*/
1160+
std::atomic_int rocksdb_perf_level_ = 2;
1161+
std::atomic_int rocksdb_perf_percent_ = 10;
1162+
11241163
//Internal used metrics Persisted by pika.conf
11251164
std::unordered_set<std::string> internal_used_unfinished_full_sync_;
11261165

src/pika_admin.cc

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2198,6 +2198,18 @@ void ConfigCmd::ConfigGet(std::string& ret) {
21982198
EncodeNumber(&config_body, g_pika_conf->throttle_bytes_per_second());
21992199
}
22002200

2201+
if (pstd::stringmatch(pattern.data(), "rocksdb-perf-level", 1) != 0) {
2202+
elements += 2;
2203+
EncodeString(&config_body, "rocksdb-perf-level");
2204+
EncodeNumber(&config_body, g_pika_conf->RocksDBPerfLevel());
2205+
}
2206+
2207+
if (pstd::stringmatch(pattern.data(), "rocksdb-perf-percent", 1) != 0) {
2208+
elements += 2;
2209+
EncodeString(&config_body, "rocksdb-perf-percent");
2210+
EncodeNumber(&config_body, g_pika_conf->RocksDBPerfPercent());
2211+
}
2212+
22012213
if (pstd::stringmatch(pattern.data(), "max-rsync-parallel-num", 1) != 0) {
22022214
elements += 2;
22032215
EncodeString(&config_body, "max-rsync-parallel-num");
@@ -2864,6 +2876,32 @@ void ConfigCmd::ConfigSet(std::shared_ptr<DB> db) {
28642876
}
28652877
g_pika_conf->SetArenaBlockSize(static_cast<int>(ival));
28662878
res_.AppendStringRaw("+OK\r\n");
2879+
} else if (set_item == "rocksdb-perf-level") {
2880+
if ((pstd::string2int(value.data(), value.size(), &ival) == 0) || ival <= 0) {
2881+
res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'rocksdb-perf-level'\r\n");
2882+
return;
2883+
}
2884+
bool success = g_pika_conf->UpdateRocksDBPerfLevel(int(ival));
2885+
LOG(INFO) << "update rocksdb-perf-level to " << ival
2886+
<< (success ? " success" : " failed");
2887+
if (!success) {
2888+
res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'rocksdb-perf-level', should between 1 and 5\r\n");
2889+
return;
2890+
}
2891+
res_.AppendStringRaw("+OK\r\n");
2892+
} else if (set_item == "rocksdb-perf-percent") {
2893+
if ((pstd::string2int(value.data(), value.size(), &ival) == 0) || ival <= 0) {
2894+
res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'rocksdb-perf-percent'\r\n");
2895+
return;
2896+
}
2897+
bool success = g_pika_conf->UpdateRocksDBPerfPercent(int(ival));
2898+
LOG(INFO) << "update rocksdb-perf-percent to " << ival
2899+
<< (success ? " success" : " failed");
2900+
if (!success) {
2901+
res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'rocksdb-perf-percent', should between 0 and 100\r\n");
2902+
return;
2903+
}
2904+
res_.AppendStringRaw("+OK\r\n");
28672905
} else if (set_item == "throttle-bytes-per-second") {
28682906
if ((pstd::string2int(value.data(), value.size(), &ival) == 0) || ival <= 0) {
28692907
res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'throttle-bytes-per-second'\r\n");

src/pika_client_conn.cc

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@
2020
#include "net/src/worker_thread.h"
2121
#include "src/pstd/include/scope_record_lock.h"
2222

23+
#include "rocksdb/perf_context.h"
24+
#include "rocksdb/iostats_context.h"
25+
#include "util/random.h"
26+
2327
extern std::unique_ptr<PikaConf> g_pika_conf;
2428
extern PikaServer* g_pika_server;
2529
extern std::unique_ptr<PikaReplicaManager> g_pika_rm;
@@ -46,6 +50,7 @@ std::shared_ptr<Cmd> PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const st
4650
}
4751
return tmp_ptr;
4852
}
53+
4954
c_ptr->SetCacheMissedInRtc(cache_miss_in_rtc);
5055
c_ptr->SetConn(shared_from_this());
5156
c_ptr->SetResp(resp_ptr);
@@ -202,21 +207,32 @@ std::shared_ptr<Cmd> PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const st
202207
}
203208
}
204209

210+
211+
// set rocksdb perflevel based on RocksDBPerfLevel and RocksDBPerfPercent
212+
int rocksdb_perf_level = 2;
213+
if (rocksdb::Random::GetTLSInstance()->PercentTrue(g_pika_conf->RocksDBPerfPercent())) {
214+
rocksdb_perf_level = g_pika_conf->RocksDBPerfLevel();
215+
}
216+
rocksdb::SetPerfLevel(rocksdb::PerfLevel(rocksdb_perf_level));
217+
218+
// Perform some operations
219+
rocksdb::get_perf_context()->Reset();
205220
// Process Command
206221
c_ptr->Execute();
222+
207223
time_stat_->process_done_ts_ = pstd::NowMicros();
208224
auto cmdstat_map = g_pika_cmd_table_manager->GetCommandStatMap();
209225
(*cmdstat_map)[opt].cmd_count.fetch_add(1);
210226
(*cmdstat_map)[opt].cmd_time_consuming.fetch_add(time_stat_->total_time());
211227

212228
if (g_pika_conf->slowlog_slower_than() >= 0) {
213-
ProcessSlowlog(argv, c_ptr->GetDoDuration());
229+
ProcessSlowlog(argv, c_ptr);
214230
}
215231

216232
return c_ptr;
217233
}
218234

219-
void PikaClientConn::ProcessSlowlog(const PikaCmdArgsType& argv, uint64_t do_duration) {
235+
void PikaClientConn::ProcessSlowlog(const PikaCmdArgsType& argv, std::shared_ptr<Cmd> c_ptr) {
220236
if (time_stat_->total_time() > g_pika_conf->slowlog_slower_than()) {
221237
g_pika_server->SlowlogPushEntry(argv, time_stat_->start_ts() / 1000000, time_stat_->total_time());
222238
if (g_pika_conf->slowlog_write_errorlog()) {
@@ -241,7 +257,8 @@ void PikaClientConn::ProcessSlowlog(const PikaCmdArgsType& argv, uint64_t do_dur
241257
<< ", before_queue_time(ms): " << time_stat_->before_queue_time() / 1000
242258
<< ", queue_time(ms): " << time_stat_->queue_time() / 1000
243259
<< ", process_time(ms): " << time_stat_->process_time() / 1000
244-
<< ", cmd_time(ms): " << do_duration / 1000;
260+
<< ", " << c_ptr->StagesDurationSummary(true /*skip zero counter*/)
261+
<< ", " << rocksdb::get_perf_context()->ToString(true);
245262
}
246263
}
247264
}

src/pika_command.cc

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
// of patent rights can be found in the PATENTS file in the same directory.
55

66
#include <memory>
7+
#include <sstream>
78
#include <utility>
89

910
#include <glog/logging.h>
@@ -859,23 +860,22 @@ void Cmd::ProcessCommand(const HintKeys& hint_keys) {
859860
}
860861

861862
void Cmd::InternalProcessCommand(const HintKeys& hint_keys) {
863+
uint64_t start_us = pstd::NowMicros();
862864
pstd::lock::MultiRecordLock record_lock(db_->LockMgr());
863865
if (is_write()) {
864866
record_lock.Lock(current_key());
865867
}
866-
uint64_t start_us = 0;
867-
if (g_pika_conf->slowlog_slower_than() >= 0) {
868-
start_us = pstd::NowMicros();
869-
}
870868

871869
if (!IsSuspend()) {
872870
db_->DBLockShared();
873871
}
874872

873+
uint64_t before_do_command_us = pstd::NowMicros();
874+
this->acquire_lock_duration_ms = (before_do_command_us - start_us) / 1000;
875875
DoCommand(hint_keys);
876-
if (g_pika_conf->slowlog_slower_than() >= 0) {
877-
do_duration_ += pstd::NowMicros() - start_us;
878-
}
876+
877+
uint64_t before_do_binlog_us = pstd::NowMicros();
878+
this->command_duration_ms = (before_do_binlog_us - before_do_command_us) / 1000;
879879
DoBinlog();
880880

881881
if (!IsSuspend()) {
@@ -884,6 +884,9 @@ void Cmd::InternalProcessCommand(const HintKeys& hint_keys) {
884884
if (is_write()) {
885885
record_lock.Unlock(current_key());
886886
}
887+
888+
uint64_t end_us = pstd::NowMicros();
889+
this->binlog_duration_ms = (end_us - before_do_binlog_us) / 1000;
887890
}
888891

889892
void Cmd::DoCommand(const HintKeys& hint_keys) {
@@ -968,6 +971,23 @@ void Cmd::DoBinlog() {
968971
}
969972
}
970973

974+
#define PIKA_STAGE_DURATION_OUTPUT(duration) \
975+
if (!exclude_zero_value || duration > 0) { \
976+
ss << #duration << " = " << duration << ", "; \
977+
}
978+
979+
std::string Cmd::StagesDurationSummary(bool exclude_zero_value) const {
980+
std::ostringstream ss;
981+
PIKA_STAGE_DURATION_OUTPUT(acquire_lock_duration_ms);
982+
PIKA_STAGE_DURATION_OUTPUT(command_duration_ms);
983+
PIKA_STAGE_DURATION_OUTPUT(binlog_duration_ms);
984+
PIKA_STAGE_DURATION_OUTPUT(storage_duration_ms);
985+
PIKA_STAGE_DURATION_OUTPUT(cache_duration_ms);
986+
std::string str = ss.str();
987+
str.erase(str.find_last_not_of(", ") + 1);
988+
return str;
989+
}
990+
971991
bool Cmd::hasFlag(uint32_t flag) const { return (flag_ & flag); }
972992
bool Cmd::is_read() const { return (flag_ & kCmdFlagsRead); }
973993
bool Cmd::is_write() const { return (flag_ & kCmdFlagsWrite); }

0 commit comments

Comments
 (0)