Skip to content

Commit 854eebf

Browse files
committed
Fix the low qps bug
1 parent 3a3760a commit 854eebf

12 files changed

Lines changed: 184 additions & 171 deletions

conf/pika.conf

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -153,13 +153,6 @@ replication-num : 0
153153
# The default value of consensus-level is 0, which means this feature is not enabled.
154154
consensus-level : 0
155155

156-
# consensus-batch-size: The maximum number of items in a consensus batch.
157-
consensus-batch-size : 1000
158-
159-
# consensus-timeout: The timeout in milliseconds for waiting for a batch ACK from a slave.
160-
# Default: 1500
161-
consensus-timeout : 1500
162-
163156
# The Prefix of dump file's name.
164157
# All the files that generated by command "bgsave" will be name with this prefix.
165158
dump-prefix :
@@ -320,6 +313,11 @@ write-binlog : yes
320313
# Supported Units [K|M|G], binlog-file-size default unit is in [bytes] and the default value is 100M.
321314
binlog-file-size : 104857600
322315

316+
# The interval (in number of logs) for forcing a disk flush of the binlog.
317+
# A value of 1 means fsync for every log. A higher value improves performance at the cost of durability.
318+
# Default: 100
319+
binlog-fsync-interval : 1
320+
323321
# Automatically triggers a small compaction according to statistics
324322
# Use the cache to store up to 'max-cache-statistic-keys' keys
325323
# If 'max-cache-statistic-keys' set to '0', that means turn off the statistics function

include/pika_client_conn.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
#include <bitset>
1010
#include <utility>
11+
#include <future>
1112

1213
#include "acl.h"
1314
#include "include/pika_command.h"
@@ -52,6 +53,13 @@ class PikaClientConn : public net::RedisConn {
5253
bool cache_miss_in_rtc_;
5354
};
5455

