Skip to content
Closed
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -738,8 +738,8 @@ endif()
aux_source_directory(src DIR_SRCS)

# # generate version
string(TIMESTAMP TS "%Y-%m-%d %H:%M:%S" UTC)
set(PIKA_BUILD_DATE "${TS}" CACHE STRING "the time we first built pika")
string(TIMESTAMP TS "%Y-%m-%d %H:%M:%S")
set(PIKA_BUILD_DATE "${TS}")

find_package(Git)

Expand Down
8 changes: 5 additions & 3 deletions codis/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@

export GO111MODULE=on

build-all: codis-dashboard codis-proxy codis-admin codis-ha codis-fe
build-all: generate-version codis-dashboard codis-proxy codis-admin codis-ha codis-fe

PRJ_ROOT=${CURDIR}

generate-version:
$(info generate version)
@mkdir -p ./bin && ./version

codis-deps:
@mkdir -p ./bin && go version

@go version
codis-dashboard: codis-deps
$(info build codis-dashboard)
@cd ${PRJ_ROOT}/cmd/dashboard && go mod tidy && go build -buildvcs=false -o ${PRJ_ROOT}/bin/codis-dashboard .
Expand Down
7 changes: 5 additions & 2 deletions codis/cmd/dashboard/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,11 @@ Options:
return

case d["--version"].(bool):
fmt.Println("version:", utils.Version)
fmt.Println("compile:", utils.Compile)
fmt.Printf("-----------Codis Dashboard----------\n")
fmt.Println("codis_version:", utils.Version)
fmt.Println("codis_git_sha:", utils.Gitsha)
fmt.Println("codis_build_compile_date:", utils.Compile)
fmt.Println("go version:", utils.GoVersion)
return

}
Expand Down
7 changes: 5 additions & 2 deletions codis/cmd/fe/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,11 @@ Options:
}

if d["--version"].(bool) {
fmt.Println("version:", utils.Version)
fmt.Println("compile:", utils.Compile)
fmt.Printf("-----------Codis FE----------\n")
fmt.Println("codis_version:", utils.Version)
fmt.Println("codis_git_sha:", utils.Gitsha)
fmt.Println("codis_build_compile_date:", utils.Compile)
fmt.Println("go version:", utils.GoVersion)
return
}

Expand Down
7 changes: 5 additions & 2 deletions codis/cmd/ha/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,11 @@ Options:
}

if d["--version"].(bool) {
fmt.Println("version:", utils.Version)
fmt.Println("compile:", utils.Compile)
fmt.Printf("-----------Codis HA----------\n")
fmt.Println("codis_version:", utils.Version)
fmt.Println("codis_git_sha:", utils.Gitsha)
fmt.Println("codis_build_compile_date:", utils.Compile)
fmt.Println("go version:", utils.GoVersion)
return
}

Expand Down
7 changes: 5 additions & 2 deletions codis/cmd/proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,11 @@ Options:
return

