Skip to content

Commit a16ff96

Browse files
committed
Fixed multithreading bug
1 parent 6b8762d commit a16ff96

7 files changed

Lines changed: 178 additions & 42 deletions

File tree

conf/pika.conf

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -154,12 +154,12 @@ replication-num : 0
154154
consensus-level : 0
155155

156156
# consensus-batch-size: The maximum number of items in a consensus batch.
157-
# Default: 1000
158-
consensus-batch-size : 1000
157+
# Default: 100
158+
consensus-batch-size : 100
159159

160160
# consensus-timeout: The maximum waiting time during the master node batch sending process.
161-
# Default: 10
162-
consensus-timeout : 10
161+
# Default: 5
162+
consensus-timeout : 5
163163

164164
# replication-ack-timeout: The timeout in milliseconds for waiting for a batch ACK from a slave.
165165
# Default: 5000

include/pika_consensus.h

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,13 +275,36 @@ class ConsensusCoordinator {
275275
prepared_id_ = offset;
276276
}
277277
void SetCommittedId(const LogOffset& offset) {
278+
// Record desired committed id (from quorum/ACK) first
279+
LogOffset fsynced_snapshot;
280+
{
281+
std::shared_lock fs_lock(fsynced_id_rwlock_);
282+
fsynced_snapshot = last_fsynced_id_;
283+
}
284+
// If desired commit goes beyond fsynced, proactively trigger fsync
285+
if (offset > fsynced_snapshot) {
286+
std::lock_guard<pstd::Mutex> lk(sync_mu_);
287+
needs_sync_.store(true);
288+
sync_cv_.notify_one();
289+
}
278290
std::lock_guard l(committed_id_rwlock_);
279-
committed_id_ = offset;
291+
if (offset > desired_committed_id_) {
292+
desired_committed_id_ = offset;
293+
}
294+
LogOffset target = desired_committed_id_;
295+
if (target > fsynced_snapshot) {
296+
target = fsynced_snapshot;
297+
}
298+
if (target > committed_id_) {
299+
committed_id_ = target;
280300
context_->UpdateAppliedIndex(committed_id_);
281301
committed_id_cv_.notify_all();
302+
}
282303
}
283304
pstd::Mutex* GetCommittedIdMu() { return &committed_id_mu_; }
284305
pstd::CondVar* GetCommittedIdCv() { return &committed_id_cv_; }
306+
// force next SendBinlog call to send immediately, bypassing coalesce wait
307+
void TriggerImmediateSend() { immediate_send_once_.store(true); }
285308

286309
private:
287310
void SyncBinlogLoop();
@@ -296,17 +319,23 @@ class ConsensusCoordinator {
296319
std::thread sync_thread_;
297320
pstd::Mutex promises_mu_;
298321
std::vector<std::promise<pstd::Status>> sync_promises_;
322+
// one-shot switch to force immediate send on next SendBinlog
323+
std::atomic<bool> immediate_send_once_{false};
299324

300325
std::shared_mutex is_consistency_rwlock_;
301326
bool is_consistency_ = false;
302327
std::shared_mutex committed_id_rwlock_;
303328
pstd::Mutex committed_id_mu_;
304329
pstd::CondVar committed_id_cv_;
305330
LogOffset committed_id_ = LogOffset();
331+
LogOffset desired_committed_id_ = LogOffset();
306332
std::shared_mutex prepared_id__rwlock_;
307333
LogOffset prepared_id_ = LogOffset();
308334
std::shared_ptr<Log> logs_;
309335
int binlog_fsync_counter_ = 0;
336+
// Track last fsynced offset to gate commit advancement
337+
std::shared_mutex fsynced_id_rwlock_;
338+
LogOffset last_fsynced_id_ = LogOffset();
310339
};
311340

312341
#endif // INCLUDE_PIKA_CONSENSUS_H_

