Skip to content

Commit 977cb93

Browse files
author
caiyu
committed
add big key detection
1 parent e981599 commit 977cb93

27 files changed

Lines changed: 494 additions & 48 deletions

conf/pika.conf

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -527,6 +527,24 @@ max-rsync-parallel-num : 4
527527
# The synchronization mode of Pika primary/secondary replication is determined by ReplicationID. ReplicationID in one replication_cluster are the same
528528
# replication-id :
529529

530+
# The maximum number of big keys to output in 'info' and log output.
531+
# This controls how many big key entries are shown at most for each type.
532+
# Default: 10
533+
BIGKEYS_SHOW_LIMIT = 10
534+
535+
# The threshold for member count to trigger big key detection
536+
# Default: 10000
537+
bigkeys_member_threshold : 10000
538+
539+
# The threshold for key and value length (in bytes)
540+
# to trigger big key detection (for string type).
541+
# Default: 1048576 (1MB)
542+
bigkeys_key_value_length_threshold : 1048576
543+
544+
# The interval (in seconds) for outputting big key statistics to the log.
545+
# If set to 0, periodic big key logging is disabled.
546+
# Default: 60
547+
bigkeys_log_interval : 60
530548
###################
531549
## Cache Settings
532550
###################

include/pika_admin.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,9 @@ class InfoCmd : public Cmd {
267267
kInfoAll,
268268
kInfoDebug,
269269
kInfoCommandStats,
270-
kInfoCache
270+
kInfoCache,
271+
kInfoBigKeys
272+
271273
};
272274
InfoCmd(const std::string& name, int arity, uint32_t flag) : Cmd(name, arity, flag) {}
273275
void Do() override;
@@ -295,6 +297,7 @@ class InfoCmd : public Cmd {
295297
const static std::string kDebugSection;
296298
const static std::string kCommandStatsSection;
297299
const static std::string kCacheSection;
300+
const static std::string kBigKeysSection;
298301

299302
void DoInitial() override;
300303
void Clear() override {
@@ -304,6 +307,7 @@ class InfoCmd : public Cmd {
304307
}
305308

306309
void InfoServer(std::string& info);
310+
void InfoBigKeys(std::string& info);
307311
void InfoClients(std::string& info);
308312
void InfoStats(std::string& info);
309313
void InfoExecCount(std::string& info);

include/pika_conf.h

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,23 @@ class PikaConf : public pstd::BaseConf {
160160
std::shared_lock l(rwlock_);
161161
return binlog_writer_num_;
162162
}
163+
//big keys
164+
int bigkeys_show_limit() {
165+
std::shared_lock l(rwlock_);
166+
return bigkeys_show_limit_;
167+
}
168+
int bigkeys_member_threshold() {
169+
std::shared_lock l(rwlock_);
170+
return bigkeys_member_threshold_;
171+
}
172+
int bigkeys_key_value_length_threshold() {
173+
std::shared_lock l(rwlock_);
174+
return bigkeys_key_value_length_threshold_;
175+
}
176+
int bigkeys_log_interval() {
177+
std::shared_lock l(rwlock_);
178+
return bigkeys_log_interval_;
179+
}
163180
bool slotmigrate() {
164181
std::shared_lock l(rwlock_);
165182
return slotmigrate_;
@@ -729,7 +746,23 @@ class PikaConf : public pstd::BaseConf {
729746
log_net_activities_.store(false);
730747
}
731748
}
732-
749+
//big keys
750+
void SetBigkeysShowLimit(const int value) {
751+
std::lock_guard l(rwlock_);
752+
bigkeys_show_limit_ = value;
753+
}
754+
void SetBigkeysKeyValueLengthThreshold(const int value) {
755+
std::lock_guard l(rwlock_);
756+
bigkeys_key_value_length_threshold_ = value;
757+
}
758+
void SetBigkeysMemberCountThreshold(const int value) {
759+
std::lock_guard l(rwlock_);
760+
bigkeys_member_count_threshold_ = value;
761+
}
762+
void SetBigkeysLogInterval(const int value) {
763+
std::lock_guard l(rwlock_);
764+
bigkeys_log_interval_ = value;
765+
}
733766
// Rsync Rate limiting configuration
734767
void SetThrottleBytesPerSecond(const int value) {
735768
std::lock_guard l(rwlock_);
@@ -895,6 +928,11 @@ class PikaConf : public pstd::BaseConf {
895928
int thread_pool_size_ = 0;
896929
int slow_cmd_thread_pool_size_ = 0;
897930
int admin_thread_pool_size_ = 0;
931+
//big keys
932+
int bigkeys_show_limit_ = 10;
933+
int bigkeys_member_threshold_ = 10000;
934+
int bigkeys_key_value_length_threshold = 1048576;
935+
int bigkeys_log_interval_ = 60;
898936
std::unordered_set<std::string> slow_cmd_set_;
899937
// Because the exporter of Pika_exporter implements Auth authentication
900938
// with the Exporter of Pika, and the Exporter authenticates the Auth when

src/pika_admin.cc

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -884,7 +884,7 @@ const std::string InfoCmd::kRocksDBSection = "rocksdb";
884884
const std::string InfoCmd::kDebugSection = "debug";
885885
const std::string InfoCmd::kCommandStatsSection = "commandstats";
886886
const std::string InfoCmd::kCacheSection = "cache";
887-
887+
const std::string InfoCmd::kBigKeysSection = "bigkeys";
888888

889889
const std::string ClientCmd::KILLTYPE_NORMAL = "normal";
890890
const std::string ClientCmd::KILLTYPE_PUBSUB = "pubsub";
@@ -910,6 +910,8 @@ void InfoCmd::DoInitial() {
910910
keyspace_scan_dbs_ = g_pika_server->GetAllDBName();
911911
} else if (strcasecmp(argv_[1].data(), kServerSection.data()) == 0) {
912912
info_section_ = kInfoServer;
913+
} else if (strcasecmp(argv_[1].data(), "bigkeys") == 0) {
914+
info_section_ = kInfoBigKeys;
913915
} else if (strcasecmp(argv_[1].data(), kClientsSection.data()) == 0) {
914916
info_section_ = kInfoClients;
915917
} else if (strcasecmp(argv_[1].data(), kStatsSection.data()) == 0) {
@@ -994,6 +996,9 @@ void InfoCmd::Do() {
994996
InfoReplication(info);
995997
info.append("\r\n");
996998
InfoKeyspace(info);
999+
info.append("\r\n");
1000+
InfoBigKeys(info);
1001+
info.append("\r\n");
9971002
break;
9981003
case kInfoAll:
9991004
InfoServer(info);
@@ -1017,6 +1022,9 @@ void InfoCmd::Do() {
10171022
InfoKeyspace(info);
10181023
info.append("\r\n");
10191024
InfoRocksDB(info);
1025+
info.append("\r\n");
1026+
InfoBigKeys(info);
1027+
info.append("\r\n");
10201028
break;
10211029
case kInfoServer:
10221030
InfoServer(info);
@@ -1054,6 +1062,9 @@ void InfoCmd::Do() {
10541062
case kInfoCache:
10551063
InfoCache(info, db_);
10561064
break;
1065+
case kInfoBigKeys:
1066+
InfoBigKeys(info);
1067+
break;
10571068
default:
10581069
// kInfoErr is nothing
10591070
break;
@@ -1094,7 +1105,21 @@ void InfoCmd::InfoServer(std::string& info) {
10941105

10951106
info.append(tmp_stream.str());
10961107
}
1097-
1108+
void InfoCmd::InfoBigKeys(std::string& info) {
1109+
std::stringstream tmp_stream;
1110+
std::shared_lock db_rwl(g_pika_server->dbs_rw_);
1111+
for (const auto& db_item : g_pika_server->dbs_) {
1112+
if (!db_item.second) {
1113+
continue;
1114+
}
1115+
std::vector<storage::BigKeyInfo> bigkeys;
1116+
db_item.second->storage()->GetBigKeyStatistics(&bigkeys);
1117+
std::string bigkey_info;
1118+
storage::FormatBigKeyStatistics(bigkeys, &bigkey_info);
1119+
tmp_stream << bigkey_info;
1120+
}
1121+
info.append(tmp_stream.str());
1122+
}
10981123
void InfoCmd::InfoClients(std::string& info) {
10991124
std::stringstream tmp_stream;
11001125
tmp_stream << "# Clients"

src/pika_conf.cc

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,22 @@ int PikaConf::Load() {
6161
if (root_connection_num_ < 0) {
6262
root_connection_num_ = 2;
6363
}
64-
64+
GetConfInt("bigkeys_show_limit", &bigkeys_show_limit_);
65+
if (bigkeys_show_limit_ <= 0) {
66+
bigkeys_show_limit_ = 10;
67+
}
68+
GetConfInt("bigkeys_key_value_length_threshold", &bigkeys_key_value_length_threshold_);
69+
if (bigkeys_key_value_length_threshold_ <= 0) {
70+
bigkeys_key_value_length_threshold_ = 1048576;
71+
}
72+
GetConfInt("bigkeys_member_threshold", &bigkeys_member_threshold_);
73+
if (bigkeys_member_threshold_ <= 0) {
74+
bigkeys_membert_threshold_ = 10000;
75+
}
76+
GetConfInt("bigkeys_log_interval", &bigkeys_log_interval_);
77+
if (bigkeys_log_interval_ < 0) {
78+
bigkeys_log_interval_ = 60;
79+
}
6580
std::string swe;
6681
GetConfStr("slowlog-write-errorlog", &swe);
6782
slowlog_write_errorlog_.store(swe == "yes" ? true : false);
@@ -805,6 +820,11 @@ int PikaConf::ConfigRewrite() {
805820
SetConfInt64("thread-migrate-keys-num", thread_migrate_keys_num_);
806821
// slaveof config item is special
807822
SetConfStr("slaveof", slaveof_);
823+
//big keys
824+
SetConfInt("bigkeys_show_limit", bigkeys_show_limit_);
825+
SetConfInt("bigkeys_key_value_length_threshold", bigkeys_key_value_length_threshold_);
826+
SetConfInt("bigkeys_member_threshold", bigkeys_member_threshold_);
827+
SetConfInt("bigkeys_log_interval", bigkeys_log_interval_);
808828
// cache config
809829
SetConfStr("cache-index-and-filter-blocks", cache_index_and_filter_blocks_ ? "yes" : "no");
810830
SetConfInt("cache-model", cache_mode_);

src/storage/include/storage/storage.h

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,15 @@ struct ScoreMember {
127127
enum BeforeOrAfter { Before, After };
128128

129129
enum DataType { kAll, kStrings, kHashes, kLists, kZSets, kSets, kStreams };
130-
130+
//Big Keys INFO struct
131+
struct BigKeyInfo {
132+
std::string key;
133+
DataType type;
134+
uint64_t key_size;
135+
uint64_t value_size;
136+
uint64_t last_update_time;
137+
};
138+
void FormatBigKeyStatistics(const std::vector<BigKeyInfo>& bigkeys, std::string* out);
131139
const char DataTypeTag[] = {'a', 'k', 'h', 'l', 'z', 's', 'x'};
132140

133141
enum class OptionType {
@@ -1100,7 +1108,7 @@ class Storage {
11001108
Status EnableAutoCompaction(const OptionType& option_type,
11011109
const std::string& db_type, const std::unordered_map<std::string, std::string>& options);
11021110
void GetRocksDBInfo(std::string& info);
1103-
1111+
void GetBigKeyStatistics(std::vector<BigKeyInfo>* bigkeys);
11041112
private:
11051113
std::unique_ptr<RedisStrings> strings_db_;
11061114
std::unique_ptr<RedisHashes> hashes_db_;

src/storage/src/redis.cc

Lines changed: 63 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@
55

66
#include "src/redis.h"
77
#include <sstream>
8-
8+
#include <thread>
9+
#include <fstream>
10+
#include <glog/logging.h>
911
namespace storage {
1012

1113
Redis::Redis(Storage* const s, const DataType& type)
@@ -20,6 +22,7 @@ Redis::Redis(Storage* const s, const DataType& type)
2022
default_compact_range_options_.exclusive_manual_compaction = false;
2123
default_compact_range_options_.change_level = true;
2224
handles_.clear();
25+
StartBigKeysLogThread();
2326
}
2427

2528
Redis::~Redis() {
@@ -33,6 +36,7 @@ Redis::~Redis() {
3336
if (default_compact_range_options_.canceled) {
3437
delete default_compact_range_options_.canceled;
3538
}
39+
stop_big_keys_log_ = true;
3640
}
3741

3842
Status Redis::GetScanStartPoint(const Slice& key, const Slice& pattern, int64_t cursor, std::string* start_point) {
@@ -109,7 +113,16 @@ Status Redis::SetOptions(const OptionType& option_type, const std::unordered_map
109113
}
110114
return s;
111115
}
112-
116+
inline const char* DataTypeName(DataType type) {
117+
switch (type) {
118+
case kStrings: return "string";
119+
case kHashes: return "hash";
120+
case kLists: return "list";
121+
case kSets: return "set";
122+
case kZSets: return "zset";
123+
default: return "unknown";
124+
}
125+
}
113126
void Redis::GetRocksDBInfo(std::string &info, const char *prefix) {
114127
std::ostringstream string_stream;
115128
string_stream << "#" << prefix << "RocksDB" << "\r\n";
@@ -198,5 +211,52 @@ void Redis::SetCompactRangeOptions(const bool is_canceled) {
198211
default_compact_range_options_.canceled->store(is_canceled);
199212
}
200213
}
201-
214+
//big keys
215+
void Redis::CheckAndRecordBigKeys(const std::string& key, DataType type, uint64_t key_size, uint64_t value_size) {
216+
bool is_bigkey = false;
217+
if (type == kStrings) {
218+
is_bigkey = (key_size >= BIGKEYS_THRESHOLD || value_size >= BIGKEYS_THRESHOLD);
219+
} else {
220+
is_bigkey = (key_size >= BIGKEYS_THRESHOLD);
221+
}
222+
if (is_bigkey) {
223+
std::lock_guard<std::mutex> lock(big_keys_mutex_);
224+
BigKeyInfo info;
225+
info.key = key;
226+
info.type = type;
227+
info.key_size = key_size;
228+
info.value_size = value_size;
229+
info.last_update_time = static_cast<uint64_t>(time(nullptr));
230+
big_keys_info_map_[key] = info;
231+
}
232+
}
233+
void Redis::StartBigKeysLogThread() {
234+
std::thread([this]() {
235+
while (!stop_big_keys_log_) {
236+
{
237+
std::lock_guard<std::mutex> lock(big_keys_mutex_);
238+
std::ofstream ofs("log/pika.WARNING", std::ios::app);
239+
if (ofs.is_open()) {
240+
auto now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
241+
char time_buf[32];
242+
std::strftime(time_buf, sizeof(time_buf), "%Y-%m-%d %H:%M:%S", std::localtime(&now));
243+
for (const auto& kv : big_keys_info_map_) {
244+
ofs << "[BigKey] key=" << kv.first
245+
<< " type=" << DataTypeName(kv.second.type)
246+
<< " key_size=" << kv.second.key_size;
247+
if (kv.second.type == DataType::kStrings) {
248+
ofs << " value_size=" << kv.second.value_size;
249+
}
250+
ofs << " Updated Time=" << kv.second.last_update_time;
251+
ofs << " Real time=" << time_buf << std::endl;
252+
}
253+
ofs.close();
254+
} else {
255+
LOG(WARNING) << "Failed to open log/pika.WARNING for writing!";
256+
}
257+
}
258+
std::this_thread::sleep_for(std::chrono::minutes(1));
259+
}
260+
}).detach();
261+
}
202262
} // namespace storage

src/storage/src/redis.h

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
#ifndef SRC_REDIS_H_
77
#define SRC_REDIS_H_
8-
8+
#define BIGKEYS_THRESHOLD 10000
99
#include <memory>
1010
#include <string>
1111
#include <vector>
@@ -24,7 +24,8 @@
2424
namespace storage {
2525
using Status = rocksdb::Status;
2626
using Slice = rocksdb::Slice;
27-
27+
//Big Keys INFO struct
28+
struct BigKeyInfo;
2829
class Redis {
2930
public:
3031
Redis(Storage* storage, const DataType& type);
@@ -115,7 +116,7 @@ class Redis {
115116
Status SetSmallCompactionDurationThreshold(uint64_t small_compaction_duration_threshold);
116117
std::vector<rocksdb::ColumnFamilyHandle*> GetHandles(){ return handles_;};
117118
void GetRocksDBInfo(std::string &info, const char *prefix);
118-
119+
virtual void GetBigKeyStatistics(std::vector<BigKeyInfo>* bigkeys) = 0;
119120
protected:
120121
Storage* const storage_;
121122
DataType type_;
@@ -126,6 +127,13 @@ class Redis {
126127
rocksdb::WriteOptions default_write_options_;
127128
rocksdb::ReadOptions default_read_options_;
128129
rocksdb::CompactRangeOptions default_compact_range_options_;
130+
// big keys
131+
std::mutex big_keys_mutex_;
132+
std::unordered_map<std::string, BigKeyInfo> big_keys_info_map_;
133+
uint64_t big_keys_threshold_;
134+
std::atomic<bool> stop_big_keys_log_ = false;
135+
void CheckAndRecordBigKeys(const std::string& key, DataType type, uint64_t value_size, uint64_t key_size = 0);
136+
void StartBigKeysLogThread();
129137

130138
// For Scan
131139
std::unique_ptr<LRUCache<std::string, std::string>> scan_cursors_store_;

0 commit comments

Comments
 (0)