@@ -842,11 +842,12 @@ Status ConsensusCoordinator::PersistAppendBinlog(const std::shared_ptr<Cmd>& cmd
842842 return s;
843843 }
844844
845- // Force flush to ensure data persistence on master
846- s = stable_logger_->Logger ()->Sync ();
847- if (!s.ok ()) {
848- LOG (WARNING) << " Failed to sync binlog to disk on master: " << s.ToString ();
849- return s;
845+ if (++binlog_fsync_counter_ % g_pika_conf->binlog_fsync_interval () == 0 ) {
846+ s = stable_logger_->Logger ()->Sync ();
847+ if (!s.ok ()) {
848+ LOG (WARNING) << " Failed to sync binlog to disk on master: " << s.ToString ();
849+ return s;
850+ }
850851 }
851852
852853 // If successful, append the log entry to the logs
@@ -998,33 +999,39 @@ Status ConsensusCoordinator::ApplyBinlog(const std::vector<Log::LogItem>& logs)
998999
9991000Status ConsensusCoordinator::SendBinlog (std::shared_ptr<SlaveNode> slave_ptr, std::string db_name) {
10001001 std::vector<WriteTask> tasks;
1001- LogOffset prev_offset = slave_ptr->sent_offset ;
1002- LOG (INFO) << " SendBinlog: logs_->LastOffset()=" << logs_->LastOffset ().ToString ()
1003- << " , slave_ptr->sent_offset=" << slave_ptr->sent_offset .ToString ();
1004- // Check if there are new log entries that need to be sent to the slave
1005- if (logs_->LastOffset () >= slave_ptr->acked_offset ) {
1006- LOG (INFO) << " SendBinlog: logs_->Size()=" << logs_->Size ();
1007- // Find the index of the log entry corresponding to the slave's acknowledged offset
1008- int index = logs_->FindOffset (slave_ptr->acked_offset );
1009- LOG (INFO) << " SendBinlog: index=" << index;
1010- if (index < logs_->Size ()) {
1011- for (int i = index; i < logs_->Size (); ++i) {
1012- const Log::LogItem& item = logs_->At (i);
1013-
1014- slave_ptr->SetLastSendTime (pstd::NowMicros ());
1015-
1016- RmNode rm_node (slave_ptr->Ip (), slave_ptr->Port (), slave_ptr->DBName (), slave_ptr->SessionId ());
1017- WriteTask task (rm_node, BinlogChip (item.offset , item.binlog_ ), prev_offset, GetCommittedId ());
1018- tasks.emplace_back (std::move (task));
1019-
1020- prev_offset = item.offset ;
1021- slave_ptr->sent_offset = item.offset ;
1022- }
1002+ if (!g_pika_server->IsConsistency ()) {
1003+ return Status::OK ();
1004+ }
1005+ std::string ip = slave_ptr->Ip ();
1006+ int port = slave_ptr->Port ();
1007+ int32_t session_id = slave_ptr->SessionId ();
1008+
1009+ LogOffset last_sent = slave_ptr->sent_offset ;
1010+ if (logs_->LastOffset () > last_sent) {
1011+ int send_start_index = logs_->FindOffset (last_sent);
1012+ if (send_start_index < 0 ) {
1013+ LOG (WARNING) << " Binlog offset not found, maybe purged. last_sent: " << last_sent.ToString ();
1014+ return Status::Corruption (" cant find the file_num" );
1015+ }
1016+
1017+ if (send_start_index < logs_->Size () && logs_->At (send_start_index).offset == last_sent) {
1018+ send_start_index++;
1019+ }
1020+
1021+ if (send_start_index < logs_->Size ()) {
1022+ LogOffset prev_offset = send_start_index > 0 ? logs_->At (send_start_index - 1 ).offset : LogOffset ();
1023+ const auto & item = logs_->At (send_start_index);
1024+ RmNode rm_node (ip, port, db_name, session_id);
1025+ WriteTask task (rm_node, BinlogChip (item.offset , item.binlog_ ), prev_offset, GetCommittedId ());
1026+ tasks.emplace_back (std::move (task));
10231027 }
10241028 }
10251029
10261030 if (!tasks.empty ()) {
1027- g_pika_rm->ProduceWriteQueue (slave_ptr->Ip (), slave_ptr->Port (), db_name, tasks);
1031+ g_pika_rm->ProduceWriteQueue (ip, port, db_name, tasks);
1032+ slave_ptr->sent_offset = tasks.back ().binlog_chip_ .offset_ ;
1033+ LOG (INFO) << " SendBinlog tasks to queue, slave: " << ip << " :" << port << " tasks: " << tasks.size ()
1034+ << " new sent_offset: " << slave_ptr->sent_offset .ToString ();
10281035 }
10291036 return Status::OK ();
10301037}
0 commit comments