Skip to content

Commit bf41bf1

Browse files
committed
feat: 独立dump + 即时清理机制,解决Pika全量同步孤儿文件问题:
1. 每个Slave独占一个dump目录(dump-YYYYMMDD-NN格式) 2. 传输完成后立即清理文件,释放磁盘空间 3. 限制并发dump数量为3个 4. 完善dump完整性检查和占用管理 备注:1.未兼容多database场景 2.多slave同时全量同步时,清理存在异常 3.当天多次手动同步时孤儿文件清理能力下降
1 parent 974276b commit bf41bf1

14 files changed

Lines changed: 938 additions & 62 deletions

File tree

CMakeLists.txt

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,13 @@ endif()
88
set(CMAKE_CXX_STANDARD 17)
99
project(pika)
1010
set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
11-
enable_testing()
11+
12+
# Option to control whether tests are built
13+
option(BUILD_TESTS "Build tests" ON)
14+
15+
if(BUILD_TESTS)
16+
enable_testing()
17+
endif()
1218

1319
if (CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
1420
# using Clang
@@ -760,6 +766,7 @@ if (USE_PIKA_TOOLS)
760766
add_subdirectory(tools)
761767
endif()
762768
aux_source_directory(src DIR_SRCS)
769+
list(REMOVE_ITEM DIR_SRCS "src/build_version.cc")
763770

764771
# # generate version
765772
string(TIMESTAMP TS "%Y-%m-%d %H:%M:%S")

conf/pika.conf

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -496,7 +496,7 @@ default-slot-num : 1024
496496

497497
# rate limiter bandwidth, units in bytes, default 1024GB/s (No limit)
498498
# [Support Dynamically changeable] send 'rate-limiter-bandwidth' to a running pika can change it's value dynamically
499-
#rate-limiter-bandwidth : 1099511627776
499+
rate-limiter-bandwidth : 109951162
500500

501501
#rate-limiter-refill-period-us : 100000
502502
#
@@ -505,7 +505,7 @@ default-slot-num : 1024
505505
# if auto_tuned is true: Enables dynamic adjustment of rate limit within the range
506506
#`[rate-limiter-bandwidth / 20, rate-limiter-bandwidth]`, according to the recent demand for background I/O.
507507
# rate limiter auto tune https://rocksdb.org/blog/2017/12/18/17-auto-tuned-rate-limiter.html. the default value is true.
508-
#rate-limiter-auto-tuned : yes
508+
rate-limiter-auto-tuned : no
509509

510510
################################## RocksDB Blob Configure #####################
511511
# rocksdb blob configure
@@ -673,7 +673,7 @@ internal-used-unfinished-full-sync :
673673
# for wash data from 4.0.0 to 4.0.1
674674
# https://github.com/OpenAtomFoundation/pika/issues/2886
675675
# default value: true
676-
wash-data: true
676+
wash-data: false
677677

678678
# Pika automatic compact compact strategy, a complement to rocksdb compact.
679679
# Trigger the compact background task periodically according to `compact-interval`

include/pika_server.h

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,40 @@ class PikaServer : public pstd::noncopyable {
212212
pstd::Status GetDumpMeta(const std::string& db_name, std::vector<std::string>* files, std::string* snapshot_uuid);
213213
void TryDBSync(const std::string& ip, int port, const std::string& db_name, int32_t top);
214214

215+
/*
216+
* Rsync snapshot tracking (for orphan file cleanup protection)
217+
*/
218+
void RegisterRsyncSnapshot(const std::string& snapshot_uuid);
219+
void UnregisterRsyncSnapshot(const std::string& snapshot_uuid);
220+
bool IsRsyncSnapshotActive(const std::string& snapshot_uuid);
221+
std::set<std::string> GetActiveRsyncSnapshots();
222+
223+
/*
224+
* Rsync file transfer tracking (for safe orphan file cleanup during sync)
225+
*/
226+
void RegisterRsyncTransferringFile(const std::string& snapshot_uuid, const std::string& filename);
227+
void UnregisterRsyncTransferringFile(const std::string& snapshot_uuid, const std::string& filename);
228+
bool IsRsyncFileTransferring(const std::string& snapshot_uuid, const std::string& filename);
229+
std::set<std::string> GetRsyncTransferringFiles(const std::string& snapshot_uuid);
230+
231+
/*
232+
* Dump ownership management (Scheme A: each slave has exclusive dump)
233+
*/
234+
bool MarkDumpInUse(const std::string& snapshot_uuid, const std::string& conn_id, const std::string& dump_path);
235+
void ReleaseDump(const std::string& snapshot_uuid);
236+
bool IsDumpInUse(const std::string& snapshot_uuid) const;
237+
std::string GetDumpPathBySnapshot(const std::string& snapshot_uuid) const;
238+
size_t GetActiveDumpCount() const;
239+
static constexpr size_t kMaxConcurrentDumps = 3; // Max concurrent dumps allowed
240+
241+
/*
242+
* Delayed file cleanup for orphan SST files (Scheme A)
243+
* Files are scheduled for cleanup 10 minutes after transfer completes
244+
* to allow for retries and ensure data consistency
245+
*/
246+
void ScheduleFileForCleanup(const std::string& filepath, int delay_seconds);
247+
void ProcessPendingCleanupFiles();
248+
215249
/*
216250
* Keyscan used
217251
*/
@@ -498,6 +532,13 @@ class PikaServer : public pstd::noncopyable {
498532
*/
499533
void DisableCompact();
500534

535+
/*
536+
* Utility function to ensure directory exists
537+
* Returns true if directory exists or was created successfully
538+
* Handles the special case where CreatePath returns 0 for both success and "already exists"
539+
*/
540+
static bool EnsureDirExists(const std::string& path, mode_t mode = 0755);
541+
501542
/*
502543
* lastsave used
503544
*/
@@ -605,6 +646,41 @@ class PikaServer : public pstd::noncopyable {
605646
std::unique_ptr<PikaRsyncService> pika_rsync_service_;
606647
std::unique_ptr<rsync::RsyncServer> rsync_server_;
607648

649+
/*
650+
* Rsync snapshot tracking used (for orphan file cleanup protection)
651+
*/
652+
std::set<std::string> active_rsync_snapshots_;
653+
std::mutex active_rsync_snapshots_mutex_;
654+
655+
/*
656+
* Rsync file transfer tracking used (for safe orphan file cleanup during sync)
657+
* Tracks which files are currently being transferred for each snapshot
658+
*/
659+
std::map<std::string, std::set<std::string>> rsync_transferring_files_;
660+
std::mutex rsync_transferring_files_mutex_;
661+
662+
/*
663+
* Dump ownership tracking used (Scheme A: each slave has exclusive dump)
664+
* snapshot_uuid -> {connection id, dump path}
665+
*/
666+
struct DumpOwnerInfo {
667+
std::string conn_id;
668+
std::string dump_path;
669+
};
670+
std::map<std::string, DumpOwnerInfo> dump_owners_;
671+
mutable std::mutex dump_owners_mutex_;
672+
673+
/*
674+
* Pending cleanup tracking for delayed file deletion (Scheme A)
675+
* filepath -> {cleanup_time}
676+
*/
677+
struct PendingCleanupInfo {
678+
std::string filepath;
679+
time_t cleanup_time;
680+
};
681+
std::map<std::string, PendingCleanupInfo> pending_cleanup_files_;
682+
mutable std::mutex pending_cleanup_mutex_;
683+
608684
/*
609685
* Pubsub used
610686
*/

include/rsync_server.h

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,30 @@ class RsyncServerConn : public net::PbConn {
5454
int DealMessage() override;
5555
static void HandleMetaRsyncRequest(void* arg);
5656
static void HandleFileRsyncRequest(void* arg);
57+
58+
// Snapshot tracking for orphan file cleanup protection
59+
void RegisterSnapshot(const std::string& snapshot_uuid);
60+
void UnregisterSnapshot();
61+
std::string GetSnapshotUuid() const { return snapshot_uuid_; }
62+
63+
// File transfer tracking for safe orphan file cleanup during sync
64+
void AddTransferringFile(const std::string& filename);
65+
// Remove file from transfer tracking, optionally cleanup if transfer is complete (is_eof=true)
66+
void RemoveTransferringFile(const std::string& filename, bool is_eof = false);
67+
bool IsFileTransferring(const std::string& filename) const;
68+
std::set<std::string> GetTransferringFiles() const;
69+
// Global check if a file is being transferred by any connection
70+
static bool IsFileTransferringGlobally(const std::string& snapshot_uuid, const std::string& filename);
71+
72+
// Public member for dump ownership tracking (Scheme A)
73+
std::string conn_id_; // Connection ID for dump ownership tracking
74+
5775
private:
5876
std::vector<std::shared_ptr<RsyncReader> > readers_;
59-
std::mutex mu_;
77+
mutable std::mutex mu_;
6078
void* data_ = nullptr;
79+
std::string snapshot_uuid_; // Current snapshot being synced
80+
std::set<std::string> transferring_files_; // Files currently being read
6181
};
6282

6383
class RsyncServerThread : public net::HolyThread {

src/pika_db.cc

Lines changed: 103 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
// LICENSE file in the root directory of this source tree. An additional grant
44
// of patent rights can be found in the PATENTS file in the same directory.
55

6+
#include <sys/stat.h>
67
#include <fstream>
78
#include <utility>
89

@@ -300,8 +301,9 @@ bool DB::RunBgsaveEngine() {
300301
LOG(INFO) << db_name_ << " bgsave_info: path=" << info.path << ", filenum=" << info.offset.b_offset.filenum
301302
<< ", offset=" << info.offset.b_offset.offset;
302303

303-
// Backup to tmp dir
304-
rocksdb::Status s = bgsave_engine_->CreateNewBackup(info.path);
304+
// Use SetBackupContentAndCreate to minimize time window between GetLiveFiles and CreateCheckpoint
305+
// This reduces the chance of compaction occurring and creating orphan files
306+
rocksdb::Status s = bgsave_engine_->SetBackupContentAndCreate(info.path);
305307

306308
if (!s.ok()) {
307309
LOG(WARNING) << db_name_ << " create new backup failed :" << s.ToString();
@@ -324,29 +326,61 @@ void DB::FinishBgsave() {
324326
}
325327

326328
// Prepare engine, need bgsave_protector protect
329+
// Scheme A: Each slave has exclusive dump, so we need unique dump directories
327330
bool DB::InitBgsaveEnv() {
328331
std::lock_guard l(bgsave_protector_);
329332
// Prepare for bgsave dir
330333
bgsave_info_.start_time = time(nullptr);
331334
char s_time[32];
332335
int len = static_cast<int32_t>(strftime(s_time, sizeof(s_time), "%Y%m%d%H%M%S", localtime(&bgsave_info_.start_time)));
333336
bgsave_info_.s_start_time.assign(s_time, len);
334-
std::string time_sub_path = g_pika_conf->bgsave_prefix() + std::string(s_time, 8);
335-
bgsave_info_.path = g_pika_conf->bgsave_path() + time_sub_path + "/" + bgsave_sub_path_;
336-
if (!pstd::DeleteDirIfExist(bgsave_info_.path)) {
337-
LOG(WARNING) << db_name_ << " remove exist bgsave dir failed";
337+
338+
// Scheme A: Use unique directory name with sequence number
339+
// Format: dump-YYYYMMDD-NN/db_name where NN is sequence number
340+
std::string base_path = g_pika_conf->bgsave_path();
341+
std::string date_str(s_time, 8);
342+
std::string prefix = g_pika_conf->bgsave_prefix() + date_str;
343+
344+
// Find first available sequence number
345+
int seq = 0;
346+
std::string time_sub_path;
347+
std::string full_path;
348+
do {
349+
time_sub_path = prefix + "-" + std::to_string(seq);
350+
full_path = base_path + time_sub_path + "/" + bgsave_sub_path_;
351+
seq++;
352+
} while (pstd::FileExists(full_path) && seq < 1000); // Max 1000 dumps per day
353+
354+
if (seq >= 1000) {
355+
LOG(ERROR) << db_name_ << " too many dump directories for today";
338356
return false;
339357
}
340-
pstd::CreatePath(bgsave_info_.path, 0755);
341-
// Prepare for failed dir
342-
if (!pstd::DeleteDirIfExist(bgsave_info_.path + "_FAILED")) {
343-
LOG(WARNING) << db_name_ << " remove exist fail bgsave dir failed :";
358+
359+
bgsave_info_.path = full_path;
360+
LOG(INFO) << db_name_ << " preparing bgsave dir: " << bgsave_info_.path;
361+
362+
// Note: In Scheme A, we don't delete existing directories
363+
// because other slaves may be using them
364+
// Just create the new path
365+
if (!PikaServer::EnsureDirExists(bgsave_info_.path, 0755)) {
366+
LOG(WARNING) << db_name_ << " create bgsave dir failed: " << bgsave_info_.path
367+
<< ", errno=" << errno << ", error=" << strerror(errno);
368+
// Clear the path on failure to avoid using invalid path in GetDumpMeta
369+
bgsave_info_.path.clear();
344370
return false;
345371
}
372+
373+
// Prepare for failed dir
374+
std::string failed_dir = bgsave_info_.path + "_FAILED";
375+
if (pstd::FileExists(failed_dir)) {
376+
pstd::DeleteDirIfExist(failed_dir);
377+
}
346378
return true;
347379
}
348380

349381
// Prepare bgsave env, need bgsave_protector protect
382+
// Note: SetBackupContent is now done in RunBgsaveEngine using SetBackupContentAndCreate
383+
// to minimize time window between GetLiveFiles and CreateCheckpoint
350384
bool DB::InitBgsaveEngine() {
351385
bgsave_engine_.reset();
352386
rocksdb::Status s = storage::BackupEngine::Open(storage().get(), bgsave_engine_, g_pika_conf->db_instance_num());
@@ -371,11 +405,7 @@ bool DB::InitBgsaveEngine() {
371405
std::lock_guard l(bgsave_protector_);
372406
bgsave_info_.offset = bgsave_offset;
373407
}
374-
s = bgsave_engine_->SetBackupContent();
375-
if (!s.ok()) {
376-
LOG(WARNING) << db_name_ << " set backup content failed " << s.ToString();
377-
return false;
378-
}
408+
// SetBackupContent is now done in RunBgsaveEngine to minimize time window
379409
}
380410
return true;
381411
}
@@ -390,25 +420,73 @@ void DB::Init() {
390420

391421
void DB::GetBgSaveMetaData(std::vector<std::string>* fileNames, std::string* snapshot_uuid) {
392422
const std::string dbPath = bgsave_info().path;
423+
size_t total_sst_files = 0;
424+
size_t orphan_sst_files = 0;
425+
426+
LOG(INFO) << "[GetBgSaveMetaData] Starting scan, dbPath=" << dbPath;
427+
428+
// dbPath is already the specific DB path (e.g., .../dump/dump-9454-20260302/db0)
429+
// We need to scan its subdirectories (0, 1, 2 for rocksdb instances)
430+
std::vector<std::string> subDirs;
431+
int ret = pstd::GetChildren(dbPath, subDirs);
432+
LOG(INFO) << "[GetBgSaveMetaData] GetChildren for dbPath returned " << ret
433+
<< ", subDirs count=" << subDirs.size();
434+
if (ret) {
435+
LOG(WARNING) << "[GetBgSaveMetaData] Failed to read dbPath: " << dbPath;
436+
return;
437+
}
393438

394-
int db_instance_num = g_pika_conf->db_instance_num();
395-
for (int index = 0; index < db_instance_num; index++) {
396-
std::string instPath = dbPath + ((dbPath.back() != '/') ? "/" : "") + std::to_string(index);
397-
if (!pstd::FileExists(instPath)) {
398-
continue ;
439+
for (const std::string& subDir : subDirs) {
440+
std::string instPath = dbPath + "/" + subDir;
441+
// Skip if not exists or is a file (not directory)
442+
// Note: IsDir returns 0 for directory, 1 for file, -1 for error
443+
if (!pstd::FileExists(instPath) || pstd::IsDir(instPath) != 0) {
444+
continue;
399445
}
400446

401447
std::vector<std::string> tmpFileNames;
402-
int ret = pstd::GetChildren(instPath, tmpFileNames);
448+
ret = pstd::GetChildren(instPath, tmpFileNames);
403449
if (ret) {
404-
LOG(WARNING) << dbPath << " read dump meta files failed, path " << instPath;
405-
return;
450+
LOG(WARNING) << "[GetBgSaveMetaData] Failed to read instPath: " << instPath;
451+
continue;
406452
}
407453

408-
for (const std::string fileName : tmpFileNames) {
409-
fileNames -> push_back(std::to_string(index) + "/" + fileName);
454+
for (const std::string& fileName : tmpFileNames) {
455+
std::string fullPath = instPath + "/" + fileName;
456+
struct stat st;
457+
// Check if file exists and get its stat
458+
if (stat(fullPath.c_str(), &st) != 0) {
459+
// File doesn't exist, skip it
460+
LOG(WARNING) << "[GetBgSaveMetaData] File does not exist: " << fullPath;
461+
continue;
462+
}
463+
464+
// Check if it's an SST file and if it's an orphan (Links=1)
465+
if (fileName.size() > 4 && fileName.substr(fileName.size() - 4) == ".sst") {
466+
total_sst_files++;
467+
if (st.st_nlink == 1) {
468+
// This is an orphan file, but we need to include it in the meta
469+
// to ensure data consistency. The file will be cleaned up after
470+
// a delay to allow for retries.
471+
orphan_sst_files++;
472+
LOG(INFO) << "[GetBgSaveMetaData] Including orphan SST file: " << fullPath
473+
<< ", size=" << st.st_size;
474+
// NOTE: We no longer skip orphan files here. They will be included
475+
// in the file list and cleaned up with a delay after transfer.
476+
}
477+
}
478+
// Construct relative path like "0/xxx.sst" or "1/xxx.sst"
479+
fileNames->push_back(subDir + "/" + fileName);
410480
}
411481
}
482+
483+
if (orphan_sst_files > 0) {
484+
LOG(INFO) << "[GetBgSaveMetaData] Summary for " << dbPath
485+
<< ": total_sst=" << total_sst_files
486+
<< ", orphan_included=" << orphan_sst_files
487+
<< ", returned=" << fileNames->size();
488+
}
489+
412490
fileNames->push_back(kBgsaveInfoFile);
413491
pstd::Status s = GetBgSaveUUID(snapshot_uuid);
414492
if (!s.ok()) {

src/pika_rm.cc

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
#include <netinet/in.h>
1111
#include <sys/socket.h>
1212

13+
#include <ctime>
14+
#include <map>
1315
#include <utility>
1416

1517
#include "net/include/net_cli.h"
@@ -507,7 +509,16 @@ pstd::Status SyncSlaveDB::ActivateRsync() {
507509
if (!rsync_cli_->IsIdle()) {
508510
return s;
509511
}
510-
LOG(WARNING) << "Slave DB: " << DBName() << " Activating Rsync ... (retry count:" << rsync_init_retry_count_ << ")";
512+
// Rate limiting for retry logs - only log once per 30 seconds to reduce noise
513+
static std::map<std::string, time_t> last_retry_log_time;
514+
time_t now = time(nullptr);
515+
std::string db_key = DBName();
516+
bool should_log = (last_retry_log_time.find(db_key) == last_retry_log_time.end() ||
517+
now - last_retry_log_time[db_key] >= 30);
518+
if (should_log) {
519+
LOG(WARNING) << "Slave DB: " << DBName() << " Activating Rsync ... (retry count:" << rsync_init_retry_count_ << ")";
520+
last_retry_log_time[db_key] = now;
521+
}
511522
if (rsync_cli_->Init()) {
512523
rsync_init_retry_count_ = 0;
513524
rsync_cli_->Start();

0 commit comments

Comments
 (0)