-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Expand file tree
/
Copy pathpika_client_conn.h
More file actions
150 lines (116 loc) · 5.03 KB
/
pika_client_conn.h
File metadata and controls
150 lines (116 loc) · 5.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#ifndef PIKA_CLIENT_CONN_H_
#define PIKA_CLIENT_CONN_H_
#include <bitset>
#include <utility>
#include "acl.h"
#include "include/pika_command.h"
#include "include/pika_define.h"
// TODO: stat time costing in write out data to connfd
struct TimeStat {
TimeStat() = default;
void Reset() {
enqueue_ts_ = dequeue_ts_ = 0;
process_done_ts_ = 0;
before_queue_ts_ = 0;
}
uint64_t start_ts() const { return enqueue_ts_; }
uint64_t total_time() const { return process_done_ts_ > enqueue_ts_ ? process_done_ts_ - enqueue_ts_ : 0; }
uint64_t queue_time() const { return dequeue_ts_ > enqueue_ts_ ? dequeue_ts_ - enqueue_ts_ : 0; }
uint64_t process_time() const { return process_done_ts_ > dequeue_ts_ ? process_done_ts_ - dequeue_ts_ : 0; }
uint64_t before_queue_time() const { return process_done_ts_ > dequeue_ts_ ? before_queue_ts_ - enqueue_ts_ : 0; }
uint64_t enqueue_ts_;
uint64_t dequeue_ts_;
uint64_t before_queue_ts_;
uint64_t process_done_ts_;
};
class PikaClientConn : public net::RedisConn {
public:
using WriteCompleteCallback = std::function<void()>;
struct BgTaskArg {
std::shared_ptr<Cmd> cmd_ptr;
std::shared_ptr<PikaClientConn> conn_ptr;
std::vector<net::RedisCmdArgsType> redis_cmds;
std::shared_ptr<std::string> resp_ptr;
LogOffset offset;
std::string db_name;
bool cache_miss_in_rtc_;
};
struct TxnStateBitMask {
public:
static constexpr uint8_t Start = 0;
static constexpr uint8_t InitCmdFailed = 1;
static constexpr uint8_t WatchFailed = 2;
static constexpr uint8_t Execing = 3;
};
PikaClientConn(int fd, const std::string& ip_port, net::Thread* server_thread, net::NetMultiplexer* mpx,
const net::HandleType& handle_type, int max_conn_rbuf_size);
~PikaClientConn() = default;
bool IsInterceptedByRTC(std::string& opt);
void ProcessRedisCmds(const std::vector<net::RedisCmdArgsType>& argvs, bool async, std::string* response) override;
bool ReadCmdInCache(const net::RedisCmdArgsType& argv, const std::string& opt);
void BatchExecRedisCmd(const std::vector<net::RedisCmdArgsType>& argvs, bool cache_miss_in_rtc);
int DealMessage(const net::RedisCmdArgsType& argv, std::string* response) override { return 0; }
static void DoBackgroundTask(void* arg);
bool IsPubSub() { return is_pubsub_; }
void SetIsPubSub(bool is_pubsub) { is_pubsub_ = is_pubsub; }
void SetCurrentDb(const std::string& db_name) { current_db_ = db_name; }
void SetWriteCompleteCallback(WriteCompleteCallback cb) { write_completed_cb_ = std::move(cb); }
const std::string& GetCurrentTable() override { return current_db_; }
void DoAuth(const std::shared_ptr<User>& user);
void UnAuth(const std::shared_ptr<User>& user);
bool IsAuthed() const;
void InitUser();
bool AuthRequired() const;
std::string UserName() const;
// Txn
std::queue<std::shared_ptr<Cmd>> GetTxnCmdQue();
void PushCmdToQue(std::shared_ptr<Cmd> cmd);
void ClearTxnCmdQue();
void SetTxnWatchFailState(bool is_failed);
void SetTxnInitFailState(bool is_failed);
void SetTxnStartState(bool is_start);
void AddKeysToWatch(const std::vector<std::string>& db_keys);
void RemoveWatchedKeys();
void SetTxnFailedFromKeys(const std::vector<std::string>& db_keys);
void SetTxnFailedIfKeyExists(const std::string target_db_name = "");
void ExitTxn();
bool IsInTxn();
bool IsTxnInitFailed();
bool IsTxnWatchFailed();
bool IsTxnExecing(void);
net::ServerThread* server_thread() { return server_thread_; }
void ClientInfoToString(std::string* info, const std::string& cmdName);
std::atomic<int> resp_num;
std::vector<std::shared_ptr<std::string>> resp_array;
std::shared_ptr<TimeStat> time_stat_;
private:
net::ServerThread* const server_thread_;
std::string current_db_;
WriteCompleteCallback write_completed_cb_;
bool is_pubsub_ = false;
std::queue<std::shared_ptr<Cmd>> txn_cmd_que_;
std::bitset<16> txn_state_;
std::unordered_set<std::string> watched_db_keys_;
std::mutex txn_state_mu_;
bool authenticated_ = false;
std::shared_ptr<User> user_;
std::shared_ptr<Cmd> DoCmd(const PikaCmdArgsType& argv, const std::string& opt,
const std::shared_ptr<std::string>& resp_ptr, bool cache_miss_in_rtc);
void ProcessSlowlog(const PikaCmdArgsType& argv, std::shared_ptr<Cmd> c_ptr);
void ProcessMonitor(const PikaCmdArgsType& argv);
void ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr<std::string>& resp_ptr, bool cache_miss_in_rtc);
void TryWriteResp();
};
struct ClientInfo {
int fd;
std::string ip_port;
int64_t last_interaction = 0;
std::shared_ptr<PikaClientConn> conn;
};
extern bool AddrCompare(const ClientInfo& lhs, const ClientInfo& rhs);
extern bool IdleCompare(const ClientInfo& lhs, const ClientInfo& rhs);
#endif