Skip to content

Commit b07682c

Browse files
committed
New Architecture
1 parent 46a398d commit b07682c

7 files changed

Lines changed: 635 additions & 198 deletions

File tree

include/pika_command_collector.h

Lines changed: 5 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
* 3. Send commands in batches to the consensus coordinator with batch-level synchronization
3737
* 4. Support asynchronous callback notification of command processing results
3838
* 5. Track performance metrics for batch processing
39-
* 6. Provide intelligent retry mechanisms for failed batches
39+
4040
*/
4141
class PikaCommandCollector {
4242
public:
@@ -46,14 +46,13 @@ class PikaCommandCollector {
4646
/**
4747
* @brief constructor
4848
* @param coordinator consensus coordinator reference
49-
* @param batch_size batch size (number of commands)
50-
* @param batch_max_wait_time forced flush interval (milliseconds)
49+
* @param batch_max_wait_time maximum wait time in milliseconds
5150
*/
5251
// Constructor with raw pointer (original)
53-
PikaCommandCollector(ConsensusCoordinator* coordinator, size_t batch_size = 100, int batch_max_wait_time = 5);
52+
PikaCommandCollector(ConsensusCoordinator* coordinator, int batch_max_wait_time = 5);
5453

5554
// Constructor with shared_ptr (for compatibility with make_shared calls)
56-
PikaCommandCollector(std::shared_ptr<ConsensusCoordinator> coordinator, size_t batch_size = 100, int batch_max_wait_time = 5);
55+
PikaCommandCollector(std::shared_ptr<ConsensusCoordinator> coordinator, int batch_max_wait_time = 5);
5756

5857
~PikaCommandCollector();
5958

@@ -84,12 +83,6 @@ class PikaCommandCollector {
8483
*/
8584
size_t PendingCommands() const;
8685

87-
/**
88-
* @brief Set the batch size
89-
* @param batch_size batch size
90-
*/
91-
void SetBatchSize(size_t batch_size);
92-
9386
/**
9487
* @brief Set the batch max wait time
9588
* @param batch_max_wait_time maximum wait time in milliseconds
@@ -103,10 +96,6 @@ class PikaCommandCollector {
10396
std::pair<uint64_t, uint64_t> GetBatchStats() const;
10497

10598
/**
106-
* @brief Get average batch processing time in milliseconds
107-
* @return Average processing time or nullopt if no batches processed
108-
*/
109-
std::optional<double> GetAverageBatchTime() const;
11099
111100
private:
112101
@@ -131,55 +120,33 @@ class PikaCommandCollector {
131120
*/
132121
void HandleConflict(const std::shared_ptr<Cmd>& cmd_ptr);
133122

134-
/**
135-
* @brief Retry batch processing commands
136-
* @param commands List of commands to retry
137-
* @param callbacks Corresponding callback function list
138-
* @param priority Priority level for the retry (higher means more urgent)
139-
* @return Whether the commands were successfully requeued
140-
*/
141-
bool RetryBatch(const std::vector<std::shared_ptr<Cmd>>& commands,
142-
const std::vector<CommandCallback>& callbacks,
143-
int priority = 100);
144123

145124
private:
146125
//Consensus coordinator reference
147126
ConsensusCoordinator* coordinator_;
148127

149128
// Batch processing configuration
150-
std::atomic<size_t> batch_size_;
151129
std::atomic<int> batch_max_wait_time_;
130+
std::chrono::time_point<std::chrono::steady_clock> batch_start_time_;
152131

153-
// Retry configuration
154-
std::atomic<int> max_retry_attempts_{3};
155-
std::atomic<int> retry_backoff_ms_{50};
156-
157132
// Command collection and processing
158133
mutable std::mutex mutex_;
159134

160135
// Pending command queue and callbacks
161136
std::list<std::pair<std::shared_ptr<Cmd>, CommandCallback>> pending_commands_;
162137

163-
// Priority queue for retries
164-
std::deque<std::tuple<int, std::vector<std::shared_ptr<Cmd>>, std::vector<CommandCallback>>> retry_queue_;
165-
166138
// Command key mapping, used to handle same-key conflicts
167139
std::unordered_map<std::string, std::list<std::pair<std::shared_ptr<Cmd>, CommandCallback>>::iterator> key_map_;
168140

169141
// Batch statistics
170142
std::atomic<uint64_t> total_processed_{0};
171143
std::atomic<uint64_t> total_batches_{0};
172-
std::atomic<uint64_t> total_retries_{0};
173144
std::atomic<uint64_t> total_conflicts_{0};
174-
std::atomic<uint64_t> total_batch_time_ms_{0};
175-
std::chrono::time_point<std::chrono::steady_clock> batch_start_time_;
176145

177146
// Performance tracking
178147
struct BatchMetrics {
179148
uint64_t batch_size;
180-
uint64_t processing_time_ms;
181149
uint64_t wait_time_ms;
182-
bool successful;
183150
};
184151

185152
// Circular buffer for recent batch metrics

include/pika_command_queue.h

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved.
2+
// This source code is licensed under the BSD-style license found in the
3+
// LICENSE file in the root directory of this source tree. An additional grant
4+
// of patent rights can be found in the PATENTS file in the same directory.
5+
6+
#ifndef PIKA_COMMAND_QUEUE_H_
7+
#define PIKA_COMMAND_QUEUE_H_
8+
9+
#include <memory>
10+
#include <queue>
11+
#include <vector>
12+
#include <string>
13+
#include <functional>
14+
#include <atomic>
15+
#include <condition_variable>
16+
#include <mutex>
17+
18+
#include "pstd/include/pstd_mutex.h"
19+
#include "pstd/include/env.h"
20+
#include "include/pika_command.h"
21+
#include "include/pika_define.h"
22+
#include "pstd/include/env.h"
23+
#include "include/pika_define.h"
24+
25+
// Callback function type for command completion notification
26+
using CommandCallback = std::function<void(const LogOffset&, pstd::Status)>;
27+
28+
// Structure representing a batch of commands
29+
struct CommandBatch {
30+
std::vector<std::shared_ptr<Cmd>> commands;
31+
std::vector<CommandCallback> callbacks;
32+
uint64_t batch_id;
33+
uint64_t create_time;
34+
std::string db_name;
35+
std::vector<LogOffset> binlog_offsets; // Binlog offsets for each command
36+
37+
CommandBatch(const std::vector<std::shared_ptr<Cmd>>& cmds,
38+
const std::vector<CommandCallback>& cbs,
39+
const std::string& db)
40+
: commands(cmds), callbacks(cbs), db_name(db) {
41+
static std::atomic<uint64_t> next_id{1};
42+
batch_id = next_id.fetch_add(1);
43+
create_time = pstd::NowMicros();
44+
}
45+
46+
bool Empty() const {
47+
return commands.empty();
48+
}
49+
50+
size_t Size() const {
51+
return commands.size();
52+
}
53+
};
54+
55+
// Thread-safe command queue for batched command processing
56+
class CommandQueue {
57+
public:
58+
explicit CommandQueue(size_t max_size);
59+
~CommandQueue();
60+
61+
// Enqueue a command batch (blocking if queue is full)
62+
bool EnqueueBatch(std::shared_ptr<CommandBatch> batch);
63+
64+
// Dequeue a command batch (blocking if queue is empty)
65+
std::shared_ptr<CommandBatch> DequeueBatch();
66+
67+
// Dequeue all available batches (non-blocking)
68+
std::vector<std::shared_ptr<CommandBatch>> DequeueAllBatches();
69+
70+
// Get current queue size
71+
size_t Size() const;
72+
73+
// Check if queue is empty
74+
bool Empty() const;
75+
76+
// Shutdown the queue
77+
void Shutdown();
78+
79+
private:
80+
std::queue<std::shared_ptr<CommandBatch>> cmd_queue_;
81+
mutable std::mutex queue_mutex_;
82+
std::condition_variable queue_cv_;
83+
size_t max_size_;
84+
std::atomic<bool> shutdown_{false};
85+
};
86+
87+
#endif // PIKA_COMMAND_QUEUE_H_

include/pika_rm.h

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@
1212
#include <string>
1313
#include <unordered_map>
1414
#include <vector>
15+
#include <thread>
16+
#include <atomic>
17+
#include <condition_variable>
18+
#include <mutex>
1519

1620
#include "pstd/include/pstd_status.h"
1721

@@ -23,6 +27,7 @@
2327
#include "include/pika_stable_log.h"
2428
#include "include/rsync_client.h"
2529
#include "include/pika_command_collector.h"
30+
#include "include/pika_command_queue.h"
2631

2732
#define kBinlogSendPacketNum 40
2833
#define kBinlogSendBatchNum 100
@@ -121,7 +126,7 @@ class SyncMasterDB : public SyncDB {
121126
pstd::Status UpdateCommittedID();
122127
pstd::Status CommitAppLog(const LogOffset& master_committed_id);
123128
pstd::Status Truncate(const LogOffset& offset);
124-
pstd::Status WaitForSlaveAcks(const LogOffset& target_offset, int timeout_ms);
129+
// pstd::Status WaitForSlaveAcks(const LogOffset& target_offset, int timeout_ms);
125130
};
126131

127132
class SyncSlaveDB : public SyncDB {
@@ -242,6 +247,14 @@ class PikaReplicaManager {
242247
return pika_repl_client_->GetUnfinishedAsyncWriteDBTaskCount(db_name);
243248
}
244249

250+
// Command Queue related methods
251+
void EnqueueCommandBatch(std::shared_ptr<CommandBatch> batch);
252+
std::shared_ptr<CommandBatch> DequeueCommandBatch();
253+
size_t GetCommandQueueSize() const;
254+
bool IsCommandQueueEmpty() const;
255+
// CommittedID notification for RocksDB thread
256+
void NotifyCommittedID(const LogOffset& committed_id);
257+
245258
private:
246259
void InitDB();
247260
pstd::Status SelectLocalIp(const std::string& remote_ip, int remote_port, std::string* local_ip);
@@ -273,6 +286,41 @@ class PikaReplicaManager {
273286
std::shared_mutex is_consistency_rwlock_;
274287
bool is_consistency_ = true;
275288
std::shared_mutex committed_id_rwlock_;
289+
290+
// Command queue for collected batches
291+
std::unique_ptr<CommandQueue> command_queue_;
292+
293+
// Background thread for processing command queue
294+
std::unique_ptr<std::thread> command_queue_thread_;
295+
std::atomic<bool> command_queue_running_{false};
296+
std::mutex command_queue_mutex_;
297+
std::condition_variable command_queue_cv_;
298+
299+
// RocksDB background thread for Put operations and client responses
300+
std::unique_ptr<std::thread> rocksdb_back_thread_;
301+
std::atomic<bool> rocksdb_thread_running_{false};
302+
std::mutex rocksdb_thread_mutex_;
303+
std::condition_variable rocksdb_thread_cv_;
304+
305+
// Pending batches waiting for CommittedID
306+
std::queue<std::shared_ptr<CommandBatch>> pending_batches_;
307+
std::mutex pending_batches_mutex_;
308+
309+
// Last committed ID for RocksDB thread processing
310+
LogOffset last_committed_id_;
311+
std::mutex last_committed_id_mutex_;
312+
313+
// Background thread processing methods
314+
void StartCommandQueueThread();
315+
void StopCommandQueueThread();
316+
void CommandQueueLoop();
317+
void ProcessCommandBatches(const std::vector<std::shared_ptr<CommandBatch>>& batches);
318+
319+
// RocksDB background thread methods
320+
void StartRocksDBThread();
321+
void StopRocksDBThread();
322+
void RocksDBThreadLoop();
323+
void ProcessCommittedBatches(const LogOffset& committed_id);
276324
};
277325

278326
#endif // PIKA_RM_H

0 commit comments

Comments
 (0)