Skip to content

Commit 6b8762d

Browse files
committed
Change to parallel execution
1 parent 3a3760a commit 6b8762d

17 files changed

Lines changed: 554 additions & 244 deletions

conf/pika.conf

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -154,11 +154,16 @@ replication-num : 0
154154
consensus-level : 0
155155

156156
# consensus-batch-size: The maximum number of items in a consensus batch.
157+
# Default: 1000
157158
consensus-batch-size : 1000
158159

159-
# consensus-timeout: The timeout in milliseconds for waiting for a batch ACK from a slave.
160-
# Default: 1500
161-
consensus-timeout : 1500
160+
# consensus-timeout: The maximum waiting time during the master node batch sending process.
161+
# Default: 10
162+
consensus-timeout : 10
163+
164+
# replication-ack-timeout: The timeout in milliseconds for waiting for a batch ACK from a slave.
165+
# Default: 5000
166+
replication-ack-timeout : 5000
162167

163168
# The Prefix of dump file's name.
164169
# All the files that generated by command "bgsave" will be name with this prefix.
@@ -320,6 +325,11 @@ write-binlog : yes
320325
# Supported Units [K|M|G], binlog-file-size default unit is in [bytes] and the default value is 100M.
321326
binlog-file-size : 104857600
322327

328+
# The interval (in number of logs) for forcing a disk flush of the binlog.
329+
# A value of 1 means fsync for every log. A higher value improves performance at the cost of durability.
330+
# Default: 100
331+
binlog-fsync-interval : 100
332+
323333
# Automatically triggers a small compaction according to statistics
324334
# Use the cache to store up to 'max-cache-statistic-keys' keys
325335
# If 'max-cache-statistic-keys' set to '0', that means turn off the statistics function

include/pika_client_conn.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
#include <bitset>
1010
#include <utility>
11+
#include <future>
1112

1213
#include "acl.h"
1314
#include "include/pika_command.h"
@@ -52,6 +53,13 @@ class PikaClientConn : public net::RedisConn {
5253
bool cache_miss_in_rtc_;
5354
};
5455

