Skip to content

Commit c467b47

Browse files
Mixficsolwuxianrong
andauthored
fix: Fixed an issue with asynchronous loading (OpenAtomFoundation#3037)
* Fixed an issue with asynchronous loading * code format * add list * Fixed Hsetnx data consistency issues * add rpush * fix hsetnx commadn --------- Co-authored-by: wuxianrong <[email protected]>
1 parent cf60090 commit c467b47

12 files changed

Lines changed: 241 additions & 29 deletions

File tree

include/pika_cache.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ class PikaCache : public pstd::noncopyable, public std::enable_shared_from_this<
9191
rocksdb::Status Appendxx(std::string& key, std::string& value);
9292
rocksdb::Status GetRange(std::string& key, int64_t start, int64_t end, std::string* value);
9393
rocksdb::Status SetRangexx(std::string& key, int64_t start, std::string& value);
94+
rocksdb::Status SetRangeIfKeyExist(std::string& key, int64_t start, std::string &value);
9495
rocksdb::Status Strlen(std::string& key, int32_t* len);
9596

9697
// Hash Commands
@@ -112,6 +113,7 @@ class PikaCache : public pstd::noncopyable, public std::enable_shared_from_this<
112113
rocksdb::Status HIncrbyfloatxx(std::string& key, std::string& field, long double value);
113114
rocksdb::Status HLen(std::string& key, uint64_t* len);
114115
rocksdb::Status HStrlen(std::string& key, std::string& field, uint64_t* len);
116+
rocksdb::Status HMSetIfKeyExist(std::string& key, std::vector<storage::FieldValue> &fvs);
115117

116118
// List Commands
117119
rocksdb::Status LIndex(std::string& key, int64_t index, std::string* element);
@@ -126,10 +128,12 @@ class PikaCache : public pstd::noncopyable, public std::enable_shared_from_this<
126128
rocksdb::Status LTrim(std::string& key, int64_t start, int64_t stop);
127129
rocksdb::Status RPop(std::string& key, std::string* element);
128130
rocksdb::Status RPush(std::string& key, std::vector<std::string> &values);
131+
rocksdb::Status RPushIfKeyExist(std::string& key, std::vector<std::string> &values);
129132
rocksdb::Status RPushx(std::string& key, std::vector<std::string> &values);
130133
rocksdb::Status RPushnx(std::string& key, std::vector<std::string> &values, int64_t ttl);
131134
rocksdb::Status RPushnxWithoutTTL(std::string& key, std::vector<std::string> &values);
132-
135+
rocksdb::Status LPushIfKeyExist(std::string& key, std::vector<std::string> &values);
136+
133137
// Set Commands
134138
rocksdb::Status SAdd(std::string& key, std::vector<std::string>& members);
135139
rocksdb::Status SAddIfKeyExist(std::string& key, std::vector<std::string>& members);

src/cache/include/cache.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,13 +70,16 @@ class RedisCache {
7070
Status Append(std::string& key, std::string &value);
7171
Status GetRange(std::string& key, int64_t start, int64_t end, std::string *value);
7272
Status SetRange(std::string& key, int64_t start, std::string &value);
73+
Status SetRangeIfKeyExist(std::string& key, int64_t start, std::string &value);
7374
Status Strlen(std::string& key, int32_t *len);
7475

7576
// Hash Commands
7677
Status HDel(std::string& key, std::vector<std::string> &fields);
7778
Status HSetIfKeyExist(std::string& key, std::string &field, std::string &value);
7879
Status HSetnx(std::string& key, std::string &field, std::string &value);
80+
Status HSetnxIfKeyExist(std::string& key, std::string &field, std::string &value);
7981
Status HMSet(std::string& key, std::vector<storage::FieldValue> &fvs);
82+
Status HMSetIfKeyExist(std::string& key, std::vector<storage::FieldValue> &fvs);
8083
Status HGet(std::string& key, std::string &field, std::string *value);
8184
Status HMGet(std::string& key,
8285
std::vector<std::string> &fields,
@@ -96,6 +99,7 @@ class RedisCache {
9699
std::string &pivot, std::string &value);
97100
Status LLen(std::string& key, uint64_t *len);
98101
Status LPop(std::string& key, std::string *element);
102+
Status LPushIfKeyExist(std::string& key, std::vector<std::string> &values);
99103
Status LPush(std::string& key, std::vector<std::string> &values);
100104
Status LPushx(std::string& key, std::vector<std::string> &values);
101105
Status LRange(std::string& key, int64_t start, int64_t stop, std::vector<std::string> *values);
@@ -104,10 +108,12 @@ class RedisCache {
104108
Status LTrim(std::string& key, int64_t start, int64_t stop);
105109
Status RPop(std::string& key, std::string *element);
106110
Status RPush(std::string& key, std::vector<std::string> &values);
111+
Status RPushIfKeyExist(std::string& key, std::vector<std::string> &values);
107112
Status RPushx(std::string& key, std::vector<std::string> &values);
108113

109114
// Set Commands
110115
Status SAdd(std::string& key, std::vector<std::string> &members);
116+
Status SAddIfKeyExist(std::string& key, std::vector<std::string> &members);
111117
Status SCard(std::string& key, uint64_t *len);
112118
Status SIsmember(std::string& key, std::string& member);
113119
Status SMembers(std::string& key, std::vector<std::string> *members);
@@ -116,6 +122,7 @@ class RedisCache {
116122

117123
// Zset Commands
118124
Status ZAdd(std::string& key, std::vector<storage::ScoreMember> &score_members);
125+
Status ZAddIfKeyExist(std::string& key, std::vector<storage::ScoreMember> &score_members);
119126
Status ZCard(std::string& key, uint64_t *len);
120127
Status ZCount(std::string& key, std::string &min, std::string &max, uint64_t *len);
121128
Status ZIncrby(std::string& key, std::string& member, double increment);
@@ -151,6 +158,7 @@ class RedisCache {
151158
Status ZPopMax(std::string& key, int64_t count, std::vector<storage::ScoreMember>* score_members);
152159

153160
// Bit Commands
161+
Status SetBitIfKeyExist(std::string& key, size_t offset, int64_t value);
154162
Status SetBit(std::string& key, size_t offset, int64_t value);
155163
Status GetBit(std::string& key, size_t offset, int64_t *value);
156164
Status BitCount(std::string& key, int64_t start, int64_t end, int64_t *value, bool have_offset);

src/cache/src/bit.cc

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,25 @@ Status RedisCache::SetBit(std::string& key, size_t offset, int64_t value) {
1515
return Status::Corruption("[error] Free memory faild !");
1616
}
1717

18+
// createObject is a function in redis, the init ref count of robj is 1
19+
robj *kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size()));
20+
DEFER {
21+
DecrObjectsRefCount(kobj);
22+
};
23+
int ret = RcSetBit(cache_, kobj, offset, value);
24+
if (C_OK != ret) {
25+
return Status::Corruption("RcSetBit failed");
26+
}
27+
28+
return Status::OK();
29+
}
30+
31+
Status RedisCache::SetBitIfKeyExist(std::string& key, size_t offset, int64_t value) {
32+
int res = RcFreeMemoryIfNeeded(cache_);
33+
if (C_OK != res) {
34+
return Status::Corruption("[error] Free memory faild !");
35+
}
36+
1837
if (!Exists(key)) {
1938
return Status::NotFound("key not exist");
2039
}

src/cache/src/hash.cc

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,27 @@ Status RedisCache::HSetIfKeyExist(std::string& key, std::string &field, std::str
5454
return Status::OK();
5555
}
5656

57+
Status RedisCache::HSetnxIfKeyExist(std::string& key, std::string &field, std::string &value) {
58+
if (C_OK != RcFreeMemoryIfNeeded(cache_)) {
59+
return Status::Corruption("[error] Free memory faild !");
60+
}
61+
62+
if (!Exists(key)) {
63+
return Status::NotFound("key not exist");
64+
}
65+
robj *kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size()));
66+
robj *fobj = createObject(OBJ_STRING, sdsnewlen(field.data(), field.size()));
67+
robj *vobj = createObject(OBJ_STRING, sdsnewlen(value.data(), value.size()));
68+
DEFER {
69+
DecrObjectsRefCount(kobj, fobj, vobj);
70+
};
71+
if (C_OK != RcHSetnx(cache_, kobj, fobj, vobj)) {
72+
return Status::Corruption("RcHSetnx failed");
73+
}
74+
75+
return Status::OK();
76+
}
77+
5778
Status RedisCache::HSetnx(std::string& key, std::string &field, std::string &value) {
5879
if (C_OK != RcFreeMemoryIfNeeded(cache_)) {
5980
return Status::Corruption("[error] Free memory faild !");
@@ -72,11 +93,11 @@ Status RedisCache::HSetnx(std::string& key, std::string &field, std::string &val
7293
return Status::OK();
7394
}
7495

75-
Status RedisCache::HMSet(std::string& key, std::vector<storage::FieldValue> &fvs) {
96+
Status RedisCache::HMSetIfKeyExist(std::string& key, std::vector<storage::FieldValue> &fvs) {
7697
int res = RcFreeMemoryIfNeeded(cache_);
7798
if (C_OK != res) {
7899
return Status::Corruption("[error] Free memory faild !");
79-
}
100+
}
80101

81102
if (!Exists(key)) {
82103
return Status::NotFound("key not exist");
@@ -99,6 +120,30 @@ Status RedisCache::HMSet(std::string& key, std::vector<storage::FieldValue> &fvs
99120
return Status::OK();
100121
}
101122

123+
Status RedisCache::HMSet(std::string& key, std::vector<storage::FieldValue> &fvs) {
124+
int res = RcFreeMemoryIfNeeded(cache_);
125+
if (C_OK != res) {
126+
return Status::Corruption("[error] Free memory faild !");
127+
}
128+
129+
robj *kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size()));
130+
unsigned int items_size = fvs.size() * 2;
131+
robj **items = (robj **)zcallocate(sizeof(robj *) * items_size);
132+
for (unsigned int i = 0; i < fvs.size(); ++i) {
133+
items[i * 2] = createObject(OBJ_STRING, sdsnewlen(fvs[i].field.data(), fvs[i].field.size()));
134+
items[i * 2 + 1] = createObject(OBJ_STRING, sdsnewlen(fvs[i].value.data(), fvs[i].value.size()));
135+
}
136+
DEFER {
137+
FreeObjectList(items, items_size);
138+
DecrObjectsRefCount(kobj);
139+
};
140+
int ret = RcHMSet(cache_, kobj, items, items_size);
141+
if (C_OK != ret) {
142+
return Status::Corruption("RcHMSet failed");
143+
}
144+
return Status::OK();
145+
}
146+
102147
Status RedisCache::HGet(std::string& key, std::string &field, std::string *value) {
103148
sds val;
104149
robj *kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size()));

src/cache/src/list.cc

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ Status RedisCache::LPop(std::string& key, std::string *element) {
9393
return Status::OK();
9494
}
9595

96-
Status RedisCache::LPush(std::string& key, std::vector<std::string> &values) {
96+
Status RedisCache::LPushIfKeyExist(std::string& key, std::vector<std::string> &values) {
9797
int ret = RcFreeMemoryIfNeeded(cache_);
9898
if (C_OK != ret) {
9999
return Status::Corruption("[error] Free memory faild !");
@@ -119,6 +119,29 @@ Status RedisCache::LPush(std::string& key, std::vector<std::string> &values) {
119119
return Status::OK();
120120
}
121121

122+
Status RedisCache::LPush(std::string& key, std::vector<std::string> &values) {
123+
int ret = RcFreeMemoryIfNeeded(cache_);
124+
if (C_OK != ret) {
125+
return Status::Corruption("[error] Free memory faild !");
126+
}
127+
128+
robj *kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size()));
129+
robj **vals = (robj **)zcallocate(sizeof(robj *) * values.size());
130+
for (unsigned int i = 0; i < values.size(); ++i) {
131+
vals[i] = createObject(OBJ_STRING, sdsnewlen(values[i].data(), values[i].size()));
132+
}
133+
DEFER {
134+
FreeObjectList(vals, values.size());
135+
DecrObjectsRefCount(kobj);
136+
};
137+
int res = RcLPush(cache_, kobj, vals, values.size());
138+
if (C_OK != res) {
139+
return Status::Corruption("RcLPush failed");
140+
}
141+
142+
return Status::OK();
143+
}
144+
122145
Status RedisCache::LPushx(std::string& key, std::vector<std::string> &values) {
123146
int ret = RcFreeMemoryIfNeeded(cache_);
124147
if (C_OK != ret) {
@@ -242,7 +265,7 @@ Status RedisCache::RPop(std::string& key, std::string *element) {
242265
return Status::OK();
243266
}
244267

245-
Status RedisCache::RPush(std::string& key, std::vector<std::string> &values) {
268+
Status RedisCache::RPushIfKeyExist(std::string& key, std::vector<std::string> &values) {
246269
int res = RcFreeMemoryIfNeeded(cache_);
247270
if (C_OK != res) {
248271
return Status::Corruption("[error] Free memory faild !");
@@ -267,6 +290,29 @@ Status RedisCache::RPush(std::string& key, std::vector<std::string> &values) {
267290
return Status::OK();
268291
}
269292

293+
Status RedisCache::RPush(std::string& key, std::vector<std::string> &values) {
294+
int res = RcFreeMemoryIfNeeded(cache_);
295+
if (C_OK != res) {
296+
return Status::Corruption("[error] Free memory faild !");
297+
}
298+
299+
robj *kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size()));
300+
robj **vals = (robj **)zcallocate(sizeof(robj *) * values.size());
301+
for (unsigned int i = 0; i < values.size(); ++i) {
302+
vals[i] = createObject(OBJ_STRING, sdsnewlen(values[i].data(), values[i].size()));
303+
}
304+
DEFER {
305+
FreeObjectList(vals, values.size());
306+
DecrObjectsRefCount(kobj);
307+
};
308+
int ret = RcRPush(cache_, kobj, vals, values.size());
309+
if (C_OK != ret) {
310+
return Status::Corruption("RcRPush failed");
311+
}
312+
313+
return Status::OK();
314+
}
315+
270316
Status RedisCache::RPushx(std::string& key, std::vector<std::string> &values) {
271317
int res = RcFreeMemoryIfNeeded(cache_);
272318
if (C_OK != res) {

src/cache/src/set.cc

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@
88

99
namespace cache {
1010

11-
Status RedisCache::SAdd(std::string& key, std::vector<std::string> &members) {
11+
12+
Status RedisCache::SAddIfKeyExist(std::string& key, std::vector<std::string> &members) {
1213
int ret = RcFreeMemoryIfNeeded(cache_);
1314
if (C_OK != ret) {
1415
return Status::Corruption("[error] Free memory faild !");
@@ -34,6 +35,29 @@ Status RedisCache::SAdd(std::string& key, std::vector<std::string> &members) {
3435
return Status::OK();
3536
}
3637

38+
Status RedisCache::SAdd(std::string& key, std::vector<std::string> &members) {
39+
int ret = RcFreeMemoryIfNeeded(cache_);
40+
if (C_OK != ret) {
41+
return Status::Corruption("[error] Free memory faild !");
42+
}
43+
44+
robj *kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size()));
45+
robj **vals = (robj **)zcallocate(sizeof(robj *) * members.size());
46+
for (unsigned int i = 0; i < members.size(); ++i) {
47+
vals[i] = createObject(OBJ_STRING, sdsnewlen(members[i].data(), members[i].size()));
48+
}
49+
DEFER {
50+
FreeObjectList(vals, members.size());
51+
DecrObjectsRefCount(kobj);
52+
};
53+
int res = RcSAdd(cache_, kobj, vals, members.size());
54+
if (C_OK != res) {
55+
return Status::Corruption("RcSAdd failed");
56+
}
57+
58+
return Status::OK();
59+
}
60+
3761
Status RedisCache::SCard(std::string& key, uint64_t *len) {
3862
robj *kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size()));
3963
DEFER {

src/cache/src/string.cc

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ Status RedisCache::GetRange(std::string& key, int64_t start, int64_t end, std::s
255255
return Status::OK();
256256
}
257257

258-
Status RedisCache::SetRange(std::string& key, int64_t start, std::string &value) {
258+
Status RedisCache::SetRangeIfKeyExist(std::string& key, int64_t start, std::string &value) {
259259
if (C_OK != RcFreeMemoryIfNeeded(cache_)) {
260260
return Status::Corruption("[error] Free memory faild !");
261261
}
@@ -277,6 +277,25 @@ Status RedisCache::SetRange(std::string& key, int64_t start, std::string &value)
277277
return Status::OK();
278278
}
279279

280+
Status RedisCache::SetRange(std::string& key, int64_t start, std::string &value) {
281+
if (C_OK != RcFreeMemoryIfNeeded(cache_)) {
282+
return Status::Corruption("[error] Free memory faild !");
283+
}
284+
285+
uint64_t ret = 0;
286+
robj *kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size()));
287+
robj *vobj = createObject(OBJ_STRING, sdsnewlen(value.data(), value.size()));
288+
DEFER {
289+
DecrObjectsRefCount(kobj, vobj);
290+
};
291+
int res = RcSetRange(cache_, kobj, start, vobj, reinterpret_cast<unsigned long *>(&ret));
292+
if (C_OK != res) {
293+
return Status::Corruption("SetRange failed!");
294+
}
295+
296+
return Status::OK();
297+
}
298+
280299
Status RedisCache::Strlen(std::string& key, int32_t *len) {
281300
robj *kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size()));
282301
DEFER {

src/cache/src/zset.cc

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
namespace cache {
1010

11-
Status RedisCache::ZAdd(std::string& key, std::vector<storage::ScoreMember> &score_members) {
11+
Status RedisCache::ZAddIfKeyExist(std::string& key, std::vector<storage::ScoreMember> &score_members) {
1212
int res = RcFreeMemoryIfNeeded(cache_);
1313
if (C_OK != res) {
1414
return Status::Corruption("[error] Free memory faild !");
@@ -37,6 +37,32 @@ Status RedisCache::ZAdd(std::string& key, std::vector<storage::ScoreMember> &sco
3737
return Status::OK();
3838
}
3939

40+
Status RedisCache::ZAdd(std::string& key, std::vector<storage::ScoreMember> &score_members) {
41+
int res = RcFreeMemoryIfNeeded(cache_);
42+
if (C_OK != res) {
43+
return Status::Corruption("[error] Free memory faild !");
44+
}
45+
46+
robj *kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size()));
47+
unsigned int items_size = score_members.size() * 2;
48+
robj **items = (robj **)zcallocate(sizeof(robj *) * items_size);
49+
for (unsigned int i = 0; i < score_members.size(); ++i) {
50+
items[i * 2] = createStringObjectFromLongDouble(score_members[i].score, 0);
51+
items[i * 2 + 1] =
52+
createObject(OBJ_STRING, sdsnewlen(score_members[i].member.data(), score_members[i].member.size()));
53+
}
54+
DEFER {
55+
FreeObjectList(items, items_size);
56+
DecrObjectsRefCount(kobj);
57+
};
58+
int ret = RcZAdd(cache_, kobj, items, items_size);
59+
if (C_OK != ret) {
60+
return Status::Corruption("RcZAdd failed");
61+
}
62+
63+
return Status::OK();
64+
}
65+
4066
Status RedisCache::ZCard(std::string& key, uint64_t *len) {
4167
robj *kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size()));
4268
DEFER {

0 commit comments

Comments
 (0)