Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,24 @@ ExternalProject_Add(rocksdb
make -j${CPU_CORE}
)

ExternalProject_Add(prometheus_cpp
URL
https://github.com/jupp0r/prometheus-cpp/releases/download/v1.2.4/prometheus-cpp-with-submodules.tar.gz
CMAKE_ARGS
-DBUILD_SHARED_LIBS=OFF
-DENABLE_PUSH=OFF
-DENABLE_COMPRESSION=OFF
-DCMAKE_INSTALL_LIBDIR=${INSTALL_LIBDIR}
-DCMAKE_INSTALL_INCLUDEDIR=${INSTALL_INCLUDEDIR}
BUILD_ALWAYS
1
BUILD_COMMAND
make -j${CPU_CORE}
)

set(PROMETHEUS_CPP_CORE_LIB ${INSTALL_LIBDIR}/libprometheus-cpp-core.a)
set(PROMETHEUS_CPP_PULL_LIB ${INSTALL_LIBDIR}/libprometheus-cpp-pull.a)

ExternalProject_Add(rediscache
URL
https://github.com/pikiwidb/rediscache/archive/refs/tags/v1.0.7.tar.gz
Expand Down Expand Up @@ -822,6 +840,8 @@ target_link_libraries(${PROJECT_NAME}
${LIB_PROTOBUF}
${LIB_GFLAGS}
${LIB_FMT}
${PROMETHEUS_CPP_PULL_LIB}
${PROMETHEUS_CPP_CORE_LIB}
libsnappy.a
libzstd.a
liblz4.a
Expand Down
4 changes: 2 additions & 2 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ admin-thread-pool-size : 2
slow-cmd-list :

# List of commands considered as administrative. These commands will be handled by the admin thread pool. Modify this list as needed.
# Default commands: info, ping, monitor
# Default commands: info, ping, monitor, auth
# This parameter is only supported by the CONFIG GET command and not by CONFIG SET.
admin-cmd-list : info, ping, monitor
admin-cmd-list : info, ping, monitor, auth

# The number of threads to write DB in slaveNode when replicating.
# It's preferable to set slave's sync-thread-num value close to master's thread-pool-size.
Expand Down
6 changes: 6 additions & 0 deletions include/pika_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,8 @@ class InfoCmd : public Cmd {
kInfoAll,
kInfoDebug,
kInfoCommandStats,
kInfoSlowCommand,
kInfoCommandP99,
kInfoCache
};
InfoCmd(const std::string& name, int arity, uint32_t flag) : Cmd(name, arity, flag) {}
Expand Down Expand Up @@ -294,6 +296,8 @@ class InfoCmd : public Cmd {
const static std::string kRocksDBSection;
const static std::string kDebugSection;
const static std::string kCommandStatsSection;
const static std::string kCommandP99Section;
const static std::string kSlowCommandSection;
const static std::string kCacheSection;

void DoInitial() override;
Expand All @@ -314,6 +318,8 @@ class InfoCmd : public Cmd {
void InfoRocksDB(std::string& info);
void InfoDebug(std::string& info);
void InfoCommandStats(std::string& info);
void InfoCommandP99(std::string& info);
void InfoSlowCommand(std::string& info);
void InfoCache(std::string& info, std::shared_ptr<DB> db);

std::string CacheStatusToString(int status);
Expand Down
2 changes: 1 addition & 1 deletion include/pika_client_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class PikaClientConn : public net::RedisConn {
std::shared_ptr<Cmd> DoCmd(const PikaCmdArgsType& argv, const std::string& opt,
const std::shared_ptr<std::string>& resp_ptr, bool cache_miss_in_rtc);

void ProcessSlowlog(const PikaCmdArgsType& argv, std::shared_ptr<Cmd> c_ptr);
void ProcessSlowlog(const PikaCmdArgsType& argv, std::shared_ptr<Cmd> c_ptr, const std::string& opt);
void ProcessMonitor(const PikaCmdArgsType& argv);

void ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr<std::string>& resp_ptr, bool cache_miss_in_rtc);
Expand Down
33 changes: 33 additions & 0 deletions include/pika_cmd_table_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,17 @@

#include <shared_mutex>
#include <thread>
#include <prometheus/exposer.h>
#include <prometheus/registry.h>
#include <prometheus/counter.h>
#include <prometheus/histogram.h>

#include "include/acl.h"
#include "include/pika_command.h"
#include "include/pika_data_distribution.h"

using namespace prometheus;

struct CommandStatistics {
CommandStatistics() = default;
CommandStatistics(const CommandStatistics& other) {
Expand All @@ -23,13 +29,31 @@ struct CommandStatistics {
std::atomic<uint64_t> cmd_time_consuming = 0;
};

struct HistogramData {
std::shared_ptr<prometheus::Registry> registry;
prometheus::Family<prometheus::Histogram>* family;
std::unordered_map<std::string, prometheus::Histogram*> histograms;

HistogramData() {
registry = std::make_shared<prometheus::Registry>();
family = &prometheus::BuildHistogram()
.Name("pika_command_duration_seconds")
.Help("Execution time of Pika commands in seconds")
.Register(*registry);
}

HistogramData(const HistogramData&) = delete;
HistogramData& operator=(const HistogramData&) = delete;
};

class PikaCmdTableManager {
friend AclSelector;

public:
PikaCmdTableManager();
virtual ~PikaCmdTableManager() = default;
void InitCmdTable(void);
void InitHistograms();
void RenameCommand(const std::string before, const std::string after);
std::shared_ptr<Cmd> GetCmd(const std::string& opt);
bool CmdExist(const std::string& cmd) const;
Expand All @@ -42,6 +66,11 @@ class PikaCmdTableManager {
* Info Commandstats used
*/
std::unordered_map<std::string, CommandStatistics>* GetCommandStatMap();
std::unordered_map<std::string, CommandStatistics> GetSlowCommandCount();
void UpdateSlowCommandCount(const std::string& opt);
void ResetCommandCount();
prometheus::Histogram& GetHistogram(const std::string& opt);
std::shared_ptr<HistogramData> GetHistogramsData();

private:
std::shared_ptr<Cmd> NewCommand(const std::string& opt);
Expand All @@ -60,5 +89,9 @@ class PikaCmdTableManager {
* Info Commandstats used
*/
std::unordered_map<std::string, CommandStatistics> cmdstat_map_;
std::unordered_map<std::string, CommandStatistics> slow_command_count_;
std::shared_mutex slow_command_mutex_;
std::mutex data_mutex_;
std::shared_ptr<HistogramData> data_;
};
#endif
1 change: 1 addition & 0 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ class PikaServer : public pstd::noncopyable {
void ResetStat();
void incr_accumulative_connections();
void ResetLastSecQuerynum();
void ResetCommandCount();
void UpdateQueryNumAndExecCountDB(const std::string& db_name, const std::string& command, bool is_write);
std::unordered_map<std::string, uint64_t> ServerExecCountDB();
std::unordered_map<std::string, QpsStatistic> ServerAllDBStat();
Expand Down
83 changes: 83 additions & 0 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -882,6 +882,8 @@ const std::string InfoCmd::kKeyspaceSection = "keyspace";
const std::string InfoCmd::kDataSection = "data";
const std::string InfoCmd::kRocksDBSection = "rocksdb";
const std::string InfoCmd::kDebugSection = "debug";
const std::string InfoCmd::kCommandP99Section = "commandp99";
const std::string InfoCmd::kSlowCommandSection = "slowcommand";
const std::string InfoCmd::kCommandStatsSection = "commandstats";
const std::string InfoCmd::kCacheSection = "cache";

Expand Down Expand Up @@ -967,6 +969,10 @@ void InfoCmd::DoInitial() {
info_section_ = kInfoDebug;
} else if (strcasecmp(argv_[1].data(), kCommandStatsSection.data()) == 0) {
info_section_ = kInfoCommandStats;
} else if (strcasecmp(argv_[1].data(), kCommandP99Section.data()) == 0) {
info_section_ = kInfoCommandP99;
} else if (strcasecmp(argv_[1].data(), kSlowCommandSection.data()) == 0) {
info_section_ = kInfoSlowCommand;
} else if (strcasecmp(argv_[1].data(), kCacheSection.data()) == 0) {
info_section_ = kInfoCache;
} else {
Expand Down Expand Up @@ -1008,6 +1014,10 @@ void InfoCmd::Do() {
info.append("\r\n");
InfoCommandStats(info);
info.append("\r\n");
InfoCommandP99(info);
info.append("\r\n");
InfoSlowCommand(info);
info.append("\r\n");
InfoCache(info, db_);
info.append("\r\n");
InfoCPU(info);
Expand Down Expand Up @@ -1051,6 +1061,12 @@ void InfoCmd::Do() {
case kInfoCommandStats:
InfoCommandStats(info);
break;
case kInfoCommandP99:
InfoCommandP99(info);
break;
case kInfoSlowCommand:
InfoSlowCommand(info);
break;
case kInfoCache:
InfoCache(info, db_);
break;
Expand Down Expand Up @@ -1476,6 +1492,73 @@ void InfoCmd::InfoDebug(std::string& info) {
g_pika_server->ServerStatus(&info);
}

void InfoCmd::InfoCommandP99(std::string& info) {
std::stringstream tmp_stream;
tmp_stream.precision(2);
tmp_stream.setf(std::ios::fixed);
tmp_stream << "# Commands P99" << "\r\n";
auto data = g_pika_cmd_table_manager->GetHistogramsData();
auto* histogram_family = data->family;
for (const auto& metric_family : histogram_family->Collect()) {
for (const auto& metric : metric_family.metric) {
std::string command_name;

for (const auto& label : metric.label) {
if (label.name == "command") {
command_name = label.value;
break;
}
}

double total_count = metric.histogram.sample_count;

if (command_name.empty()) {
tmp_stream << "Command: UNKNOWN\r\n";
} else {
tmp_stream << "Command: " << command_name << "\r\n";
}

double tp99_threshold = total_count * 0.99;
double tp999_threshold = total_count * 0.999;
double tp9999_threshold = total_count * 0.9999;
double tp99 = 0, tp999 = 0, tp9999 = 0;

for (const auto& bucket : metric.histogram.bucket) {
if (bucket.cumulative_count >= tp99_threshold && tp99 == 0) {
tp99 = bucket.upper_bound;
}
if (bucket.cumulative_count >= tp999_threshold && tp999 == 0) {
tp999 = bucket.upper_bound;
}
if (bucket.cumulative_count >= tp9999_threshold && tp9999 == 0) {
tp9999 = bucket.upper_bound;
break;
}
}
tmp_stream << "TP99 ms: " << tp99 << "\r\n";
tmp_stream << "TP999 ms: " << tp999 << "\r\n";
tmp_stream << "TP9999 ms: " << tp9999 << "\r\n";
tmp_stream << "----------------------\r\n";
}
}

info.append(tmp_stream.str());
}

void InfoCmd::InfoSlowCommand(std::string& info) {
std::stringstream tmp_stream;
tmp_stream.precision(2);
tmp_stream.setf(std::ios::fixed);
auto stats = g_pika_cmd_table_manager->GetSlowCommandCount();
tmp_stream << "# SlowCommand Count" << "\r\n";
for (auto iter : stats) {
if (iter.second.cmd_count != 0) {
tmp_stream << iter.first << ":slow_count=" << iter.second.cmd_count << "\r\n";
}
}
info.append(tmp_stream.str());
}

void InfoCmd::InfoCommandStats(std::string& info) {
std::stringstream tmp_stream;
tmp_stream.precision(2);
Expand Down
10 changes: 8 additions & 2 deletions src/pika_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
#include <glog/logging.h>
#include <utility>
#include <vector>
#include <prometheus/exposer.h>
#include <prometheus/registry.h>
#include <prometheus/counter.h>
#include <prometheus/histogram.h>

#include "include/pika_admin.h"
#include "include/pika_client_conn.h"
Expand Down Expand Up @@ -221,19 +225,21 @@ std::shared_ptr<Cmd> PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const st
c_ptr->Execute();

time_stat_->process_done_ts_ = pstd::NowMicros();
g_pika_cmd_table_manager->GetHistogram(opt).Observe(time_stat_->total_time() / 1000);
auto cmdstat_map = g_pika_cmd_table_manager->GetCommandStatMap();
(*cmdstat_map)[opt].cmd_count.fetch_add(1);
(*cmdstat_map)[opt].cmd_time_consuming.fetch_add(time_stat_->total_time());

if (g_pika_conf->slowlog_slower_than() >= 0) {
ProcessSlowlog(argv, c_ptr);
ProcessSlowlog(argv, c_ptr, opt);
}

return c_ptr;
}

void PikaClientConn::ProcessSlowlog(const PikaCmdArgsType& argv, std::shared_ptr<Cmd> c_ptr) {
void PikaClientConn::ProcessSlowlog(const PikaCmdArgsType& argv, std::shared_ptr<Cmd> c_ptr, const std::string& opt) {
if (time_stat_->total_time() > g_pika_conf->slowlog_slower_than()) {
g_pika_cmd_table_manager->UpdateSlowCommandCount(opt);
g_pika_server->SlowlogPushEntry(argv, time_stat_->start_ts() / 1000000, time_stat_->total_time());
if (g_pika_conf->slowlog_write_errorlog()) {
bool trim = false;
Expand Down
53 changes: 53 additions & 0 deletions src/pika_cmd_table_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,18 @@

extern std::unique_ptr<PikaConf> g_pika_conf;

void PikaCmdTableManager::ResetCommandCount() {
{
std::unique_lock<std::shared_mutex> write_lock(slow_command_mutex_);
slow_command_count_.clear();
}
std::atomic_store(&data_, std::make_shared<HistogramData>());
}

PikaCmdTableManager::PikaCmdTableManager() {
cmds_ = std::make_unique<CmdTable>();
cmds_->reserve(300);
std::atomic_store(&data_, std::make_shared<HistogramData>());
}

void PikaCmdTableManager::InitCmdTable(void) {
Expand Down Expand Up @@ -63,6 +72,50 @@ void PikaCmdTableManager::RenameCommand(const std::string before, const std::str
}
}

prometheus::Histogram& PikaCmdTableManager::GetHistogram(const std::string& opt) {
auto current_data = std::atomic_load(&data_);
{
auto it = current_data->histograms.find(opt);
if (it != current_data->histograms.end()) {
return *(it->second);
}
}

std::lock_guard<std::mutex> lock(data_mutex_);
auto& new_histogram = current_data->family->Add(
{{"command", opt}},
prometheus::Histogram::BucketBoundaries{0.5, 1, 2, 3, 5, 7, 10, 15, 20, 30, 40, 50, 65, 75, 85, 100, 125, 140, 150, 160, 175, 185, 200, 300, 400, 500, 750, 1000, 2000, 5000, 10000}
);
current_data->histograms[opt] = &new_histogram;
return new_histogram;
}

std::shared_ptr<HistogramData> PikaCmdTableManager::GetHistogramsData() {
return std::atomic_load(&data_);
}

void PikaCmdTableManager::UpdateSlowCommandCount(const std::string& opt) {
{
std::shared_lock<std::shared_mutex> read_lock(slow_command_mutex_);
if (slow_command_count_.find(opt) != slow_command_count_.end()) {
slow_command_count_[opt].cmd_count.fetch_add(1);
return;
}
}

{
std::unique_lock<std::shared_mutex> write_lock(slow_command_mutex_);
slow_command_count_[opt];
}

slow_command_count_[opt].cmd_count.fetch_add(1);
}

std::unordered_map<std::string, CommandStatistics> PikaCmdTableManager::GetSlowCommandCount() {
std::shared_lock<std::shared_mutex> lock(slow_command_mutex_);
return slow_command_count_;
}

std::unordered_map<std::string, CommandStatistics>* PikaCmdTableManager::GetCommandStatMap() {
return &cmdstat_map_;
}
Expand Down
Loading
Loading