Skip to content

Commit 98e4821

Browse files
author
wuxianrong
committed
constst pika
1 parent d6c0db3 commit 98e4821

18 files changed

Lines changed: 202 additions & 98 deletions

CMakeLists.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -457,9 +457,9 @@ set(LZ4_INCLUDE_DIR ${INSTALL_INCLUDEDIR})
457457
ExternalProject_Add(zlib
458458
DEPENDS
459459
URL
460-
https://github.com/madler/zlib/releases/download/v1.2.13/zlib-1.2.13.tar.gz
460+
https://github.com/madler/zlib/releases/download/v1.3.1/zlib-1.3.1.tar.gz
461461
URL_HASH
462-
MD5=9b8aa094c4e5765dabf4da391f00d15c
462+
MD5=9855b6d802d7fe5b7bd5b196a2271655
463463
DOWNLOAD_NO_PROGRESS
464464
1
465465
UPDATE_COMMAND

conf/pika.conf

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ slow-cmd-thread-pool-size : 1
4242
admin-thread-pool-size : 2
4343

4444
# Slow cmd list e.g. hgetall, mset
45-
slow-cmd-list :
45+
slow-cmd-list :
4646

4747
# List of commands considered as administrative. These commands will be handled by the admin thread pool. Modify this list as needed.
4848
# Default commands: info, ping, monitor
@@ -95,7 +95,7 @@ proto-max-bulk-len : 512M
9595
# If <= 0, a proper value is automatically calculated.
9696
# (usually 1/8 of writer-buffer-size, rounded up to a multiple of 4KB)
9797
# Supported Units [K|M|G], arena-block-size default unit is in [bytes].
98-
arena-block-size :
98+
arena-block-size :
9999

100100
# Timeout of Pika's connection, counting down starts When there are no requests
101101
# on a connection (it enters sleep state), when the countdown reaches 0, the connection
@@ -109,12 +109,12 @@ timeout : 60
109109
# [NOTICE] If this admin password is the same as user password (including both being empty),
110110
# in this scenario, users are not subject to the restrictions imposed by the userblacklist.
111111
# PS: "user password" refers to value of the parameter below: userpass.
112-
requirepass :
112+
requirepass :
113113

114114
# Password for replication verify, used for authentication when a slave
115115
# connects to a master to request replication.
116116
# [NOTICE] The value of this parameter must match the "requirepass" setting on the master.
117-
masterauth :
117+
masterauth :
118118

119119
# The [password of user], which is empty by default.
120120
# [NOTICE] If this user password is the same as admin password (including both being empty),
@@ -174,7 +174,7 @@ command-batch-enabled : yes
174174

175175
# The Prefix of dump file's name.
176176
# All the files that generated by command "bgsave" will be name with this prefix.
177-
dump-prefix :
177+
dump-prefix :
178178

179179
# daemonize [yes | no].
180180
#daemonize : yes
@@ -555,13 +555,13 @@ cache-num : 16
555555
# cache-model 0:cache_none 1:cache_read
556556
cache-model : 1
557557
# cache-type: string, set, zset, list, hash, bit
558-
cache-type: string, set, zset, list, hash, bit
558+
cache-type : string, set, zset, list, hash, bit
559559

560560
# Set the maximum number of elements in the cache of the Set, list, Zset data types
561-
cache-value-item-max-size: 1024
561+
cache-value-item-max-size : 1024
562562

563563
# Sets the maximum number of bytes for Key when the String data type is updated in the cache
564-
max-key-size-in-cache: 1048576
564+
max-key-size-in-cache : 1048576
565565

566566
# Maximum number of keys in the zset redis cache
567567
# On the disk DB, a zset field may have many fields. In the memory cache, we limit the maximum
@@ -593,10 +593,10 @@ cache-maxmemory : 10737418240
593593
cache-maxmemory-policy : 1
594594

595595
# cache-maxmemory-samples
596-
cache-maxmemory-samples: 5
596+
cache-maxmemory-samples : 5
597597

598598
# cache-lfu-decay-time
599-
cache-lfu-decay-time: 1
599+
cache-lfu-decay-time : 1
600600

601601

602602
# is possible to manage access to Pub/Sub channels with ACL rules as well. The
@@ -650,12 +650,12 @@ cache-lfu-decay-time: 1
650650
# 'internal-used-unfinished-full-sync' is used to generate a metric 'is_eligible_for_master_election'
651651
# which serves for the scenario of codis-pika cluster reelection
652652
# You'd better [DO NOT MODIFY IT UNLESS YOU KNOW WHAT YOU ARE DOING]
653-
internal-used-unfinished-full-sync :
653+
internal-used-unfinished-full-sync :
654654

655655
# for wash data from 4.0.0 to 4.0.1
656656
# https://github.com/OpenAtomFoundation/pika/issues/2886
657657
# default value: true
658-
wash-data: true
658+
wash-data : true
659659

660660
# Pika automatic compact compact strategy, a complement to rocksdb compact.
661661
# Trigger the compact background task periodically according to `compact-interval`
@@ -690,3 +690,6 @@ dont-compact-sst-created-in-seconds : 20
690690
# According to the number of sst files in rocksdb,
691691
# compact every `compact-every-num-of-files` file.
692692
best-delete-min-ratio : 10
693+
# Generated by ReplicationID CONFIG REWRITE
694+
replication-id : d605afa1b464ddf3e571966482dd934ec7336a4bac49aa0a6b
695+
run-id : 58ec490577bb7defd64f6fe642d7609af67896b1

include/pika_binlog.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ class Binlog : public pstd::noncopyable {
5757
pstd::Status Put(const std::string& item, LogOffset *cur_logoffset,std::string& binlog);
5858
pstd::Status IsOpened();
5959
pstd::Status GetProducerStatus(uint32_t* filenum, uint64_t* pro_offset, uint32_t* term = nullptr, uint64_t* logic_id = nullptr);
60+
pstd::WritableFile* GetQueue() { return queue_.get(); }
6061
/*
6162
* Set Producer pro_num and pro_offset with lock
6263
*/

include/pika_consensus.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -288,15 +288,15 @@ class ConsensusCoordinator {
288288
context_->UpdateAppliedIndex(committed_id_);
289289
}
290290
notification_counter_.fetch_add(1);
291-
LOG(INFO) << "SetCommittedId: Updated to " << offset.ToString();
291+
//LOG(INFO) << "Master SetCommittedId: Updated to " << offset.ToString();
292292
}
293293

294294
private:
295295
pstd::Status PersistAppendBinlog(const std::shared_ptr<Cmd>& cmd_ptr, LogOffset& cur_offset);
296296

297297
private:
298298
std::shared_mutex is_consistency_rwlock_;
299-
bool is_consistency_ = false;
299+
bool is_consistency_ = true;
300300
std::shared_mutex committed_id_rwlock_;
301301
LogOffset committed_id_ = LogOffset();
302302
std::atomic<uint64_t> notification_counter_{0};

include/pika_rm.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,9 @@
3333
#define kBinlogSendBatchNum 100
3434

3535
// unit seconds
36-
#define kSendKeepAliveTimeout (2 * 1000000)
37-
#define kRecvKeepAliveTimeout (20 * 1000000)
36+
// WXR
37+
#define kSendKeepAliveTimeout (100 * 1000000)
38+
#define kRecvKeepAliveTimeout (200 * 1000000)
3839

3940

4041
class SyncDB {
@@ -302,7 +303,7 @@ class PikaReplicaManager {
302303

303304
// Last committed ID for RocksDB thread processing
304305
LogOffset last_committed_id_;
305-
std::mutex last_committed_id_mutex_;
306+
std::shared_mutex last_committed_id_mutex_;
306307

307308
// Background thread processing methods
308309
void StartCommandQueueThread();

src/net/src/bg_thread.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include "net/include/bg_thread.h"
77
#include <cstdlib>
88
#include <mutex>
9+
#include <glog/logging.h>
910

1011
namespace net {
1112

@@ -79,7 +80,7 @@ void* BGThread::ThreadMain() {
7980
while (!should_stop()) {
8081
std::unique_lock lock(mu_);
8182

82-
rsignal_.wait(lock, [this]() { return !queue_.empty() || !timer_queue_.empty() || should_stop(); });
83+
rsignal_.wait(lock, [this]() { return !queue_.empty() || should_stop(); });
8384

8485
if (should_stop()) {
8586
break;

src/net/src/pb_conn.cc

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,10 @@ ReadStatus PbConn::GetRequest() {
4242
case kHeader: {
4343
int quickack = 1;
4444
ssize_t nread = read(fd(), rbuf_ + cur_pos_, COMMAND_HEADER_LENGTH - cur_pos_);
45-
setsockopt(fd(), IPPROTO_TCP, TCP_QUICKACK, &quickack, sizeof(quickack));
45+
#ifdef __linux__
46+
int quickack = 1;
47+
setsockopt(fd(), IPPROTO_TCP, TCP_QUICKACK, &quickack, sizeof(quickack));
48+
#endif
4649
if (nread == -1) {
4750
if (errno == EAGAIN) {
4851
return kReadHalf;
@@ -82,7 +85,10 @@ ReadStatus PbConn::GetRequest() {
8285
// read msg body
8386
ssize_t nread = read(fd(), rbuf_ + cur_pos_, remain_packet_len_);
8487
int quickack = 1;
85-
setsockopt(fd(), IPPROTO_TCP, TCP_QUICKACK, &quickack, sizeof(quickack));
88+
#ifdef __linux__
89+
int quickack = 1;
90+
setsockopt(fd(), IPPROTO_TCP, TCP_QUICKACK, &quickack, sizeof(quickack));
91+
#endif
8692
if (nread == -1) {
8793
if (errno == EAGAIN) {
8894
return kReadHalf;
@@ -131,7 +137,7 @@ WriteStatus PbConn::SendReply() {
131137
while (item_len - write_buf_.item_pos_ > 0) {
132138
nwritten = write(fd(), item.data() + write_buf_.item_pos_, item_len - write_buf_.item_pos_);
133139
if (nwritten <= 0) {
134-
LOG(ERROR) << "nwritten less than 0";
140+
//LOG(ERROR) << "nwritten less than 0";
135141
break;
136142
}
137143
g_network_statistic->IncrReplOutputBytes(nwritten);

src/pika_auxiliary_thread.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,13 @@ void* PikaAuxiliaryThread::ThreadMain() {
3535
g_pika_server->CheckLeaderProtectedMode();
3636

3737
// TODO(whoiami) timeout
38+
// 将增量数据写入 Write_queue_
3839
s = g_pika_server->TriggerSendBinlogSync();
3940
if (!s.ok()) {
4041
LOG(WARNING) << s.ToString();
4142
}
4243
// send to peer
44+
// 将 Write_queue 中的数据发送给从节点
4345
int res = g_pika_server->SendToPeer();
4446
if (res == 0) {
4547
// sleep 100 ms

src/pika_binlog.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,9 @@ Status Binlog::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n, int*
298298
if (s.ok()) {
299299
s = queue_->Append(pstd::Slice(ptr, n));
300300
if (s.ok()) {
301-
s = queue_->Flush();
301+
//LOG(INFO) << "EmitPhysicalRecord Flush";
302+
//s = queue_->Sync();
303+
//s = queue_->Flush();
302304
}
303305
}
304306
block_offset_ += static_cast<int32_t>(kHeaderSize + n);

src/pika_client_conn.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,12 +224,13 @@ std::shared_ptr<Cmd> PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const st
224224
if (c_ptr->is_write() && g_pika_conf->command_batch_enabled()) {
225225
// Get the appropriate SyncMasterDB for command batching
226226
auto sync_db = g_pika_rm->GetSyncMasterDBByName(DBInfo(c_ptr->db_name()));
227+
// 查看 DB 是不是 Master DB
227228
if (sync_db) {
228229
auto command_collector = sync_db->GetCommandCollector();
229230
if (command_collector) {
230231
// Create callback to handle command completion
231232
auto callback = [this, c_ptr](const LogOffset& offset, pstd::Status status) {
232-
LOG(INFO) << "Command completed";
233+
//LOG(INFO) << "Command completed";
233234
auto pc = dynamic_cast<PikaClientConn*>(c_ptr->GetConn().get());
234235
if (pc) {
235236
auto resp_ptr = c_ptr->GetResp();

0 commit comments

Comments
 (0)