Skip to content

Commit 6c271c7

Browse files
wangshao1wangshaoyi
andauthored
summary different stages duration && add rocksdb perfcontext (OpenAtomFoundation#3036)
Co-authored-by: wangshaoyi <[email protected]>
1 parent c467b47 commit 6c271c7

12 files changed

Lines changed: 452 additions & 14 deletions

include/pika_client_conn.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ class PikaClientConn : public net::RedisConn {
130130
std::shared_ptr<Cmd> DoCmd(const PikaCmdArgsType& argv, const std::string& opt,
131131
const std::shared_ptr<std::string>& resp_ptr, bool cache_miss_in_rtc);
132132

133-
void ProcessSlowlog(const PikaCmdArgsType& argv, uint64_t do_duration);
133+
void ProcessSlowlog(const PikaCmdArgsType& argv, std::shared_ptr<Cmd> c_ptr);
134134
void ProcessMonitor(const PikaCmdArgsType& argv);
135135

136136
void ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr<std::string>& resp_ptr, bool cache_miss_in_rtc);

include/pika_command.h

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,16 @@
1717
#include "net/include/net_conn.h"
1818
#include "net/include/redis_conn.h"
1919
#include "pstd/include/pstd_string.h"
20+
#include "pstd/include/stage_timer.h"
2021

2122
#include "net/src/dispatch_thread.h"
2223

24+
// Declare and set start time of the timer
25+
#define STAGE_TIMER_GUARD(metric, enabled) \
26+
pstd::StageTimer stage_timer_##metric( \
27+
&metric, enabled); \
28+
stage_timer_##metric.Start();
29+
2330
class SyncMasterDB;
2431
class SyncSlaveDB;
2532
class DB;
@@ -544,7 +551,7 @@ class Cmd : public std::enable_shared_from_this<Cmd> {
544551
bool IsNeedReadCache() const;
545552
bool IsNeedCacheDo() const;
546553
bool HashtagIsConsistent(const std::string& lhs, const std::string& rhs) const;
547-
uint64_t GetDoDuration() const { return do_duration_; };
554+
virtual std::string StagesDurationSummary(bool exclude_zero_value) const;
548555
std::shared_ptr<DB> GetDB() const { return db_; };
549556
uint32_t AclCategory() const;
550557
void AddAclCategory(uint32_t aclCategory);
@@ -599,7 +606,13 @@ class Cmd : public std::enable_shared_from_this<Cmd> {
599606
std::weak_ptr<net::NetConn> conn_;
600607
std::weak_ptr<std::string> resp_;
601608
CmdStage stage_ = kNone;
602-
uint64_t do_duration_ = 0;
609+
610+
uint64_t acquire_lock_duration_ms = 0;
611+
uint64_t command_duration_ms = 0;
612+
uint64_t binlog_duration_ms = 0;
613+
uint64_t storage_duration_ms = 0;
614+
uint64_t cache_duration_ms = 0;
615+
603616
uint32_t cmdId_ = 0;
604617
uint32_t aclCategory_ = 0;
605618
bool cache_missed_in_rtc_{false};

include/pika_conf.h

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -744,6 +744,30 @@ class PikaConf : public pstd::BaseConf {
744744
rsync_timeout_ms_.store(value);
745745
}
746746

747+
int RocksDBPerfLevel() const {
748+
return rocksdb_perf_level_.load();
749+
}
750+
751+
bool UpdateRocksDBPerfLevel(int perf_level) {
752+
if (perf_level >= 6 || perf_level < 0) {
753+
return false;
754+
}
755+
rocksdb_perf_level_.store(perf_level);
756+
return true;
757+
}
758+
759+
int RocksDBPerfPercent() const {
760+
return rocksdb_perf_percent_.load();
761+
}
762+
763+
bool UpdateRocksDBPerfPercent(int percent) {
764+
if (percent > 100 || percent < 0) {
765+
return false;
766+
}
767+
rocksdb_perf_percent_.store(percent);
768+
return true;
769+
}
770+
747771
void SetAclPubsubDefault(const std::string& value) {
748772
std::lock_guard l(rwlock_);
749773
TryPushDiffCommands("acl-pubsub-default", value);
@@ -984,6 +1008,21 @@ class PikaConf : public pstd::BaseConf {
9841008
int max_rsync_parallel_num_ = kMaxRsyncParallelNum;
9851009
std::atomic_int64_t rsync_timeout_ms_ = 1000;
9861010

1011+
/*
1012+
kUninitialized = 0, // unknown setting
1013+
kDisable = 1, // disable perf stats
1014+
kEnableCount = 2, // enable only count stats
1015+
kEnableTimeExceptForMutex = 3, // Other than count stats, also enable time
1016+
// stats except for mutexes
1017+
// Other than time, also measure CPU time counters. Still don't measure
1018+
// time (neither wall time nor CPU time) for mutexes.
1019+
kEnableTimeAndCPUTimeExceptForMutex = 4,
1020+
kEnableTime = 5, // enable count and time stats
1021+
kOutOfBounds = 6 // N.B. Must always be the last value!
1022+
*/
1023+
std::atomic_int rocksdb_perf_level_ = 2;
1024+
std::atomic_int rocksdb_perf_percent_ = 10;
1025+
9871026
//Internal used metrics Persisted by pika.conf
9881027
std::unordered_set<std::string> internal_used_unfinished_full_sync_;
9891028
};

src/pika_admin.cc

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2196,6 +2196,18 @@ void ConfigCmd::ConfigGet(std::string& ret) {
21962196
EncodeNumber(&config_body, g_pika_conf->throttle_bytes_per_second());
21972197
}
21982198

2199+
if (pstd::stringmatch(pattern.data(), "rocksdb-perf-level", 1) != 0) {
2200+
elements += 2;
2201+
EncodeString(&config_body, "rocksdb-perf-level");
2202+
EncodeNumber(&config_body, g_pika_conf->RocksDBPerfLevel());
2203+
}
2204+
2205+
if (pstd::stringmatch(pattern.data(), "rocksdb-perf-percent", 1) != 0) {
2206+
elements += 2;
2207+
EncodeString(&config_body, "rocksdb-perf-percent");
2208+
EncodeNumber(&config_body, g_pika_conf->RocksDBPerfPercent());
2209+
}
2210+
21992211
if (pstd::stringmatch(pattern.data(), "max-rsync-parallel-num", 1) != 0) {
22002212
elements += 2;
22012213
EncodeString(&config_body, "max-rsync-parallel-num");
@@ -2740,6 +2752,32 @@ void ConfigCmd::ConfigSet(std::shared_ptr<DB> db) {
27402752
}
27412753
g_pika_conf->SetArenaBlockSize(static_cast<int>(ival));
27422754
res_.AppendStringRaw("+OK\r\n");
2755+
} else if (set_item == "rocksdb-perf-level") {
2756+
if ((pstd::string2int(value.data(), value.size(), &ival) == 0) || ival <= 0) {
2757+
res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'rocksdb-perf-level'\r\n");
2758+
return;
2759+
}
2760+
bool success = g_pika_conf->UpdateRocksDBPerfLevel(int(ival));
2761+
LOG(INFO) << "update rocksdb-perf-level to " << ival
2762+
<< (success ? " success" : " failed");
2763+
if (!success) {
2764+
res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'rocksdb-perf-level', should between 1 and 5\r\n");
2765+
return;
2766+
}
2767+
res_.AppendStringRaw("+OK\r\n");
2768+
} else if (set_item == "rocksdb-perf-percent") {
2769+
if ((pstd::string2int(value.data(), value.size(), &ival) == 0) || ival <= 0) {
2770+
res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'rocksdb-perf-percent'\r\n");
2771+
return;
2772+
}
2773+
bool success = g_pika_conf->UpdateRocksDBPerfPercent(int(ival));
2774+
LOG(INFO) << "update rocksdb-perf-percent to " << ival
2775+
<< (success ? " success" : " failed");
2776+
if (!success) {
2777+
res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'rocksdb-perf-percent', should between 0 and 100\r\n");
2778+
return;
2779+
}
2780+
res_.AppendStringRaw("+OK\r\n");
27432781
} else if (set_item == "throttle-bytes-per-second") {
27442782
if ((pstd::string2int(value.data(), value.size(), &ival) == 0) || ival <= 0) {
27452783
res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'throttle-bytes-per-second'\r\n");

src/pika_client_conn.cc

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@
2020
#include "net/src/worker_thread.h"
2121
#include "src/pstd/include/scope_record_lock.h"
2222

23+
#include "rocksdb/perf_context.h"
24+
#include "rocksdb/iostats_context.h"
25+
#include "util/random.h"
26+
2327
extern std::unique_ptr<PikaConf> g_pika_conf;
2428
extern PikaServer* g_pika_server;
2529
extern std::unique_ptr<PikaReplicaManager> g_pika_rm;
@@ -46,6 +50,7 @@ std::shared_ptr<Cmd> PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const st
4650
}
4751
return tmp_ptr;
4852
}
53+
4954
c_ptr->SetCacheMissedInRtc(cache_miss_in_rtc);
5055
c_ptr->SetConn(shared_from_this());
5156
c_ptr->SetResp(resp_ptr);
@@ -202,21 +207,32 @@ std::shared_ptr<Cmd> PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const st
202207
}
203208
}
204209

210+
211+
// set rocksdb perflevel based on RocksDBPerfLevel and RocksDBPerfPercent
212+
int rocksdb_perf_level = 2;
213+
if (rocksdb::Random::GetTLSInstance()->PercentTrue(g_pika_conf->RocksDBPerfPercent())) {
214+
rocksdb_perf_level = g_pika_conf->RocksDBPerfLevel();
215+
}
216+
rocksdb::SetPerfLevel(rocksdb::PerfLevel(rocksdb_perf_level));
217+
218+
// Perform some operations
219+
rocksdb::get_perf_context()->Reset();
205220
// Process Command
206221
c_ptr->Execute();
222+
207223
time_stat_->process_done_ts_ = pstd::NowMicros();
208224
auto cmdstat_map = g_pika_cmd_table_manager->GetCommandStatMap();
209225
(*cmdstat_map)[opt].cmd_count.fetch_add(1);
210226
(*cmdstat_map)[opt].cmd_time_consuming.fetch_add(time_stat_->total_time());
211227

212228
if (g_pika_conf->slowlog_slower_than() >= 0) {
213-
ProcessSlowlog(argv, c_ptr->GetDoDuration());
229+
ProcessSlowlog(argv, c_ptr);
214230
}
215231

216232
return c_ptr;
217233
}
218234

