Skip to content

Commit e75170f

Browse files
author
wuxianrong
committed
Add command overwriting in Raft mode
1 parent 0a646cb commit e75170f

34 files changed

Lines changed: 2584 additions & 709 deletions

include/pika_bit.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ class BitSetCmd : public Cmd {
6363
std::string key_;
6464
int64_t bit_offset_;
6565
int64_t on_;
66+
int32_t bit_val_ = 0; // For async mode
6667
rocksdb::Status s_;
6768
void Clear() override {
6869
key_ = "";
@@ -169,6 +170,7 @@ class BitOpCmd : public Cmd {
169170
rocksdb::Status s_;
170171
std::vector<std::string> src_keys_;
171172
storage::BitOpType op_;
173+
int64_t result_length_ = 0; // For async mode
172174
void Clear() override {
173175
dest_key_ = "";
174176
src_keys_.clear();

include/pika_command.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -621,6 +621,11 @@ class Cmd : public std::enable_shared_from_this<Cmd> {
621621
uint32_t aclCategory_ = 0;
622622
bool cache_missed_in_rtc_{false};
623623

624+
// Raft async mode helper functions
625+
bool IsRaftLeader() const;
626+
bool IsRaftEnabled() const;
627+
bool ShouldUseAsyncMode() const;
628+
624629
private:
625630
virtual void DoInitial() = 0;
626631
virtual void Clear(){};

include/pika_geo.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ class GeoAddCmd : public Cmd {
6767
private:
6868
std::string key_;
6969
std::vector<GeoPoint> pos_;
70+
rocksdb::Status s_;
71+
int32_t count_ = 0;
7072
void DoInitial() override;
7173
};
7274

include/pika_hash.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ class HSetCmd : public Cmd {
108108
std::string key_, field_, value_;
109109
std::vector<std::string> fields_;
110110
std::vector<storage::FieldValue> fields_values_;
111+
int32_t ret_ = 0; // For async mode: 1 if field is new, 0 if updated
111112
void DoInitial() override;
112113
rocksdb::Status s_;
113114
};
@@ -154,6 +155,7 @@ class HIncrbyCmd : public Cmd {
154155
private:
155156
std::string key_, field_;
156157
int64_t by_ = 0;
158+
int64_t new_value_ = 0; // For async mode: result after increment
157159
void DoInitial() override;
158160
rocksdb::Status s_;
159161
};
@@ -176,6 +178,7 @@ class HIncrbyfloatCmd : public Cmd {
176178

177179
private:
178180
std::string key_, field_, by_;
181+
std::string new_value_; // For async mode: result after increment
179182
void DoInitial() override;
180183
rocksdb::Status s_;
181184
};
@@ -291,6 +294,7 @@ class HSetnxCmd : public Cmd {
291294

292295
private:
293296
std::string key_, field_, value_;
297+
int32_t ret_ = 0; // For async mode: 1 if field was set, 0 if already exists
294298
void DoInitial() override;
295299
rocksdb::Status s_;
296300
};

include/pika_hyperloglog.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ class PfAddCmd : public Cmd {
2727
private:
2828
std::string key_;
2929
std::vector<std::string> values_;
30+
bool update_ = false;
3031
void DoInitial() override;
3132
void Clear() override { values_.clear(); }
3233
};

include/pika_kv.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ class DelCmd : public Cmd {
9292
private:
9393
std::vector<std::string> keys_;
9494
int64_t split_res_ = 0;
95+
int64_t deleted_count_ = 0; // For async mode: number of deleted keys
9596
void DoInitial() override;
9697
rocksdb::Status s_;
9798
};
@@ -236,6 +237,7 @@ class GetsetCmd : public Cmd {
236237
private:
237238
std::string key_;
238239
std::string new_value_;
240+
std::string old_value_; // For async mode
239241
void DoInitial() override;
240242
rocksdb::Status s_;
241243
};
@@ -260,6 +262,7 @@ class AppendCmd : public Cmd {
260262
std::string key_;
261263
std::string value_;
262264
std::string new_value_;
265+
int32_t new_len_ = 0; // For async mode
263266
void DoInitial() override;
264267
rocksdb::Status s_;
265268
int32_t expired_timestamp_sec_ = 0;
@@ -519,6 +522,7 @@ class SetrangeCmd : public Cmd {
519522
std::string key_;
520523
int64_t offset_ = 0;
521524
std::string value_;
525+
int32_t new_len_ = 0; // For async mode
522526
void DoInitial() override;
523527
rocksdb::Status s_;
524528
};

include/pika_list.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ class LInsertCmd : public Cmd {
5959
storage::BeforeOrAfter dir_{storage::After};
6060
std::string pivot_;
6161
std::string value_;
62+
int64_t llen_ = 0; // For async mode: list length after insert
6263
void DoInitial() override;
6364
rocksdb::Status s_;
6465
};
@@ -149,6 +150,7 @@ class LPopCmd : public Cmd {
149150
private:
150151
std::string key_;
151152
std::int64_t count_ = 1;
153+
std::vector<std::string> elements_; // For async mode: popped elements
152154
void DoInitial() override;
153155
rocksdb::Status s_;
154156
};
@@ -171,6 +173,7 @@ class LPushCmd : public BlockingBaseCmd {
171173
private:
172174
std::string key_;
173175
std::vector<std::string> values_;
176+
uint64_t llen_ = 0; // For async mode: list length after push
174177
rocksdb::Status s_;
175178
void DoInitial() override;
176179
void Clear() override { values_.clear(); }
@@ -194,6 +197,7 @@ class LPushxCmd : public Cmd {
194197

195198
private:
196199
std::string key_;
200+
uint64_t llen_ = 0; // For async mode: list length after push
197201
rocksdb::Status s_;
198202
std::vector<std::string> values_;
199203
void DoInitial() override;
@@ -244,6 +248,7 @@ class LRemCmd : public Cmd {
244248
std::string key_;
245249
int64_t count_ = 0;
246250
std::string value_;
251+
uint64_t removed_count_ = 0; // For async mode: number of elements removed
247252
rocksdb::Status s_;
248253
void DoInitial() override;
249254
};
@@ -334,6 +339,7 @@ class RPopCmd : public Cmd {
334339
private:
335340
std::string key_;
336341
std::int64_t count_ = 1;
342+
std::vector<std::string> elements_; // For async mode: popped elements
337343
void DoInitial() override;
338344
rocksdb::Status s_;
339345
};
@@ -400,6 +406,7 @@ class RPushCmd : public BlockingBaseCmd {
400406
private:
401407
std::string key_;
402408
std::vector<std::string> values_;
409+
uint64_t llen_ = 0; // For async mode: list length after push
403410
rocksdb::Status s_;
404411
void DoInitial() override;
405412
void Clear() override { values_.clear(); }
@@ -425,6 +432,7 @@ class RPushxCmd : public Cmd {
425432
std::string key_;
426433
std::string value_;
427434
std::vector<std::string> values_;
435+
uint64_t llen_ = 0; // For async mode: list length after push
428436
rocksdb::Status s_;
429437
void DoInitial() override;
430438
};

include/pika_set.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ class SAddCmd : public Cmd {
3232
private:
3333
std::string key_;
3434
std::vector<std::string> members_;
35+
int32_t added_count_ = 0; // For async mode: number of members added
3536
rocksdb::Status s_;
3637
void DoInitial() override;
3738
};
@@ -220,6 +221,7 @@ class SUnionstoreCmd : public SetOperationCmd {
220221

221222
private:
222223
void DoInitial() override;
224+
int32_t result_count_ = 0; // For async mode: number of members in result
223225
rocksdb::Status s_;
224226
};
225227

@@ -249,6 +251,7 @@ class SInterstoreCmd : public SetOperationCmd {
249251

250252
private:
251253
void DoInitial() override;
254+
int32_t result_count_ = 0; // For async mode: number of members in result
252255
rocksdb::Status s_;
253256
};
254257

@@ -301,6 +304,7 @@ class SDiffstoreCmd : public SetOperationCmd {
301304
Cmd* Clone() override { return new SDiffstoreCmd(*this); }
302305

303306
private:
307+
int32_t result_count_ = 0; // For async mode: number of members in result
304308
rocksdb::Status s_;
305309
void DoInitial() override;
306310
};
@@ -337,6 +341,7 @@ class SMoveCmd : public Cmd {
337341
std::shared_ptr<SRemCmd> srem_cmd_;
338342
std::shared_ptr<SAddCmd> sadd_cmd_;
339343
int32_t move_success_{0};
344+
rocksdb::Status s_;
340345
};
341346

342347
class SRandmemberCmd : public Cmd {

include/pika_stream.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ class XAddCmd : public Cmd {
4040
std::string key_;
4141
storage::StreamAddTrimArgs args_;
4242
int field_pos_{0};
43+
std::string serialized_message_;
44+
rocksdb::Status s_;
4345

4446
void DoInitial() override;
4547
};
@@ -57,6 +59,7 @@ class XDelCmd : public Cmd {
5759
private:
5860
std::string key_;
5961
std::vector<storage::streamID> ids_;
62+
rocksdb::Status s_;
6063

6164
void DoInitial() override;
6265
void Clear() override { ids_.clear(); }
@@ -133,6 +136,7 @@ class XTrimCmd : public Cmd {
133136
private:
134137
std::string key_;
135138
storage::StreamAddTrimArgs args_;
139+
rocksdb::Status s_;
136140

137141
void DoInitial() override;
138142
};

include/pika_zset.h

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ class ZAddCmd : public Cmd {
3333
private:
3434
std::string key_;
3535
std::vector<storage::ScoreMember> score_members;
36+
int32_t added_count_ = 0; // For async mode
3637
rocksdb::Status s_;
3738
void DoInitial() override;
3839
};
@@ -103,7 +104,8 @@ class ZIncrbyCmd : public Cmd {
103104
private:
104105
std::string key_, member_;
105106
double by_ = .0f;
106-
double score_ = .0f;
107+
double score_ = .0f; // For async mode: result after increment
108+
rocksdb::Status s_;
107109
void DoInitial() override;
108110
};
109111

@@ -333,6 +335,7 @@ class ZUnionstoreCmd : public ZsetUIstoreParentCmd {
333335

334336
private:
335337
void DoInitial() override;
338+
int32_t result_count_ = 0; // For async mode
336339
// used for write binlog
337340
std::map<std::string, double> value_to_dest_;
338341
rocksdb::Status s_;
@@ -352,6 +355,7 @@ class ZInterstoreCmd : public ZsetUIstoreParentCmd {
352355

353356
private:
354357
void DoInitial() override;
358+
int32_t result_count_ = 0; // For async mode
355359
rocksdb::Status s_;
356360
// used for write binlog
357361
std::vector<storage::ScoreMember> value_to_dest_;
@@ -561,6 +565,7 @@ class ZRemrangebyscoreCmd : public Cmd {
561565
std::string key_, min_, max_;
562566
double min_score_ = 0, max_score_ = 0;
563567
bool left_close_ = true, right_close_ = true;
568+
int32_t deleted_count_ = 0; // For async mode
564569
rocksdb::Status s_;
565570
void DoInitial() override;
566571
void Clear() override { left_close_ = right_close_ = true; }
@@ -586,6 +591,7 @@ class ZRemrangebylexCmd : public Cmd {
586591
std::string key_, min_, max_;
587592
std::string min_member_, max_member_;
588593
bool left_close_ = true, right_close_ = true;
594+
int32_t deleted_count_ = 0; // For async mode
589595
rocksdb::Status s_;
590596
void DoInitial() override;
591597
void Clear() override { left_close_ = right_close_ = true; }
@@ -611,6 +617,8 @@ class ZPopmaxCmd : public Cmd {
611617
void DoInitial() override;
612618
std::string key_;
613619
int64_t count_ = 0;
620+
std::vector<storage::ScoreMember> score_members_; // For async mode: popped members
621+
rocksdb::Status s_;
614622
};
615623

616624
class ZPopminCmd : public Cmd {
@@ -633,6 +641,8 @@ class ZPopminCmd : public Cmd {
633641
void DoInitial() override;
634642
std::string key_;
635643
int64_t count_ = 0;
644+
std::vector<storage::ScoreMember> score_members_; // For async mode: popped members
645+
rocksdb::Status s_;
636646
};
637647

638648
#endif

0 commit comments

Comments
 (0)