66#define PIKA_CONSENSUS_H_
77
88#include < utility>
9+ #include < condition_variable>
10+ #include < unordered_map>
911
1012#include " include/pika_binlog_transverter.h"
1113#include " include/pika_client_conn.h"
@@ -170,7 +172,10 @@ class ConsensusCoordinator {
170172
171173 SyncProgress& SyncPros () { return sync_pros_; }
172174 std::shared_ptr<StableLog> StableLogger () { return stable_logger_; }
173- std::shared_ptr<MemLog> MemLogger () { return mem_logger_; }
175+ // Expose the internally maintained mem_logger_ object to the outside world.
176+ std::shared_ptr<MemLog> MemLogger () { return mem_logger_; }
177+ // Allows an external user to inject a new StableLog instance into the current object and replace the original one
178+ void SetStableLogger (std::shared_ptr<StableLog> logger) { stable_logger_ = logger; }
174179
175180 LogOffset committed_index () {
176181 std::lock_guard lock (index_mu_);
@@ -186,35 +191,20 @@ class ConsensusCoordinator {
186191 };
187192 static int InitCmd (net::RedisParser* parser, const net::RedisCmdArgsType& argv);
188193
189- std::string ToStringStatus () {
190- std::stringstream tmp_stream;
191- {
192- std::lock_guard lock (index_mu_);
193- tmp_stream << " Committed_index: " << committed_index_.ToString () << " \r\n " ;
194- }
195- tmp_stream << " Context: "
196- << " \r\n "
197- << context_->ToString ();
198- {
199- std::shared_lock lock (term_rwlock_);
200- tmp_stream << " Term: " << term_ << " \r\n " ;
201- }
202- tmp_stream << " Mem_logger size: " << mem_logger_->Size () << " last offset "
203- << mem_logger_->last_offset ().ToString () << " \r\n " ;
204- tmp_stream << " Stable_logger first offset " << stable_logger_->first_offset ().ToString () << " \r\n " ;
205- LogOffset log_status;
206- stable_logger_->Logger ()->GetProducerStatus (&(log_status.b_offset .filenum ), &(log_status.b_offset .offset ),
207- &(log_status.l_offset .term ), &(log_status.l_offset .index ));
208- tmp_stream << " Physical Binlog Status: " << log_status.ToString () << " \r\n " ;
209- return tmp_stream.str ();
210- }
194+ std::string ToStringStatus ();
195+ // Called after committed_id_ is updated, waking up all threads waiting for the offset
196+ void NotifyLogCommitted (const LogOffset& offset);
197+ // Block until the specified offset ≤ the current committed_id_, or return false if the timeout expires.
198+ bool WaitLogCommitted (const LogOffset& offset, std::chrono::milliseconds timeout);
211199
212200 private:
213201 pstd::Status TruncateTo (const LogOffset& offset);
214202
215203 pstd::Status InternalAppendLog (const std::shared_ptr<Cmd>& cmd_ptr);
216204 pstd::Status InternalAppendBinlog (const std::shared_ptr<Cmd>& cmd_ptr);
217205 void InternalApply (const MemLog::LogItem& log);
206+ // Batch processing version
207+ void InternalApplyFollower (const std::vector<std::shared_ptr<Cmd>>& cmds);
218208 void InternalApplyFollower (const std::shared_ptr<Cmd>& cmd_ptr);
219209
220210 pstd::Status GetBinlogOffset (const BinlogOffset& start_offset, LogOffset* log_offset);
@@ -274,21 +264,27 @@ class ConsensusCoordinator {
274264 }
275265 void SetCommittedId (const LogOffset& offset) {
276266 std::lock_guard l (committed_id_rwlock_);
277- committed_id_ = offset;
278- context_->UpdateAppliedIndex (committed_id_);
267+ if (offset > committed_id_) {
268+ committed_id_ = offset; // Update in-memory committed_id_
269+ context_->UpdateAppliedIndex (committed_id_); // Persist to Context so it survives restart
270+ log_commit_cv_.notify_all (); // Wake up any threads waiting in WaitLogCommitted()
271+ }
279272 }
280273
281274 private:
282275 pstd::Status PersistAppendBinlog (const std::shared_ptr<Cmd>& cmd_ptr, LogOffset& cur_offset);
283276
284- private:
277+ std::shared_ptr<Log> logs_;
285278 std::shared_mutex is_consistency_rwlock_;
286279 bool is_consistency_ = false ;
287280 std::shared_mutex committed_id_rwlock_;
288281 LogOffset committed_id_ = LogOffset();
289282 std::shared_mutex prepared_id__rwlock_;
290283 LogOffset prepared_id_ = LogOffset();
291- std::shared_ptr<Log> logs_;
284+ // used to notify that the log has been submitted
285+ std::condition_variable_any log_commit_cv_;
286+ // Used to track threads waiting for a specific log to be submitted
287+ std::mutex waiting_threads_mu_;
292288};
293289
294290#endif // INCLUDE_PIKA_CONSENSUS_H_
0 commit comments