case d["--version"].(bool):
fmt.Println("version:", utils.Version)
fmt.Println("compile:", utils.Compile)
fmt.Printf("-----------Codis Proxy----------\n")
fmt.Println("codis_version:", utils.Version)
fmt.Println("codis_git_sha:", utils.Gitsha)
fmt.Println("codis_build_compile_date:", utils.Compile)
fmt.Println("go version:", utils.GoVersion)
return

}
Expand Down
13 changes: 5 additions & 8 deletions codis/pkg/topom/topom_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,14 +381,11 @@ func (s *Topom) tryFixReplicationRelationship(group *models.Group, groupServer *
}
}
} else {
// skip if it has right replication relationship
if state.Replication.GetMasterAddr() == curMasterAddr {
return nil
}

// current server is slave, execute the command `slaveof [new master ip] [new master port] force`
if err = updateMasterToNewOneForcefully(groupServer.Addr, curMasterAddr, s.config.ProductAuth); err != nil {
return err
if state.Replication.GetMasterAddr() != curMasterAddr {
// current server is slave, execute the command `slaveof [new master ip] [new master port]`
if err = updateMasterToNewOne(groupServer.Addr, curMasterAddr, s.config.ProductAuth); err != nil {
return err
}
}
}

Expand Down
27 changes: 23 additions & 4 deletions codis/pkg/topom/topom_sentinel.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,36 @@ func (s *Topom) checkAndUpdateGroupServerState(conf *Config, group *models.Group
}

// Start the election master node
if groupServer.State == models.GroupServerStateOffline && isGroupMaster(state, group) {
*masterOfflineGroups = append(*masterOfflineGroups, group)
} else {
*slaveOfflineGroups = append(*slaveOfflineGroups, group)
// Currently, both primary and secondary nodes have subjective and objective logics.
// If it is subjective, we will not perform any operation. If more than 10 probe counts
// fail, it is defined as objective logics, If it is an objective offline, we will add the
// node to masterOfflineGroups or slaveOfflineGroups respectively, and then notify the Proxy
// to change the meta information
if groupServer.State == models.GroupServerStateOffline {
if isGroupMaster(state, group) {
*masterOfflineGroups = append(*masterOfflineGroups, group)
} else {
*slaveOfflineGroups = append(*slaveOfflineGroups, group)
}
}
}
} else {
if groupServer.State == models.GroupServerStateOffline {
*recoveredGroupServers = append(*recoveredGroupServers, state)
// update GroupServer to GroupServerStateNormal state later
} else {
// This may contains any of following condition:
// 1. groupServer.State is Normal
// 2. groupServer.State is GroupServerStateSubjectiveOffline and is Master
// 3. groupServer.State is GroupServerStateSubjectiveOffline and is Slave
// for condition 3, if current server's previous state is SubjectiveOffline
// and has been added to slaveofflinegroups before,
// should also resync mappings to proxy to enable replicationgroup
if groupServer.State == models.GroupServerStateSubjectiveOffline &&
!isGroupMaster(state, group) &&
group.OutOfSync {
*recoveredGroupServers = append(*recoveredGroupServers, state)
}
// Update the offset information of the state and role nodes
groupServer.State = models.GroupServerStateNormal
groupServer.ReCallTimes = 0
Expand Down
4 changes: 2 additions & 2 deletions codis/pkg/utils/redis/sentinel.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ func (i *InfoReplication) UnmarshalJSON(b []byte) error {
}

i.Role = kvmap["role"]
i.MasterPort = kvmap["master_host"]
i.MasterHost = kvmap["master_port"]
i.MasterPort = kvmap["master_port"]
i.MasterHost = kvmap["master_host"]
Comment on lines +109 to +110
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fixed critical field assignment bug in UnmarshalJSON

This change corrects a critical bug where master host and port values were previously swapped during JSON unmarshalling. The fix ensures proper assignment of host and port values, preventing potential connection issues between master and slave servers.

i.MasterLinkStatus = kvmap["master_link_status"]
i.IsEligibleForMasterElection = kvmap["is_eligible_for_master_election"] == "true"

Expand Down
22 changes: 14 additions & 8 deletions codis/version
Original file line number Diff line number Diff line change
@@ -1,30 +1,36 @@
#!/bin/bash

version=`git log --date=iso --pretty=format:"%cd @%H" -1`
CODIS_MAJOR=3
CODIS_MINOR=5
CODIS_PATCH=4

gitsha=`git log --pretty=format:"%H" -1`
if [ $? -ne 0 ]; then
version="unknown version"
gitsha ="unknown version gitsha"
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix syntax error in error handling.

There's a syntax error in the fallback assignment for gitsha - there should not be a space between the variable name and the equals sign in bash.

-    gitsha ="unknown version gitsha"
+    gitsha="unknown version gitsha"
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
gitsha ="unknown version gitsha"
gitsha="unknown version gitsha"

fi

compile=`date +"%F %T %z"`" by "`go version`
compile=`date +"%F %T %Z"`
if [ $? -ne 0 ]; then
compile="unknown datetime"
fi

describe=`git describe --tags 2>/dev/null`
if [ $? -eq 0 ]; then
version="${version} @${describe}"
goversion=$(go version | sed 's/go version //')
if [ $? -ne 0 ]; then
compile="unknown go version"
fi
Comment on lines +17 to 20
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Error handling sets the wrong variable.

When the go version command fails, the error handler incorrectly sets compile instead of goversion.

goversion=$(go version | sed 's/go version //')
if [ $? -ne 0 ]; then
-    compile="unknown go version"
+    goversion="unknown go version"
fi
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
goversion=$(go version | sed 's/go version //')
if [ $? -ne 0 ]; then
compile="unknown go version"
fi
goversion=$(go version | sed 's/go version //')
if [ $? -ne 0 ]; then
goversion="unknown go version"
fi


cat << EOF | gofmt > pkg/utils/version.go
package utils

const (
Version = "$version"
Version = "$CODIS_MAJOR.$CODIS_MINOR.$CODIS_PATCH"
Gitsha = "$gitsha"
Compile = "$compile"
GoVersion = "$goversion"
)
EOF

cat << EOF > bin/version
version = $version
gitsha = $gitsha
compile = $compile
EOF
6 changes: 6 additions & 0 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,12 @@ cache-model : 1
# cache-type: string, set, zset, list, hash, bit
cache-type: string, set, zset, list, hash, bit

# Set the maximum number of elements in the cache of the Set, list, Zset data types
cache-value-item-max-size: 1024

# Sets the maximum number of bytes for Key when the String data type is updated in the cache
max-key-size-in-cache: 1048576

# Maximum number of keys in the zset redis cache
# On the disk DB, a zset field may have many fields. In the memory cache, we limit the maximum
# number of keys that can exist in a zset, which is zset-zset-cache-field-num-per-key, with a
Expand Down
10 changes: 7 additions & 3 deletions include/pika_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ class PikaCache : public pstd::noncopyable, public std::enable_shared_from_this<
rocksdb::Status Appendxx(std::string& key, std::string& value);
rocksdb::Status GetRange(std::string& key, int64_t start, int64_t end, std::string* value);
rocksdb::Status SetRangexx(std::string& key, int64_t start, std::string& value);
rocksdb::Status SetRangeIfKeyExist(std::string& key, int64_t start, std::string &value);
rocksdb::Status Strlen(std::string& key, int32_t* len);

// Hash Commands
Expand All @@ -112,6 +113,7 @@ class PikaCache : public pstd::noncopyable, public std::enable_shared_from_this<
rocksdb::Status HIncrbyfloatxx(std::string& key, std::string& field, long double value);
rocksdb::Status HLen(std::string& key, uint64_t* len);
rocksdb::Status HStrlen(std::string& key, std::string& field, uint64_t* len);
rocksdb::Status HMSetIfKeyExist(std::string& key, std::vector<storage::FieldValue> &fvs);

// List Commands
rocksdb::Status LIndex(std::string& key, int64_t index, std::string* element);
Expand All @@ -126,16 +128,18 @@ class PikaCache : public pstd::noncopyable, public std::enable_shared_from_this<
rocksdb::Status LTrim(std::string& key, int64_t start, int64_t stop);
rocksdb::Status RPop(std::string& key, std::string* element);
rocksdb::Status RPush(std::string& key, std::vector<std::string> &values);
rocksdb::Status RPushIfKeyExist(std::string& key, std::vector<std::string> &values);
rocksdb::Status RPushx(std::string& key, std::vector<std::string> &values);
rocksdb::Status RPushnx(std::string& key, std::vector<std::string> &values, int64_t ttl);
rocksdb::Status RPushnxWithoutTTL(std::string& key, std::vector<std::string> &values);

rocksdb::Status LPushIfKeyExist(std::string& key, std::vector<std::string> &values);

// Set Commands
rocksdb::Status SAdd(std::string& key, std::vector<std::string>& members);
rocksdb::Status SAddIfKeyExist(std::string& key, std::vector<std::string>& members);
rocksdb::Status SAddnx(std::string& key, std::vector<std::string>& members, int64_t ttl);
rocksdb::Status SAddnxWithoutTTL(std::string& key, std::vector<std::string>& members);
rocksdb::Status SCard(std::string& key, uint64_t* len);
rocksdb::Status SCard(const std::string& key, uint64_t* len);
rocksdb::Status SIsmember(std::string& key, std::string& member);
rocksdb::Status SMembers(std::string& key, std::vector<std::string>* members);
rocksdb::Status SRem(std::string& key, std::vector<std::string>& members);
Expand All @@ -146,7 +150,7 @@ class PikaCache : public pstd::noncopyable, public std::enable_shared_from_this<
rocksdb::Status ZAddIfKeyExist(std::string& key, std::vector<storage::ScoreMember>& score_members);
rocksdb::Status ZAddnx(std::string& key, std::vector<storage::ScoreMember>& score_members, int64_t ttl);
rocksdb::Status ZAddnxWithoutTTL(std::string& key, std::vector<storage::ScoreMember>& score_members);
rocksdb::Status ZCard(std::string& key, uint32_t* len, const std::shared_ptr<DB>& db);
rocksdb::Status ZCard(const std::string& key, uint32_t* len, const std::shared_ptr<DB>& db);
rocksdb::Status ZCount(std::string& key, std::string& min, std::string& max, uint64_t* len, ZCountCmd* cmd);
rocksdb::Status ZIncrby(std::string& key, std::string& member, double increment);
rocksdb::Status ZIncrbyIfKeyExist(std::string& key, std::string& member, double increment, ZIncrbyCmd* cmd, const std::shared_ptr<DB>& db);
Expand Down
2 changes: 1 addition & 1 deletion include/pika_client_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class PikaClientConn : public net::RedisConn {
std::shared_ptr<Cmd> DoCmd(const PikaCmdArgsType& argv, const std::string& opt,
const std::shared_ptr<std::string>& resp_ptr, bool cache_miss_in_rtc);

void ProcessSlowlog(const PikaCmdArgsType& argv, uint64_t do_duration);
void ProcessSlowlog(const PikaCmdArgsType& argv, std::shared_ptr<Cmd> c_ptr);
void ProcessMonitor(const PikaCmdArgsType& argv);

void ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr<std::string>& resp_ptr, bool cache_miss_in_rtc);
Expand Down
19 changes: 16 additions & 3 deletions include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,16 @@
#include "net/include/net_conn.h"
#include "net/include/redis_conn.h"
#include "pstd/include/pstd_string.h"
#include "pstd/include/stage_timer.h"

#include "net/src/dispatch_thread.h"

// Declare and set start time of the timer
#define STAGE_TIMER_GUARD(metric, enabled) \
pstd::StageTimer stage_timer_##metric( \
&metric, enabled); \
stage_timer_##metric.Start();

class SyncMasterDB;
class SyncSlaveDB;
class DB;
Expand Down Expand Up @@ -532,7 +539,7 @@ class Cmd : public std::enable_shared_from_this<Cmd> {
// used for execute multikey command into different slots
virtual void Split(const HintKeys& hint_keys) = 0;
virtual void Merge() = 0;
virtual bool IsTooLargeKey(const int &max_sz) { return false; }
virtual bool IsTooLargeKey(const size_t &max_sz) { return false; }

int8_t SubCmdIndex(const std::string& cmdName); // if the command no subCommand,return -1;

Expand All @@ -552,7 +559,7 @@ class Cmd : public std::enable_shared_from_this<Cmd> {
bool IsNeedReadCache() const;
bool IsNeedCacheDo() const;
bool HashtagIsConsistent(const std::string& lhs, const std::string& rhs) const;
uint64_t GetDoDuration() const { return do_duration_; };
virtual std::string StagesDurationSummary(bool exclude_zero_value) const;
std::shared_ptr<DB> GetDB() const { return db_; };
uint32_t AclCategory() const;
void AddAclCategory(uint32_t aclCategory);
Expand Down Expand Up @@ -607,7 +614,13 @@ class Cmd : public std::enable_shared_from_this<Cmd> {
std::weak_ptr<net::NetConn> conn_;
std::weak_ptr<std::string> resp_;
CmdStage stage_ = kNone;
uint64_t do_duration_ = 0;

uint64_t acquire_lock_duration_ms = 0;
uint64_t command_duration_ms = 0;
uint64_t binlog_duration_ms = 0;
uint64_t storage_duration_ms = 0;
uint64_t cache_duration_ms = 0;

uint32_t cmdId_ = 0;
uint32_t aclCategory_ = 0;
bool cache_missed_in_rtc_{false};
Expand Down
Loading
Loading