Skip to content

Commit d6c0db3

Browse files
committed
fix some bugs
1 parent aea793b commit d6c0db3

12 files changed

Lines changed: 160 additions & 165 deletions

include/pika_client_conn.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ class PikaClientConn : public net::RedisConn {
113113
std::vector<std::shared_ptr<std::string>> resp_array;
114114

115115
std::shared_ptr<TimeStat> time_stat_;
116-
116+
void TryWriteResp();
117117
private:
118118
net::ServerThread* const server_thread_;
119119
std::string current_db_;
@@ -134,7 +134,7 @@ class PikaClientConn : public net::RedisConn {
134134
void ProcessMonitor(const PikaCmdArgsType& argv);
135135

136136
void ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr<std::string>& resp_ptr, bool cache_miss_in_rtc);
137-
void TryWriteResp();
137+
// void TryWriteResp();
138138
};
139139

140140
struct ClientInfo {

include/pika_command_collector.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ class PikaCommandCollector {
6262
* @param callback callback function after processing is completed
6363
* @return whether the addition was successful
6464
*/
65-
bool AddCommand(const std::shared_ptr<Cmd>& cmd_ptr, CommandCallback callback);
65+
bool AddCommand(std::shared_ptr<Cmd> cmd_ptr, CommandCallback callback);
6666

6767
/**
6868
* @brief Set the batch max wait time

include/pika_kv.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ class SetCmd : public Cmd {
2424
res.push_back(key_);
2525
return res;
2626
}
27+
~SetCmd() {}
2728
void Do() override;
2829
void DoUpdateCache() override;
2930
void DoThroughDB() override;

src/pika_client_conn.cc

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -229,14 +229,16 @@ std::shared_ptr<Cmd> PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const st
229229
if (command_collector) {
230230
// Create callback to handle command completion
231231
auto callback = [this, c_ptr](const LogOffset& offset, pstd::Status status) {
232-
if (status.ok()) {
233-
// Command was successfully processed through the pipeline
234-
LOG(INFO) << "Command " << c_ptr->name() << " completed via CommandCollector";
235-
} else {
236-
// Set error response
237-
c_ptr->res().SetRes(CmdRes::kErrOther, "Command processing failed: " + status.ToString());
238-
LOG(ERROR) << "Command " << c_ptr->name() << " failed in CommandCollector: " << status.ToString();
239-
}
232+
LOG(INFO) << "Command completed";
233+
auto pc = dynamic_cast<PikaClientConn*>(c_ptr->GetConn().get());
234+
if (pc) {
235+
auto resp_ptr = c_ptr->GetResp();
236+
if (resp_ptr) {
237+
*resp_ptr = std::move(c_ptr->res().message());
238+
}
239+
pc->resp_num--;
240+
pc->TryWriteResp();
241+
}
240242
};
241243

242244
// Add command to collector for batch processing
@@ -591,8 +593,13 @@ void PikaClientConn::ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr<s
591593
}
592594

593595
std::shared_ptr<Cmd> cmd_ptr = DoCmd(argv, opt, resp_ptr, cache_miss_in_rtc);
594-
*resp_ptr = std::move(cmd_ptr->res().message());
595-
resp_num--;
596+
// *resp_ptr = std::move(cmd_ptr->res().message());
597+
// resp_num--;
598+
if (opt == kCmdNameSet) {
599+
} else {
600+
*resp_ptr = std::move(cmd_ptr->res().message());
601+
resp_num--;
602+
}
596603
}
597604

598605
std::queue<std::shared_ptr<Cmd>> PikaClientConn::GetTxnCmdQue() { return txn_cmd_que_; }

src/pika_command.cc

Lines changed: 30 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -952,35 +952,36 @@ bool Cmd::DoReadCommandInCache() {
952952

953953

954954
void Cmd::DoBinlog() {
955-
if (res().ok() && is_write() && g_pika_conf->write_binlog()) {
956-
std::shared_ptr<net::NetConn> conn_ptr = GetConn();
957-
std::shared_ptr<std::string> resp_ptr = GetResp();
958-
// Consider that dummy cmd appended by system, both conn and resp are null.
959-
if ((!conn_ptr || !resp_ptr) && (name_ != kCmdDummy)) {
960-
if (!conn_ptr) {
961-
LOG(WARNING) << sync_db_->SyncDBInfo().ToString() << " conn empty.";
962-
}
963-
if (!resp_ptr) {
964-
LOG(WARNING) << sync_db_->SyncDBInfo().ToString() << " resp empty.";
965-
}
966-
res().SetRes(CmdRes::kErrOther);
967-
return;
968-
}
969-
970-
Status s = sync_db_->ConsensusProposeLog(shared_from_this());
971-
if (!s.ok()) {
972-
if(g_pika_server->IsConsistency()&&s.IsTimeout()){
973-
res().SetRes(CmdRes::kConsistencyTimeout, "Timeout waiting for consistency");
974-
LOG(WARNING) << sync_db_->SyncDBInfo().ToString() << " Slave node consistency timeout"
975-
<< s.ToString();
976-
}else{
977-
LOG(WARNING) << sync_db_->SyncDBInfo().ToString() << " Writing binlog failed, maybe no space left on device "
978-
<< s.ToString();
979-
res().SetRes(CmdRes::kErrOther, s.ToString());
980-
}
981-
return;
982-
}
983-
}
955+
// if (res().ok() && is_write() && g_pika_conf->write_binlog()) {
956+
// std::shared_ptr<net::NetConn> conn_ptr = GetConn();
957+
// std::shared_ptr<std::string> resp_ptr = GetResp();
958+
// // Consider that dummy cmd appended by system, both conn and resp are null.
959+
// if ((!conn_ptr || !resp_ptr) && (name_ != kCmdDummy)) {
960+
// if (!conn_ptr) {
961+
// LOG(WARNING) << sync_db_->SyncDBInfo().ToString() << " conn empty.";
962+
// }
963+
// if (!resp_ptr) {
964+
// LOG(WARNING) << sync_db_->SyncDBInfo().ToString() << " resp empty.";
965+
// }
966+
// res().SetRes(CmdRes::kErrOther);
967+
// return;
968+
// }
969+
970+
// Status s = sync_db_->ConsensusProposeLog(shared_from_this());
971+
// if (!s.ok()) {
972+
// if(g_pika_server->IsConsistency()&&s.IsTimeout()){
973+
// res().SetRes(CmdRes::kConsistencyTimeout, "Timeout waiting for consistency");
974+
// LOG(WARNING) << sync_db_->SyncDBInfo().ToString() << " Slave node consistency timeout"
975+
// << s.ToString();
976+
// }else{
977+
// LOG(WARNING) << sync_db_->SyncDBInfo().ToString() << " Writing binlog failed, maybe no space left on device "
978+
// << s.ToString();
979+
// res().SetRes(CmdRes::kErrOther, s.ToString());
980+
// }
981+
// return;
982+
// }
983+
// }
984+
return;
984985
}
985986

986987
#define PIKA_STAGE_DURATION_OUTPUT(duration) \

src/pika_command_collector.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ PikaCommandCollector::~PikaCommandCollector() {
3030
<< " commands, " << total_batches_.load() << " batches";
3131
}
3232

33-
bool PikaCommandCollector::AddCommand(const std::shared_ptr<Cmd>& cmd_ptr, CommandCallback callback) {
33+
bool PikaCommandCollector::AddCommand(std::shared_ptr<Cmd> cmd_ptr, CommandCallback callback) {
3434
if (!cmd_ptr || !cmd_ptr->is_write()) {
3535
LOG(WARNING) << "Attempt to add non-write command to CommandCollector";
3636
return false;

src/pika_consensus.cc

Lines changed: 41 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -824,7 +824,6 @@ bool ConsensusCoordinator::checkFinished(const LogOffset& offset) {
824824
//// pacificA private:
825825

826826
Status ConsensusCoordinator::PersistAppendBinlog(const std::shared_ptr<Cmd>& cmd_ptr, LogOffset& cur_offset) {
827-
std::lock_guard l(order_mu_);
828827
std::string content = cmd_ptr->ToRedisProtocol();
829828
std::string binlog = std::string();
830829
LogOffset offset = LogOffset();
@@ -843,12 +842,16 @@ Status ConsensusCoordinator::PersistAppendBinlog(const std::shared_ptr<Cmd>& cmd
843842
// If successful, append the log entry to the logs
844843
// TODO: 这里logs_的appendlog操作和上边的stable_logger_->Logger()->Put不是原子的,可能导致offset大的先被追加到logs_中,
845844
// 多线程写入的时候窗口会对不上,最终主从断开连接。需要加逻辑保证原子性
846-
LOG(INFO) << "PersistAppendBinlog: About to append to logs_, current size=" << logs_->Size()
847-
<< ", offset=" << cur_offset.ToString() << ", cmd=" << cmd_ptr->name();
848-
logs_->AppendLog(Log::LogItem(cur_offset, cmd_ptr, binlog));
849-
LOG(INFO) << "PersistAppendBinlog: Successfully appended to logs_, new size=" << logs_->Size();
850-
851-
SetPreparedId(cur_offset);
845+
//LOG(INFO) << "PersistAppendBinlog: About to append to logs_, current size=" << logs_->Size()
846+
// << ", offset=" << cur_offset.ToString() << ", cmd=" << cmd_ptr->name();
847+
// logs_->AppendLog(Log::LogItem(cur_offset, cmd_ptr, binlog));
848+
//LOG(INFO) << "PersistAppendBinlog: Successfully appended to logs_, new size=" << logs_->Size();
849+
{
850+
std::lock_guard l(order_mu_);
851+
// Append to logs_ under order lock to maintain ordering
852+
logs_->AppendLog(Log::LogItem(cur_offset, cmd_ptr, binlog));
853+
SetPreparedId(cur_offset);
854+
}
852855

853856
return stable_logger_->Logger()->IsOpened();
854857
}
@@ -868,7 +871,7 @@ Status ConsensusCoordinator::AppendEntries(const std::shared_ptr<Cmd>& cmd_ptr,
868871
return s;
869872
}
870873

871-
g_pika_server->SignalAuxiliary();
874+
// g_pika_server->SignalAuxiliary();
872875
return Status::OK();
873876
}
874877
Status ConsensusCoordinator::AppendSlaveEntries(const std::shared_ptr<Cmd>& cmd_ptr, const BinlogItem& attribute) {
@@ -922,6 +925,23 @@ Status ConsensusCoordinator::UpdateCommittedID() {
922925
}
923926
}
924927
}
928+
// if (!has_active_slaves) {
929+
// LogOffset master_prepared_id = GetPreparedId();
930+
// if (master_prepared_id.IsValid() && master_prepared_id >= GetCommittedId()) {
931+
// SetCommittedId(master_prepared_id);
932+
// LOG(INFO) << "PacificA update CommittedID (no active slaves): " << GetCommittedId().ToString()
933+
// << ", Total slaves: " << total_slaves
934+
// << ", kSlaveBinlogSync: " << binlog_sync_slaves
935+
// << ", Other states: " << other_state_slaves;
936+
// } else {
937+
// LOG(INFO) << "PacificA update CommittedID: No active slaves, keeping current CommittedID: " << GetCommittedId().ToString()
938+
// << ", Total slaves: " << total_slaves
939+
// << ", kSlaveBinlogSync: " << binlog_sync_slaves
940+
// << ", Other states: " << other_state_slaves;
941+
// }
942+
// // g_pika_rm->NotifyCommittedID(GetCommittedId());
943+
// return Status::OK();
944+
// }
925945
if (slave_prepared_id < GetCommittedId()) {
926946
LOG(WARNING) << "Error: slave_prepared_id (" << slave_prepared_id.ToString() << ") < master_committedId ("
927947
<< GetCommittedId().ToString() << ")";
@@ -953,7 +973,9 @@ Status ConsensusCoordinator::ProcessCoordination() {
953973
// Execute the operation of writing to DB
954974
Status ConsensusCoordinator::ApplyBinlog(const std::shared_ptr<Cmd>& cmd_ptr) {
955975
auto opt = cmd_ptr->argv()[0];
976+
LOG(INFO) << "[ApplyBinlog] Received command: " << opt << " for db: " << db_name_;
956977
if (pstd::StringToLower(opt) != kCmdNameFlushdb) {
978+
LOG(INFO) << "[ApplyBinlog] Scheduling async task for " << opt;
957979
InternalApplyFollower(cmd_ptr);
958980
} else {
959981
int32_t wait_ms = 250;
@@ -990,86 +1012,28 @@ void ConsensusCoordinator::BatchInternalApplyFollower(const std::vector<std::sha
9901012

9911013
Status ConsensusCoordinator::SendBinlog(std::shared_ptr<SlaveNode> slave_ptr, std::string db_name) {
9921014
std::vector<WriteTask> tasks;
993-
const int MAX_BATCH_SIZE = 100; // Maximum number of logs to send in a single batch
994-
995-
// Get current committed_id to ensure it's sent to the slave
996-
LogOffset current_committed_id = GetCommittedId();
997-
LOG(INFO) << "SendBinlog: [Thread " << std::this_thread::get_id() << "] Current committed_id: " << current_committed_id.ToString()
998-
<< ", sending to slave " << slave_ptr->Ip() << ":" << slave_ptr->Port()
999-
<< ", logs_ addr: " << logs_.get() << ", db_name: " << db_name_;
10001015

10011016
// Check if there are new log entries that need to be sent to the slave
1002-
LOG(INFO) << "SendBinlog: logs_->LastOffset()=" << logs_->LastOffset().ToString()
1003-
<< ", slave_ptr->acked_offset=" << slave_ptr->acked_offset.ToString()
1004-
<< ", logs_->Size()=" << logs_->Size();
1005-
1006-
if (logs_->Size() > 0 && logs_->LastOffset() >= slave_ptr->acked_offset) {
1017+
if (logs_->LastOffset() >= slave_ptr->acked_offset) {
10071018
// Find the index of the log entry corresponding to the slave's acknowledged offset
10081019
int index = logs_->FindOffset(slave_ptr->acked_offset);
1009-
int entries_to_send = logs_->Size() - index;
1010-
LOG(INFO) << "SendBinlog: Found " << entries_to_send << " new log entries to send, "
1011-
<< "starting from index " << index << " of " << logs_->Size();
1012-
10131020
if (index < logs_->Size()) {
1014-
// Send log entries in optimized batches
1015-
RmNode rm_node(slave_ptr->Ip(), slave_ptr->Port(), db_name, slave_ptr->SessionId());
1016-
1017-
// For large batches, use specialized batch handling
1018-
if (entries_to_send > MAX_BATCH_SIZE) {
1019-
LOG(INFO) << "SendBinlog: Using optimized batch sending for " << entries_to_send << " entries";
1020-
1021-
// Process in chunks of MAX_BATCH_SIZE
1022-
for (int batch_start = index; batch_start < logs_->Size(); batch_start += MAX_BATCH_SIZE) {
1023-
int batch_end = std::min(batch_start + MAX_BATCH_SIZE, logs_->Size());
1024-
std::vector<WriteTask> batch_tasks;
1025-
1026-
for (int i = batch_start; i < batch_end; ++i) {
1027-
Log::LogItem item = logs_->At(i);
1028-
WriteTask task(rm_node, BinlogChip(item.offset, item.binlog_), item.offset, current_committed_id);
1029-
batch_tasks.push_back(task);
1030-
}
1031-
1032-
g_pika_rm->ProduceWriteQueue(slave_ptr->Ip(), slave_ptr->Port(), db_name, batch_tasks);
1033-
LOG(INFO) << "SendBinlog: Sent batch " << (batch_start - index) / MAX_BATCH_SIZE + 1
1034-
<< " with " << (batch_end - batch_start) << " entries";
1035-
}
1036-
} else {
1037-
// Send all entries in a single batch
1038-
for (int i = index; i < logs_->Size(); ++i) {
1039-
Log::LogItem item = logs_->At(i);
1040-
WriteTask task(rm_node, BinlogChip(item.offset, item.binlog_), item.offset, current_committed_id);
1041-
tasks.push_back(task);
1042-
}
1021+
for (int i = index; i < logs_->Size(); ++i) {
1022+
const Log::LogItem& item = logs_->At(i);
1023+
1024+
slave_ptr->SetLastSendTime(pstd::NowMicros());
1025+
1026+
RmNode rm_node(slave_ptr->Ip(), slave_ptr->Port(), slave_ptr->DBName(), slave_ptr->SessionId());
1027+
WriteTask task(rm_node, BinlogChip(item.offset, item.binlog_), slave_ptr->sent_offset, GetCommittedId());
1028+
tasks.emplace_back(std::move(task));
1029+
1030+
slave_ptr->sent_offset = item.offset;
10431031
}
1044-
} else {
1045-
LOG(INFO) << "SendBinlog: No new log entries to send, index " << index << " is out of range (logs size: " << logs_->Size() << ")";
1046-
}
1047-
} else {
1048-
if (logs_->Size() == 0) {
1049-
LOG(INFO) << "SendBinlog: No logs available yet (logs_->Size()=0), will send empty binlog to maintain connection";
1050-
} else {
1051-
LOG(INFO) << "SendBinlog: Slave is already up to date, last offset: " << logs_->LastOffset().ToString()
1052-
<< ", slave acked offset: " << slave_ptr->acked_offset.ToString();
10531032
}
10541033
}
10551034

1056-
// Only send empty binlog if there are no actual log entries to send
1057-
// This prevents the deadlock where master waits for slave ACK and slave waits for master data
1058-
if (tasks.empty() && logs_->Size() == 0) {
1059-
// LOG(INFO) << "SendBinlog: Sending empty binlog with current committed_id: " << current_committed_id.ToString();
1060-
RmNode rm_node(slave_ptr->Ip(), slave_ptr->Port(), db_name, slave_ptr->SessionId());
1061-
// Create an empty WriteTask that includes the current committed_id
1062-
WriteTask empty_task(rm_node, BinlogChip(LogOffset(), ""), LogOffset(), current_committed_id);
1063-
tasks.push_back(empty_task);
1064-
}
1065-
1066-
// Send the tasks to the slave
10671035
if (!tasks.empty()) {
1068-
LOG(INFO) << "SendBinlog: Sending " << tasks.size() << " tasks to slave " << slave_ptr->Ip() << ":" << slave_ptr->Port();
1069-
extern std::unique_ptr<PikaReplicaManager> g_pika_rm;
10701036
g_pika_rm->ProduceWriteQueue(slave_ptr->Ip(), slave_ptr->Port(), db_name, tasks);
1071-
} else {
1072-
LOG(INFO) << "SendBinlog: No tasks to send to slave " << slave_ptr->Ip() << ":" << slave_ptr->Port();
10731037
}
10741038
return Status::OK();
10751039
}

src/pika_kv.cc

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,20 @@
1515

1616
extern std::unique_ptr<PikaConf> g_pika_conf;
1717
/* SET key value [NX] [XX] [EX <seconds>] [PX <milliseconds>] */
18+
// SetCmd::~SetCmd() {
19+
// auto tmp_conn = GetConn();
20+
// if (!tmp_conn) {
21+
// return;
22+
// }
23+
24+
// auto pc = dynamic_cast<PikaClientConn*>(tmp_conn.get());
25+
// std::shared_ptr<std::string> resp_ptr = std::make_shared<std::string>();
26+
// *resp_ptr = std::move(res().message());
27+
// pc->resp_num--;
28+
// pc->resp_array.push_back(resp_ptr);
29+
// pc->TryWriteResp();
30+
// LOG(INFO) << "SetCmd::~SetCmd() is completed";
31+
// }
1832
void SetCmd::DoInitial() {
1933
if (!CheckArg(argv_.size())) {
2034
res_.SetRes(CmdRes::kWrongNum, kCmdNameSet);

src/pika_repl_server.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ extern PikaServer* g_pika_server;
1717
extern std::unique_ptr<PikaReplicaManager> g_pika_rm;
1818

1919
PikaReplServer::PikaReplServer(const std::set<std::string>& ips, int port, int cron_interval) {
20-
server_tp_ = std::make_unique<net::ThreadPool>(PIKA_REPL_SERVER_TP_SIZE, 100000, "PikaReplServer");
20+
server_tp_ = std::make_unique<net::ThreadPool>(1, 100000, "PikaReplServer");
2121
pika_repl_server_thread_ = std::make_unique<PikaReplServerThread>(ips, port, cron_interval);
2222
pika_repl_server_thread_->set_thread_name("PikaReplServer");
2323
}

0 commit comments

Comments
 (0)