Skip to content
Closed
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,7 @@ aux_source_directory(src DIR_SRCS)

# # generate version
string(TIMESTAMP TS "%Y-%m-%d %H:%M:%S" UTC)
set(PIKA_BUILD_DATE "${TS}" CACHE STRING "the time we first built pika")
set(PIKA_BUILD_DATE "${TS}")

find_package(Git)

Expand Down
7 changes: 5 additions & 2 deletions codis/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@

export GO111MODULE=on

build-all: codis-dashboard codis-proxy codis-admin codis-ha codis-fe
build-all: codis-dashboard codis-proxy codis-admin codis-ha codis-fe generate-version

PRJ_ROOT=${CURDIR}


codis-deps:
@mkdir -p ./bin && go version

codis-dashboard: codis-deps
$(info build codis-dashboard)
@cd ${PRJ_ROOT}/cmd/dashboard && go mod tidy && go build -buildvcs=false -o ${PRJ_ROOT}/bin/codis-dashboard .
Expand All @@ -33,6 +32,10 @@ codis-fe: codis-deps
@cd ${PRJ_ROOT}/cmd/fe && go mod tidy && go build -buildvcs=false -o ${PRJ_ROOT}/bin/codis-fe .
@rm -rf ${PRJ_ROOT}/bin/assets && cp -rf ${PRJ_ROOT}/cmd/fe/assets ./bin/

generate-version:
$(info generate version)
@./version
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Added new target to generate version information

The new generate-version target executes a script located at ./version to generate version information for the build artifacts.


🏁 Script executed:

#!/bin/bash
# Check if the version script exists and is executable
if [ -f ./version ]; then
  echo "Version script exists"
  if [ -x ./version ]; then
    echo "Version script is executable"
  else
    echo "Version script is not executable"
  fi
else
  echo "Version script does not exist"
fi

Length of output: 91


Action Required: Missing 'version' Script File

The generate-version target in codis/Makefile (lines 35-37) calls ./version to generate version information. However, our verification script confirms that the ./version file does not exist in the repository. Please ensure that the missing script is added or update the Makefile target to reference the correct script.


