Skip to content

Commit 014bc67

Browse files
committed
Modified some log levels of redisSender and pikaSender,
Updated the logic of connecting to Redis in redisSender and pikaSender
1 parent e744786 commit 014bc67

4 files changed

Lines changed: 116 additions & 141 deletions

File tree

include/pika_sender.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@ class PikaSender : public net::Thread {
2323
void SendCommand(std::string &command, const std::string &key);
2424
int QueueSize();
2525
void ConnectRedis();
26-
26+
bool Authenticate();
2727
private:
28-
net::NetCli *cli_;
28+
std::shared_ptr<net::NetCli> cli_;
2929
pstd::CondVar wsignal_;
3030
pstd::CondVar rsignal_;
3131
std::mutex signal_mutex;

include/redis_sender.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ class RedisSender : public net::Thread {
2929
std::lock_guard l(keys_mutex_);
3030
return commands_queue_.size();
3131
}
32+
bool Authenticate();
3233

3334
private:
3435
int id_;

src/pika_sender.cc

Lines changed: 57 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,17 @@
88
#include <glog/logging.h>
99

1010
PikaSender::PikaSender(std::string ip, int64_t port, std::string password):
11-
cli_(NULL),
1211
ip_(ip),
1312
port_(port),
1413
password_(password),
1514
should_exit_(false),
1615
elements_(0)
1716
{
17+
cli_ = std::shared_ptr<net::NetCli>(net::NewRedisCli());
18+
cli_->set_connect_timeout(1000);
19+
cli_->set_send_timeout(10000);
20+
cli_->set_recv_timeout(10000);
21+
ConnectRedis();
1822
}
1923

2024
PikaSender::~PikaSender() {
@@ -29,81 +33,70 @@ void PikaSender::Stop() {
2933
should_exit_.store(true);
3034
wsignal_.notify_all();
3135
rsignal_.notify_all();
32-
LOG(INFO) << "PikaSender received stop signal";
3336
}
3437

3538
void PikaSender::ConnectRedis() {
36-
while (cli_ == NULL) {
37-
// Connect to redis
38-
cli_ = net::NewRedisCli();
39-
cli_->set_connect_timeout(1000);
39+
40+
while (true) {
4041
pstd::Status s = cli_->Connect(ip_, port_);
4142
if (!s.ok()) {
42-
delete cli_;
43-
cli_ = NULL;
4443
LOG(WARNING) << "Can not connect to " << ip_ << ":" << port_ << ", status: " << s.ToString();
44+
std::this_thread::sleep_for(std::chrono::seconds(1));
4545
continue;
4646
} else {
4747
// Connect success
48+
if (!Authenticate()) {
49+
cli_->Close();
50+
std::this_thread::sleep_for(std::chrono::seconds(1));
51+
continue;
52+
}
53+
break;
54+
}
55+
}
56+
}
57+
58+
bool PikaSender::Authenticate() {
59+
if (!password_.empty()) {
60+
net::RedisCmdArgsType argv, resp;
61+
std::string cmd;
4862

49-
// Authentication
50-
if (!password_.empty()) {
51-
net::RedisCmdArgsType argv, resp;
52-
std::string cmd;
53-
54-
argv.push_back("AUTH");
55-
argv.push_back(password_);
56-
net::SerializeRedisCommand(argv, &cmd);
57-
pstd::Status s = cli_->Send(&cmd);
58-
59-
if (s.ok()) {
60-
s = cli_->Recv(&resp);
61-
if (resp[0] == "OK") {
62-
} else {
63-
LOG(FATAL) << "Connect to redis(" << ip_ << ":" << port_ << ") Invalid password";
64-
cli_->Close();
65-
delete cli_;
66-
cli_ = NULL;
67-
should_exit_ = true;
68-
return;
69-
}
70-
} else {
71-
LOG(WARNING) << "send auth failed: " << s.ToString();
72-
cli_->Close();
73-
delete cli_;
74-
cli_ = NULL;
75-
continue;
76-
}
63+
argv.push_back("AUTH");
64+
argv.push_back(password_);
65+
net::SerializeRedisCommand(argv, &cmd);
66+
pstd::Status s = cli_->Send(&cmd);
67+
68+
if (s.ok()) {
69+
s = cli_->Recv(&resp);
70+
if (resp[0] == "OK") {
71+
return true;
7772
} else {
78-
// If forget to input password
79-
net::RedisCmdArgsType argv, resp;
80-
std::string cmd;
81-
82-
argv.push_back("PING");
83-
net::SerializeRedisCommand(argv, &cmd);
84-
pstd::Status s = cli_->Send(&cmd);
85-
86-
if (s.ok()) {
87-
s = cli_->Recv(&resp);
88-
if (s.ok()) {
89-
if (resp[0] == "NOAUTH Authentication required.") {
90-
LOG(FATAL) << "Ping redis(" << ip_ << ":" << port_ << ") NOAUTH Authentication required";
91-
cli_->Close();
92-
delete cli_;
93-
cli_ = NULL;
94-
should_exit_ = true;
95-
return;
96-
}
97-
} else {
98-
LOG(WARNING) << "Recv failed: " << s.ToString();
99-
cli_->Close();
100-
delete cli_;
101-
cli_ = NULL;
102-
}
103-
}
73+
LOG(ERROR) << "Connect to redis(" << ip_ << ":" << port_ << ") Invalid password";
74+
return false;
10475
}
76+
} else {
77+
LOG(WARNING) << "send auth failed: " << s.ToString();
78+
return false;
79+
}
80+
} else {
81+
net::RedisCmdArgsType argv, resp;
82+
std::string cmd;
83+
84+
argv.push_back("PING");
85+
net::SerializeRedisCommand(argv, &cmd);
86+
pstd::Status s = cli_->Send(&cmd);
87+
88+
if (s.ok()) {
89+
s = cli_->Recv(&resp);
90+
if (s.ok() && resp[0] == "NOAUTH Authentication required.") {
91+
LOG(ERROR) << "Ping redis(" << ip_ << ":" << port_ << ") NOAUTH Authentication required";
92+
return false;
93+
}
94+
} else {
95+
LOG(WARNING) << "Recv failed: " << s.ToString();
96+
return false;
10597
}
10698
}
99+
return true;
107100
}
108101

109102
void PikaSender::LoadKey(const std::string &key) {
@@ -123,9 +116,7 @@ void PikaSender::SendCommand(std::string &command, const std::string &key) {
123116
elements_--;
124117
LoadKey(key);
125118
cli_->Close();
126-
LOG(INFO) << s.ToString().data();
127-
delete cli_;
128-
cli_ = NULL;
119+
LOG(WARNING) << s.ToString();
129120
ConnectRedis();
130121
}else{
131122
cli_->Recv(NULL);
@@ -158,15 +149,9 @@ void *PikaSender::ThreadMain() {
158149
}
159150
wsignal_.notify_one();
160151
SendCommand(key, key);
161-
162152
}
163153

164-
165-
if (cli_) {
166-
cli_->Close();
167-
delete cli_;
168-
cli_ = NULL;
169-
}
154+
cli_->Close();
170155
LOG(INFO) << "PikaSender thread complete";
171156
return NULL;
172157
}

src/redis_sender.cc

Lines changed: 56 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212
#include <glog/logging.h>
1313

14-
static time_t kCheckDiff = 1;
14+
static time_t kCheckDiff = 5;
1515

1616
RedisSender::RedisSender(int id, std::string ip, int64_t port, std::string password):
1717
id_(id),
@@ -24,82 +24,75 @@ RedisSender::RedisSender(int id, std::string ip, int64_t port, std::string passw
2424
elements_(0) {
2525

2626
last_write_time_ = ::time(NULL);
27+
cli_ = std::shared_ptr<net::NetCli>(net::NewRedisCli());
28+
cli_->set_connect_timeout(1000);
29+
cli_->set_recv_timeout(10000);
30+
cli_->set_send_timeout(10000);
2731
}
2832

2933
RedisSender::~RedisSender() {
30-
LOG(INFO) << "RedisSender thread " << id_ << " exit!!!";
3134
}
3235

3336
void RedisSender::ConnectRedis() {
34-
while (cli_ == NULL) {
35-
// Connect to redis
36-
cli_ = std::shared_ptr<net::NetCli>(net::NewRedisCli());
37-
cli_->set_connect_timeout(1000);
38-
cli_->set_recv_timeout(10000);
39-
cli_->set_send_timeout(10000);
37+
while (true) {
4038
pstd::Status s = cli_->Connect(ip_, port_);
4139
if (!s.ok()) {
4240
LOG(WARNING) << "Can not connect to " << ip_ << ":" << port_ << ", status: " << s.ToString();
43-
cli_ = NULL;
44-
sleep(3);
4541
continue;
4642
} else {
4743
// Connect success
4844
LOG(INFO) << "RedisSender thread " << id_ << "Connect to redis(" << ip_ << ":" << port_ << ") success";
49-
// Authentication
50-
if (!password_.empty()) {
51-
net::RedisCmdArgsType argv, resp;
52-
std::string cmd;
53-
54-
argv.push_back("AUTH");
55-
argv.push_back(password_);
56-
net::SerializeRedisCommand(argv, &cmd);
57-
pstd::Status s = cli_->Send(&cmd);
58-
59-
if (s.ok()) {
60-
s = cli_->Recv(&resp);
61-
if (resp[0] == "OK") {
62-
} else {
63-
LOG(FATAL) << "Connect to redis(" << ip_ << ":" << port_ << ") Invalid password";
64-
cli_->Close();
65-
cli_ = NULL;
66-
should_exit_ = true;
67-
return;
68-
}
69-
} else {
70-
LOG(WARNING) << "send auth failed: " << s.ToString();
71-
cli_->Close();
72-
cli_ = NULL;
73-
continue;
74-
}
45+
if (!Authenticate()) {
46+
cli_->Close();
47+
continue;
48+
}
49+
break;
50+
}
51+
}
52+
}
53+
54+
bool RedisSender::Authenticate() {
55+
if (!password_.empty()) {
56+
net::RedisCmdArgsType argv, resp;
57+
std::string cmd;
58+
59+
argv.push_back("AUTH");
60+
argv.push_back(password_);
61+
net::SerializeRedisCommand(argv, &cmd);
62+
pstd::Status s = cli_->Send(&cmd);
63+
64+
if (s.ok()) {
65+
s = cli_->Recv(&resp);
66+
if (resp[0] == "OK") {
67+
return true;
7568
} else {
76-
// If forget to input password
77-
net::RedisCmdArgsType argv, resp;
78-
std::string cmd;
79-
80-
argv.push_back("PING");
81-
net::SerializeRedisCommand(argv, &cmd);
82-
pstd::Status s = cli_->Send(&cmd);
83-
84-
if (s.ok()) {
85-
s = cli_->Recv(&resp);
86-
if (s.ok()) {
87-
if (resp[0] == "NOAUTH Authentication required.") {
88-
LOG(FATAL) << "Ping redis(" << ip_ << ":" << port_ << ") NOAUTH Authentication required";
89-
cli_->Close();
90-
cli_ = NULL;
91-
should_exit_ = true;
92-
return;
93-
}
94-
} else {
95-
LOG(WARNING) << s.ToString();
96-
cli_->Close();
97-
cli_ = NULL;
98-
}
99-
}
69+
LOG(ERROR) << "Connect to redis(" << ip_ << ":" << port_ << ") Invalid password";
70+
return false;
10071
}
72+
} else {
73+
LOG(WARNING) << "send auth failed: " << s.ToString();
74+
return false;
75+
}
76+
} else {
77+
net::RedisCmdArgsType argv, resp;
78+
std::string cmd;
79+
80+
argv.push_back("PING");
81+
net::SerializeRedisCommand(argv, &cmd);
82+
pstd::Status s = cli_->Send(&cmd);
83+
84+
if (s.ok()) {
85+
s = cli_->Recv(&resp);
86+
if (s.ok() && resp[0] == "NOAUTH Authentication required.") {
87+
LOG(ERROR) << "Ping redis(" << ip_ << ":" << port_ << ") NOAUTH Authentication required";
88+
return false;
89+
}
90+
} else {
91+
LOG(WARNING) << s.ToString();
92+
return false;
10193
}
10294
}
95+
return true;
10396
}
10497

10598
void RedisSender::Stop() {
@@ -122,11 +115,8 @@ void RedisSender::SendRedisCommand(const std::string &command) {
122115
int RedisSender::SendCommand(std::string &command) {
123116
time_t now = ::time(NULL);
124117
if (kCheckDiff < now - last_write_time_) {
125-
int ret = cli_->CheckAliveness();
126-
if (ret < 0) {
127-
cli_ = NULL;
118+
cli_->Close();
128119
ConnectRedis();
129-
}
130120
last_write_time_ = now;
131121
}
132122

@@ -140,10 +130,9 @@ int RedisSender::SendCommand(std::string &command) {
140130
}
141131

142132
cli_->Close();
143-
cli_ = NULL;
144133
ConnectRedis();
145134
} while(++idx < 3);
146-
LOG(WARNING) << "RedisSender " << id_ << " fails to send redis command " << command << ", times: " << idx << ", error: " << "send command failed";
135+
LOG(ERROR) << "RedisSender " << id_ << " fails to send redis command " << command << ", times: " << idx << ", error: " << "send command failed";
147136
return -1;
148137
}
149138

@@ -182,7 +171,7 @@ void *RedisSender::ThreadMain() {
182171
}
183172

184173
LOG(INFO) << "RedisSender thread " << id_ << " complete";
185-
cli_ = NULL;
174+
cli_->Close();
186175
return NULL;
187176
}
188177

0 commit comments

Comments
 (0)