Skip to content

Commit 45d39a7

Browse files
wangshao1wangshaoyi
andauthored
fix rediscache race condition when read api meets rehashing (#3088)
Co-authored-by: wangshaoyi <[email protected]>
1 parent 0709b88 commit 45d39a7

2 files changed

Lines changed: 133 additions & 103 deletions

File tree

include/pika_cache.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ class PikaCache : public pstd::noncopyable, public std::enable_shared_from_this<
133133
rocksdb::Status RPushnx(std::string& key, std::vector<std::string> &values, int64_t ttl);
134134
rocksdb::Status RPushnxWithoutTTL(std::string& key, std::vector<std::string> &values);
135135
rocksdb::Status LPushIfKeyExist(std::string& key, std::vector<std::string> &values);
136-
136+
137137
// Set Commands
138138
rocksdb::Status SAdd(std::string& key, std::vector<std::string>& members);
139139
rocksdb::Status SAddIfKeyExist(std::string& key, std::vector<std::string>& members);

src/pika_cache.cc

Lines changed: 132 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -141,54 +141,69 @@ std::map<storage::DataType, int64_t> PikaCache::TTL(std::string &key, std::map<s
141141
std::map<storage::DataType, int64_t> ret;
142142
int64_t timestamp = 0;
143143

144-
std::string CacheKeyPrefixK = PCacheKeyPrefixK + key;
145-
int cache_indexk = CacheIndex(CacheKeyPrefixK);
146-
s = caches_[cache_indexk]->TTL(CacheKeyPrefixK, &timestamp);
147-
if (s.ok() || s.IsNotFound()) {
148-
ret[storage::DataType::kStrings] = timestamp;
149-
} else if (!s.IsNotFound()) {
150-
ret[storage::DataType::kStrings] = -3;
151-
(*type_status)[storage::DataType::kStrings] = s;
152-
}
153-
154-
std::string CacheKeyPrefixH = PCacheKeyPrefixH + key;
155-
int cache_indexh = CacheIndex(CacheKeyPrefixH);
156-
s = caches_[cache_indexh]->TTL(CacheKeyPrefixH, &timestamp);
157-
if (s.ok() || s.IsNotFound()) {
158-
ret[storage::DataType::kHashes] = timestamp;
159-
} else if (!s.IsNotFound()) {
160-
ret[storage::DataType::kHashes] = -3;
161-
(*type_status)[storage::DataType::kHashes] = s;
162-
}
163-
164-
std::string CacheKeyPrefixL = PCacheKeyPrefixL + key;
165-
int cache_indexl = CacheIndex(CacheKeyPrefixL);
166-
s = caches_[cache_indexl]->TTL(CacheKeyPrefixL, &timestamp);
167-
if (s.ok() || s.IsNotFound()) {
168-
ret[storage::DataType::kLists] = timestamp;
169-
} else if (!s.IsNotFound()) {
170-
ret[storage::DataType::kLists] = -3;
171-
(*type_status)[storage::DataType::kLists] = s;
172-
}
173-
174-
std::string CacheKeyPrefixS = PCacheKeyPrefixS + key;
175-
int cache_indexs = CacheIndex(CacheKeyPrefixS);
176-
s = caches_[cache_indexs]->TTL(CacheKeyPrefixS, &timestamp);
177-
if (s.ok() || s.IsNotFound()) {
178-
ret[storage::DataType::kSets] = timestamp;
179-
} else if (!s.IsNotFound()) {
180-
ret[storage::DataType::kSets] = -3;
181-
(*type_status)[storage::DataType::kSets] = s;
182-
}
183-
184-
std::string CacheKeyPrefixZ = PCacheKeyPrefixZ + key;
185-
int cache_indexz = CacheIndex(CacheKeyPrefixZ);
186-
s = caches_[cache_indexz]->TTL(CacheKeyPrefixZ, &timestamp);
187-
if (s.ok() || s.IsNotFound()) {
188-
ret[storage::DataType::kZSets] = timestamp;
189-
} else if (!s.IsNotFound()) {
190-
ret[storage::DataType::kZSets] = -3;
191-
(*type_status)[storage::DataType::kZSets] = s;
144+
{
145+
std::string CacheKeyPrefixK = PCacheKeyPrefixK + key;
146+
int cache_indexk = CacheIndex(CacheKeyPrefixK);
147+
std::lock_guard lm(*cache_mutexs_[cache_indexk]);
148+
s = caches_[cache_indexk]->TTL(CacheKeyPrefixK, &timestamp);
149+
if (s.ok() || s.IsNotFound()) {
150+
ret[storage::DataType::kStrings] = timestamp;
151+
} else if (!s.IsNotFound()) {
152+
ret[storage::DataType::kStrings] = -3;
153+
(*type_status)[storage::DataType::kStrings] = s;
154+
}
155+
}
156+
157+
{
158+
std::string CacheKeyPrefixH = PCacheKeyPrefixH + key;
159+
int cache_indexh = CacheIndex(CacheKeyPrefixH);
160+
std::lock_guard lm(*cache_mutexs_[cache_indexh]);
161+
s = caches_[cache_indexh]->TTL(CacheKeyPrefixH, &timestamp);
162+
if (s.ok() || s.IsNotFound()) {
163+
ret[storage::DataType::kHashes] = timestamp;
164+
} else if (!s.IsNotFound()) {
165+
ret[storage::DataType::kHashes] = -3;
166+
(*type_status)[storage::DataType::kHashes] = s;
167+
}
168+
}
169+
170+
{
171+
std::string CacheKeyPrefixL = PCacheKeyPrefixL + key;
172+
int cache_indexl = CacheIndex(CacheKeyPrefixL);
173+
std::lock_guard lm(*cache_mutexs_[cache_indexl]);
174+
s = caches_[cache_indexl]->TTL(CacheKeyPrefixL, &timestamp);
175+
if (s.ok() || s.IsNotFound()) {
176+
ret[storage::DataType::kLists] = timestamp;
177+
} else if (!s.IsNotFound()) {
178+
ret[storage::DataType::kLists] = -3;
179+
(*type_status)[storage::DataType::kLists] = s;
180+
}
181+
}
182+
183+
{
184+
std::string CacheKeyPrefixS = PCacheKeyPrefixS + key;
185+
int cache_indexs = CacheIndex(CacheKeyPrefixS);
186+
std::lock_guard lm(*cache_mutexs_[cache_indexs]);
187+
s = caches_[cache_indexs]->TTL(CacheKeyPrefixS, &timestamp);
188+
if (s.ok() || s.IsNotFound()) {
189+
ret[storage::DataType::kSets] = timestamp;
190+
} else if (!s.IsNotFound()) {
191+
ret[storage::DataType::kSets] = -3;
192+
(*type_status)[storage::DataType::kSets] = s;
193+
}
194+
}
195+
196+
{
197+
std::string CacheKeyPrefixZ = PCacheKeyPrefixZ + key;
198+
int cache_indexz = CacheIndex(CacheKeyPrefixZ);
199+
std::lock_guard lm(*cache_mutexs_[cache_indexz]);
200+
s = caches_[cache_indexz]->TTL(CacheKeyPrefixZ, &timestamp);
201+
if (s.ok() || s.IsNotFound()) {
202+
ret[storage::DataType::kZSets] = timestamp;
203+
} else if (!s.IsNotFound()) {
204+
ret[storage::DataType::kZSets] = -3;
205+
(*type_status)[storage::DataType::kZSets] = s;
206+
}
192207
}
193208
return ret;
194209
}
@@ -226,68 +241,83 @@ Status PikaCache::GetType(const std::string& key, bool single, std::vector<std::
226241

227242
Status s;
228243
std::string value;
229-
std::string CacheKeyPrefixK = PCacheKeyPrefixK + key;
230-
int cache_indexk = CacheIndex(CacheKeyPrefixK);
231-
s = caches_[cache_indexk]->Get(CacheKeyPrefixK, &value);
232-
if (s.ok()) {
233-
types.emplace_back("string");
234-
} else if (!s.IsNotFound()) {
235-
return s;
236-
}
237-
if (single && !types.empty()) {
238-
return s;
244+
{
245+
std::string CacheKeyPrefixK = PCacheKeyPrefixK + key;
246+
int cache_indexk = CacheIndex(CacheKeyPrefixK);
247+
std::lock_guard lm(*cache_mutexs_[cache_indexk]);
248+
s = caches_[cache_indexk]->Get(CacheKeyPrefixK, &value);
249+
if (s.ok()) {
250+
types.emplace_back("string");
251+
} else if (!s.IsNotFound()) {
252+
return s;
253+
}
254+
if (single && !types.empty()) {
255+
return s;
256+
}
239257
}
240258

241-
uint64_t hashes_len = 0;
242-
std::string CacheKeyPrefixH = PCacheKeyPrefixH + key;
243-
int cache_indexh = CacheIndex(CacheKeyPrefixH);
244-
s = caches_[cache_indexh]->HLen(CacheKeyPrefixH, &hashes_len);
245-
if (s.ok() && hashes_len != 0) {
246-
types.emplace_back("hash");
247-
} else if (!s.IsNotFound()) {
248-
return s;
249-
}
250-
if (single && !types.empty()) {
251-
return s;
259+
{
260+
uint64_t hashes_len = 0;
261+
std::string CacheKeyPrefixH = PCacheKeyPrefixH + key;
262+
int cache_indexh = CacheIndex(CacheKeyPrefixH);
263+
std::lock_guard lm(*cache_mutexs_[cache_indexh]);
264+
s = caches_[cache_indexh]->HLen(CacheKeyPrefixH, &hashes_len);
265+
if (s.ok() && hashes_len != 0) {
266+
types.emplace_back("hash");
267+
} else if (!s.IsNotFound()) {
268+
return s;
269+
}
270+
if (single && !types.empty()) {
271+
return s;
272+
}
252273
}
253274

254-
uint64_t lists_len = 0;
255-
std::string CacheKeyPrefixL = PCacheKeyPrefixL + key;
256-
int cache_indexl = CacheIndex(CacheKeyPrefixL);
257-
s = caches_[cache_indexl]->LLen(CacheKeyPrefixL, &lists_len);
258-
if (s.ok() && lists_len != 0) {
259-
types.emplace_back("list");
260-
} else if (!s.IsNotFound()) {
261-
return s;
262-
}
263-
if (single && !types.empty()) {
264-
return s;
275+
{
276+
uint64_t lists_len = 0;
277+
std::string CacheKeyPrefixL = PCacheKeyPrefixL + key;
278+
int cache_indexl = CacheIndex(CacheKeyPrefixL);
279+
std::lock_guard lm(*cache_mutexs_[cache_indexl]);
280+
s = caches_[cache_indexl]->LLen(CacheKeyPrefixL, &lists_len);
281+
if (s.ok() && lists_len != 0) {
282+
types.emplace_back("list");
283+
} else if (!s.IsNotFound()) {
284+
return s;
285+
}
286+
if (single && !types.empty()) {
287+
return s;
288+
}
265289
}
266290

267-
uint64_t zsets_size = 0;
268-
std::string CacheKeyPrefixZ = PCacheKeyPrefixZ + key;
269-
int cache_indexz = CacheIndex(CacheKeyPrefixZ);
270-
s = caches_[cache_indexz]->ZCard(CacheKeyPrefixZ, &zsets_size);
271-
if (s.ok() && zsets_size != 0) {
272-
types.emplace_back("zset");
273-
} else if (!s.IsNotFound()) {
274-
return s;
275-
}
276-
if (single && !types.empty()) {
277-
return s;
291+
{
292+
uint64_t zsets_size = 0;
293+
std::string CacheKeyPrefixZ = PCacheKeyPrefixZ + key;
294+
int cache_indexz = CacheIndex(CacheKeyPrefixZ);
295+
std::lock_guard lm(*cache_mutexs_[cache_indexz]);
296+
s = caches_[cache_indexz]->ZCard(CacheKeyPrefixZ, &zsets_size);
297+
if (s.ok() && zsets_size != 0) {
298+
types.emplace_back("zset");
299+
} else if (!s.IsNotFound()) {
300+
return s;
301+
}
302+
if (single && !types.empty()) {
303+
return s;
304+
}
278305
}
279306

280-
uint64_t sets_size = 0;
281-
std::string CacheKeyPrefixS = PCacheKeyPrefixS + key;
282-
int cache_indexs = CacheIndex(CacheKeyPrefixS);
283-
s = caches_[cache_indexs]->SCard(CacheKeyPrefixS, &sets_size);
284-
if (s.ok() && sets_size != 0) {
285-
types.emplace_back("set");
286-
} else if (!s.IsNotFound()) {
287-
return s;
288-
}
289-
if (single && types.empty()) {
290-
types.emplace_back("none");
307+
{
308+
uint64_t sets_size = 0;
309+
std::string CacheKeyPrefixS = PCacheKeyPrefixS + key;
310+
int cache_indexs = CacheIndex(CacheKeyPrefixS);
311+
std::lock_guard lm(*cache_mutexs_[cache_indexs]);
312+
s = caches_[cache_indexs]->SCard(CacheKeyPrefixS, &sets_size);
313+
if (s.ok() && sets_size != 0) {
314+
types.emplace_back("set");
315+
} else if (!s.IsNotFound()) {
316+
return s;
317+
}
318+
if (single && types.empty()) {
319+
types.emplace_back("none");
320+
}
291321
}
292322
return Status::OK();
293323
}

0 commit comments

Comments
 (0)