Skip to content

Commit 8eb49c5

Browse files
committed
Fix thread safety
1 parent eca840d commit 8eb49c5

2 files changed

Lines changed: 31 additions & 3 deletions

File tree

src/storage/src/redis.cc

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,12 @@ Redis::Redis(Storage* const s, const DataType& type)
2626
}
2727

2828
Redis::~Redis() {
29+
stop_big_keys_log_ = true;
30+
std::lock_guard<std::mutex> lock(bg_thread_mutex_);
31+
bg_thread_cond_.notify_all();
32+
std::unique_lock<std::mutex> lock(bg_thread_mutex_);
33+
bg_thread_exit_cond_.wait_for(lock, std::chrono::milliseconds(200),
34+
[this]() { return this->bg_threads_count_.load() == 0; });
2935
std::vector<rocksdb::ColumnFamilyHandle*> tmp_handles = handles_;
3036
handles_.clear();
3137
for (auto handle : tmp_handles) {
@@ -36,7 +42,7 @@ Redis::~Redis() {
3642
if (default_compact_range_options_.canceled) {
3743
delete default_compact_range_options_.canceled;
3844
}
39-
stop_big_keys_log_ = true;
45+
4046
}
4147

4248
Status Redis::GetScanStartPoint(const Slice& key, const Slice& pattern, int64_t cursor, std::string* start_point) {
@@ -252,7 +258,9 @@ void Redis::CheckAndRecordBigKeys(
252258
}
253259
void Redis::StartBigKeysExpireCheckThread() {
254260
std::thread([this]() {
255-
while (!stop_big_keys_log_) {
261+
std::lock_guard<std::mutex> lock(bg_thread_mutex_);
262+
this->bg_threads_count_++;
263+
while (!this->stop_big_keys_log_.load()) {
256264
{
257265
std::lock_guard<std::mutex> lock(big_keys_mutex_);
258266
std::vector<std::string> expired_keys;
@@ -270,12 +278,21 @@ void Redis::StartBigKeysExpireCheckThread() {
270278
}
271279
}
272280
}
281+
std::unique_lock<std::mutex> lock(bg_thread_mutex_);
282+
bg_thread_cond_.wait_for(lock,
283+
std::chrono::minutes(storage_->bigkeys_log_interval_),
284+
[this]() { return this->stop_big_keys_log_.load(); });
285+
std::lock_guard<std::mutex> lock(bg_thread_mutex_);
286+
this->bg_threads_count_--;
287+
bg_thread_exit_cond_.notify_all();
273288
std::this_thread::sleep_for(std::chrono::minutes(storage_->bigkeys_log_interval_));
274289
}
275290
}).detach();
276291
}
277292
void Redis::StartBigKeysLogThread() {
278293
std::thread([this]() {
294+
std::lock_guard<std::mutex> lock(bg_thread_mutex_);
295+
this->bg_threads_count_++;
279296
while (!stop_big_keys_log_) {
280297
{
281298
uint32_t current_log_interval = storage_->bigkeys_log_interval_;
@@ -306,6 +323,13 @@ void Redis::StartBigKeysLogThread() {
306323
}
307324
}
308325
}
326+
std::unique_lock<std::mutex> lock(bg_thread_mutex_);
327+
bg_thread_cond_.wait_for(lock,
328+
std::chrono::minutes(storage_->bigkeys_log_interval_),
329+
[this]() { return this->stop_big_keys_log_.load(); });
330+
std::lock_guard<std::mutex> lock(bg_thread_mutex_);
331+
this->bg_threads_count_--;
332+
bg_thread_exit_cond_.notify_all();
309333
std::this_thread::sleep_for(std::chrono::minutes(storage_->bigkeys_log_interval_));
310334
}
311335
}).detach();

src/storage/src/redis.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
#include "src/lru_cache.h"
2121
#include "src/mutex_impl.h"
2222
#include "storage/storage.h"
23-
23+
#include <condition_variable>
2424
namespace storage {
2525
using Status = rocksdb::Status;
2626
using Slice = rocksdb::Slice;
@@ -132,6 +132,10 @@ class Redis {
132132
std::unordered_map<std::string, BigKeyInfo> big_keys_info_map_;
133133
uint64_t big_keys_threshold_;
134134
std::atomic<bool> stop_big_keys_log_{false};
135+
std::mutex bg_thread_mutex_;
136+
std::condition_variable bg_thread_cond_;
137+
std::condition_variable bg_thread_exit_cond_;
138+
std::atomic<int> bg_threads_count_{0};
135139
void CheckAndRecordBigKeys(const std::string& key, DataType type, uint64_t member_size, uint64_t key_length = 0, uint64_t value_length = 0, bool is_delete = false);
136140
void StartBigKeysLogThread();
137141
void StartBigKeysExpireCheckThread();

0 commit comments

Comments
 (0)