219-
void PikaClientConn::ProcessSlowlog(const PikaCmdArgsType& argv, uint64_t do_duration) {
235+
void PikaClientConn::ProcessSlowlog(const PikaCmdArgsType& argv, std::shared_ptr<Cmd> c_ptr) {
220236
if (time_stat_->total_time() > g_pika_conf->slowlog_slower_than()) {
221237
g_pika_server->SlowlogPushEntry(argv, time_stat_->start_ts() / 1000000, time_stat_->total_time());
222238
if (g_pika_conf->slowlog_write_errorlog()) {
@@ -241,7 +257,8 @@ void PikaClientConn::ProcessSlowlog(const PikaCmdArgsType& argv, uint64_t do_dur
241257
<< ", before_queue_time(ms): " << time_stat_->before_queue_time() / 1000
242258
<< ", queue_time(ms): " << time_stat_->queue_time() / 1000
243259
<< ", process_time(ms): " << time_stat_->process_time() / 1000
244-
<< ", cmd_time(ms): " << do_duration / 1000;
260+
<< ", " << c_ptr->StagesDurationSummary(true /*skip zero counter*/)
261+
<< ", " << rocksdb::get_perf_context()->ToString(true);
245262
}
246263
}
247264
}

src/pika_command.cc

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
// of patent rights can be found in the PATENTS file in the same directory.
55

66
#include <memory>
7+
#include <sstream>
78
#include <utility>
89

910
#include <glog/logging.h>
@@ -864,23 +865,22 @@ void Cmd::ProcessCommand(const HintKeys& hint_keys) {
864865
}
865866

866867
void Cmd::InternalProcessCommand(const HintKeys& hint_keys) {
868+
uint64_t start_us = pstd::NowMicros();
867869
pstd::lock::MultiRecordLock record_lock(db_->LockMgr());
868870
if (is_write()) {
869871
record_lock.Lock(current_key());
870872
}
871-
uint64_t start_us = 0;
872-
if (g_pika_conf->slowlog_slower_than() >= 0) {
873-
start_us = pstd::NowMicros();
874-
}
875873

876874
if (!IsSuspend()) {
877875
db_->DBLockShared();
878876
}
879877

878+
uint64_t before_do_command_us = pstd::NowMicros();
879+
this->acquire_lock_duration_ms = (before_do_command_us - start_us) / 1000;
880880
DoCommand(hint_keys);
881-
if (g_pika_conf->slowlog_slower_than() >= 0) {
882-
do_duration_ += pstd::NowMicros() - start_us;
883-
}
881+
882+
uint64_t before_do_binlog_us = pstd::NowMicros();
883+
this->command_duration_ms = (before_do_binlog_us - before_do_command_us) / 1000;
884884
DoBinlog();
885885

886886
if (!IsSuspend()) {
@@ -889,6 +889,9 @@ void Cmd::InternalProcessCommand(const HintKeys& hint_keys) {
889889
if (is_write()) {
890890
record_lock.Unlock(current_key());
891891
}
892+
893+
uint64_t end_us = pstd::NowMicros();
894+
this->binlog_duration_ms = (end_us - before_do_binlog_us) / 1000;
892895
}
893896

894897
void Cmd::DoCommand(const HintKeys& hint_keys) {
@@ -966,6 +969,23 @@ void Cmd::DoBinlog() {
966969
}
967970
}
968971

972+
#define PIKA_STAGE_DURATION_OUTPUT(duration) \
973+
if (!exclude_zero_value || duration > 0) { \
974+
ss << #duration << " = " << duration << ", "; \
975+
}
976+
977+
std::string Cmd::StagesDurationSummary(bool exclude_zero_value) const {
978+
std::ostringstream ss;
979+
PIKA_STAGE_DURATION_OUTPUT(acquire_lock_duration_ms);
980+
PIKA_STAGE_DURATION_OUTPUT(command_duration_ms);
981+
PIKA_STAGE_DURATION_OUTPUT(binlog_duration_ms);
982+
PIKA_STAGE_DURATION_OUTPUT(storage_duration_ms);
983+
PIKA_STAGE_DURATION_OUTPUT(cache_duration_ms);
984+
std::string str = ss.str();
985+
str.erase(str.find_last_not_of(", ") + 1);
986+
return str;
987+
}
988+
969989
bool Cmd::hasFlag(uint32_t flag) const { return (flag_ & flag); }
970990
bool Cmd::is_read() const { return (flag_ & kCmdFlagsRead); }
971991
bool Cmd::is_write() const { return (flag_ & kCmdFlagsWrite); }

0 commit comments

Comments
 (0)