Skip to content

Commit 3a3760a

Browse files
committed
Fix timeout bug
1 parent fda56d8 commit 3a3760a

19 files changed

Lines changed: 617 additions & 259 deletions

conf/pika.conf

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,13 @@ replication-num : 0
153153
# The default value of consensus-level is 0, which means this feature is not enabled.
154154
consensus-level : 0
155155

156+
# consensus-batch-size: The maximum number of items in a consensus batch.
157+
consensus-batch-size : 1000
158+
159+
# consensus-timeout: The timeout in milliseconds for waiting for a batch ACK from a slave.
160+
# Default: 1500
161+
consensus-timeout : 1500
162+
156163
# The Prefix of dump file's name.
157164
# All the files that generated by command "bgsave" will be name with this prefix.
158165
dump-prefix :

include/pika_binlog.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ class Binlog : public pstd::noncopyable {
6161
* Set Producer pro_num and pro_offset with lock
6262
*/
6363
pstd::Status SetProducerStatus(uint32_t pro_num, uint64_t pro_offset, uint32_t term = 0, uint64_t index = 0);
64+
// Force sync data to disk
65+
pstd::Status Sync();
6466
// Need to hold Lock();
6567
pstd::Status Truncate(uint32_t pro_num, uint64_t pro_offset, uint64_t index);
6668

include/pika_conf.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -888,6 +888,9 @@ class PikaConf : public pstd::BaseConf {
888888
int ConfigRewriteSlaveOf();
889889
int ConfigRewriteReplicationID();
890890

891+
int consensus_batch_size() const;
892+
int consensus_timeout() const;
893+
891894
private:
892895
int port_ = 0;
893896
int slave_priority_ = 100;
@@ -1065,6 +1068,10 @@ class PikaConf : public pstd::BaseConf {
10651068

10661069
//Internal used metrics Persisted by pika.conf
10671070
std::unordered_set<std::string> internal_used_unfinished_full_sync_;
1071+
1072+
// Consensus configuration
1073+
int consensus_batch_size_;
1074+
int consensus_timeout_;
10681075
};
10691076

10701077
#endif

include/pika_consensus.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,7 @@ class ConsensusCoordinator {
257257
pstd::Status AppendSlaveEntries(const std::shared_ptr<Cmd>& cmd_ptr, const BinlogItem& attribute);
258258
pstd::Status CommitAppLog(const LogOffset& master_committed_id);
259259
pstd::Status UpdateCommittedID();
260-
pstd::Status ApplyBinlog(const std::shared_ptr<Cmd>& cmd_ptr);
260+
pstd::Status ApplyBinlog(const std::vector<Log::LogItem>& logs);
261261
pstd::Status ProcessCoordination();
262262

263263
LogOffset GetCommittedId() {
@@ -276,7 +276,10 @@ class ConsensusCoordinator {
276276
std::lock_guard l(committed_id_rwlock_);
277277
committed_id_ = offset;
278278
context_->UpdateAppliedIndex(committed_id_);
279+
committed_id_cv_.notify_all();
279280
}
281+
pstd::Mutex* GetCommittedIdMu() { return &committed_id_mu_; }
282+
pstd::CondVar* GetCommittedIdCv() { return &committed_id_cv_; }
280283

281284
private:
282285
pstd::Status PersistAppendBinlog(const std::shared_ptr<Cmd>& cmd_ptr, LogOffset& cur_offset);
@@ -285,6 +288,8 @@ class ConsensusCoordinator {
285288
std::shared_mutex is_consistency_rwlock_;
286289
bool is_consistency_ = false;
287290
std::shared_mutex committed_id_rwlock_;
291+
pstd::Mutex committed_id_mu_;
292+
pstd::CondVar committed_id_cv_;
288293
LogOffset committed_id_ = LogOffset();
289294
std::shared_mutex prepared_id__rwlock_;
290295
LogOffset prepared_id_ = LogOffset();

include/pika_define.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,14 @@ const int64_t kPoolSize = 1073741824;
354354
const std::string kBinlogPrefix = "write2file";
355355
const size_t kBinlogPrefixLen = 10;
356356

357+
/*
358+
* PIKA_BATCH_MAGIC: Core identifier for binlog batch processing.
359+
* - Master: Prefixes batched binlogs with this magic in SendBinlog
360+
* - Slave: Detects this magic in HandleBGWorkerWriteBinlog
361+
* to switch between batch and single-binlog parsing modes.
362+
*/
363+
const uint32_t PIKA_BATCH_MAGIC = 0x42544348; // "BTCH" in ASCII
364+
357365
const std::string kPikaMeta = "meta";
358366
const std::string kManifest = "manifest";
359367
const std::string kContext = "context";

include/pika_repl_client.h

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -88,14 +88,22 @@ class PikaReplClient {
8888
async_write_db_task_counts_[db_index].fetch_sub(incr_step, std::memory_order::memory_order_seq_cst);
8989
}
9090

91-
int32_t GetUnfinishedAsyncWriteDBTaskCount(const std::string& db_name) {
92-
int32_t db_index = db_name.back() - '0';
93-
assert(db_index >= 0 && db_index <= 7);
94-
return async_write_db_task_counts_[db_index].load(std::memory_order_seq_cst);
95-
}
91+
int32_t GetUnfinishedAsyncWriteDBTaskCount(const std::string& db_name);
92+
void SignalAsyncWriteDBTaskEnd(const std::string& db_name);
93+
void WaitForAsyncWriteDBTaskEnd(const std::string& db_name);
94+
95+
// unfinished_async_write_db_tasks related
96+
pstd::Mutex unfinished_async_write_db_tasks_mu_;
97+
std::unordered_map<std::string, int32_t> unfinished_async_write_db_tasks_;
98+
pstd::CondVar async_write_db_tasks_cond_;
99+
100+
// db_write_block_fds_ related
101+
pstd::Mutex db_write_block_fds_mu_;
102+
std::set<int> db_write_block_fds_;
96103

97104
private:
98-
size_t GetBinlogWorkerIndexByDBName(const std::string &db_name);
105+
size_t GetBinlogWorkerIndexByDBName(const std::string& db_name);
106+
size_t GetDBWorkerIndexByDBName(const std::string& db_name);
99107
size_t GetHashIndexByKey(const std::string& key);
100108
void UpdateNextAvail() { next_avail_ = (next_avail_ + 1) % static_cast<int32_t>(write_binlog_workers_.size()); }
101109

include/pika_rm.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,8 @@ class PikaReplicaManager {
207207
const std::shared_ptr<InnerMessage::InnerResponse>& res,
208208
const std::shared_ptr<net::PbConn>& conn, void* res_private_data);
209209
void ScheduleWriteDBTask(const std::shared_ptr<Cmd>& cmd_ptr, const std::string& db_name);
210+
void SignalAsyncWriteDBTaskEnd(const std::string& db_name);
211+
void WaitForAsyncWriteDBTaskEnd(const std::string& db_name);
210212
void ScheduleReplClientBGTaskByDBName(net::TaskFunc , void* arg, const std::string &db_name);
211213
void ReplServerRemoveClientConn(int fd);
212214
void ReplServerUpdateClientConnMap(const std::string& ip_port, int fd);
@@ -244,7 +246,7 @@ class PikaReplicaManager {
244246
pstd::Mutex write_queue_mu_;
245247

246248
// every host owns a queue, the key is "ip + port"
247-
std::unordered_map<std::string, std::unordered_map<std::string, std::queue<WriteTask>>> write_queues_;
249+
std::unordered_map<std::string, std::unordered_map<std::string, std::queue<std::pair<WriteTask, uint64_t>>>> write_queues_;
248250
std::unique_ptr<PikaReplClient> pika_repl_client_;
249251
std::unique_ptr<PikaReplServer> pika_repl_server_;
250252
};

src/net/src/pb_conn.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ WriteStatus PbConn::SendReply() {
153153
if (item_len - write_buf_.item_pos_ != 0) {
154154
return kWriteHalf;
155155
}
156-
LOG(ERROR) << "write item success";
156+
//LOG(ERROR) << "write item success";
157157
}
158158
return kWriteAll;
159159
}

src/pika_binlog.cc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -469,3 +469,10 @@ Status Binlog::Truncate(uint32_t pro_num, uint64_t pro_offset, uint64_t index) {
469469

470470
return Status::OK();
471471
}
472+
473+
Status Binlog::Sync() {
474+
if (queue_) {
475+
return queue_->Sync();
476+
}
477+
return Status::Corruption("Logger not initialized");
478+
}

src/pika_conf.cc

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -707,6 +707,11 @@ int PikaConf::Load() {
707707
rsync_timeout_ms_.store(tmp_rsync_timeout_ms);
708708
}
709709

710+
consensus_batch_size_ = 100;
711+
consensus_timeout_ = 1000; // 1s
712+
713+
GetConfInt("consensus-batch-size", &consensus_batch_size_);
714+
GetConfInt("consensus-timeout", &consensus_timeout_);
710715
return ret;
711716
}
712717

@@ -912,3 +917,7 @@ std::vector<rocksdb::CompressionType> PikaConf::compression_per_level() {
912917
}
913918
return types;
914919
}
920+
921+
int PikaConf::consensus_batch_size() const { return consensus_batch_size_; }
922+
923+
int PikaConf::consensus_timeout() const { return consensus_timeout_; }

0 commit comments

Comments
 (0)