Skip to content

Commit 5c1cb48

Browse files
committed
Simplify the number of files in tools/pika-migrate
1 parent 014bc67 commit 5c1cb48

135 files changed

Lines changed: 53013 additions & 197 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

conf/pika.conf

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -289,16 +289,6 @@ sync-window-size : 9000
289289
# Supported Units [K|M|G]. Its default unit is in [bytes] and its default value is 268435456(256MB). The value range is [64MB, 1GB].
290290
max-conn-rbuf-size : 268435456
291291

292-
###################
293-
## Migrate Settings
294-
###################
295-
296-
target-redis-host : 127.0.0.1
297-
target-redis-port : 6379
298-
target-redis-pwd :
299-
300-
sync-batch-num : 100
301-
redis-sender-num : 10
302292

303293
#######################################################################E#######
304294
#! Critical Settings !#

include/pika_conf.h

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -346,14 +346,6 @@ class PikaConf : public pstd::BaseConf {
346346
int max_conn_rbuf_size() { return max_conn_rbuf_size_.load(); }
347347
int consensus_level() { return consensus_level_.load(); }
348348
int replication_num() { return replication_num_.load(); }
349-
350-
std::string target_redis_host() { return target_redis_host_; }
351-
int target_redis_port() { return target_redis_port_; }
352-
std::string target_redis_pwd() { return target_redis_pwd_; }
353-
int sync_batch_num() { return sync_batch_num_; }
354-
int redis_sender_num() { return redis_sender_num_; }
355-
356-
357349
int rate_limiter_mode() {
358350
std::shared_lock l(rwlock_);
359351
return rate_limiter_mode_;
@@ -933,13 +925,6 @@ class PikaConf : public pstd::BaseConf {
933925
std::map<std::string, std::string> diff_commands_;
934926
void TryPushDiffCommands(const std::string& command, const std::string& value);
935927

936-
// migrate configure items
937-
std::string target_redis_host_;
938-
int target_redis_port_;
939-
std::string target_redis_pwd_;
940-
int sync_batch_num_;
941-
int redis_sender_num_;
942-
943928
//
944929
// Critical configure items
945930
//

include/pika_repl_bgworker.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ class PikaReplBgWorker {
4747
net::BGThread bg_thread_;
4848
static int HandleWriteBinlog(net::RedisParser* parser, const net::RedisCmdArgsType& argv);
4949
static void ParseBinlogOffset(const InnerMessage::BinlogOffset& pb_offset, LogOffset* offset);
50-
static void ParseAndSendPikaCommand(const std::shared_ptr<Cmd>& c_ptr);
5150
};
5251

5352
#endif // PIKA_REPL_BGWROKER_H_

include/pika_server.h

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
#include "include/pika_statistic.h"
4747
#include "include/pika_transaction.h"
4848
#include "include/rsync_server.h"
49-
#include "include/redis_sender.h"
5049

5150
extern std::unique_ptr<PikaConf> g_pika_conf;
5251

@@ -310,12 +309,6 @@ class PikaServer : public pstd::noncopyable {
310309

311310
pstd::Status GetCmdRouting(std::vector<net::RedisCmdArgsType>& redis_cmds, std::vector<Node>* dst, bool* all_local);
312311

313-
/*
314-
* migrate used
315-
*/
316-
int SendRedisCommand(const std::string& command, const std::string& key);
317-
void RetransmitData(const std::string& path);
318-
319312
// info debug use
320313
void ServerStatus(std::string* info);
321314

@@ -624,11 +617,6 @@ class PikaServer : public pstd::noncopyable {
624617
*/
625618
std::unique_ptr<PikaAuxiliaryThread> pika_auxiliary_thread_;
626619

627-
/*
628-
* migrate to redis used
629-
*/
630-
std::vector<std::unique_ptr<RedisSender>> redis_senders_;
631-
632620
/*
633621
* Async slotsMgrt use
634622
*/

src/pika_conf.cc

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -207,12 +207,6 @@ int PikaConf::Load() {
207207

208208
if (classic_mode_.load()) {
209209
GetConfInt("databases", &databases_);
210-
211-
// pika-migrate-tool only support 1 db
212-
if (databases_ != 1) {
213-
LOG(FATAL) << "pika-migrate-tool only support 1 db";
214-
}
215-
216210
if (databases_ < 1 || databases_ > MAX_DB_NUM) {
217211
LOG(FATAL) << "config databases error, limit [1 ~ 8], the actual is: " << databases_;
218212
}
@@ -633,22 +627,6 @@ int PikaConf::Load() {
633627
sync_window_size_.store(tmp_sync_window_size);
634628
}
635629

636-
// redis-migrate conifg args
637-
target_redis_host_ = "127.0.0.1";
638-
GetConfStr("target-redis-host", &target_redis_host_);
639-
640-
target_redis_port_ = 6379;
641-
GetConfInt("target-redis-port", &target_redis_port_);
642-
643-
target_redis_pwd_ = "";
644-
GetConfStr("target-redis-pwd" , &target_redis_pwd_);
645-
646-
sync_batch_num_ = 100;
647-
GetConfInt("sync-batch-num", &sync_batch_num_);
648-
649-
redis_sender_num_ = 8;
650-
GetConfInt("redis-sender-num", &redis_sender_num_);
651-
652630
// max conn rbuf size
653631
int tmp_max_conn_rbuf_size = PIKA_MAX_CONN_RBUF;
654632
GetConfIntHuman("max-conn-rbuf-size", &tmp_max_conn_rbuf_size);

src/pika_db.cc

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -473,9 +473,6 @@ bool DB::TryUpdateMasterOffset() {
473473
<< ", master_ip: " << master_ip << ", master_port: " << master_port << ", filenum: " << filenum
474474
<< ", offset: " << offset << ", term: " << term << ", index: " << index;
475475

476-
// Retransmit Data to target redis
477-
g_pika_server->RetransmitData(dbsync_path_);
478-
479476
pstd::DeleteFile(info_path);
480477
if (!ChangeDb(dbsync_path_)) {
481478
LOG(WARNING) << "DB: " << db_name_ << ", Failed to change db";

src/pika_repl_bgworker.cc

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,6 @@ void PikaReplBgWorker::WriteDBInSyncWay(const std::shared_ptr<Cmd>& c_ptr) {
231231
&& PIKA_CACHE_NONE != g_pika_conf->cache_mode()
232232
&& c_ptr->GetDB()->cache()->CacheStatus() == PIKA_CACHE_STATUS_OK) {
233233
if (c_ptr->is_write()) {
234-
ParseAndSendPikaCommand(c_ptr);
235234
c_ptr->DoThroughDB();
236235
if (c_ptr->IsNeedUpdateCache()) {
237236
c_ptr->DoUpdateCache();
@@ -240,7 +239,6 @@ void PikaReplBgWorker::WriteDBInSyncWay(const std::shared_ptr<Cmd>& c_ptr) {
240239
LOG(WARNING) << "It is impossbile to reach here";
241240
}
242241
} else {
243-
ParseAndSendPikaCommand(c_ptr);
244242
c_ptr->Do();
245243
}
246244
if (!c_ptr->IsSuspend()) {
@@ -276,33 +274,3 @@ void PikaReplBgWorker::WriteDBInSyncWay(const std::shared_ptr<Cmd>& c_ptr) {
276274
}
277275
}
278276
}
279-
280-
void PikaReplBgWorker::ParseAndSendPikaCommand(const std::shared_ptr<Cmd>& c_ptr) {
281-
const PikaCmdArgsType& argv = c_ptr->argv();
282-
if (!strcasecmp(argv[0].data(), "pksetexat")) {
283-
if (argv.size() != 4) {
284-
LOG(WARNING) << "find invaild command, command size: " << argv.size();
285-
return;
286-
} else {
287-
std::string key = argv[1];
288-
int timestamp = std::atoi(argv[2].data());
289-
std::string value = argv[3];
290-
291-
int seconds = timestamp - time(NULL);
292-
PikaCmdArgsType tmp_argv;
293-
tmp_argv.push_back("setex");
294-
tmp_argv.push_back(key);
295-
tmp_argv.push_back(std::to_string(seconds));
296-
tmp_argv.push_back(value);
297-
298-
std::string command;
299-
net::SerializeRedisCommand(tmp_argv, &command);
300-
g_pika_server->SendRedisCommand(command, key);
301-
}
302-
} else {
303-
std::string key = argv.size() >= 2 ? argv[1] : argv[0];
304-
std::string command;
305-
net::SerializeRedisCommand(argv, &command);
306-
g_pika_server->SendRedisCommand(command, key);
307-
}
308-
}

src/pika_repl_client_conn.cc

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,6 @@ void PikaReplClientConn::HandleMetaSyncResponse(void* arg) {
154154
LOG(INFO) << "Finish to handle meta sync response";
155155
}
156156

157-
static bool alerady_full_sync = false;
158157
void PikaReplClientConn::HandleDBSyncResponse(void* arg) {
159158
std::unique_ptr<ReplClientTaskArg> task_arg(static_cast<ReplClientTaskArg*>(arg));
160159
std::shared_ptr<net::PbConn> conn = task_arg->conn;
@@ -179,16 +178,12 @@ void PikaReplClientConn::HandleDBSyncResponse(void* arg) {
179178
return;
180179
}
181180

182-
if (!alerady_full_sync) {
183-
alerady_full_sync = true;
184-
} else {
185-
LOG(FATAL) << "DBSyncResponse should only be called once";
186-
}
187-
188181
slave_db->SetMasterSessionId(session_id);
182+
189183
slave_db->StopRsync();
190184
slave_db->SetReplState(ReplState::kWaitDBSync);
191185
LOG(INFO) << "DB: " << db_name << " Need Wait To Sync";
186+
192187
//now full sync is starting, add an unfinished full sync count
193188
g_pika_conf->AddInternalUsedUnfinishedFullSync(slave_db->DBName());
194189
}

src/pika_server.cc

Lines changed: 1 addition & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,6 @@
2424
#include "include/pika_monotonic_time.h"
2525
#include "include/pika_rm.h"
2626
#include "include/pika_server.h"
27-
#include "include/pika_sender.h"
28-
#include "include/migrator_thread.h"
2927

3028
using pstd::Status;
3129
extern PikaServer* g_pika_server;
@@ -103,15 +101,6 @@ PikaServer::PikaServer()
103101
}
104102
}
105103

106-
// Create redis sender
107-
for (int i = 0; i < g_pika_conf->redis_sender_num(); ++i) {
108-
redis_senders_.emplace_back(std::make_unique<RedisSender>(int(i),
109-
g_pika_conf->target_redis_host(),
110-
g_pika_conf->target_redis_port(),
111-
g_pika_conf->target_redis_pwd()));
112-
}
113-
114-
115104
acl_ = std::make_unique<::Acl>();
116105
SetSlowCmdThreadPoolFlag(g_pika_conf->slow_cmd_pool());
117106
bgsave_thread_.set_thread_name("PikaServer::bgsave_thread_");
@@ -140,10 +129,7 @@ PikaServer::~PikaServer() {
140129
bgsave_thread_.StopThread();
141130
key_scan_thread_.StopThread();
142131
pika_migrate_thread_->StopThread();
143-
for (size_t i = 0; i < redis_senders_.size(); ++i) {
144-
redis_senders_[i]->Stop();
145-
}
146-
redis_senders_.clear();
132+
147133
dbs_.clear();
148134

149135
LOG(INFO) << "PikaServer " << pthread_self() << " exit!!!";
@@ -223,15 +209,6 @@ void PikaServer::Start() {
223209
<< (ret == net::kCreateThreadError ? ": create thread error " : ": other error");
224210
}
225211

226-
for (size_t i = 0; i < redis_senders_.size(); ++i) {
227-
ret = redis_senders_[i]->StartThread();
228-
if (ret != net::kSuccess) {
229-
dbs_.clear();
230-
LOG(FATAL) << "Start RedisSender Error: " << ret
231-
<< (ret == net::kCreateThreadError ? ": create thread error " : ": other error");
232-
}
233-
}
234-
235212
time(&start_time_s_);
236213
LOG(INFO) << "Pika Server going to start";
237214
rsync_server_->Start();
@@ -1565,77 +1542,6 @@ Status PikaServer::GetCmdRouting(std::vector<net::RedisCmdArgsType>& redis_cmds,
15651542
return Status::OK();
15661543
}
15671544

1568-
int PikaServer::SendRedisCommand(const std::string& command, const std::string& key) {
1569-
// Send command
1570-
size_t idx = std::hash<std::string>()(key) % redis_senders_.size();
1571-
redis_senders_[idx]->SendRedisCommand(command);
1572-
return 0;
1573-
}
1574-
1575-
void PikaServer::RetransmitData(const std::string& path) {
1576-
std::shared_ptr<storage::Storage> storage_ = std::make_shared<storage::Storage>();
1577-
rocksdb::Status s = storage_->Open(g_pika_server->storage_options(), path);
1578-
1579-
if (!s.ok()) {
1580-
LOG(FATAL) << "open received database error: " << s.ToString();
1581-
return;
1582-
}
1583-
1584-
// Init SenderThread
1585-
int thread_num = g_pika_conf->redis_sender_num();
1586-
std::string target_host = g_pika_conf->target_redis_host();
1587-
int target_port = g_pika_conf->target_redis_port();
1588-
std::string target_pwd = g_pika_conf->target_redis_pwd();
1589-
1590-
LOG(INFO) << "open received database success, start retransmit data to redis("
1591-
<< target_host << ":" << target_port << ")";
1592-
1593-
1594-
std::vector<std::shared_ptr<PikaSender>> pika_senders;
1595-
std::vector<std::shared_ptr<MigratorThread>> migrators;
1596-
1597-
for (int i = 0; i < thread_num; i++) {
1598-
pika_senders.emplace_back(std::make_shared<PikaSender>(target_host, target_port, target_pwd));
1599-
}
1600-
migrators.emplace_back(std::make_shared<MigratorThread>(storage_, &pika_senders, storage::kStrings, thread_num));
1601-
migrators.emplace_back(std::make_shared<MigratorThread>(storage_, &pika_senders, storage::kLists, thread_num));
1602-
migrators.emplace_back(std::make_shared<MigratorThread>(storage_, &pika_senders, storage::kHashes, thread_num));
1603-
migrators.emplace_back(std::make_shared<MigratorThread>(storage_, &pika_senders, storage::kSets, thread_num));
1604-
migrators.emplace_back(std::make_shared<MigratorThread>(storage_, &pika_senders, storage::kZSets, thread_num));
1605-
1606-
for (size_t i = 0; i < pika_senders.size(); i++) {
1607-
pika_senders[i]->StartThread();
1608-
}
1609-
for (size_t i = 0; i < migrators.size(); i++) {
1610-
migrators[i]->StartThread();
1611-
}
1612-
1613-
for (size_t i = 0; i < migrators.size(); i++) {
1614-
migrators[i]->JoinThread();
1615-
}
1616-
for (size_t i = 0; i < pika_senders.size(); i++) {
1617-
pika_senders[i]->Stop();
1618-
}
1619-
for (size_t i = 0; i < pika_senders.size(); i++) {
1620-
pika_senders[i]->JoinThread();
1621-
}
1622-
1623-
int64_t replies = 0, records = 0;
1624-
for (size_t i = 0; i < migrators.size(); i++) {
1625-
records += migrators[i]->num();
1626-
}
1627-
migrators.clear();
1628-
for (size_t i = 0; i < pika_senders.size(); i++) {
1629-
replies += pika_senders[i]->elements();
1630-
}
1631-
pika_senders.clear();
1632-
1633-
LOG(INFO) << "=============== Retransmit Finish =====================";
1634-
LOG(INFO) << "Total records : " << records << " have been Scaned";
1635-
LOG(INFO) << "Total replies : " << replies << " received from redis server";
1636-
LOG(INFO) << "=======================================================";
1637-
}
1638-
16391545
void PikaServer::ServerStatus(std::string* info) {
16401546
std::stringstream tmp_stream;
16411547
size_t q_size = ClientProcessorThreadPoolCurQueueSize();

tools/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ add_subdirectory(./benchmark_client)
33
add_subdirectory(./binlog_sender)
44
add_subdirectory(./manifest_generator)
55
add_subdirectory(./rdb_to_pika)
6+
add_subdirectory(./pika_migrate)
67
add_subdirectory(./pika_to_txt)
78
add_subdirectory(./txt_to_pika)
89
add_subdirectory(./pika-port/pika_port_3)

0 commit comments

Comments
 (0)