56+
struct ParallelTask {
57+
std::vector<std::shared_ptr<std::string>> resps;
58+
std::vector<std::future<void>> futures;
59+
std::atomic<size_t> completed_count{0};
60+
size_t total_tasks{0};
61+
};
62+
5563
struct TxnStateBitMask {
5664
public:
5765
static constexpr uint8_t Start = 0;
@@ -72,6 +80,7 @@ class PikaClientConn : public net::RedisConn {
7280
void BatchExecRedisCmd(const std::vector<net::RedisCmdArgsType>& argvs, bool cache_miss_in_rtc);
7381
int DealMessage(const net::RedisCmdArgsType& argv, std::string* response) override { return 0; }
7482
static void DoBackgroundTask(void* arg);
83+
static void ParallelExecRedisCmd(void* arg);
7584

7685
bool IsPubSub() { return is_pubsub_; }
7786
void SetIsPubSub(bool is_pubsub) { is_pubsub_ = is_pubsub; }

include/pika_conf.h

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -749,6 +749,8 @@ class PikaConf : public pstd::BaseConf {
749749
rsync_timeout_ms_.store(value);
750750
}
751751

752+
int binlog_fsync_interval() const;
753+
752754
void SetProtoMaxBulkLen(const int64_t value) {
753755
std::lock_guard l(rwlock_);
754756
TryPushDiffCommands("proto-max-bulk-len", std::to_string(value));
@@ -888,9 +890,6 @@ class PikaConf : public pstd::BaseConf {
888890
int ConfigRewriteSlaveOf();
889891
int ConfigRewriteReplicationID();
890892

891-
int consensus_batch_size() const;
892-
int consensus_timeout() const;
893-
894893
private:
895894
int port_ = 0;
896895
int slave_priority_ = 100;
@@ -1069,9 +1068,8 @@ class PikaConf : public pstd::BaseConf {
10691068
//Internal used metrics Persisted by pika.conf
10701069
std::unordered_set<std::string> internal_used_unfinished_full_sync_;
10711070

1072-
// Consensus configuration
1073-
int consensus_batch_size_;
1074-
int consensus_timeout_;
1071+
// Binlog fsync interval
1072+
int binlog_fsync_interval_;
10751073
};
10761074

10771075
#endif

include/pika_consensus.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,7 @@ class ConsensusCoordinator {
294294
std::shared_mutex prepared_id__rwlock_;
295295
LogOffset prepared_id_ = LogOffset();
296296
std::shared_ptr<Log> logs_;
297+
int binlog_fsync_counter_ = 0;
297298
};
298299

299300
#endif // INCLUDE_PIKA_CONSENSUS_H_

include/pika_define.h

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -354,14 +354,6 @@ const int64_t kPoolSize = 1073741824;
354354
const std::string kBinlogPrefix = "write2file";
355355
const size_t kBinlogPrefixLen = 10;
356356

357-
/*
358-
* PIKA_BATCH_MAGIC: Core identifier for binlog batch processing.
359-
* - Master: Prefixes batched binlogs with this magic in SendBinlog
360-
* - Slave: Detects this magic in HandleBGWorkerWriteBinlog
361-
* to switch between batch and single-binlog parsing modes.
362-
*/
363-
const uint32_t PIKA_BATCH_MAGIC = 0x42544348; // "BTCH" in ASCII
364-
365357
const std::string kPikaMeta = "meta";
366358
const std::string kManifest = "manifest";
367359
const std::string kContext = "context";

include/pika_repl_bgworker.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ class PikaReplBgWorker {
4242
net::RedisParser redis_parser_;
4343
std::string ip_port_;
4444
std::string db_name_;
45+
int binlog_fsync_counter_ = 0;
4546

4647
private:
4748
net::BGThread bg_thread_;

include/pika_rm.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,8 +245,15 @@ class PikaReplicaManager {
245245

246246
pstd::Mutex write_queue_mu_;
247247

248+
// db_name -> a queue of write task
249+
using DBWriteTaskQueue = std::map<std::string, std::queue<WriteTask>>;
250+
// ip:port -> a map of DBWriteTaskQueue
251+
using SlaveWriteTaskQueue = std::map<std::string, DBWriteTaskQueue>;
252+
248253
// every host owns a queue, the key is "ip + port"
249-
std::unordered_map<std::string, std::unordered_map<std::string, std::queue<std::pair<WriteTask, uint64_t>>>> write_queues_;
254+
SlaveWriteTaskQueue write_queues_;
255+
256+
// client for replica
250257
std::unique_ptr<PikaReplClient> pika_repl_client_;
251258
std::unique_ptr<PikaReplServer> pika_repl_server_;
252259
};

src/pika_client_conn.cc

Lines changed: 60 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include "net/src/dispatch_thread.h"
2020
#include "net/src/worker_thread.h"
2121
#include "src/pstd/include/scope_record_lock.h"
22+
#include <future>
2223

2324
#include "rocksdb/perf_context.h"
2425
#include "rocksdb/iostats_context.h"
@@ -55,6 +56,8 @@ std::shared_ptr<Cmd> PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const st
5556
c_ptr->SetConn(shared_from_this());
5657
c_ptr->SetResp(resp_ptr);
5758

59+
LOG(INFO) << "PikaClientConn::DoCmd command: " << c_ptr->name() << ", keys: " << c_ptr->current_key().size();
60+
5861
// Check authed
5962
if (AuthRequired()) { // the user is not authed, need to do auth
6063
if (!(c_ptr->flag() & kCmdFlagsNoAuth)) {
@@ -357,14 +360,64 @@ void PikaClientConn::DoBackgroundTask(void* arg) {
357360
}
358361

359362
void PikaClientConn::BatchExecRedisCmd(const std::vector<net::RedisCmdArgsType>& argvs, bool cache_miss_in_rtc) {
360-
resp_num.store(static_cast<int32_t>(argvs.size()));
361-
for (const auto& argv : argvs) {
362-
std::shared_ptr<std::string> resp_ptr = std::make_shared<std::string>();
363-
resp_array.push_back(resp_ptr);
364-
ExecRedisCmd(argv, resp_ptr, cache_miss_in_rtc);
363+
if (argvs.empty()) {
364+
return;
365365
}
366-
time_stat_->process_done_ts_ = pstd::NowMicros();
367-
TryWriteResp();
366+
if (argvs.size() > 1) {
367+
auto task = std::make_shared<ParallelTask>();
368+
task->total_tasks = argvs.size();
369+
task->resps.resize(argvs.size());
370+
371+
for (size_t i = 0; i < argvs.size(); ++i) {
372+
task->resps[i] = std::make_shared<std::string>();
373+
std::promise<void> promise;
374+
task->futures.push_back(promise.get_future());
375+
376+
g_pika_server->ScheduleClientPool(&PikaClientConn::ParallelExecRedisCmd, new std::tuple(shared_from_this(), argvs[i], task, i, std::move(promise), cache_miss_in_rtc), false, false);
377+
}
378+
379+
for (auto& f : task->futures) {
380+
f.get();
381+
}
382+
383+
for (const auto& resp : task->resps) {
384+
WriteResp(*resp);
385+
}
386+
if (write_completed_cb_) {
387+
write_completed_cb_();
388+
write_completed_cb_ = nullptr;
389+
}
390+
NotifyEpoll(true);
391+
} else {
392+
resp_num.store(static_cast<int32_t>(argvs.size()));
393+
for (const auto& argv : argvs) {
394+
std::shared_ptr<std::string> resp_ptr = std::make_shared<std::string>();
395+
resp_array.push_back(resp_ptr);
396+
ExecRedisCmd(argv, resp_ptr, cache_miss_in_rtc);
397+
}
398+
time_stat_->process_done_ts_ = pstd::NowMicros();
399+
TryWriteResp();
400+
}
401+
}
402+
403+
void PikaClientConn::ParallelExecRedisCmd(void* arg) {
404+
auto* task_args = static_cast<std::tuple<std::shared_ptr<PikaClientConn>, net::RedisCmdArgsType, std::shared_ptr<ParallelTask>, size_t, std::promise<void>, bool>*>(arg);
405+
auto [conn, argv, task, index, promise, cache_miss_in_rtc] = std::move(*task_args);
406+
delete task_args;
407+
408+
std::string opt = argv[0];
409+
pstd::StringToLower(opt);
410+
if (opt == kClusterPrefix) {
411+
if (argv.size() >= 2) {
412+
opt += argv[1];
413+
pstd::StringToLower(opt);
414+
}
415+
}
416+
417+
std::shared_ptr<Cmd> cmd_ptr = conn->DoCmd(argv, opt, task->resps[index], cache_miss_in_rtc);
418+
*(task->resps[index]) = std::move(cmd_ptr->res().message());
419+
420+
promise.set_value();
368421
}
369422

370423
bool PikaClientConn::ReadCmdInCache(const net::RedisCmdArgsType& argv, const std::string& opt) {

src/pika_conf.cc

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -528,6 +528,10 @@ int PikaConf::Load() {
528528
if (binlog_file_size_ < 1024 || static_cast<int64_t>(binlog_file_size_) > (1024LL * 1024 * 1024)) {
529529
binlog_file_size_ = 100 * 1024 * 1024; // 100M
530530
}
531+
GetConfInt("binlog-fsync-interval", &binlog_fsync_interval_);
532+
if (binlog_fsync_interval_ < 0) {
533+
binlog_fsync_interval_ = 0;
534+
}
531535
GetConfStr("pidfile", &pidfile_);
532536

533537
// db sync
@@ -707,11 +711,6 @@ int PikaConf::Load() {
707711
rsync_timeout_ms_.store(tmp_rsync_timeout_ms);
708712
}
709713

710-
consensus_batch_size_ = 100;
711-
consensus_timeout_ = 1000; // 1s
712-
713-
GetConfInt("consensus-batch-size", &consensus_batch_size_);
714-
GetConfInt("consensus-timeout", &consensus_timeout_);
715714
return ret;
716715
}
717716

@@ -918,6 +917,4 @@ std::vector<rocksdb::CompressionType> PikaConf::compression_per_level() {
918917
return types;
919918
}
920919

921-
int PikaConf::consensus_batch_size() const { return consensus_batch_size_; }
922-
923-
int PikaConf::consensus_timeout() const { return consensus_timeout_; }
920+
int PikaConf::binlog_fsync_interval() const { return binlog_fsync_interval_; }

0 commit comments

Comments
 (0)