Skip to content

Commit e744786

Browse files
committed
feature: migrate tools support pika v3.5.0
1 parent cd92a46 commit e744786

16 files changed

Lines changed: 1228 additions & 3 deletions

conf/pika.conf

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,16 @@ sync-window-size : 9000
289289
# Supported Units [K|M|G]. Its default unit is in [bytes] and its default value is 268435456(256MB). The value range is [64MB, 1GB].
290290
max-conn-rbuf-size : 268435456
291291

292+
###################
293+
## Migrate Settings
294+
###################
295+
296+
target-redis-host : 127.0.0.1
297+
target-redis-port : 6379
298+
target-redis-pwd :
299+
300+
sync-batch-num : 100
301+
redis-sender-num : 10
292302

293303
#######################################################################E#######
294304
#! Critical Settings !#

include/migrator_thread.h

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
#ifndef MIGRATOR_THREAD_H_
2+
#define MIGRATOR_THREAD_H_
3+
4+
#include <iostream>
5+
#include <mutex>
6+
7+
#include "storage/storage.h"
8+
#include "net/include/redis_cli.h"
9+
10+
#include "include/pika_sender.h"
11+
12+
class MigratorThread : public net::Thread {
13+
public:
14+
MigratorThread(std::shared_ptr<storage::Storage> storage_, std::vector<std::shared_ptr<PikaSender>> *senders, int type, int thread_num) :
15+
storage_(storage_),
16+
should_exit_(false),
17+
senders_(senders),
18+
type_(type),
19+
thread_num_(thread_num),
20+
thread_index_(0),
21+
num_(0) {
22+
}
23+
24+
virtual ~ MigratorThread();
25+
26+
int64_t num() {
27+
std::lock_guard<std::mutex> l(num_mutex_);
28+
return num_;
29+
}
30+
31+
void Stop() {
32+
should_exit_ = true;
33+
}
34+
35+
private:
36+
void PlusNum() {
37+
std::lock_guard<std::mutex> l(num_mutex_);
38+
++num_;
39+
}
40+
41+
void DispatchKey(const std::string &command, const std::string& key = "");
42+
43+
void MigrateDB();
44+
void MigrateStringsDB();
45+
void MigrateListsDB();
46+
void MigrateHashesDB();
47+
void MigrateSetsDB();
48+
void MigrateZsetsDB();
49+
50+
virtual void *ThreadMain();
51+
52+
private:
53+
std::shared_ptr<storage::Storage> storage_;
54+
bool should_exit_;
55+
56+
std::vector<std::shared_ptr<PikaSender>> *senders_;
57+
int type_;
58+
int thread_num_;
59+
int thread_index_;
60+
61+
int64_t num_;
62+
std::mutex num_mutex_;
63+
};
64+
65+
#endif
66+

