@@ -998,33 +998,43 @@ Status ConsensusCoordinator::ApplyBinlog(const std::vector<Log::LogItem>& logs)
998998
999999Status ConsensusCoordinator::SendBinlog (std::shared_ptr<SlaveNode> slave_ptr, std::string db_name) {
10001000 std::vector<WriteTask> tasks;
1001- LogOffset prev_offset = slave_ptr->sent_offset ;
1002- LOG (INFO) << " SendBinlog: logs_->LastOffset()=" << logs_->LastOffset ().ToString ()
1003- << " , slave_ptr->sent_offset=" << slave_ptr->sent_offset .ToString ();
1004- // Check if there are new log entries that need to be sent to the slave
1005- if (logs_->LastOffset () >= slave_ptr->acked_offset ) {
1006- LOG (INFO) << " SendBinlog: logs_->Size()=" << logs_->Size ();
1007- // Find the index of the log entry corresponding to the slave's acknowledged offset
1008- int index = logs_->FindOffset (slave_ptr->acked_offset );
1009- LOG (INFO) << " SendBinlog: index=" << index;
1010- if (index < logs_->Size ()) {
1011- for (int i = index; i < logs_->Size (); ++i) {
1012- const Log::LogItem& item = logs_->At (i);
1013-
1014- slave_ptr->SetLastSendTime (pstd::NowMicros ());
1015-
1016- RmNode rm_node (slave_ptr->Ip (), slave_ptr->Port (), slave_ptr->DBName (), slave_ptr->SessionId ());
1001+ if (!g_pika_server->IsConsistency ()) {
1002+ return Status::OK ();
1003+ }
1004+ std::string ip = slave_ptr->Ip ();
1005+ int port = slave_ptr->Port ();
1006+ int32_t session_id = slave_ptr->SessionId ();
1007+
1008+ LogOffset last_sent = slave_ptr->sent_offset ;
1009+ if (logs_->LastOffset () > last_sent) {
1010+ int send_start_index = logs_->FindOffset (last_sent);
1011+ if (send_start_index < 0 ) {
1012+ LOG (WARNING) << " Binlog offset not found, maybe purged. last_sent: " << last_sent.ToString ();
1013+ return Status::Corruption (" cant find the file_num" );
1014+ }
1015+
1016+ if (send_start_index < logs_->Size () && logs_->At (send_start_index).offset == last_sent) {
1017+ send_start_index++;
1018+ }
1019+
1020+ if (send_start_index < logs_->Size ()) {
1021+ LogOffset prev_offset = send_start_index > 0 ? logs_->At (send_start_index - 1 ).offset : LogOffset ();
1022+ for (int i = send_start_index; i < logs_->Size (); ++i) {
1023+ const auto & item = logs_->At (i);
1024+ RmNode rm_node (ip, port, db_name, session_id);
10171025 WriteTask task (rm_node, BinlogChip (item.offset , item.binlog_ ), prev_offset, GetCommittedId ());
10181026 tasks.emplace_back (std::move (task));
10191027
10201028 prev_offset = item.offset ;
1021- slave_ptr->sent_offset = item.offset ;
10221029 }
10231030 }
10241031 }
10251032
10261033 if (!tasks.empty ()) {
1027- g_pika_rm->ProduceWriteQueue (slave_ptr->Ip (), slave_ptr->Port (), db_name, tasks);
1034+ g_pika_rm->ProduceWriteQueue (ip, port, db_name, tasks);
1035+ slave_ptr->sent_offset = tasks.back ().binlog_chip_ .offset_ ;
1036+ LOG (INFO) << " SendBinlog tasks to queue, slave: " << ip << " :" << port << " tasks: " << tasks.size ()
1037+ << " new sent_offset: " << slave_ptr->sent_offset .ToString ();
10281038 }
10291039 return Status::OK ();
10301040}
0 commit comments