56+
struct ParallelTask {
57+
std::vector<std::shared_ptr<std::string>> resps;
58+
std::vector<std::future<void>> futures;
59+
std::atomic<size_t> completed_count{0};
60+
size_t total_tasks{0};
61+
};
62+
5563
struct TxnStateBitMask {
5664
public:
5765
static constexpr uint8_t Start = 0;
@@ -72,6 +80,7 @@ class PikaClientConn : public net::RedisConn {
7280
void BatchExecRedisCmd(const std::vector<net::RedisCmdArgsType>& argvs, bool cache_miss_in_rtc);
7381
int DealMessage(const net::RedisCmdArgsType& argv, std::string* response) override { return 0; }
7482
static void DoBackgroundTask(void* arg);
83+
static void ParallelExecRedisCmd(void* arg);
7584

7685
bool IsPubSub() { return is_pubsub_; }
7786
void SetIsPubSub(bool is_pubsub) { is_pubsub_ = is_pubsub; }

include/pika_conf.h

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -350,6 +350,18 @@ class PikaConf : public pstd::BaseConf {
350350
int max_conn_rbuf_size() { return max_conn_rbuf_size_.load(); }
351351
int consensus_level() { return consensus_level_.load(); }
352352
int replication_num() { return replication_num_.load(); }
353+
int consensus_batch_size() {
354+
std::shared_lock l(rwlock_);
355+
return consensus_batch_size_;
356+
}
357+
int consensus_timeout() {
358+
std::shared_lock l(rwlock_);
359+
return consensus_timeout_;
360+
}
361+
int replication_ack_timeout() {
362+
std::shared_lock l(rwlock_);
363+
return replication_ack_timeout_;
364+
}
353365
int rate_limiter_mode() {
354366
std::shared_lock l(rwlock_);
355367
return rate_limiter_mode_;
@@ -665,6 +677,21 @@ class PikaConf : public pstd::BaseConf {
665677
TryPushDiffCommands("max-conn-rbuf-size", std::to_string(value));
666678
max_conn_rbuf_size_.store(value);
667679
}
680+
void SetConsensusBatchSize(const int value) {
681+
std::lock_guard l(rwlock_);
682+
TryPushDiffCommands("consensus-batch-size", std::to_string(value));
683+
consensus_batch_size_ = value;
684+
}
685+
void SetConsensusTimeout(const int value) {
686+
std::lock_guard l(rwlock_);
687+
TryPushDiffCommands("consensus-timeout", std::to_string(value));
688+
consensus_timeout_ = value;
689+
}
690+
void SetReplicationAckTimeout(const int value) {
691+
std::lock_guard l(rwlock_);
692+
TryPushDiffCommands("replication-ack-timeout", std::to_string(value));
693+
replication_ack_timeout_ = value;
694+
}
668695
void SetMaxCacheFiles(const int& value) {
669696
std::lock_guard l(rwlock_);
670697
TryPushDiffCommands("max-cache-files", std::to_string(value));
@@ -748,6 +775,12 @@ class PikaConf : public pstd::BaseConf {
748775
TryPushDiffCommands("rsync-timeout-ms", std::to_string(value));
749776
rsync_timeout_ms_.store(value);
750777
}
778+
779+
void SetBinlogFsyncSize(const int value) {
780+
std::lock_guard l(rwlock_);
781+
TryPushDiffCommands("binlog-fsync-size", std::to_string(value));
782+
binlog_fsync_size_ = value;
783+
}
751784

752785
void SetProtoMaxBulkLen(const int64_t value) {
753786
std::lock_guard l(rwlock_);
@@ -888,9 +921,6 @@ class PikaConf : public pstd::BaseConf {
888921
int ConfigRewriteSlaveOf();
889922
int ConfigRewriteReplicationID();
890923

891-
int consensus_batch_size() const;
892-
int consensus_timeout() const;
893-
894924
private:
895925
int port_ = 0;
896926
int slave_priority_ = 100;
@@ -982,11 +1012,14 @@ class PikaConf : public pstd::BaseConf {
9821012
int64_t rate_limiter_refill_period_us_ = 0;
9831013
int64_t rate_limiter_fairness_ = 0;
9841014
bool rate_limiter_auto_tuned_ = true;
1015+
int replication_ack_timeout_ = 5000;
9851016

9861017
std::atomic<int> sync_window_size_;
9871018
std::atomic<int> max_conn_rbuf_size_;
9881019
std::atomic<int> consensus_level_;
9891020
std::atomic<int> replication_num_;
1021+
int consensus_batch_size_ = 1000;
1022+
int consensus_timeout_ = 1500;
9901023

9911024
std::string network_interface_;
9921025

@@ -1050,6 +1083,7 @@ class PikaConf : public pstd::BaseConf {
10501083
int throttle_bytes_per_second_ = 200 << 20; // 200MB/s
10511084
int max_rsync_parallel_num_ = kMaxRsyncParallelNum;
10521085
std::atomic_int64_t rsync_timeout_ms_ = 1000;
1086+
int binlog_fsync_size_ = 8 * 1024 * 1024;
10531087

10541088
/*
10551089
kUninitialized = 0, // unknown setting
@@ -1068,10 +1102,6 @@ class PikaConf : public pstd::BaseConf {
10681102

10691103
//Internal used metrics Persisted by pika.conf
10701104
std::unordered_set<std::string> internal_used_unfinished_full_sync_;
1071-
1072-
// Consensus configuration
1073-
int consensus_batch_size_;
1074-
int consensus_timeout_;
10751105
};
10761106

10771107
#endif

include/pika_consensus.h

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#include "include/pika_slave_node.h"
1414
#include "include/pika_stable_log.h"
1515
#include "pstd/include/env.h"
16+
#include <future>
1617

1718
class Context : public pstd::noncopyable {
1819
public:
@@ -129,7 +130,7 @@ class Log {
129130

130131
Log();
131132
int Size();
132-
void AppendLog(const LogItem& item);
133+
pstd::Status AppendLog(const std::shared_ptr<StableLog>& stable_log, const std::shared_ptr<Cmd>& cmd_ptr);
133134
LogOffset LastOffset();
134135
LogOffset FirstOffset();
135136
LogItem At(int index);
@@ -154,6 +155,7 @@ class ConsensusCoordinator {
154155
// invoked by dbsync process
155156
pstd::Status Reset(const LogOffset& offset);
156157

158+
pstd::Status SyncAndWait();
157159
pstd::Status ProposeLog(const std::shared_ptr<Cmd>& cmd_ptr);
158160
pstd::Status UpdateSlave(const std::string& ip, int port, const LogOffset& start, const LogOffset& end);
159161
pstd::Status AddSlaveNode(const std::string& ip, int port, int session_id);
@@ -249,10 +251,10 @@ class ConsensusCoordinator {
249251
public:
250252
void InitContext() { context_->Init(); }
251253
bool checkFinished(const LogOffset& offset);
252-
pstd::Status AppendEntries(const std::shared_ptr<Cmd>& cmd_ptr, LogOffset& cur_logoffset);
254+
pstd::Status AppendEntries(const std::shared_ptr<Cmd>& cmd_ptr);
253255
void SetConsistency(bool is_consistency);
254256
bool GetISConsistency();
255-
pstd::Status SendBinlog(std::shared_ptr<SlaveNode> slave_ptr, std::string db_name);
257+
pstd::Status SendBinlog(const std::shared_ptr<SlaveNode>& slave_ptr, const std::string& db_name);
256258
pstd::Status Truncate(const LogOffset& offset);
257259
pstd::Status AppendSlaveEntries(const std::shared_ptr<Cmd>& cmd_ptr, const BinlogItem& attribute);
258260
pstd::Status CommitAppLog(const LogOffset& master_committed_id);
@@ -282,9 +284,19 @@ class ConsensusCoordinator {
282284
pstd::CondVar* GetCommittedIdCv() { return &committed_id_cv_; }
283285

284286
private:
285-
pstd::Status PersistAppendBinlog(const std::shared_ptr<Cmd>& cmd_ptr, LogOffset& cur_offset);
287+
void SyncBinlogLoop();
288+
pstd::Status PersistAppendBinlog(const std::shared_ptr<Cmd>& cmd_ptr);
286289

287290
private:
291+
// For async sync
292+
pstd::Mutex sync_mu_;
293+
pstd::CondVar sync_cv_;
294+
std::atomic<bool> needs_sync_ = false;
295+
std::atomic<bool> thread_stop_ = false;
296+
std::thread sync_thread_;
297+
pstd::Mutex promises_mu_;
298+
std::vector<std::promise<pstd::Status>> sync_promises_;
299+
288300
std::shared_mutex is_consistency_rwlock_;
289301
bool is_consistency_ = false;
290302
std::shared_mutex committed_id_rwlock_;
@@ -294,6 +306,7 @@ class ConsensusCoordinator {
294306
std::shared_mutex prepared_id__rwlock_;
295307
LogOffset prepared_id_ = LogOffset();
296308
std::shared_ptr<Log> logs_;
309+
int binlog_fsync_counter_ = 0;
297310
};
298311

299312
#endif // INCLUDE_PIKA_CONSENSUS_H_

include/pika_repl_bgworker.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ class PikaReplBgWorker {
4242
net::RedisParser redis_parser_;
4343
std::string ip_port_;
4444
std::string db_name_;
45+
int binlog_fsync_counter_ = 0;
4546

4647
private:
4748
net::BGThread bg_thread_;

include/pika_repl_server_conn.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ class PikaReplServerConn : public net::PbConn {
2121
PikaReplServerConn(int fd, const std::string& ip_port, net::Thread* thread, void* worker_specific_data,
2222
net::NetMultiplexer* mpx);
2323
~PikaReplServerConn() override;
24-
2524
static void HandleMetaSyncRequest(void* arg);
2625
static void HandleTrySyncRequest(void* arg);
2726

include/pika_rm.h

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
#define kSendKeepAliveTimeout (2 * 1000000)
3131
#define kRecvKeepAliveTimeout (20 * 1000000)
3232

33+
std::string CreateBatchFromTasks(const std::vector<WriteTask>& tasks);
3334

3435
class SyncDB {
3536
public:
@@ -83,11 +84,11 @@ class SyncMasterDB : public SyncDB {
8384
return coordinator_.StableLogger()->Logger();
8485
}
8586

87+
std::shared_ptr<SlaveNode> GetSlaveNode(const std::string& ip, int port);
8688
private:
8789
// invoker need to hold slave_mu_
8890
pstd::Status ReadBinlogFileToWq(const std::shared_ptr<SlaveNode>& slave_ptr);
8991

90-
std::shared_ptr<SlaveNode> GetSlaveNode(const std::string& ip, int port);
9192
std::unordered_map<std::string, std::shared_ptr<SlaveNode>> GetAllSlaveNodes();
9293

9394
pstd::Mutex session_mu_;
@@ -112,8 +113,7 @@ class SyncMasterDB : public SyncDB {
112113
pstd::Status UpdateCommittedID();
113114
pstd::Status CommitAppLog(const LogOffset& master_committed_id);
114115
pstd::Status Truncate(const LogOffset& offset);
115-
116-
116+
pstd::Status SyncBinlogAndWait();
117117
};
118118

119119
class SyncSlaveDB : public SyncDB {
@@ -245,8 +245,15 @@ class PikaReplicaManager {
245245

246246
pstd::Mutex write_queue_mu_;
247247

248+
// db_name -> a queue of write task
249+
using DBWriteTaskQueue = std::map<std::string, std::queue<WriteTask>>;
250+
// ip:port -> a map of DBWriteTaskQueue
251+
using SlaveWriteTaskQueue = std::map<std::string, DBWriteTaskQueue>;
252+
248253
// every host owns a queue, the key is "ip + port"
249-
std::unordered_map<std::string, std::unordered_map<std::string, std::queue<std::pair<WriteTask, uint64_t>>>> write_queues_;
254+
SlaveWriteTaskQueue write_queues_;
255+
256+
// client for replica
250257
std::unique_ptr<PikaReplClient> pika_repl_client_;
251258
std::unique_ptr<PikaReplServer> pika_repl_server_;
252259
};

include/pika_slave_node.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,12 @@ class SlaveNode : public RmNode {
7878
pstd::Status Update(const LogOffset& start, const LogOffset& end, LogOffset* updated_offset);
7979

8080
pstd::Mutex slave_mu;
81+
82+
// For replication batch
83+
std::vector<WriteTask> task_buffer_;
84+
uint64_t buffer_start_time_us_ = 0;
85+
uint64_t ack_timeout_start_time_us_ = 0;
86+
uint64_t pending_since_us_ = 0;
8187
};
8288

8389
#endif // PIKA_SLAVE_NODE_H

src/pika_auxiliary_thread.cc

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include "include/pika_define.h"
99
#include "include/pika_rm.h"
1010
#include "include/pika_server.h"
11+
#include "include/pika_conf.h"
1112

1213
extern PikaServer* g_pika_server;
1314
extern std::unique_ptr<PikaReplicaManager> g_pika_rm;
@@ -34,17 +35,23 @@ void* PikaAuxiliaryThread::ThreadMain() {
3435

3536
g_pika_server->CheckLeaderProtectedMode();
3637

37-
// TODO(whoiami) timeout
38-
s = g_pika_server->TriggerSendBinlogSync();
39-
if (!s.ok()) {
40-
LOG(WARNING) << s.ToString();
41-
}
42-
// send to peer
38+
// send to peer first if there are queued packets
4339
int res = g_pika_server->SendToPeer();
4440
if (res == 0) {
45-
// sleep 100 ms
41+
// idle: wait for a short period (consensus-timeout) or notification
4642
std::unique_lock lock(mu_);
47-
cv_.wait_for(lock, 100ms);
43+
auto to = std::chrono::milliseconds(g_pika_conf->consensus_timeout());
44+
if (to.count() <= 0) {
45+
to = 10ms;
46+
}
47+
cv_.wait_for(lock, to);
48+
// after wait, trigger replication send once (size/timeout gating will decide to send or not)
49+
s = g_pika_server->TriggerSendBinlogSync();
50+
if (!s.ok()) {
51+
LOG(WARNING) << s.ToString();
52+
}
53+
// consume what may have been produced by trigger
54+
g_pika_server->SendToPeer();
4855
} else {
4956
// LOG_EVERY_N(INFO, 1000) << "Consume binlog number " << res;
5057
}

0 commit comments

Comments
 (0)