include/pika_conf.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,14 @@ class PikaConf : public pstd::BaseConf {
346346
int max_conn_rbuf_size() { return max_conn_rbuf_size_.load(); }
347347
int consensus_level() { return consensus_level_.load(); }
348348
int replication_num() { return replication_num_.load(); }
349+
350+
std::string target_redis_host() { return target_redis_host_; }
351+
int target_redis_port() { return target_redis_port_; }
352+
std::string target_redis_pwd() { return target_redis_pwd_; }
353+
int sync_batch_num() { return sync_batch_num_; }
354+
int redis_sender_num() { return redis_sender_num_; }
355+
356+
349357
int rate_limiter_mode() {
350358
std::shared_lock l(rwlock_);
351359
return rate_limiter_mode_;
@@ -925,6 +933,13 @@ class PikaConf : public pstd::BaseConf {
925933
std::map<std::string, std::string> diff_commands_;
926934
void TryPushDiffCommands(const std::string& command, const std::string& value);
927935

936+
// migrate configure items
937+
std::string target_redis_host_;
938+
int target_redis_port_;
939+
std::string target_redis_pwd_;
940+
int sync_batch_num_;
941+
int redis_sender_num_;
942+
928943
//
929944
// Critical configure items
930945
//

include/pika_repl_bgworker.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ class PikaReplBgWorker {
4747
net::BGThread bg_thread_;
4848
static int HandleWriteBinlog(net::RedisParser* parser, const net::RedisCmdArgsType& argv);
4949
static void ParseBinlogOffset(const InnerMessage::BinlogOffset& pb_offset, LogOffset* offset);
50+
static void ParseAndSendPikaCommand(const std::shared_ptr<Cmd>& c_ptr);
5051
};
5152

5253
#endif // PIKA_REPL_BGWROKER_H_

include/pika_sender.h

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
#ifndef PIKA_SENDER_H_
2+
#define PIKA_SENDER_H_
3+
4+
#include <atomic>
5+
#include <thread>
6+
#include <chrono>
7+
#include <iostream>
8+
#include <queue>
9+
10+
#include "net/include/bg_thread.h"
11+
#include "net/include/net_cli.h"
12+
#include "net/include/redis_cli.h"
13+
14+
class PikaSender : public net::Thread {
15+
public:
16+
PikaSender(std::string ip, int64_t port, std::string password);
17+
virtual ~PikaSender();
18+
void LoadKey(const std::string &cmd);
19+
void Stop();
20+
21+
int64_t elements() { return elements_; }
22+
23+
void SendCommand(std::string &command, const std::string &key);
24+
int QueueSize();
25+
void ConnectRedis();
26+
27+
private:
28+
net::NetCli *cli_;
29+
pstd::CondVar wsignal_;
30+
pstd::CondVar rsignal_;
31+
std::mutex signal_mutex;
32+
std::mutex keys_queue_mutex_;
33+
std::queue<std::string> keys_queue_;
34+
std::string ip_;
35+
int port_;
36+
std::string password_;
37+
std::atomic<bool> should_exit_;
38+
int64_t elements_;
39+
40+
virtual void *ThreadMain();
41+
};
42+
43+
#endif

include/pika_server.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
#include "include/pika_statistic.h"
4747
#include "include/pika_transaction.h"
4848
#include "include/rsync_server.h"
49+
#include "include/redis_sender.h"
4950

5051
extern std::unique_ptr<PikaConf> g_pika_conf;
5152

@@ -309,6 +310,12 @@ class PikaServer : public pstd::noncopyable {
309310

310311
pstd::Status GetCmdRouting(std::vector<net::RedisCmdArgsType>& redis_cmds, std::vector<Node>* dst, bool* all_local);
311312

313+
/*
314+
* migrate used
315+
*/
316+
int SendRedisCommand(const std::string& command, const std::string& key);
317+
void RetransmitData(const std::string& path);
318+
312319
// info debug use
313320
void ServerStatus(std::string* info);
314321

@@ -617,6 +624,11 @@ class PikaServer : public pstd::noncopyable {
617624
*/
618625
std::unique_ptr<PikaAuxiliaryThread> pika_auxiliary_thread_;
619626

627+
/*
628+
* migrate to redis used
629+
*/
630+
std::vector<std::unique_ptr<RedisSender>> redis_senders_;
631+
620632
/*
621633
* Async slotsMgrt use
622634
*/

include/redis_sender.h

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
#ifndef REDIS_SENDER_H_
2+
#define REDIS_SENDER_H_
3+
4+
#include <atomic>
5+
#include <thread>
6+
#include <chrono>
7+
#include <iostream>
8+
#include <queue>
9+
10+
#include "pika_repl_bgworker.h"
11+
#include "net/include/net_cli.h"
12+
#include "net/include/redis_cli.h"
13+
14+
class RedisSender : public net::Thread {
15+
public:
16+
RedisSender(int id, std::string ip, int64_t port, std::string password);
17+
virtual ~RedisSender();
18+
void Stop(void);
19+
int64_t elements() {
20+
return elements_;
21+
}
22+
23+
void SendRedisCommand(const std::string &command);
24+
25+
private:
26+
int SendCommand(std::string &command);
27+
void ConnectRedis();
28+
size_t commandQueueSize() {
29+
std::lock_guard l(keys_mutex_);
30+
return commands_queue_.size();
31+
}
32+
33+
private:
34+
int id_;
35+
std::shared_ptr<net::NetCli> cli_;
36+
pstd::CondVar rsignal_;
37+
pstd::CondVar wsignal_;
38+
pstd::Mutex signal_mutex_;
39+
pstd::Mutex keys_mutex_;
40+
std::queue<std::string> commands_queue_;
41+
std::string ip_;
42+
int port_;
43+
std::string password_;
44+
bool should_exit_;
45+
int32_t cnt_;
46+
int64_t elements_;
47+
std::atomic<time_t> last_write_time_;
48+
49+
virtual void *ThreadMain();
50+
};
51+
52+
#endif

pika-migrate.md

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
## Pika3.5到Redis迁移工具
2+
3+
### 适用版本:
4+
Pika 3.5, 单机模式且只支持单db
5+
6+
### 功能
7+
将Pika中的数据在线迁移到Pika、Redis(支持全量、增量同步)
8+
9+
### 开发背景:
10+
之前Pika项目官方提供的pika\_to\_redis工具仅支持离线将Pika的DB中的数据迁移到Pika、Redis, 且无法增量同步, 该工具实际上就是一个特殊的Pika, 只不过成为从库之后, 内部会将从主库获取到的数据转发给Redis,同时并支持增量同步, 实现热迁功能.
11+
12+
### 热迁原理
13+
1. pika-port通过dbsync请求获取主库当前全量db数据, 以及当前db数据所对应的binlog点位
14+
2. 获取到主库当前全量db数据之后, 扫描db, 将db中的数据转发给Redis
15+
3. 通过之前获取的binlog的点位向主库进行增量同步, 在增量同步的过程中, 将从主库获取到的binlog重组成Redis命令, 转发给Redis
16+
17+
### 新增配置项
18+
```cpp
19+
###################
20+
## Migrate Settings
21+
###################
22+
23+
target-redis-host : 127.0.0.1
24+
target-redis-port : 6379
25+
target-redis-pwd : abc
26+
27+
sync-batch-num : 100
28+
redis-sender-num : 10
29+
```
30+
31+
### 步骤
32+
1. 考虑到在pika-port在将全量数据写入到Redis这段时间可能耗时很长, 导致主库原先binlog点位已经被清理, 我们首先在主库上执行`config set expire-logs-nums 10000`, 让主库保留10000个Binlog文件(Binlog文件占用磁盘空间, 可以根据实际情况确定保留binlog的数量), 确保后续该工具请求增量同步的时候, 对应的Binlog文件还存在.
33+
2. 修改该工具配置文件的`target-redis-host, target-redis-port, target-redis-pwd, sync-batch-num, redis-sender-num`配置项(`sync-batch-num`是该工具接收到主库的全量数据之后, 为了提升转发效率, 将`sync-batch-num`个数据一起打包发送给Redis, 此外该工具内部可以指定`redis-sender-num`个线程用于转发命令, 命令通过Key的哈希值被分配到不同的线程中, 所以无需担心多线程发送导致的数据错乱的问题)
34+
3. 使用`pika -c pika.conf`命令启动该工具, 查看日志是否有报错信息
35+
4. 向该工具执行`slaveof ip port force`向主库请求同步, 观察是否有报错信息
36+
5. 在确认主从关系建立成功之后(此时pika-port同时也在向目标Redis转发数据了)通过向主库执行`info Replication`查看主从同步延迟(可在主库写入一个特殊的Key, 然后看在Redis测是否可以立马获取到, 来判断是否数据已经基本同步完毕)
37+
38+
### 注意事项
39+
1. Pika支持不同数据结构采用同名Key, 但是Redis不支持, 所以在有同Key数据的场景下, 以第一个迁移到Redis数据结构为准, 其他同Key数据结构会丢失
40+
2. 该工具只支持热迁移单机模式下, 并且只采用单DB版本的Pika, 如果是集群模式, 或者是多DB场景, 工具会报错并且退出.
41+
3. 为了避免由于主库Binlog被清理导致该工具触发多次全量同步向Redis写入脏数据, 工具自身做了保护, 在第二次触发全量同步时会报错退出.
42+
43+

0 commit comments

Comments
 (0)