@@ -541,12 +541,7 @@ Status SyncMasterDB::ConsensusProposeLog(const std::shared_ptr<Cmd>& cmd_ptr) {
541541 if (!coordinator_.GetISConsistency ()) {
542542 return coordinator_.ProposeLog (cmd_ptr);
543543 }
544-
545- // Batch append without immediate waiting to allow high concurrency
546- Status s = coordinator_.AppendEntries (cmd_ptr); // Append the log entry to the coordinator
547- if (!s.ok ()) {
548- return s;
549- }
544+
550545
551546 // Wait for consensus to be achieved within 10 seconds
552547 // while (std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now() - start).count() < 10) {
@@ -559,6 +554,7 @@ Status SyncMasterDB::ConsensusProposeLog(const std::shared_ptr<Cmd>& cmd_ptr) {
559554 // }
560555 // Per-DB global batching window across threads
561556 struct WindowState {
557+ pstd::Mutex mu;
562558 std::atomic<uint64_t > start_us{0 };
563559 std::atomic<int > accepted{0 };
564560 };
@@ -570,7 +566,20 @@ Status SyncMasterDB::ConsensusProposeLog(const std::shared_ptr<Cmd>& cmd_ptr) {
570566 std::lock_guard<pstd::Mutex> lk (g_db_windows_mu);
571567 ws = &g_db_windows[db_info_.db_name_ ];
572568 }
573-
569+ // Batch append without immediate waiting to allow high concurrency
570+ // Status s = coordinator_.AppendEntries(cmd_ptr); // Append the log entry to the coordinator
571+ // if (!s.ok()) {
572+ // return s;
573+ // }
574+ LogOffset my_offset;
575+ {
576+ std::lock_guard<pstd::Mutex> lk (ws->mu );
577+ Status s = coordinator_.AppendEntries (cmd_ptr);
578+ if (!s.ok ()) {
579+ return s;
580+ }
581+ my_offset = coordinator_.GetPreparedId ();
582+ }
574583 const uint64_t now_us = pstd::NowMicros ();
575584 const uint64_t timeout_us = static_cast <uint64_t >(g_pika_conf->consensus_timeout ()) * 1000ULL ;
576585 const int min_batch_wait = std::max (50 , g_pika_conf->consensus_batch_size ());
@@ -585,20 +594,12 @@ Status SyncMasterDB::ConsensusProposeLog(const std::shared_ptr<Cmd>& cmd_ptr) {
585594 bool window_elapsed = (now_us - leader_election_token) >= timeout_us;
586595 bool enough_accumulated = ws->accepted .load (std::memory_order_relaxed) >= min_batch_wait;
587596
588- if (!window_elapsed && !enough_accumulated) {
589- return Status::OK ();
590- }
591-
592- // Attempt to close the current window and become the leader for this batch
593- if (ws->start_us .compare_exchange_strong (leader_election_token, 0 )) {
594- // Success, we are the leader. Reset count and trigger send.
595- ws->accepted .store (0 , std::memory_order_relaxed);
596- coordinator_.TriggerImmediateSend ();
597- g_pika_rm->WakeUpBinlogSync ();
598- } else {
599- // Lost the election. Another thread will handle the batch.
600- // My command will be in the *next* batch. So, just return OK.
601- return Status::OK ();
597+ if (window_elapsed || enough_accumulated) {
598+ if (ws->start_us .compare_exchange_strong (leader_election_token, 0 )) {
599+ ws->accepted .store (0 , std::memory_order_relaxed);
600+ coordinator_.TriggerImmediateSend ();
601+ g_pika_rm->WakeUpBinlogSync ();
602+ }
602603 }
603604
604605 // Block once per window: wait until committed_id catches prepared_id (end of current window)
@@ -607,9 +608,12 @@ Status SyncMasterDB::ConsensusProposeLog(const std::shared_ptr<Cmd>& cmd_ptr) {
607608 std::unique_lock<pstd::Mutex> lock (*mu);
608609
609610 auto timeout = std::chrono::seconds (10 );
610- LogOffset window_end = coordinator_.GetPreparedId ();
611- while (window_end > coordinator_.GetCommittedId ()) {
611+ // LogOffset window_end = coordinator_.GetPreparedId();
612+ while (my_offset > coordinator_.GetCommittedId ()) {
612613 if (cv->wait_for (lock, timeout) == std::cv_status::timeout) {
614+ LOG (WARNING) << " Cmd wait for consensus timeout, db: " << db_info_.db_name_
615+ << " my_offset: " << my_offset.ToString ()
616+ << " committed_id: " << coordinator_.GetCommittedId ().ToString ();
613617 return Status::Timeout (" No consistency achieved within 10 seconds" );
614618 }
615619 }
0 commit comments