1+ // Copyright (c) 2015-present, Qihoo, Inc. All rights reserved.
2+ // This source code is licensed under the BSD-style license found in the
3+ // LICENSE file in the root directory of this source tree. An additional grant
4+ // of patent rights can be found in the PATENTS file in the same directory.
5+
6+ #ifndef PIKA_COMMAND_COLLECTOR_H_
7+ #define PIKA_COMMAND_COLLECTOR_H_
8+
9+ #include < atomic>
10+ #include < condition_variable>
11+ #include < deque>
12+ #include < functional>
13+ #include < list>
14+ #include < map>
15+ #include < memory>
16+ #include < mutex>
17+ #include < string>
18+ #include < thread>
19+ #include < unordered_map>
20+ #include < vector>
21+ #include < chrono>
22+ #include < optional>
23+
24+ #include " include/pika_command.h"
25+ #include " include/pika_define.h"
26+ #include " pstd/include/pstd_status.h"
27+
28+ #include " include/pika_consensus.h"
29+
30+ /* *
31+ * @brief PikaCommandCollector is used to collect write commands and process them in batches
32+ *
33+ * Main functions:
34+ * 1. Collect write commands and process them in optimized batches after reaching the threshold
35+ * 2. Handle the conflict of the same key (the later command will overwrite the earlier command)
36+ * 3. Send commands in batches to the consensus coordinator with batch-level synchronization
37+ * 4. Support asynchronous callback notification of command processing results
38+ * 5. Track performance metrics for batch processing
39+ * 6. Provide intelligent retry mechanisms for failed batches
40+ */
41+ class PikaCommandCollector {
42+ public:
43+ // Callback function type after command processing is completed
44+ using CommandCallback = std::function<void (const LogOffset& offset, pstd::Status status)>;
45+
46+ /* *
47+ * @brief constructor
48+ * @param coordinator consensus coordinator reference
49+ * @param batch_size batch size (number of commands)
50+ * @param batch_max_wait_time forced flush interval (milliseconds)
51+ */
52+ // Constructor with raw pointer (original)
53+ PikaCommandCollector (ConsensusCoordinator* coordinator, size_t batch_size = 100 , int batch_max_wait_time = 5 );
54+
55+ // Constructor with shared_ptr (for compatibility with make_shared calls)
56+ PikaCommandCollector (std::shared_ptr<ConsensusCoordinator> coordinator, size_t batch_size = 100 , int batch_max_wait_time = 5 );
57+
58+
59+ ~PikaCommandCollector ();
60+
61+ /* *
62+ * @brief Add command to collector
63+ * @param cmd_ptr command pointer
64+ * @param callback callback function after processing is completed
65+ * @return whether the addition was successful
66+ */
67+ bool AddCommand (const std::shared_ptr<Cmd>& cmd_ptr, CommandCallback callback);
68+
69+ /* *
70+ * @brief Called periodically by external systems to process batches
71+ * @param force Force processing even if batch is not full or timeout not reached
72+ * @return Number of commands processed
73+ */
74+
75+ /* *
76+ * @brief Immediately process all currently collected commands
77+ * @return The number of commands processed
78+ */
79+ size_t FlushCommands (bool force = false );
80+
81+
82+ /* *
83+ * @brief Get the current number of pending commands
84+ * @return number of commands
85+ */
86+ size_t PendingCommands () const ;
87+
88+ /* *
89+ * @brief Set the batch size
90+ * @param batch_size batch size
91+ */
92+ void SetBatchSize (size_t batch_size);
93+
94+ /* *
95+ * @brief Set the batch max wait time
96+ * @param batch_max_wait_time maximum wait time in milliseconds
97+ */
98+ void SetBatchMaxWaitTime (int batch_max_wait_time);
99+
100+ /* *
101+ * @brief Get batch processing statistics
102+ * @return Pair of (total_processed_commands, total_batches)
103+ */
104+ std::pair<uint64_t , uint64_t > GetBatchStats () const ;
105+
106+ /* *
107+ * @brief Get average batch processing time in milliseconds
108+ * @return Average processing time or nullopt if no batches processed
109+ */
110+ std::optional<double > GetAverageBatchTime () const ;
111+
112+ private:
113+
114+ /* *
115+ * @brief batch processing command
116+ * @param batch command batch
117+ * @return Whether the processing is successful
118+ */
119+ pstd::Status ProcessBatch (const std::vector<std::shared_ptr<Cmd>>& commands,
120+ const std::vector<CommandCallback>& callbacks);
121+
122+ /* *
123+ * @brief Check for conflicts based on command type and key name
124+ * @param cmd_ptr command pointer
125+ * @return true if there is a conflict (should be replaced), false if there is no conflict
126+ */
127+ bool CheckConflict (const std::shared_ptr<Cmd>& cmd_ptr) const ;
128+
129+ /* *
130+ * @brief Handle key conflicts and remove conflicting commands
131+ * @param cmd_ptr new command
132+ */
133+ void HandleConflict (const std::shared_ptr<Cmd>& cmd_ptr);
134+
135+ /* *
136+ * @brief Retry batch processing commands
137+ * @param commands List of commands to retry
138+ * @param callbacks Corresponding callback function list
139+ * @param priority Priority level for the retry (higher means more urgent)
140+ * @return Whether the commands were successfully requeued
141+ */
142+ bool RetryBatch (const std::vector<std::shared_ptr<Cmd>>& commands,
143+ const std::vector<CommandCallback>& callbacks,
144+ int priority = 100 );
145+
146+ private:
147+ // Consensus coordinator reference
148+ ConsensusCoordinator* coordinator_;
149+
150+ // Batch processing configuration
151+ std::atomic<size_t > batch_size_;
152+ std::atomic<int > batch_max_wait_time_;
153+
154+ // Retry configuration
155+ std::atomic<int > max_retry_attempts_{3 };
156+ std::atomic<int > retry_backoff_ms_{50 };
157+
158+ // Command collection and processing
159+ mutable std::mutex mutex_;
160+
161+ // Pending command queue and callbacks
162+ std::list<std::pair<std::shared_ptr<Cmd>, CommandCallback>> pending_commands_;
163+
164+ // Priority queue for retries
165+ std::deque<std::tuple<int , std::vector<std::shared_ptr<Cmd>>, std::vector<CommandCallback>>> retry_queue_;
166+
167+ // Command key mapping, used to handle same-key conflicts
168+ std::unordered_map<std::string, std::list<std::pair<std::shared_ptr<Cmd>, CommandCallback>>::iterator> key_map_;
169+
170+ // Batch statistics
171+ std::atomic<uint64_t > total_processed_{0 };
172+ std::atomic<uint64_t > total_batches_{0 };
173+ std::atomic<uint64_t > total_retries_{0 };
174+ std::atomic<uint64_t > total_conflicts_{0 };
175+ std::atomic<uint64_t > total_batch_time_ms_{0 };
176+ std::chrono::time_point<std::chrono::steady_clock> batch_start_time_;
177+
178+ // Performance tracking
179+ struct BatchMetrics {
180+ uint64_t batch_size;
181+ uint64_t processing_time_ms;
182+ uint64_t wait_time_ms;
183+ bool successful;
184+ };
185+
186+ // Circular buffer for recent batch metrics
187+ static constexpr size_t kMetricsBufferSize = 100 ;
188+ std::vector<BatchMetrics> recent_metrics_;
189+ std::mutex metrics_mutex_;
190+ };
191+
192+ #endif // PIKA_COMMAND_COLLECTOR_H_
0 commit comments