clean:
$(info ...Clean Start!)
@rm -rf bin
13 changes: 5 additions & 8 deletions codis/pkg/topom/topom_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,14 +381,11 @@ func (s *Topom) tryFixReplicationRelationship(group *models.Group, groupServer *
}
}
} else {
// skip if it has right replication relationship
if state.Replication.GetMasterAddr() == curMasterAddr {
return nil
}

// current server is slave, execute the command `slaveof [new master ip] [new master port] force`
if err = updateMasterToNewOneForcefully(groupServer.Addr, curMasterAddr, s.config.ProductAuth); err != nil {
return err
if state.Replication.GetMasterAddr() != curMasterAddr {
// current server is slave, execute the command `slaveof [new master ip] [new master port]`
if err = updateMasterToNewOne(groupServer.Addr, curMasterAddr, s.config.ProductAuth); err != nil {
return err
}
}
}

Expand Down
27 changes: 23 additions & 4 deletions codis/pkg/topom/topom_sentinel.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,36 @@ func (s *Topom) checkAndUpdateGroupServerState(conf *Config, group *models.Group
}

// Start the election master node
if groupServer.State == models.GroupServerStateOffline && isGroupMaster(state, group) {
*masterOfflineGroups = append(*masterOfflineGroups, group)
} else {
*slaveOfflineGroups = append(*slaveOfflineGroups, group)
// Currently, both primary and secondary nodes have subjective and objective logics.
// If it is subjective, we will not perform any operation. If more than 10 probe counts
// fail, it is defined as objective logics, If it is an objective offline, we will add the
// node to masterOfflineGroups or slaveOfflineGroups respectively, and then notify the Proxy
// to change the meta information
if groupServer.State == models.GroupServerStateOffline {
if isGroupMaster(state, group) {
*masterOfflineGroups = append(*masterOfflineGroups, group)
} else {
*slaveOfflineGroups = append(*slaveOfflineGroups, group)
}
}
}
} else {
if groupServer.State == models.GroupServerStateOffline {
*recoveredGroupServers = append(*recoveredGroupServers, state)
// update GroupServer to GroupServerStateNormal state later
} else {
// This may contains any of following condition:
// 1. groupServer.State is Normal
// 2. groupServer.State is GroupServerStateSubjectiveOffline and is Master
// 3. groupServer.State is GroupServerStateSubjectiveOffline and is Slave
// for condition 3, if current server's previous state is SubjectiveOffline
// and has been added to slaveofflinegroups before,
// should also resync mappings to proxy to enable replicationgroup
if groupServer.State == models.GroupServerStateSubjectiveOffline &&
!isGroupMaster(state, group) &&
group.OutOfSync {
*recoveredGroupServers = append(*recoveredGroupServers, state)
}
// Update the offset information of the state and role nodes
groupServer.State = models.GroupServerStateNormal
groupServer.ReCallTimes = 0
Expand Down
4 changes: 2 additions & 2 deletions codis/pkg/utils/redis/sentinel.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ func (i *InfoReplication) UnmarshalJSON(b []byte) error {
}

i.Role = kvmap["role"]
i.MasterPort = kvmap["master_host"]
i.MasterHost = kvmap["master_port"]
i.MasterPort = kvmap["master_port"]
i.MasterHost = kvmap["master_host"]
Comment on lines +109 to +110
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fixed critical field assignment bug in UnmarshalJSON

This change corrects a critical bug where master host and port values were previously swapped during JSON unmarshalling. The fix ensures proper assignment of host and port values, preventing potential connection issues between master and slave servers.

i.MasterLinkStatus = kvmap["master_link_status"]
i.IsEligibleForMasterElection = kvmap["is_eligible_for_master_election"] == "true"

Expand Down
6 changes: 6 additions & 0 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,12 @@ cache-model : 1
# cache-type: string, set, zset, list, hash, bit
cache-type: string, set, zset, list, hash, bit

# Set the maximum number of elements in the cache of the Set, list, Zset data types
cache-value-item-max-size: 1024

# Sets the maximum number of bytes for Key when the String data type is updated in the cache
max-key-size-in-cache: 1048576

# Maximum number of keys in the zset redis cache
# On the disk DB, a zset field may have many fields. In the memory cache, we limit the maximum
# number of keys that can exist in a zset, which is zset-zset-cache-field-num-per-key, with a
Expand Down
6 changes: 5 additions & 1 deletion include/pika_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ class PikaCache : public pstd::noncopyable, public std::enable_shared_from_this<
rocksdb::Status Appendxx(std::string& key, std::string& value);
rocksdb::Status GetRange(std::string& key, int64_t start, int64_t end, std::string* value);
rocksdb::Status SetRangexx(std::string& key, int64_t start, std::string& value);
rocksdb::Status SetRangeIfKeyExist(std::string& key, int64_t start, std::string &value);
rocksdb::Status Strlen(std::string& key, int32_t* len);

// Hash Commands
Expand All @@ -112,6 +113,7 @@ class PikaCache : public pstd::noncopyable, public std::enable_shared_from_this<
rocksdb::Status HIncrbyfloatxx(std::string& key, std::string& field, long double value);
rocksdb::Status HLen(std::string& key, uint64_t* len);
rocksdb::Status HStrlen(std::string& key, std::string& field, uint64_t* len);
rocksdb::Status HMSetIfKeyExist(std::string& key, std::vector<storage::FieldValue> &fvs);

// List Commands
rocksdb::Status LIndex(std::string& key, int64_t index, std::string* element);
Expand All @@ -126,10 +128,12 @@ class PikaCache : public pstd::noncopyable, public std::enable_shared_from_this<
rocksdb::Status LTrim(std::string& key, int64_t start, int64_t stop);
rocksdb::Status RPop(std::string& key, std::string* element);
rocksdb::Status RPush(std::string& key, std::vector<std::string> &values);
rocksdb::Status RPushIfKeyExist(std::string& key, std::vector<std::string> &values);
rocksdb::Status RPushx(std::string& key, std::vector<std::string> &values);
rocksdb::Status RPushnx(std::string& key, std::vector<std::string> &values, int64_t ttl);
rocksdb::Status RPushnxWithoutTTL(std::string& key, std::vector<std::string> &values);

rocksdb::Status LPushIfKeyExist(std::string& key, std::vector<std::string> &values);

// Set Commands
rocksdb::Status SAdd(std::string& key, std::vector<std::string>& members);
rocksdb::Status SAddIfKeyExist(std::string& key, std::vector<std::string>& members);
Expand Down
2 changes: 1 addition & 1 deletion include/pika_client_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class PikaClientConn : public net::RedisConn {
std::shared_ptr<Cmd> DoCmd(const PikaCmdArgsType& argv, const std::string& opt,
const std::shared_ptr<std::string>& resp_ptr, bool cache_miss_in_rtc);

void ProcessSlowlog(const PikaCmdArgsType& argv, uint64_t do_duration);
void ProcessSlowlog(const PikaCmdArgsType& argv, std::shared_ptr<Cmd> c_ptr);
void ProcessMonitor(const PikaCmdArgsType& argv);

void ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr<std::string>& resp_ptr, bool cache_miss_in_rtc);
Expand Down
19 changes: 16 additions & 3 deletions include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,16 @@
#include "net/include/net_conn.h"
#include "net/include/redis_conn.h"
#include "pstd/include/pstd_string.h"
#include "pstd/include/stage_timer.h"

#include "net/src/dispatch_thread.h"

// Declare and set start time of the timer
#define STAGE_TIMER_GUARD(metric, enabled) \
pstd::StageTimer stage_timer_##metric( \
&metric, enabled); \
stage_timer_##metric.Start();

class SyncMasterDB;
class SyncSlaveDB;
class DB;
Expand Down Expand Up @@ -532,7 +539,7 @@ class Cmd : public std::enable_shared_from_this<Cmd> {
// used for execute multikey command into different slots
virtual void Split(const HintKeys& hint_keys) = 0;
virtual void Merge() = 0;
virtual bool IsTooLargeKey(const int &max_sz) { return false; }
virtual bool IsTooLargeKey(const size_t &max_sz) { return false; }

int8_t SubCmdIndex(const std::string& cmdName); // if the command no subCommand,return -1;

Expand All @@ -552,7 +559,7 @@ class Cmd : public std::enable_shared_from_this<Cmd> {
bool IsNeedReadCache() const;
bool IsNeedCacheDo() const;
bool HashtagIsConsistent(const std::string& lhs, const std::string& rhs) const;
uint64_t GetDoDuration() const { return do_duration_; };
virtual std::string StagesDurationSummary(bool exclude_zero_value) const;
std::shared_ptr<DB> GetDB() const { return db_; };
uint32_t AclCategory() const;
void AddAclCategory(uint32_t aclCategory);
Expand Down Expand Up @@ -607,7 +614,13 @@ class Cmd : public std::enable_shared_from_this<Cmd> {
std::weak_ptr<net::NetConn> conn_;
std::weak_ptr<std::string> resp_;
CmdStage stage_ = kNone;
uint64_t do_duration_ = 0;

uint64_t acquire_lock_duration_ms = 0;
uint64_t command_duration_ms = 0;
uint64_t binlog_duration_ms = 0;
uint64_t storage_duration_ms = 0;
uint64_t cache_duration_ms = 0;

uint32_t cmdId_ = 0;
uint32_t aclCategory_ = 0;
bool cache_missed_in_rtc_{false};
Expand Down
73 changes: 71 additions & 2 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "pstd/include/pstd_string.h"

#include "acl.h"
#include "cache/include/config.h"
#include "include/pika_define.h"
#include "rocksdb/compression_type.h"

Expand Down Expand Up @@ -858,6 +859,54 @@ class PikaConf : public pstd::BaseConf {
rsync_timeout_ms_.store(value);
}

int RocksDBPerfLevel() const {
return rocksdb_perf_level_.load();
}

int CacheValueItemMaxSize() const {
return cache_value_item_max_size_.load();
}

bool UpdateCacheValueItemMaxSize(int size) {
if (size >= MAX_CACHE_ITEMS_SIZE || size <= 0) {
return false;
}
cache_value_item_max_size_.store(size);
return true;
}

size_t MaxKeySizeInCache() const {
return max_key_size_in_cache_.load();
}

bool UpdateMaxKeySizeInCache(size_t size) {
if (size >= MAX_CACHE_MAX_KEY_SIZE || size <= 0) {
return false;
}
max_key_size_in_cache_.store(size);
return true;
}

bool UpdateRocksDBPerfLevel(int perf_level) {
if (perf_level >= 6 || perf_level < 0) {
return false;
}
rocksdb_perf_level_.store(perf_level);
return true;
}

int RocksDBPerfPercent() const {
return rocksdb_perf_percent_.load();
}

bool UpdateRocksDBPerfPercent(int percent) {
if (percent > 100 || percent < 0) {
return false;
}
rocksdb_perf_percent_.store(percent);
return true;
}

void SetAclPubsubDefault(const std::string& value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("acl-pubsub-default", value);
Expand Down Expand Up @@ -935,6 +984,7 @@ class PikaConf : public pstd::BaseConf {
int zset_cache_start_direction() { return zset_cache_start_direction_; }
int zset_cache_field_num_per_key() { return zset_cache_field_num_per_key_; }
int max_key_size_in_cache() { return max_key_size_in_cache_; }
int value_item_max_size_in_cache() { return cache_value_item_max_size_; }
int cache_maxmemory_policy() { return cache_maxmemory_policy_; }
int cache_maxmemory_samples() { return cache_maxmemory_samples_; }
int cache_lfu_decay_time() { return cache_lfu_decay_time_; }
Expand All @@ -952,7 +1002,10 @@ class PikaConf : public pstd::BaseConf {
int slow_cmd_thread_pool_size_ = 0;
int admin_thread_pool_size_ = 0;
std::unordered_set<std::string> slow_cmd_set_;
std::unordered_set<std::string> admin_cmd_set_ = {"info", "ping", "monitor"};
// Because the exporter of Pika_exporter implements Auth authentication
// with the Exporter of Pika, and the Exporter authenticates the Auth when
// users connect to Pika, the Auth is added to the management command thread pool
std::unordered_set<std::string> admin_cmd_set_ = {"info", "ping", "monitor", "auth"};
int sync_thread_num_ = 0;
int sync_binlog_thread_num_ = 0;
int expire_dump_days_ = 3;
Expand Down Expand Up @@ -1096,7 +1149,8 @@ class PikaConf : public pstd::BaseConf {
std::atomic_int cache_bit_ = 1;
std::atomic_int zset_cache_start_direction_ = 0;
std::atomic_int zset_cache_field_num_per_key_ = 512;
std::atomic_int max_key_size_in_cache_ = 512;
std::atomic_int cache_value_item_max_size_ = 1024;
std::atomic_size_t max_key_size_in_cache_ = 1024 * 1024;
std::atomic_int cache_maxmemory_policy_ = 1;
std::atomic_int cache_maxmemory_samples_ = 5;
std::atomic_int cache_lfu_decay_time_ = 1;
Expand All @@ -1121,6 +1175,21 @@ class PikaConf : public pstd::BaseConf {
int max_rsync_parallel_num_ = kMaxRsyncParallelNum;
std::atomic_int64_t rsync_timeout_ms_ = 1000;

/*
kUninitialized = 0, // unknown setting
kDisable = 1, // disable perf stats
kEnableCount = 2, // enable only count stats
kEnableTimeExceptForMutex = 3, // Other than count stats, also enable time
// stats except for mutexes
// Other than time, also measure CPU time counters. Still don't measure
// time (neither wall time nor CPU time) for mutexes.
kEnableTimeAndCPUTimeExceptForMutex = 4,
kEnableTime = 5, // enable count and time stats
kOutOfBounds = 6 // N.B. Must always be the last value!
*/
std::atomic_int rocksdb_perf_level_ = 2;
std::atomic_int rocksdb_perf_percent_ = 10;

//Internal used metrics Persisted by pika.conf
std::unordered_set<std::string> internal_used_unfinished_full_sync_;

Expand Down
2 changes: 1 addition & 1 deletion include/pika_hash.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class HGetCmd : public Cmd {
void DoUpdateCache() override;
void Split(const HintKeys& hint_keys) override {};
void Merge() override {};
bool IsTooLargeKey(const int &max_sz) override { return key_.size() > static_cast<uint32_t>(max_sz); }
bool IsTooLargeKey(const size_t &max_sz) override { return key_.size() > max_sz; }
Cmd* Clone() override { return new HGetCmd(*this); }

private:
Expand Down
4 changes: 2 additions & 2 deletions include/pika_kv.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class SetCmd : public Cmd {
void DoThroughDB() override;
void Split(const HintKeys& hint_keys) override{};
void Merge() override{};
bool IsTooLargeKey(const int& max_sz) override { return key_.size() > static_cast<uint32_t>(max_sz); }
bool IsTooLargeKey(const size_t &max_sz) override { return key_.size() > max_sz; }
Cmd* Clone() override { return new SetCmd(*this); }

private:
Expand Down Expand Up @@ -65,7 +65,7 @@ class GetCmd : public Cmd {
void ReadCache() override;
void Split(const HintKeys& hint_keys) override{};
void Merge() override{};
bool IsTooLargeKey(const int &max_sz) override { return key_.size() > static_cast<uint32_t>(max_sz); }
bool IsTooLargeKey(const size_t &max_sz) override { return key_.size() > max_sz; }
Cmd* Clone() override { return new GetCmd(*this); }

private:
Expand Down
Loading
Loading