include/pika_rm.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,18 @@ class PikaReplicaManager {
256256
// client for replica
257257
std::unique_ptr<PikaReplClient> pika_repl_client_;
258258
std::unique_ptr<PikaReplServer> pika_repl_server_;
259+
260+
// one-shot switch to force immediate send on next SendBinlog
261+
std::atomic<bool> immediate_send_once_{false};
262+
263+
// consumer thread for write queue
264+
std::thread bg_thread_;
265+
pstd::CondVar bg_cv_;
266+
std::atomic<bool> bg_thread_should_stop_{false};
267+
268+
std::shared_mutex is_consistency_rwlock_;
269+
bool is_consistency_ = false;
270+
std::shared_mutex committed_id_rwlock_;
259271
};
260272

261273
#endif // PIKA_RM_H

src/pika_consensus.cc

Lines changed: 65 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,16 @@ Status ConsensusCoordinator::UpdateSlave(const std::string& ip, int port, const
390390
}
391391
{
392392
std::lock_guard l(slave_ptr->slave_mu);
393-
slave_ptr->acked_offset = end;
393+
// Treat this ACK as confirming everything before and including end
394+
LogOffset updated_offset;
395+
// Use empty start to indicate from the beginning of the window
396+
slave_ptr->sync_win.Update(SyncWinItem(LogOffset()), SyncWinItem(end), &updated_offset);
397+
if (!(updated_offset == LogOffset())) {
398+
slave_ptr->acked_offset = updated_offset;
399+
} else {
400+
// Fallback to end if window was empty or no progress detected
401+
slave_ptr->acked_offset = end;
402+
}
394403
sync_pros_.AddMatchIndex(ip, port, slave_ptr->acked_offset);
395404
// LOG(INFO) << "PacificA slave ip: " << ip << ", port :" << port << ",slave acked_offset "
396405
// << slave_ptr->acked_offset.ToString();
@@ -835,20 +844,44 @@ bool ConsensusCoordinator::checkFinished(const LogOffset& offset) {
835844
void ConsensusCoordinator::SyncBinlogLoop() {
836845
while (!thread_stop_.load()) {
837846
std::unique_lock<pstd::Mutex> lock(sync_mu_);
838-
// timed wait to allow coalescing multiple appends
839-
auto coalesce = std::chrono::milliseconds(g_pika_conf->consensus_timeout());
840-
sync_cv_.wait_for(lock, coalesce, [this] { return needs_sync_.load() || thread_stop_.load(); });
841-
847+
// Wait until there is at least one pending append
848+
sync_cv_.wait(lock, [this] { return needs_sync_.load() || thread_stop_.load(); });
842849
if (thread_stop_.load()) {
843850
break;
844851
}
845-
if (!needs_sync_.load()) {
846-
continue;
847-
}
852+
// Coalesce multiple appends in the next timeout window
853+
auto coalesce = std::chrono::milliseconds(g_pika_conf->consensus_timeout());
854+
lock.unlock();
855+
std::this_thread::sleep_for(coalesce);
856+
lock.lock();
848857

849858
needs_sync_.store(false);
850859
pstd::Status s = stable_logger_->Logger()->Sync();
851860

861+
// Record fsynced offset (not beyond prepared_id_)
862+
{
863+
std::shared_lock prep_lock(prepared_id__rwlock_);
864+
std::lock_guard fs_lock(fsynced_id_rwlock_);
865+
if (prepared_id_ > last_fsynced_id_) {
866+
last_fsynced_id_ = prepared_id_;
867+
}
868+
}
869+
870+
// After fsync, try to advance committed_id up to min(desired, fsynced)
871+
{
872+
std::shared_lock fs_lock(fsynced_id_rwlock_);
873+
std::lock_guard commit_lock(committed_id_rwlock_);
874+
LogOffset target = desired_committed_id_;
875+
if (target > last_fsynced_id_) {
876+
target = last_fsynced_id_;
877+
}
878+
if (target > committed_id_) {
879+
committed_id_ = target;
880+
context_->UpdateAppliedIndex(committed_id_);
881+
committed_id_cv_.notify_all();
882+
}
883+
}
884+
852885
std::lock_guard<pstd::Mutex> guard(promises_mu_);
853886
for (auto& p : sync_promises_) {
854887
p.set_value(s);
@@ -920,7 +953,9 @@ Status ConsensusCoordinator::AppendSlaveEntries(const std::shared_ptr<Cmd>& cmd_
920953
<< " cur last index " << last_index.l_offset.index;
921954
return Status::OK();
922955
}
956+
auto start_us = pstd::NowMicros();
923957
Status s = PersistAppendBinlog(cmd_ptr);
958+
auto end_us = pstd::NowMicros();
924959
if (!s.ok()) {
925960
return s;
926961
}
@@ -961,7 +996,7 @@ Status ConsensusCoordinator::UpdateCommittedID() {
961996
LogOffset slave_prepared_id = LogOffset();
962997

963998
for (const auto& slave : slaves) {
964-
if (slave.second->slave_state == kSlaveBinlogSync) {
999+
if (slave.second->slave_state == kSlaveBinlogSync || slave.second->slave_state == SlaveState::KCandidate) {
9651000
if (slave_prepared_id == LogOffset()) {
9661001
slave_prepared_id = slave.second->acked_offset;
9671002
} else if (slave.second->acked_offset < slave_prepared_id) {
@@ -1029,8 +1064,9 @@ pstd::Status ConsensusCoordinator::SendBinlog(const std::shared_ptr<SlaveNode>&
10291064
return Status::OK();
10301065
}
10311066

1067+
// Gate: allow only one in-flight batch until ACK clears the sync window
10321068
int batch_size = g_pika_conf->consensus_batch_size();
1033-
for (int i = start_index; i < logs_->Size() && tasks.size() < batch_size; ++i) {
1069+
for (int i = start_index; i < logs_->Size() && static_cast<int>(tasks.size()) < batch_size; ++i) {
10341070
const auto& item = logs_->At(i);
10351071
tasks.emplace_back(RmNode(slave_ptr->Ip(), slave_ptr->Port(), db_name, slave_ptr->SessionId()),
10361072
BinlogChip(item.offset, item.binlog_), item.offset, committed_index);
@@ -1045,13 +1081,21 @@ pstd::Status ConsensusCoordinator::SendBinlog(const std::shared_ptr<SlaveNode>&
10451081
// decide if we should send now based on size or timeout window
10461082
bool size_triggered = (static_cast<int>(tasks.size()) >= batch_size);
10471083
bool timeout_triggered = false;
1048-
if (slave_ptr->pending_since_us_ == 0 && !size_triggered) {
1084+
1085+
// one-shot immediate send to close current window
1086+
bool force_now = immediate_send_once_.exchange(false);
1087+
1088+
if (slave_ptr->pending_since_us_ == 0 && !size_triggered && !force_now) {
10491089
// start pending window and wait for more logs or timeout
10501090
slave_ptr->pending_since_us_ = now;
10511091
return Status::OK();
10521092
}
1053-
if (slave_ptr->pending_since_us_ > 0) {
1054-
timeout_triggered = (now - slave_ptr->pending_since_us_) >= (static_cast<uint64_t>(g_pika_conf->consensus_timeout()) * 1000ULL);
1093+
if (!size_triggered) {
1094+
if (force_now) {
1095+
timeout_triggered = true;
1096+
} else if (slave_ptr->pending_since_us_ > 0) {
1097+
timeout_triggered = (now - slave_ptr->pending_since_us_) >= (static_cast<uint64_t>(g_pika_conf->consensus_timeout()) * 1000ULL);
1098+
}
10551099
}
10561100
if (!size_triggered && !timeout_triggered) {
10571101
return Status::OK();
@@ -1068,14 +1112,21 @@ pstd::Status ConsensusCoordinator::SendBinlog(const std::shared_ptr<SlaveNode>&
10681112
std::vector<WriteTask> final_tasks_to_send;
10691113
final_tasks_to_send.push_back(batched_task);
10701114
g_pika_rm->ProduceWriteQueue(slave_ptr->Ip(), slave_ptr->Port(), db_name, final_tasks_to_send);
1115+
// Immediately consume the write queue to send over network
1116+
// g_pika_rm->ConsumeWriteQueue();
10711117

10721118
// Update slave node's state
10731119
slave_ptr->sent_offset = last_task.binlog_chip_.offset_;
1120+
// Track every log item so ACK can consume the window in order
10741121
for (const auto& task : tasks) {
1075-
slave_ptr->sync_win.Push(SyncWinItem(task.binlog_chip_.offset_));
1122+
slave_ptr->sync_win.Push(SyncWinItem(task.binlog_chip_.offset_, task.binlog_chip_.binlog_.size()));
10761123
}
10771124
// reset pending timer after sending
10781125
slave_ptr->pending_since_us_ = 0;
1126+
// start ACK timeout tracking for this in-flight batch
1127+
if (slave_ptr->ack_timeout_start_time_us_ == 0) {
1128+
slave_ptr->ack_timeout_start_time_us_ = now;
1129+
}
10791130

10801131
// trigger fsync coalesced with network send
10811132
{

src/pika_rm.cc

Lines changed: 59 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -532,6 +532,7 @@ Status SyncMasterDB::AppendCandidateBinlog(const std::string& ip, int port, cons
532532
}
533533

534534
pstd::Status SyncMasterDB::SyncBinlogAndWait() {
535+
g_pika_rm->WakeUpBinlogSync();
535536
return coordinator_.SyncAndWait();
536537
}
537538

@@ -540,7 +541,6 @@ Status SyncMasterDB::ConsensusProposeLog(const std::shared_ptr<Cmd>& cmd_ptr) {
540541
if (!coordinator_.GetISConsistency()) {
541542
return coordinator_.ProposeLog(cmd_ptr);
542543
}
543-
//LOG(INFO) << "Master DB (" << db_info_.db_name_ << ") ConsensusProposeLog";
544544

545545
// Batch append without immediate waiting to allow high concurrency
546546
Status s = coordinator_.AppendEntries(cmd_ptr); // Append the log entry to the coordinator
@@ -557,42 +557,63 @@ Status SyncMasterDB::ConsensusProposeLog(const std::shared_ptr<Cmd>& cmd_ptr) {
557557
// // TODO: 这里暂时注掉了sleep等待,50ms耗时过长,影响写入链路,后期需要改成条件变量唤醒方式
558558
// //std::this_thread::sleep_for(std::chrono::milliseconds(50));
559559
// }
560-
// Batch-wait policy: only wait once per consensus_timeout window or after enough appends
561-
static thread_local uint64_t window_start_us = 0;
562-
static thread_local int accepted_since_window = 0;
560+
// Per-DB global batching window across threads
561+
struct WindowState {
562+
std::atomic<uint64_t> start_us{0};
563+
std::atomic<int> accepted{0};
564+
};
565+
static std::unordered_map<std::string, WindowState> g_db_windows;
566+
static pstd::Mutex g_db_windows_mu;
567+
568+
WindowState* ws = nullptr;
569+
{
570+
std::lock_guard<pstd::Mutex> lk(g_db_windows_mu);
571+
ws = &g_db_windows[db_info_.db_name_];
572+
}
573+
563574
const uint64_t now_us = pstd::NowMicros();
564575
const uint64_t timeout_us = static_cast<uint64_t>(g_pika_conf->consensus_timeout()) * 1000ULL;
565576
const int min_batch_wait = std::max(50, g_pika_conf->consensus_batch_size());
566577

567-
if (window_start_us == 0) {
568-
window_start_us = now_us;
569-
accepted_since_window = 0;
578+
uint64_t expected0 = 0;
579+
if (ws->start_us.compare_exchange_strong(expected0, now_us)) {
580+
ws->accepted.store(0, std::memory_order_relaxed);
570581
}
571582

572-
accepted_since_window++;
573-
bool window_elapsed = (now_us - window_start_us) >= timeout_us;
574-
bool enough_accumulated = accepted_since_window >= min_batch_wait;
583+
ws->accepted.fetch_add(1, std::memory_order_relaxed);
584+
uint64_t leader_election_token = ws->start_us.load(std::memory_order_relaxed);
585+
bool window_elapsed = (now_us - leader_election_token) >= timeout_us;
586+
bool enough_accumulated = ws->accepted.load(std::memory_order_relaxed) >= min_batch_wait;
575587

576588
if (!window_elapsed && !enough_accumulated) {
577-
// do not wait this time; let caller return fast to accept more writes
578589
return Status::OK();
579590
}
580-
// Wait for consensus to be achieved using condition variable (once per window or batch)
591+
592+
// Attempt to close the current window and become the leader for this batch
593+
if (ws->start_us.compare_exchange_strong(leader_election_token, 0)) {
594+
// Success, we are the leader. Reset count and trigger send.
595+
ws->accepted.store(0, std::memory_order_relaxed);
596+
coordinator_.TriggerImmediateSend();
597+
g_pika_rm->WakeUpBinlogSync();
598+
} else {
599+
// Lost the election. Another thread will handle the batch.
600+
// My command will be in the *next* batch. So, just return OK.
601+
return Status::OK();
602+
}
603+
604+
// Block once per window: wait until committed_id catches prepared_id (end of current window)
581605
pstd::Mutex* mu = coordinator_.GetCommittedIdMu();
582606
pstd::CondVar* cv = coordinator_.GetCommittedIdCv();
583607
std::unique_lock<pstd::Mutex> lock(*mu);
584608

585609
auto timeout = std::chrono::seconds(10);
586-
LogOffset offset = coordinator_.GetPreparedId();
587-
while (offset > coordinator_.GetCommittedId()) {
588-
if (cv->wait_for(lock, timeout) == std::cv_status::timeout) {
589-
return Status::Timeout("No consistency achieved within 10 seconds");
590-
}
610+
LogOffset window_end = coordinator_.GetPreparedId();
611+
while (window_end > coordinator_.GetCommittedId()) {
612+
if (cv->wait_for(lock, timeout) == std::cv_status::timeout) {
613+
return Status::Timeout("No consistency achieved within 10 seconds");
614+
}
591615
}
592616

593-
// reset window
594-
window_start_us = pstd::NowMicros();
595-
accepted_since_window = 0;
596617
return Status::OK();
597618
}
598619

@@ -745,6 +766,7 @@ PikaReplicaManager::PikaReplicaManager() {
745766
pika_repl_client_ = std::make_unique<PikaReplClient>(3000, 60);
746767
pika_repl_server_ = std::make_unique<PikaReplServer>(ips, port, 3000);
747768
InitDB();
769+
bg_thread_should_stop_.store(false);
748770
}
749771

750772
void PikaReplicaManager::Start() {
@@ -760,11 +782,27 @@ void PikaReplicaManager::Start() {
760782
LOG(FATAL) << "Start Repl Server Error: " << ret
761783
<< (ret == net::kCreateThreadError ? ": create thread error " : ": other error");
762784
}
785+
786+
bg_thread_ = std::thread([this]() {
787+
while (!bg_thread_should_stop_.load()) {
788+
int consumed_count = ConsumeWriteQueue();
789+
if (consumed_count == 0) {
790+
std::unique_lock<pstd::Mutex> lock(write_queue_mu_);
791+
bg_cv_.wait_for(lock, std::chrono::milliseconds(100),
792+
[this] { return bg_thread_should_stop_.load() || !write_queues_.empty(); });
793+
}
794+
}
795+
});
763796
}
764797

765798
void PikaReplicaManager::Stop() {
766799
pika_repl_client_->Stop();
767800
pika_repl_server_->Stop();
801+
bg_thread_should_stop_.store(true);
802+
bg_cv_.notify_one();
803+
if (bg_thread_.joinable()) {
804+
bg_thread_.join();
805+
}
768806
}
769807

770808
bool PikaReplicaManager::CheckMasterSyncFinished() {
@@ -800,6 +838,7 @@ void PikaReplicaManager::ProduceWriteQueue(const std::string& ip, int port, std:
800838
for (auto& task : tasks) {
801839
write_queues_[index][db_name].push(task);
802840
}
841+
bg_cv_.notify_one();
803842
}
804843

805844
int PikaReplicaManager::ConsumeWriteQueue() {

0 commit comments

Comments
 (0)