Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,8 @@ max-rsync-parallel-num : 4
# The synchronization mode of Pika primary/secondary replication is determined by ReplicationID. ReplicationID in one replication_cluster are the same
# replication-id :

keys-analysis: no

###################
## Cache Settings
###################
Expand Down
10 changes: 10 additions & 0 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ class PikaConf : public pstd::BaseConf {
std::shared_lock l(rwlock_);
return write_binlog_;
}
bool keys_analysis() {
std::shared_lock l(rwlock_);
return keys_analysis_;
}
int thread_num() {
std::shared_lock l(rwlock_);
return thread_num_;
Expand Down Expand Up @@ -498,6 +502,11 @@ class PikaConf : public pstd::BaseConf {
TryPushDiffCommands("write-binlog", value);
write_binlog_ = value == "yes";
}
void SetKeysAnalysis(const bool value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("keys-analysis", value ? "yes" : "no");
keys_analysis_.store(value);
}
void SetMaxCacheStatisticKeys(const int value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("max-cache-statistic-keys", std::to_string(value));
Expand Down Expand Up @@ -941,6 +950,7 @@ class PikaConf : public pstd::BaseConf {
std::atomic<int> slowlog_log_slower_than_;
std::atomic<bool> slotmigrate_;
std::atomic<int> binlog_writer_num_;
std::atomic<bool> keys_analysis_ = false;
int slowlog_max_len_ = 0;
int expire_logs_days_ = 0;
int expire_logs_nums_ = 0;
Expand Down
22 changes: 22 additions & 0 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1404,8 +1404,10 @@ void InfoCmd::InfoData(std::string& info) {
uint64_t total_background_errors = 0;
uint64_t total_memtable_usage = 0;
uint64_t total_table_reader_usage = 0;
uint64_t total_big_key_count = 0;;
uint64_t memtable_usage = 0;
uint64_t table_reader_usage = 0;
uint64_t big_key_count = 0;
std::shared_lock db_rwl(g_pika_server->dbs_rw_);
for (const auto& db_item : g_pika_server->dbs_) {
if (!db_item.second) {
Expand All @@ -1417,6 +1419,8 @@ void InfoCmd::InfoData(std::string& info) {
db_item.second->storage()->GetUsage(storage::PROPERTY_TYPE_ROCKSDB_CUR_SIZE_ALL_MEM_TABLES, &memtable_usage);
db_item.second->storage()->GetUsage(storage::PROPERTY_TYPE_ROCKSDB_ESTIMATE_TABLE_READER_MEM, &table_reader_usage);
db_item.second->storage()->GetUsage(storage::PROPERTY_TYPE_ROCKSDB_BACKGROUND_ERRORS, &background_errors);
db_item.second->storage()->GetBigKeyStatistics();
total_big_key_count += big_key_count;
db_item.second->DBUnlockShared();
total_memtable_usage += memtable_usage;
total_table_reader_usage += table_reader_usage;
Expand All @@ -1436,6 +1440,7 @@ void InfoCmd::InfoData(std::string& info) {
tmp_stream << "db_tablereader_usage:" << total_table_reader_usage << "\r\n";
tmp_stream << "db_fatal:" << (total_background_errors != 0 ? "1" : "0") << "\r\n";
tmp_stream << "db_fatal_msg:" << (total_background_errors != 0 ? db_fatal_msg_stream.str() : "nullptr") << "\r\n";
tmp_stream << "big_key_count:" << total_big_key_count << "\r\n";

info.append(tmp_stream.str());
}
Expand Down Expand Up @@ -2288,6 +2293,12 @@ void ConfigCmd::ConfigGet(std::string& ret) {
g_pika_conf->acl_pubsub_default() ? EncodeString(&config_body, "allchannels")
: EncodeString(&config_body, "resetchannels");
}

if (pstd::stringmatch(pattern.data(), "keys-analysis", 1)) {
elements += 2;
EncodeString(&config_body, "keys-analysis");
EncodeNumber(&config_body, g_pika_conf->keys_analysis());
}

std::stringstream resp;
resp << "*" << std::to_string(elements) << "\r\n" << config_body;
Expand Down Expand Up @@ -2478,6 +2489,17 @@ void ConfigCmd::ConfigSet(std::shared_ptr<DB> db) {
g_pika_conf->SetSlowCmdPool(SlowCmdPool);
g_pika_server->SetSlowCmdThreadPoolFlag(SlowCmdPool);
res_.AppendStringRaw("+OK\r\n");
} else if (set_item == "keys-analysis") {
bool KeysAnalysis;
if (value == "yes") {
KeysAnalysis = true;
} else if (value == "no") {
KeysAnalysis = false;
} else {
res_.AppendStringRaw( "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'keys-analysis'\r\n");
return;
}
g_pika_conf->SetKeysAnalysis(KeysAnalysis);
} else if (set_item == "slowlog-log-slower-than") {
if ((pstd::string2int(value.data(), value.size(), &ival) == 0) || ival < 0) {
res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'slowlog-log-slower-than'\r\n");
Expand Down
6 changes: 6 additions & 0 deletions src/pika_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,11 @@ int PikaConf::Load() {
}
GetConfStr("pidfile", &pidfile_);

// keys-analysis
std::string ka;
GetConfStr("keys-analysis", &ka);
keys_analysis_ = wb != "no";

// db sync
GetConfStr("db-sync-path", &db_sync_path_);
db_sync_path_ = db_sync_path_.empty() ? "./dbsync/" : db_sync_path_;
Expand Down Expand Up @@ -766,6 +771,7 @@ int PikaConf::ConfigRewrite() {
SetConfInt("slave-priority", slave_priority_);
SetConfStr("log-net-activities", log_net_activities_ ? "yes" : "no");
SetConfStr("write-binlog", write_binlog_ ? "yes" : "no");
SetConfStr("keys-analysis", keys_analysis_ ? "yes" : "no");
SetConfStr("run-id", run_id_);
SetConfStr("replication-id", replication_id_);
SetConfInt("max-cache-statistic-keys", max_cache_statistic_keys_);
Expand Down
1 change: 1 addition & 0 deletions src/storage/include/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -1100,6 +1100,7 @@ class Storage {
Status EnableAutoCompaction(const OptionType& option_type,
const std::string& db_type, const std::unordered_map<std::string, std::string>& options);
void GetRocksDBInfo(std::string& info);
size_t GetBigKeyStatistics();

private:
std::unique_ptr<RedisStrings> strings_db_;
Expand Down
26 changes: 26 additions & 0 deletions src/storage/src/redis.cc
Original file line number Diff line number Diff line change
Expand Up @@ -199,4 +199,30 @@ void Redis::SetCompactRangeOptions(const bool is_canceled) {
}
}

void Redis::CheckBigKeyAndLog(const std::string& key, uint64_t size) {
thread_local uint64_t last_log_time = 0;
auto current_time = pstd::NowMicros();

static const uint64_t kLogInterval = 60 * 1000 * 1000;
static const uint64_t kBigKeyThreshold = 10000;

if (current_time - last_log_time >= kLogInterval) {
last_log_time = current_time;

if (size > kBigKeyThreshold) {
std::unique_lock<std::shared_mutex> write_lock(big_key_access_mutex_);
big_key_access_count_[key]++;

LOG(INFO) << "[BIGKEY DETECTED] Key: " << key
<< ", Size: " << size
<< ", Access Count: " << big_key_access_count_[key];
}
}
}

size_t Redis::GetBigKeyStatistics() {
std::shared_lock<std::shared_mutex> read_lock(big_key_access_mutex_);
return big_key_access_count_.size();
}

} // namespace storage
7 changes: 7 additions & 0 deletions src/storage/src/redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
#include <memory>
#include <string>
#include <vector>
#include <iostream>

#include <glog/logging.h>

#include "rocksdb/db.h"
#include "rocksdb/slice.h"
Expand Down Expand Up @@ -115,6 +118,8 @@ class Redis {
Status SetSmallCompactionDurationThreshold(uint64_t small_compaction_duration_threshold);
std::vector<rocksdb::ColumnFamilyHandle*> GetHandles(){ return handles_;};
void GetRocksDBInfo(std::string &info, const char *prefix);
void CheckBigKeyAndLog(const std::string& key, uint64_t size);
size_t GetBigKeyStatistics();

protected:
Storage* const storage_;
Expand All @@ -141,6 +146,8 @@ class Redis {
Status UpdateSpecificKeyStatistics(const std::string& key, uint64_t count);
Status UpdateSpecificKeyDuration(const std::string& key, uint64_t duration);
Status AddCompactKeyTaskIfNeeded(const std::string& key, uint64_t count, uint64_t duration);
std::unordered_map<std::string, int> big_key_access_count_;
std::shared_mutex big_key_access_mutex_;
};

} // namespace storage
Expand Down
22 changes: 22 additions & 0 deletions src/storage/src/redis_hashes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ Status RedisHashes::HDel(const Slice& key, const std::vector<std::string>& field
Status s = db_->Get(read_options, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.IsStale() || parsed_hashes_meta_value.count() == 0) {
*ret = 0;
return Status::OK();
Expand Down Expand Up @@ -266,6 +267,7 @@ Status RedisHashes::HGet(const Slice& key, const Slice& field, std::string* valu
Status s = db_->Get(read_options, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.IsStale()) {
return Status::NotFound("Stale");
} else if (parsed_hashes_meta_value.count() == 0) {
Expand All @@ -290,6 +292,7 @@ Status RedisHashes::HGetall(const Slice& key, std::vector<FieldValue>* fvs) {
Status s = db_->Get(read_options, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.IsStale()) {
return Status::NotFound("Stale");
} else if (parsed_hashes_meta_value.count() == 0) {
Expand Down Expand Up @@ -321,6 +324,7 @@ Status RedisHashes::HGetallWithTTL(const Slice& key, std::vector<FieldValue>* fv
Status s = db_->Get(read_options, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.count() == 0) {
return Status::NotFound();
} else if (parsed_hashes_meta_value.IsStale()) {
Expand Down Expand Up @@ -368,6 +372,7 @@ Status RedisHashes::HIncrby(const Slice& key, const Slice& field, int64_t value,
char meta_value_buf[4] = {0};
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.IsStale() || parsed_hashes_meta_value.count() == 0) {
version = parsed_hashes_meta_value.UpdateVersion();
parsed_hashes_meta_value.set_count(1);
Expand Down Expand Up @@ -443,6 +448,7 @@ Status RedisHashes::HIncrbyfloat(const Slice& key, const Slice& field, const Sli
char meta_value_buf[4] = {0};
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.IsStale() || parsed_hashes_meta_value.count() == 0) {
version = parsed_hashes_meta_value.UpdateVersion();
parsed_hashes_meta_value.set_count(1);
Expand Down Expand Up @@ -509,6 +515,7 @@ Status RedisHashes::HKeys(const Slice& key, std::vector<std::string>* fields) {
Status s = db_->Get(read_options, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.IsStale()) {
return Status::NotFound("Stale");
} else if (parsed_hashes_meta_value.count() == 0) {
Expand All @@ -535,6 +542,7 @@ Status RedisHashes::HLen(const Slice& key, int32_t* ret) {
Status s = db_->Get(default_read_options_, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.IsStale()) {
*ret = 0;
return Status::NotFound("Stale");
Expand Down Expand Up @@ -563,6 +571,7 @@ Status RedisHashes::HMGet(const Slice& key, const std::vector<std::string>& fiel
Status s = db_->Get(read_options, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if ((is_stale = parsed_hashes_meta_value.IsStale()) || parsed_hashes_meta_value.count() == 0) {
for (size_t idx = 0; idx < fields.size(); ++idx) {
vss->push_back({std::string(), Status::NotFound()});
Expand Down Expand Up @@ -613,6 +622,7 @@ Status RedisHashes::HMSet(const Slice& key, const std::vector<FieldValue>& fvs)
char meta_value_buf[4] = {0};
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.IsStale() || parsed_hashes_meta_value.count() == 0) {
version = parsed_hashes_meta_value.InitialMetaValue();
if (!parsed_hashes_meta_value.check_set_count(static_cast<int32_t>(filtered_fvs.size()))) {
Expand Down Expand Up @@ -673,6 +683,7 @@ Status RedisHashes::HSet(const Slice& key, const Slice& field, const Slice& valu
char meta_value_buf[4] = {0};
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.IsStale() || parsed_hashes_meta_value.count() == 0) {
version = parsed_hashes_meta_value.InitialMetaValue();
parsed_hashes_meta_value.set_count(1);
Expand Down Expand Up @@ -731,6 +742,7 @@ Status RedisHashes::HSetnx(const Slice& key, const Slice& field, const Slice& va
char meta_value_buf[4] = {0};
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.IsStale() || parsed_hashes_meta_value.count() == 0) {
version = parsed_hashes_meta_value.InitialMetaValue();
parsed_hashes_meta_value.set_count(1);
Expand Down Expand Up @@ -782,6 +794,7 @@ Status RedisHashes::HVals(const Slice& key, std::vector<std::string>* values) {
Status s = db_->Get(read_options, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.IsStale()) {
return Status::NotFound("Stale");
} else if (parsed_hashes_meta_value.count() == 0) {
Expand Down Expand Up @@ -832,6 +845,7 @@ Status RedisHashes::HScan(const Slice& key, int64_t cursor, const std::string& p
Status s = db_->Get(read_options, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.IsStale() || parsed_hashes_meta_value.count() == 0) {
*next_cursor = 0;
return Status::NotFound();
Expand Down Expand Up @@ -896,6 +910,7 @@ Status RedisHashes::HScanx(const Slice& key, const std::string& start_field, con
Status s = db_->Get(read_options, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.IsStale() || parsed_hashes_meta_value.count() == 0) {
*next_field = "";
return Status::NotFound();
Expand Down Expand Up @@ -954,6 +969,7 @@ Status RedisHashes::PKHScanRange(const Slice& key, const Slice& field_start, con
Status s = db_->Get(read_options, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.IsStale() || parsed_hashes_meta_value.count() == 0) {
return Status::NotFound();
} else {
Expand Down Expand Up @@ -1013,6 +1029,7 @@ Status RedisHashes::PKHRScanRange(const Slice& key, const Slice& field_start, co
Status s = db_->Get(read_options, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.IsStale() || parsed_hashes_meta_value.count() == 0) {
return Status::NotFound();
} else {
Expand Down Expand Up @@ -1163,6 +1180,7 @@ Status RedisHashes::Expire(const Slice& key, int32_t ttl) {
Status s = db_->Get(default_read_options_, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.IsStale()) {
return Status::NotFound("Stale");
} else if (parsed_hashes_meta_value.count() == 0) {
Expand All @@ -1186,6 +1204,7 @@ Status RedisHashes::Del(const Slice& key) {
Status s = db_->Get(default_read_options_, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.IsStale()) {
return Status::NotFound("Stale");
} else if (parsed_hashes_meta_value.count() == 0) {
Expand Down Expand Up @@ -1281,6 +1300,7 @@ Status RedisHashes::Expireat(const Slice& key, int32_t timestamp) {
Status s = db_->Get(default_read_options_, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.IsStale()) {
return Status::NotFound("Stale");
} else if (parsed_hashes_meta_value.count() == 0) {
Expand All @@ -1303,6 +1323,7 @@ Status RedisHashes::Persist(const Slice& key) {
Status s = db_->Get(default_read_options_, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.IsStale()) {
return Status::NotFound("Stale");
} else if (parsed_hashes_meta_value.count() == 0) {
Expand All @@ -1325,6 +1346,7 @@ Status RedisHashes::TTL(const Slice& key, int64_t* timestamp) {
Status s = db_->Get(default_read_options_, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.IsStale()) {
*timestamp = -2;
return Status::NotFound("Stale");
Expand Down
Loading
Loading