From f2b8fbf1223f7523d06665f9b52ea0302acb2765 Mon Sep 17 00:00:00 2001 From: chenbt Date: Tue, 3 Feb 2026 17:51:38 +0800 Subject: [PATCH 01/11] fix: ttl error for hlen update cache --- src/pika_command.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pika_command.cc b/src/pika_command.cc index 93455644ef..1342efe2c9 100644 --- a/src/pika_command.cc +++ b/src/pika_command.cc @@ -417,7 +417,7 @@ void InitCmdTable(CmdTable* cmd_table) { cmd_table->insert(std::pair>(kCmdNameHKeys, std::move(hkeysptr))); ////HLenCmd std::unique_ptr hlenptr = - std::make_unique(kCmdNameHLen, 2, kCmdFlagsRead | kCmdFlagsHash | kCmdFlagsUpdateCache | kCmdFlagsDoThroughDB | kCmdFlagsFast | kCmdFlagsReadCache); + std::make_unique(kCmdNameHLen, 2, kCmdFlagsRead | kCmdFlagsHash | kCmdFlagsDoThroughDB | kCmdFlagsFast | kCmdFlagsReadCache); cmd_table->insert(std::pair>(kCmdNameHLen, std::move(hlenptr))); ////HMgetCmd std::unique_ptr hmgetptr = From f2970d4337206b77a2fa293e5426767a3c6baacc Mon Sep 17 00:00:00 2001 From: chenbt Date: Mon, 9 Mar 2026 15:32:02 +0800 Subject: [PATCH 02/11] Revert "fix: ttl error for hlen update cache" This reverts commit 7be1b42f072b474afec46a99ee4f4e362a94906d. --- src/pika_command.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pika_command.cc b/src/pika_command.cc index 1342efe2c9..93455644ef 100644 --- a/src/pika_command.cc +++ b/src/pika_command.cc @@ -417,7 +417,7 @@ void InitCmdTable(CmdTable* cmd_table) { cmd_table->insert(std::pair>(kCmdNameHKeys, std::move(hkeysptr))); ////HLenCmd std::unique_ptr hlenptr = - std::make_unique(kCmdNameHLen, 2, kCmdFlagsRead | kCmdFlagsHash | kCmdFlagsDoThroughDB | kCmdFlagsFast | kCmdFlagsReadCache); + std::make_unique(kCmdNameHLen, 2, kCmdFlagsRead | kCmdFlagsHash | kCmdFlagsUpdateCache | kCmdFlagsDoThroughDB | kCmdFlagsFast | kCmdFlagsReadCache); cmd_table->insert(std::pair>(kCmdNameHLen, std::move(hlenptr))); ////HMgetCmd std::unique_ptr hmgetptr = From 8103862d6d556db2405824a4a60e5d729421ad12 Mon Sep 17 00:00:00 2001 From: chenbt Date: Mon, 16 Mar 2026 12:00:51 +0800 Subject: [PATCH 03/11] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=E6=B8=90?= =?UTF-8?q?=E8=BF=9B=E5=BC=8F=20compact=20(incremental-compact)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 compaction-strategy: incremental-compact - 使用 CompactFiles 每次处理少量最老的 SST 文件 - 可配置单次处理文件数、执行时间、压缩率阈值 - 与 dump 硬连接方案分离,独立测试 --- .claude/settings.local.json | 14 + .gitignore | 4 + check_delayed_cleanup.sh | 104 +++++ check_orphan_files.sh | 83 ++++ conf/pika.conf | 30 +- docs/pika_full_sync_solution.md | 587 ++++++++++++++++++++++++++ docs/sync_solution_analysis.md | 182 ++++++++ include/pika_conf.h | 35 +- include/pika_db.h | 1 + include/pika_server.h | 1 + src/pika_conf.cc | 35 ++ src/pika_db.cc | 11 + src/pika_server.cc | 5 + src/storage/include/storage/storage.h | 13 + src/storage/src/redis.cc | 106 +++++ src/storage/src/redis.h | 6 + src/storage/src/storage.cc | 28 ++ 17 files changed, 1240 insertions(+), 5 deletions(-) create mode 100644 .claude/settings.local.json create mode 100644 check_delayed_cleanup.sh create mode 100644 check_orphan_files.sh create mode 100644 docs/pika_full_sync_solution.md create mode 100644 docs/sync_solution_analysis.md diff --git a/.claude/settings.local.json b/.claude/settings.local.json new file mode 100644 index 0000000000..020c30a5b1 --- /dev/null +++ b/.claude/settings.local.json @@ -0,0 +1,14 @@ +{ + "permissions": { + "allow": [ + "Bash(claude doctor:*)", + "WebFetch(domain:github.com)", + "Bash(ls -la:*)", + "Bash(git stash push:*)", + "Bash(git fetch:*)", + "Bash(git checkout:*)", + "Bash(git stash pop:*)", + "Bash(git add:*)" + ] + } +} diff --git a/.gitignore b/.gitignore index ab567194a1..80f26c7561 100644 --- a/.gitignore +++ b/.gitignore @@ -74,3 +74,7 @@ pkg !codis/cmd/fe/assets/** tests/tmp +./.claude +CLAUDE.md +./.claude +CLAUDE.md \ No newline at end of file diff --git a/check_delayed_cleanup.sh b/check_delayed_cleanup.sh new file mode 100644 index 0000000000..231716e1e1 --- /dev/null +++ b/check_delayed_cleanup.sh @@ -0,0 +1,104 @@ +#!/bin/bash +# 分析日志,找出被添加到延迟删除队列但实际没有执行删除的文件 + +LOG_FILE="${1:-/app/pika4/pika-9454/log/pika.INFO}" + +echo "==========================================" +echo "分析延迟清理队列执行情况" +echo "日志文件: $LOG_FILE" +echo "==========================================" + +# 提取所有 Scheduled 的文件(延迟删除调度) +echo "" +echo "步骤 1: 提取所有被调度的文件..." +grep "Scheduled file for delayed cleanup" "$LOG_FILE" 2>/dev/null | \ + sed 's/.*cleanup: //g' | \ + awk '{print $1}' | \ + grep "^/" | \ + sort -u > /tmp/scheduled_files.txt + +scheduled_count=$(wc -l < /tmp/scheduled_files.txt 2>/dev/null | tr -d ' ') +echo " 被调度的文件数: $scheduled_count" + +# 提取所有 Deleted 的文件(实际删除) +echo "" +echo "步骤 2: 提取所有实际删除的文件..." +grep "Deleted delayed cleanup file" "$LOG_FILE" 2>/dev/null | \ + sed 's/.*Deleted delayed cleanup file: //g' | \ + awk '{print $1}' | \ + grep "^/" | \ + sort -u > /tmp/deleted_files.txt + +deleted_count=$(wc -l < /tmp/deleted_files.txt 2>/dev/null | tr -d ' ') +echo " 实际删除的文件数: $deleted_count" + +# 如果没有任何记录,直接退出 +if [ "$scheduled_count" -eq 0 ] && [ "$deleted_count" -eq 0 ]; then + echo "" + echo "未找到任何延迟清理相关日志" + rm -f /tmp/scheduled_files.txt /tmp/deleted_files.txt + exit 0 +fi + +# 找出被调度但未删除的文件 +echo "" +echo "步骤 3: 找出被调度但未删除的文件..." +comm -23 /tmp/scheduled_files.txt /tmp/deleted_files.txt > /tmp/missing_files.txt + +missing_count=$(wc -l < /tmp/missing_files.txt 2>/dev/null | tr -d ' ') +echo " 被调度但未删除的文件数: $missing_count" + +if [ "$missing_count" -gt 0 ]; then + echo "" + echo "==========================================" + echo "被调度但未删除的文件列表:" + echo "==========================================" + + while IFS= read -r filepath; do + if [ -n "$filepath" ]; then + # 检查文件是否仍然存在 + if [ -f "$filepath" ]; then + # 获取文件大小和 nlink + size=$(stat -c %s "$filepath" 2>/dev/null || stat -f %z "$filepath" 2>/dev/null) + nlink=$(stat -c %h "$filepath" 2>/dev/null || stat -f %l "$filepath" 2>/dev/null) + + if command -v numfmt >/dev/null 2>&1; then + human_size=$(numfmt --to=iec-i --suffix=B "$size" 2>/dev/null) + else + human_size="${size} bytes" + fi + + echo " [仍存在] $filepath" + echo " 大小: $human_size, 硬链接数: $nlink" + + # 查找该文件的调度时间 + scheduled_time=$(grep "Scheduled file for delayed cleanup.*$filepath" "$LOG_FILE" | tail -1 | awk '{print $1}') + if [ -n "$scheduled_time" ]; then + echo " 调度时间: $scheduled_time" + fi + echo "" + else + echo " [已消失] $filepath (可能已被其他方式删除)" + fi + fi + done < /tmp/missing_files.txt +fi + +# 清理临时文件 +rm -f /tmp/scheduled_files.txt /tmp/deleted_files.txt /tmp/missing_files.txt + +echo "" +echo "==========================================" +echo "总结:" +echo " 调度文件: $scheduled_count" +echo " 删除文件: $deleted_count" +echo " 未删除文件: $missing_count" +if [ "$missing_count" -gt 0 ]; then + echo "" + echo "⚠️ 发现 $missing_count 个文件被调度但未删除!" + echo " 可能原因:" + echo " 1. 延迟时间未到 (600秒)" + echo " 2. 文件在删除时 nlink != 1 (不再是孤儿文件)" + echo " 3. ProcessPendingCleanupFiles 未执行或执行失败" +fi +echo "==========================================" diff --git a/check_orphan_files.sh b/check_orphan_files.sh new file mode 100644 index 0000000000..bec2e954ce --- /dev/null +++ b/check_orphan_files.sh @@ -0,0 +1,83 @@ +#!/bin/bash +# 检查 dump 目录下的所有孤儿文件(nlink=1 的 SST 文件)及其大小 + +DUMP_DIR="${1:-/app/pika4/pika-9454/dump}" + +echo "==========================================" +echo "扫描 Dump 目录下的孤儿文件 (nlink=1)" +echo "目录: $DUMP_DIR" +echo "==========================================" + +# 统计总文件数、孤儿文件数、总大小 +total_files=0 +orphan_files=0 +total_orphan_size=0 + +# 遍历所有 dump 子目录 +for dump_subdir in "$DUMP_DIR"/dump-*/; do + if [ -d "$dump_subdir" ]; then + echo "" + echo "检查目录: $dump_subdir" + echo "----------------------------------------" + + # 查找所有 .sst 文件并检查 nlink + find "$dump_subdir" -name "*.sst" -type f 2>/dev/null | while read -r file; do + total_files=$((total_files + 1)) + + # 获取硬链接数 + nlink=$(stat -c %h "$file" 2>/dev/null || stat -f %l "$file" 2>/dev/null) + + if [ "$nlink" -eq 1 ]; then + # 获取文件大小 + size=$(stat -c %s "$file" 2>/dev/null || stat -f %z "$file" 2>/dev/null) + + if command -v numfmt >/dev/null 2>&1; then + human_size=$(numfmt --to=iec-i --suffix=B "$size" 2>/dev/null) + else + human_size="${size} bytes" + fi + + echo "[孤儿文件] $file (大小: $human_size)" + fi + done + fi +done + +echo "" +echo "==========================================" +echo "正在统计总数..." +echo "==========================================" + +# 重新统计 +total_files=0 +orphan_files=0 +total_orphan_size=0 + +for dump_subdir in "$DUMP_DIR"/dump-*/; do + if [ -d "$dump_subdir" ]; then + find "$dump_subdir" -name "*.sst" -type f 2>/dev/null | while read -r file; do + total_files=$((total_files + 1)) + nlink=$(stat -c %h "$file" 2>/dev/null || stat -f %l "$file" 2>/dev/null) + if [ "$nlink" -eq 1 ]; then + size=$(stat -c %s "$file" 2>/dev/null || stat -f %z "$file" 2>/dev/null) + orphan_files=$((orphan_files + 1)) + total_orphan_size=$((total_orphan_size + size)) + echo "$size $file" + fi + done + fi +done > /tmp/orphan_list.txt + +orphan_files=$(wc -l < /tmp/orphan_list.txt 2>/dev/null || echo 0) +total_orphan_size=$(awk '{sum+=$1} END {print sum}' /tmp/orphan_list.txt 2>/dev/null || echo 0) + +echo "统计结果:" +echo " 孤儿文件数: $orphan_files" +if command -v numfmt >/dev/null 2>&1; then + echo " 孤儿文件总大小: $(numfmt --to=iec-i --suffix=B $total_orphan_size 2>/dev/null || echo ${total_orphan_size}bytes)" +else + echo " 孤儿文件总大小: $total_orphan_size bytes" +fi + +rm -f /tmp/orphan_list.txt +echo "==========================================" diff --git a/conf/pika.conf b/conf/pika.conf index 4f51f9cdbd..0977dab062 100644 --- a/conf/pika.conf +++ b/conf/pika.conf @@ -677,8 +677,9 @@ wash-data: true # Pika automatic compact compact strategy, a complement to rocksdb compact. # Trigger the compact background task periodically according to `compact-interval` -# Can choose `full-compact` or `obd-compact`. +# Can choose `full-compact`, `obd-compact` or `incremental-compact`. # obd-compact https://github.com/OpenAtomFoundation/pika/issues/2255 +# incremental-compact: incremental compact using CompactFiles, processes a small number of oldest SST files each time compaction-strategy : obd-compact # For OBD_Compact @@ -704,7 +705,28 @@ force-compact-min-delete-ratio : 10 # compact every `compact-every-num-of-files` file. dont-compact-sst-created-in-seconds : 20 -# For OBD_Compact -# According to the number of sst files in rocksdb, +# For OBD_Compact +# According to the number of sst files in rocksdb, # compact every `compact-every-num-of-files` file. -best-delete-min-ratio : 10 \ No newline at end of file +best-delete-min-ratio : 10 + +# ============================================ +# For incremental-compact (when compaction-strategy = incremental-compact) +# ============================================ +# Execution interval in seconds +incremental-compact-interval : 60 + +# Maximum number of files to compact per run +incremental-compact-max-files : 1 + +# Maximum execution time per run in milliseconds +incremental-compact-max-time-ms : 1000 + +# Compression rate threshold (%), continue processing if rate is below this value +incremental-compact-min-rate : 70 + +# Target level for compact files (-1 means current level + 1) +incremental-compact-target-level : -1 + +# Minimum file age in seconds to be considered for compaction +incremental-compact-min-file-age : 60 \ No newline at end of file diff --git a/docs/pika_full_sync_solution.md b/docs/pika_full_sync_solution.md new file mode 100644 index 0000000000..ec9a793082 --- /dev/null +++ b/docs/pika_full_sync_solution.md @@ -0,0 +1,587 @@ +# Pika 全量同步方案详解(Scheme A) + +## 文档说明 + +本文档详细描述 Pika 3.5.0+ 版本的全量同步机制(Scheme A),包括各场景下的完整流程、状态变化、数据流转以及已知问题。 + +**版本信息** +- 适用版本:Pika 3.5.0+ +- 方案名称:Scheme A(独立 Dump + 延迟清理) +- 最后更新:2026-03-06 + +--- + +## 1. 架构概述 + +### 1.1 核心设计 + +**Scheme A** 采用以下设计原则: + +1. **每个 Slave 独占一个 Dump 目录**:`dump-YYYYMMDD-NN/db_name` 格式 +2. **传输完成后延迟清理**:孤儿文件(nlink=1)传输完成后加入延迟清理队列(10分钟后删除) +3. **最大并发限制**:默认最多 3 个并发 dump +4. **细粒度文件保护**:传输中的文件受保护,防止被误删 +5. **统一清理入口**:所有孤儿文件清理通过 `RemoveTransferringFile` 统一处理 + +### 1.2 关键组件 + +| 组件 | 文件 | 职责 | +|------|------|------| +| RsyncServer | `rsync_server.cc` | 处理 Slave 文件同步请求 | +| RsyncServerConn | `rsync_server.cc` | 维护单个连接的状态 | +| PikaServer | `pika_server.cc` | 管理 Dump 占用、snapshot 注册 | +| DB | `pika_db.cc` | 管理 bgsave 和 dump 元数据 | + +### 1.3 关键数据结构 + +```cpp +// Dump 占用信息 +struct DumpOwnerInfo { + std::string conn_id; // 占用连接的 ID + std::string dump_path; // dump 目录路径 +}; +std::map dump_owners_; // snapshot_uuid -> 占用信息 + +// 传输中文件保护 +std::map> rsync_transferring_files_; // snapshot_uuid -> 文件集合 + +// 活跃 snapshot +std::set active_rsync_snapshots_; // 用于孤儿文件清理保护 +``` + +--- + +## 2. 单个 Slave 单 DB 全量同步流程 + +以 `db0` 为例,详细描述 Master 和 Slave 的状态变化。 + +### 2.1 流程时序图 + +``` +阶段1: 触发全量同步 +┌─────────────┐ ┌─────────────┐ +│ Slave │ │ Master │ +└──────┬──────┘ └──────┬──────┘ + │ │ + │ 1. 判断需要全量同步 │ + │ (repl_state: kTryConnect) │ + │ │ + │ 2. 发送 DBSync 请求 │ + │ ───────────────────────────────>│ + │ │ + │ 3. 检查是否正在 bgsave + │ (IsBgSaving()) + │ │ + │ 4. 如果不在 bgsave,触发 bgsave + │ (BgSaveDB()) + │ │ + │ 5. 返回 kErr(等待 bgsave) │ + │ <───────────────────────────────│ + │ │ + │ 6. 重试(循环) │ + │ ───────────────────────────────>│ + │ 7. 如果仍在 bgsave,返回 kErr + │ <───────────────────────────────│ + │ │ + +阶段2: bgsave 执行 +┌─────────────┐ ┌─────────────┐ +│ Background │ │ Master │ +│ Thread │ │ │ +└──────┬──────┘ └──────┬──────┘ + │ │ + │ 1. 创建 dump 目录 │ + │ (InitBgsaveEnv) │ + │ dump-20260305-0/db0 │ + │ │ + │ 2. 创建 RocksDB Checkpoint │ + │ (创建硬链接) │ + │ │ + │ 3. 生成 info 文件 │ + │ │ + │ 4. bgsave 完成 │ + │ (IsBgSaving() -> false) │ + │ │ + +阶段3: Meta 请求处理 +┌─────────────┐ ┌─────────────┐ +│ Slave │ │ Master │ +└──────┬──────┘ └──────┬──────┘ + │ │ + │ 1. 再次发送 DBSync 请求 │ + │ (循环重试后 bgsave 已完成) │ + │ ───────────────────────────────>│ + │ │ + │ 2. 获取文件列表 + │ (GetDumpMeta) + │ 扫描 dump-20260305-0/db0 + │ 生成 snapshot_uuid + │ │ + │ 3. 检查 dump 完整性 + │ 4. 检查是否已被占用 + │ 5. 检查并发限制 + │ 6. 标记 dump 为占用 + │ (MarkDumpInUse) + │ 7. 注册 snapshot + │ (RegisterSnapshot) + │ 8. 预注册所有文件 + │ (AddTransferringFile) + │ │ + │ 9. 返回 Meta 响应 │ + │ (snapshot_uuid + 文件列表) │ + │ <───────────────────────────────│ + │ │ + +阶段4: 文件传输 +┌─────────────┐ ┌─────────────┐ +│ Slave │ │ Master │ +└──────┬──────┘ └──────┬──────┘ + │ │ + │ 1. 多线程下载文件 │ + │ ─────────────────────────────> │ + │ │ + │ 2. 检查文件是否存在 + │ 3. 注册文件为传输中 + │ 4. 读取文件内容 + │ 5. 注销文件 + │ 6. 如果是最后一块(is_eof) + │ 检查是否为孤儿文件(nlink=1) + │ 如果是孤儿,加入延迟清理队列(10分钟) + │ │ + │ 7. 返回文件数据 │ + │ <───────────────────────────────│ + │ │ + │ (重复直到所有文件下载完成) │ + +阶段5: 清理 +┌─────────────┐ ┌─────────────┐ +│ Slave │ │ Master │ +└──────┬──────┘ └──────┬──────┘ + │ │ + │ 1. 下载完成,关闭连接 │ + │ ───────X────────────────────────>│ + │ │ + │ 2. 连接断开,析构 RsyncServerConn + │ 3. 释放 dump 占用 + │ (ReleaseDump) + │ 4. 注销 snapshot + │ (UnregisterSnapshot) + │ │ + │ 5. AutoDeleteExpiredDump 定时执行 + │ 处理延迟清理队列(ProcessPendingCleanupFiles) + │ 删除过期 dump 目录 + │ (注:CleanupOrphanSstFiles 已移除,延迟清理统一处理) + │ │ +``` + +### 2.2 Master 状态变化 + +| 阶段 | 状态 | 说明 | +|------|------|------| +| T0 | 无 dump | 初始状态 | +| T1 | bgsaving | 创建 dump-20260305-0/db0 | +| T2 | dump 可用 | bgsave 完成,等待 Meta 请求 | +| T3 | dump 占用 | 收到 Meta 请求,标记为占用 | +| T4 | 传输中 | 文件传输中,即时清理进行中 | +| T5 | dump 释放 | Slave 断开,释放占用 | +| T6 | dump 过期 | AutoDeleteExpiredDump 删除过期 dump | + +### 2.3 Slave 状态变化 + +| 阶段 | 状态 | 说明 | +|------|------|------| +| T0 | kTryConnect | 尝试连接 Master | +| T1 | kWaitDBSync | 等待 Master bgsave 完成 | +| T2 | kWaitDBSync | 获取文件列表,开始下载 | +| T3 | kWaitDBSync | 文件下载中 | +| T4 | kConnected | 全量同步完成,开始增量同步 | + +### 2.4 数据变化 + +**Master 磁盘占用变化**: + +| 时间点 | 数据目录 | Dump 目录 | 总计 | +|--------|----------|-----------|------| +| 初始 | 100GB | 0 | 100GB | +| bgsave 中 | 100GB | 0 (硬链接不占用) | 100GB | +| compaction 后 | 100GB | 部分孤儿文件 | 100GB + 孤儿文件 | +| 传输中 | 100GB | 100GB (dump) | 200GB | +| 传输完成 | 100GB | 孤儿文件延迟10分钟清理 | 100GB ~ 200GB | + +--- + +## 3. 多 Slave 同步流程 + +### 3.1 场景描述 +- Master 有 100GB 数据 +- Slave-1 先发起同步 +- Slave-2 在 Slave-1 同步过程中发起同步 + +### 3.2 流程时序 + +``` +时间线: +T0: + Slave-1 ──DBSync──> Master + Master: IsBgSaving? No + Master: 触发 BgSaveDB() + Master: 创建 dump-20260305-0/db0 + Slave-1 <──kErr─── Master (等待 bgsave) + +T30s: + Master: bgsave 完成 + Slave-1 ──DBSync──> Master + Master: 获取文件列表 (dump-0) + Master: MarkDumpInUse(dump-0, Slave-1) + Slave-1 <──文件列表── Master + Slave-1 开始下载... + +T31s: + Slave-2 ──DBSync──> Master + Master: IsDumpInUse(dump-0)? Yes (被 Slave-1 占用) + Master: 触发新的 BgSaveDB() + Master: 创建 dump-20260305-1/db0 + Slave-2 <──kErr─── Master (等待新 bgsave) + +T61s: + Master: 新 bgsave 完成 + Slave-2 ──DBSync──> Master + Master: MarkDumpInUse(dump-1, Slave-2) + Slave-2 <──文件列表── Master + Slave-2 开始下载... + +T120s: + Slave-1: 下载完成,断开连接 + Master: ReleaseDump(dump-0) + Master: 删除 dump-0 (AutoDeleteExpiredDump) + +T180s: + Slave-2: 下载完成,断开连接 + Master: ReleaseDump(dump-1) + Master: 删除 dump-1 +``` + +### 3.3 关键限制 + +- **最大并发 dump 数**:3 个(`kMaxConcurrentDumps = 3`) +- **超过限制**:返回 kErr,Slave 重试 + +--- + +## 4. 单 Slave 多 DB 同步流程 + +### 4.1 场景描述 +- Master 配置 3 个 DB:db0, db1, db2 +- 每个 DB 有独立的 RocksDB 实例(db-instance-num=3) +- Slave 同时同步所有 DB + +### 4.2 目录结构 + +``` +dump/dump-20260305-0/ +├── db0/ +│ ├── 0/ # RocksDB 实例 0 +│ │ ├── 000001.sst +│ │ └── 000002.sst +│ ├── 1/ # RocksDB 实例 1 +│ │ └── 000003.sst +│ ├── 2/ # RocksDB 实例 2 +│ │ └── 000004.sst +│ └── info # dump 元信息 +├── db1/ +│ ├── 0/ +│ ├── 1/ +│ ├── 2/ +│ └── info +└── db2/ + ├── 0/ + ├── 1/ + ├── 2/ + └── info +``` + +### 4.3 文件命名规则 + +- Slave 请求格式:`{rocksdb_instance}/{filename}` +- 示例:`0/000001.sst`, `1/000003.sst` +- **注意**:不包含 db0/db1/db2 前缀 + +### 4.4 同步流程 + +每个 DB 独立同步: + +1. Slave 发送 db0 的 DBSync 请求 +2. Master 返回 db0 的文件列表 +3. Slave 下载 db0 的所有文件 +4. 重复步骤 1-3 对 db1 和 db2 + +### 4.5 潜在问题 + +**问题:info 文件位置不一致** +- `AutoDeleteExpiredDump` 查找:`dump/dump-xxx/info` +- 实际位置:`dump/dump-xxx/db0/info` + +**已修复**:先尝试 `db0/info`,再回退到 `info` + +--- + +## 5. 多 Slave 多 DB 同步流程 + +这是 Scheme A 最复杂的场景,结合了多 Slave 和多 DB 的特点。 + +### 5.1 场景描述 +- Master:3 个 DB (db0, db1, db2) +- Slave-1:同步 db0, db1, db2 +- Slave-2:同步 db0, db1, db2 + +### 5.2 Dump 占用机制 + +**方案 A 设计**:每个 Slave 独占整个 dump 目录(包含所有 DB) + +``` +Slave-1 占用 dump-20260305-0: +├── db0 (传输中) +├── db1 (传输中) +└── db2 (传输中) + +Slave-2 占用 dump-20260305-1: +├── db0 (传输中) +├── db1 (传输中) +└── db2 (传输中) +``` + +### 5.3 占用检查 + +- 检查粒度:**整个 dump 目录** +- 如果一个 Slave 正在使用 dump-0,其他 Slave 不能使用 +- 触发新的 bgsave 创建 dump-1 + +### 5.4 潜在问题 + +**问题 1:DB 级别粒度 vs Dump 级别粒度** +- 当前设计:dump 级别占用 +- 如果 Slave-1 只同步 db0,dump-0 仍不能被 Slave-2 使用 +- 浪费磁盘空间 + +**问题 2:多 DB 的孤儿文件清理** +- `AutoDeleteExpiredDump` 只检查 `db0/info` +- 如果 db1 或 db2 还在传输,可能被误判为可清理 + +--- + +## 6. 孤儿文件清理机制(统一延迟清理) + +### 6.1 触发条件 + +孤儿文件:nlink=1 的 SST 文件(只被 dump 引用,不被 RocksDB 引用) + +**产生原因**: +- RocksDB compaction 删除旧 SST +- dump 中的硬链接变成孤儿 + +### 6.2 统一清理策略 + +**设计变更**:移除 `CleanupOrphanSstFiles` 函数,统一使用延迟清理队列 + +**新清理流程**: + +``` +1. 文件传输完成时(RemoveTransferringFile) + - 检查 is_eof=true(最后一块传输完成) + - stat 检查文件 nlink + - 如果 nlink=1(孤儿文件): + * 加入延迟清理队列(ScheduleFileForCleanup,延迟600秒) + * 记录日志 "Scheduled orphan file for cleanup" + - 如果 nlink=2(非孤儿): + * 不做处理,RocksDB 会管理生命周期 + +2. AutoDeleteExpiredDump 定时执行(每60秒) + - 调用 ProcessPendingCleanupFiles() + - 检查队列中到期的文件 + - 删除到期文件,记录日志 "Deleted delayed cleanup file" + - 同时检查并删除过期的 dump 目录 +``` + +### 6.3 保护机制 + +| 保护级别 | 说明 | 实现位置 | +|----------|------|----------| +| 传输中保护 | 传输中的文件不会被清理 | `rsync_transferring_files_` | +| 延迟保护 | 孤儿文件延迟10分钟删除,给 Slave 重试时间 | `ScheduleFileForCleanup(filepath, 600)` | +| nlink 检查 | 只清理孤儿文件(nlink=1),避免误删 | `stat` 检查 | + +### 6.4 时序说明 + +``` +T0: 文件传输完成(is_eof=true) + └─> RemoveTransferringFile 检查 nlink==1 + └─> ScheduleFileForCleanup(filepath, 600) 加入队列 + └─> 日志: "Scheduled orphan file for cleanup in 10min" + +T0+10min: AutoDeleteExpiredDump 定时执行 + └─> ProcessPendingCleanupFiles() + └─> 检查到期文件 + └─> 删除文件 + └─> 日志: "Deleted delayed cleanup file" +``` + +### 6.5 对比:旧方案 vs 新方案 + +| 方面 | 旧方案(CleanupOrphanSstFiles) | 新方案(统一延迟清理) | +|------|--------------------------------|----------------------| +| 触发时机 | 定时扫描所有 dump 目录 | 传输完成时即时检查 | +| 清理延迟 | 扫描周期不确定 | 固定延迟10分钟 | +| 竞争条件 | 与传输过程可能竞争 | 无竞争,统一入口 | +| 代码复杂度 | ~170行独立函数 | ~15行集成逻辑 | +| Slave 重试 | 可能失败(文件已被删) | 10分钟内可重试 | + +--- + +## 7. Bug 列表 + +### 7.1 已修复的 Bug + +| Bug | 影响 | 修复 | Commit | +|-----|------|------|--------| +| CreatePath 逻辑错误 | 无法创建 dump 目录 | 添加 EnsureDirExists 包装函数 | ee37a586 | +| 文件分片传输被提前删除 | Slave 重试失败 | 添加 is_eof 参数,只在传输完成时删除 | eb778848 | +| Snapshot 注册失败 | 文件被误删 | 修复 RegisterSnapshot 调用顺序 | 08ee1c36 | +| Info 文件路径错误 | 无法读取 snapshot_uuid | 先尝试 db0/info,再回退到 info | ad54f43a | +| 空 dump 目录返回空列表 | Slave 尝试下载不存在的文件 | 添加 filenames.empty() 检查 | 27c3a838 | +| 即时清理与延迟清理竞争 | Slave 重试时文件已被删 | 移除 CleanupOrphanSstFiles,统一使用延迟清理队列 | 当前版本 | +| CleanupOrphanSstFiles 误删传输中文件 | 同步失败 | 删除 CleanupOrphanSstFiles 函数 | 当前版本 | + +### 7.2 已移除的 Bug(通过架构调整修复) + +| Bug | 原影响 | 修复方式 | +|-----|--------|----------| +| CleanupOrphanSstFiles 竞争问题 | 与延迟清理队列竞争同一文件 | 移除 CleanupOrphanSstFiles 函数 | +| 即时清理导致重试失败 | Slave 30分钟后重试,文件已删 | 统一使用延迟10分钟清理 | + +### 7.3 待修复的 Bug + +| Bug | 影响 | 严重程度 | 修复方案 | +|-----|------|----------|----------| +| 多 DB 场景下孤儿文件清理粒度问题 | db1/db2 传输中可能被误判 | 中 | 检查所有 DB 的 info 文件 | +| 多 Slave 多 DB 的磁盘浪费 | 每个 Slave 独占整个 dump | 低 | 支持 DB 级别占用 | + +--- + +## 8. 待办事项 + +### 8.1 高优先级 + +- [x] **统一孤儿文件清理机制(已完成)** + - 移除 CleanupOrphanSstFiles 函数 + - 统一使用 RemoveTransferringFile + 延迟清理队列 + - 延迟10分钟,给 Slave 重试时间 + +- [ ] **修复多 DB 孤儿文件清理粒度问题** + - 当前只检查 db0/info + - 需要检查所有 DB 子目录 + - 如果任何 DB 在使用中,整个 dump 应被保护 + +### 8.2 中优先级 + +- [ ] **优化多 Slave 多 DB 的磁盘占用** + - 当前:每个 Slave 独占整个 dump + - 优化:支持 DB 级别占用 + - 影响:需要修改占用管理逻辑 + +- [ ] **完善监控指标** + - Dump 占用数量 + - 孤儿文件清理统计 + - 传输失败率 + +### 8.3 低优先级 + +- [ ] **支持动态调整并发限制** + - 当前:编译期常量 kMaxConcurrentDumps=3 + - 优化:支持配置热更新 + +- [ ] **Dump 压缩传输** + - 减少网络带宽 + - 权衡 CPU 和网络 + +--- + +## 9. 配置建议 + +### 9.1 关键配置项 + +```ini +# pika.conf + +# dump 目录前缀 +dump-prefix : dump- + +# dump 目录路径 +dump-path : ./dump/ + +# dump 过期时间(天) +# 0 表示永不过期 +dump-expire : 1 + +# RocksDB 实例数 +db-instance-num : 3 + +# 最大并发 dump 数(编译期配置) +# kMaxConcurrentDumps = 3 +``` + +### 9.2 部署建议 + +1. **磁盘空间**:预留 3 × 数据量 的空间 +2. **监控**:监控 dump 目录数量和磁盘使用率 +3. **日志**:关注 `[Rsync Meta]`、`[RsyncTransfer]`、`[Scheduled orphan file`、`[Deleted delayed cleanup` 日志 + +--- + +## 10. 附录 + +### 10.1 关键日志 + +```bash +# 查看 Meta 请求处理 +grep "Rsync Meta" log/pika.INFO + +# 查看文件传输 +grep "RsyncTransfer" log/pika.INFO + +# 查看孤儿文件延迟清理调度 +grep "Scheduled orphan file" log/pika.INFO + +# 查看延迟清理执行 +grep "Deleted delayed cleanup file" log/pika.INFO + +# 查看 dump 占用 +grep "DumpOwnership" log/pika.INFO + +# 查看错误 +grep "File no longer exists" log/pika.WARNING +``` + +### 10.2 状态码说明 + +| 状态码 | 含义 | 处理 | +|--------|------|------| +| kOk | 成功 | 继续处理 | +| kErr | 错误 | Slave 重试 | + +### 10.3 文件路径规范 + +| 类型 | 格式 | 示例 | +|------|------|------| +| Dump 目录 | dump-YYYYMMDD-NN/db_name | dump-20260305-0/db0 | +| RocksDB 实例 | {rocksdb_instance}/ | 0/, 1/, 2/ | +| SST 文件 | {instance}/{filename}.sst | 0/000001.sst | +| Info 文件 | db_name/info | db0/info | + +--- + +## 文档历史 + +| 版本 | 日期 | 修改内容 | +|------|------|----------| +| 1.0 | 2026-03-05 | 初始版本,整理 Scheme A 方案 | +| 1.1 | 2026-03-06 | 更新为统一延迟清理机制,移除 CleanupOrphanSstFiles | diff --git a/docs/sync_solution_analysis.md b/docs/sync_solution_analysis.md new file mode 100644 index 0000000000..279d2c926d --- /dev/null +++ b/docs/sync_solution_analysis.md @@ -0,0 +1,182 @@ +# Pika 主从同步方案变更分析 + +## 最近14次提交记录 + +1. **27c3a838** - fix: 修复 dump 目录为空时返回空文件列表导致同步失败 +2. **08ee1c36** - fix: 修复 snapshot 注册失败导致文件被误删 +3. **ad54f43a** - fix: 修复 AutoDeleteExpiredDump 中 info 文件路径问题 +4. **eb778848** - fix: 修复文件分片传输过程中被提前删除的问题 +5. **ee37a586** - fix: 添加 EnsureDirExists 包装函数修复 dump 目录创建失败 +6. **c3b83b5b** - feat: 实现方案A - 独立dump + 即时清理机制 +7. **e166b072** - fix: 修复RsyncServerConn析构函数死锁问题 +8. **6abdfd5b** - fix: 方案D完善 降低传输时的清理频率 +9. **403c6eb6** - fix: 方案D完善 - 处理传输中途文件丢失场景 +10. **733cd7ba** - fix: 修复全量同步过程中孤儿文件清理导致的文件丢失问题 +11. **4ed0ea47** - GetChildren添加IsDir保护防止传入文件路径崩溃 +12. **dc16f255** - 修复GetBgSaveMetaData目录遍历崩溃问题 +13. **2f3b8337** - 修复目录扫描层级问题 +14. **05248b00** - 修复Pika全量同步孤儿文件问题 + +--- + +## 当前最终方案 + +**方案A + 延迟清理 + 完整性检查**: +1. GetBgSaveMetaData 不过滤孤儿文件 +2. HandleMetaRsyncRequest 增加完整性检查(重新扫描对比) +3. RemoveTransferringFile 延迟10分钟清理 +4. AutoDeleteExpiredDump 调用 ProcessPendingCleanupFiles + +--- + +## 与当前方案不一致的代码分析 + +### 1. 即时清理代码(c3b83b5b 引入)- 需要修改 + +**位置**: src/rsync_server.cc:RemoveTransferringFile +**问题**: c3b83b5b 提交实现了即时清理,但当前方案改为延迟清理 +**状态**: 已在当前工作区修改为延迟清理 + +```cpp +// c3b83b5b 中的代码(即时清理): +if (stat(filepath.c_str(), &st) == 0 && st.st_nlink == 1) { + pstd::DeleteFile(filepath); // 立即删除 +} + +// 当前方案(延迟清理): +if (stat(filepath.c_str(), &st) == 0 && st.st_nlink == 1) { + g_pika_server->ScheduleFileForCleanup(filepath, 600); // 10分钟后删除 +} +``` + +--- + +### 2. CleanupOrphanSstFiles 函数 - 需要评估 + +**位置**: src/pika_server.cc:1523 +**问题**: 这个函数在 AutoDeleteExpiredDump 中被调用,用于清理孤儿文件 +**与延迟清理的关系**: +- CleanupOrphanSstFiles: 定时清理所有孤儿文件(非延迟) +- ProcessPendingCleanupFiles: 清理延迟队列中的文件 + +**建议**: 保留 CleanupOrphanSstFiles,但降低调用频率或仅在 dump 不再使用时调用 + +**原因**: +1. 延迟清理只处理传输完成的文件 +2. CleanupOrphanSstFiles 处理所有未被延迟清理覆盖的孤儿文件 +3. 两者可以共存,作为双保险 + +--- + +### 3. GetBgSaveMetaData 的孤儿文件过滤 - 已修改 + +**位置**: src/pika_db.cc:421 +**问题**: 之前提交过滤孤儿文件,当前方案不过滤 +**状态**: 已在当前工作区修改为不跳过 + +```cpp +// 之前(跳过): +if (st.st_nlink == 1) { + continue; // 跳过孤儿文件 +} + +// 当前(不跳过): +if (st.st_nlink == 1) { + // 记录但不跳过 + LOG(INFO) << "[GetBgSaveMetaData] Including orphan SST file: " << fullPath; +} +``` + +--- + +### 4. AutoDeleteExpiredDump 的 rate limiting - 需要关注 + +**位置**: src/pika_server.cc:1766 +**问题**: 当前工作区将 120 秒改为 600 秒 +**分析**: +```cpp +// HEAD~14: 120秒 +if (!active_rsync_snapshots_.empty() && (now - last_cleanup_time < 120)) + +// 当前工作区: 600秒 +if (!active_rsync_snapshots_.empty() && (now - last_cleanup_time < 600)) +``` +**建议**: 600秒(10分钟)与延迟清理时间一致,合理 + +--- + +### 5. rsync_transferring_files_ 保护机制 - 保留 + +**位置**: include/pika_server.h 和 src/pika_server.cc +**功能**: 跟踪正在传输的文件,防止被 CleanupOrphanSstFiles 误删 +**状态**: 应该保留,与延迟清理不冲突 + +--- + +### 6. 旧方案D的代码 - 可以删除 + +**相关提交**: +- 6abdfd5b: 方案D完善 降低传输时的清理频率 +- 403c6eb6: 方案D完善 - 处理传输中途文件丢失场景 +- 733cd7ba: 修复全量同步过程中孤儿文件清理导致的文件丢失问题 + +**分析**: 这些提交是在方案A之前的尝试,方案A已经替代了方案D +**但是**: 方案A的实现(c3b83b5b)是基于方案D的改进,不是完全替换 +**结论**: 不需要删除,方案A已经整合了这些修复 + +--- + +### 7. 早期bug修复 - 保留 + +**保留的提交**: +- e166b072: 修复RsyncServerConn析构函数死锁问题 +- ee37a586: 添加 EnsureDirExists 包装函数 +- ad54f43a: 修复 AutoDeleteExpiredDump 中 info 文件路径问题 +- 08ee1c36: 修复 snapshot 注册失败导致文件被误删 +- 27c3a838: 修复 dump 目录为空时返回空文件列表 + +**这些与当前方案不冲突,应该保留** + +--- + +## 需要删除或修改的代码清单 + +### 需要修改的代码 + +1. **无** - 当前工作区的修改已经覆盖了需要改的地方 + +### 可能需要删除/禁用的代码 + +1. **CleanupOrphanSstFiles 中的即时删除逻辑** + - 可选:改为仅在 dump 不再使用时调用 + - 或者降低调用频率 + +2. **检查是否有过时的配置项或调试代码** + - 检查 conf/pika.conf 是否有不需要的配置 + +### 建议保留但可能未使用的代码 + +1. **rsync_transferring_files_ 保护机制** + - 虽然延迟清理减少了即时删除的需求,但作为保护机制仍有用 + +2. **IsDumpInUse 和 dump_owners_** + - Scheme A 的核心机制,应该保留 + +--- + +## 总结 + +**当前工作区的修改已经覆盖了主要的不一致**: +1. GetBgSaveMetaData 不过滤孤儿文件 +2. RemoveTransferringFile 改为延迟清理 +3. HandleMetaRsyncRequest 增加完整性检查 +4. AutoDeleteExpiredDump 调用 ProcessPendingCleanupFiles + +**唯一需要评估的是**: CleanupOrphanSstFiles 是否还需要在同步期间调用 +建议: +- 选项1: 完全禁用 CleanupOrphanSstFiles(依赖延迟清理) +- 选项2: 仅在 dump 不再使用时调用 CleanupOrphanSstFiles(当前行为,可保留) + +--- + +文档生成时间: 2026-03-05 diff --git a/include/pika_conf.h b/include/pika_conf.h index e3c1519be2..e7aa8198db 100644 --- a/include/pika_conf.h +++ b/include/pika_conf.h @@ -33,7 +33,8 @@ class PikaConf : public pstd::BaseConf { enum CompactionStrategy { NONE, FullCompact, - OldestOrBestDeleteRatioSstCompact + OldestOrBestDeleteRatioSstCompact, + IncrementalCompact }; PikaConf(const std::string& path); ~PikaConf() override = default; @@ -147,6 +148,30 @@ class PikaConf : public pstd::BaseConf { std::shared_lock l(rwlock_); return compaction_strategy_; } + int incremental_compact_interval() { + std::shared_lock l(rwlock_); + return incremental_compact_interval_; + } + int incremental_compact_max_files() { + std::shared_lock l(rwlock_); + return incremental_compact_max_files_; + } + int incremental_compact_max_time_ms() { + std::shared_lock l(rwlock_); + return incremental_compact_max_time_ms_; + } + int incremental_compact_min_rate() { + std::shared_lock l(rwlock_); + return incremental_compact_min_rate_; + } + int incremental_compact_target_level() { + std::shared_lock l(rwlock_); + return incremental_compact_target_level_; + } + int incremental_compact_min_file_age() { + std::shared_lock l(rwlock_); + return incremental_compact_min_file_age_; + } bool disable_auto_compactions() { std::shared_lock l(rwlock_); return disable_auto_compactions_; @@ -1040,6 +1065,14 @@ class PikaConf : public pstd::BaseConf { int best_delete_min_ratio_; CompactionStrategy compaction_strategy_; + // for incremental-compact + int incremental_compact_interval_ = 60; + int incremental_compact_max_files_ = 1; + int incremental_compact_max_time_ms_ = 1000; + int incremental_compact_min_rate_ = 70; + int incremental_compact_target_level_ = -1; + int incremental_compact_min_file_age_ = 60; + int64_t resume_check_interval_ = 60; // seconds int64_t least_free_disk_to_resume_ = 268435456; // 256 MB double min_check_resume_ratio_ = 0.7; diff --git a/include/pika_db.h b/include/pika_db.h index 3dfe3b69f5..f40a171a6e 100644 --- a/include/pika_db.h +++ b/include/pika_db.h @@ -133,6 +133,7 @@ class DB : public std::enable_shared_from_this, public pstd::noncopyable { void Compact(const storage::DataType& type); void CompactRange(const storage::DataType& type, const std::string& start, const std::string& end); void LongestNotCompactionSstCompact(const storage::DataType& type); + void IncrementalCompact(const storage::DataType& type); void SetCompactRangeOptions(const bool is_canceled); diff --git a/include/pika_server.h b/include/pika_server.h index df75229188..0706af357f 100644 --- a/include/pika_server.h +++ b/include/pika_server.h @@ -60,6 +60,7 @@ enum TaskType { kBgSave, kCompactRangeAll, kCompactOldestOrBestDeleteRatioSst, + kIncrementalCompact, }; struct TaskArg { diff --git a/src/pika_conf.cc b/src/pika_conf.cc index b7bf82e647..052284642a 100644 --- a/src/pika_conf.cc +++ b/src/pika_conf.cc @@ -341,10 +341,35 @@ int PikaConf::Load() { compaction_strategy_ = FullCompact; } else if (cs_ == "obd-compact") { compaction_strategy_ = OldestOrBestDeleteRatioSstCompact; + } else if (cs_ == "incremental-compact") { + compaction_strategy_ = IncrementalCompact; } else { compaction_strategy_ = NONE; } + // for incremental-compact + GetConfInt("incremental-compact-interval", &incremental_compact_interval_); + if (incremental_compact_interval_ <= 0) { + incremental_compact_interval_ = 60; + } + GetConfInt("incremental-compact-max-files", &incremental_compact_max_files_); + if (incremental_compact_max_files_ <= 0) { + incremental_compact_max_files_ = 1; + } + GetConfInt("incremental-compact-max-time-ms", &incremental_compact_max_time_ms_); + if (incremental_compact_max_time_ms_ <= 0) { + incremental_compact_max_time_ms_ = 1000; + } + GetConfInt("incremental-compact-min-rate", &incremental_compact_min_rate_); + if (incremental_compact_min_rate_ <= 0 || incremental_compact_min_rate_ > 100) { + incremental_compact_min_rate_ = 70; + } + GetConfInt("incremental-compact-target-level", &incremental_compact_target_level_); + GetConfInt("incremental-compact-min-file-age", &incremental_compact_min_file_age_); + if (incremental_compact_min_file_age_ < 0) { + incremental_compact_min_file_age_ = 60; + } + // least-free-disk-resume-size GetConfInt64Human("least-free-disk-resume-size", &least_free_disk_to_resume_); if (least_free_disk_to_resume_ <= 0) { @@ -893,10 +918,20 @@ int PikaConf::ConfigRewrite() { compaction_strategy_ = FullCompact; } else if (cs_ == "obd-compact") { compaction_strategy_ = OldestOrBestDeleteRatioSstCompact; + } else if (cs_ == "incremental-compact") { + compaction_strategy_ = IncrementalCompact; } else { compaction_strategy_ = NONE; } + // for incremental-compact config update + SetConfInt("incremental-compact-interval", incremental_compact_interval_); + SetConfInt("incremental-compact-max-files", incremental_compact_max_files_); + SetConfInt("incremental-compact-max-time-ms", incremental_compact_max_time_ms_); + SetConfInt("incremental-compact-min-rate", incremental_compact_min_rate_); + SetConfInt("incremental-compact-target-level", incremental_compact_target_level_); + SetConfInt("incremental-compact-min-file-age", incremental_compact_min_file_age_); + SetConfStr("disable_auto_compactions", disable_auto_compactions_ ? "true" : "false"); SetConfStr("cache-type", scachetype); SetConfInt64("least-free-disk-resume-size", least_free_disk_to_resume_); diff --git a/src/pika_db.cc b/src/pika_db.cc index f3d52fdec3..6cef5c2f78 100644 --- a/src/pika_db.cc +++ b/src/pika_db.cc @@ -201,6 +201,17 @@ void DB::LongestNotCompactionSstCompact(const storage::DataType& type) { storage_->LongestNotCompactionSstCompact(type); } +void DB::IncrementalCompact(const storage::DataType& type) { + std::lock_guard rwl(dbs_rw_); + if (!opened_) { + return; + } + storage_->IncrementalCompact(type, + g_pika_conf->incremental_compact_max_files(), + g_pika_conf->incremental_compact_max_time_ms(), + g_pika_conf->incremental_compact_min_rate()); +} + void DB::DoKeyScan(void* arg) { std::unique_ptr bg_task_arg(static_cast(arg)); bg_task_arg->db->RunKeyScan(); diff --git a/src/pika_server.cc b/src/pika_server.cc index 7b8941b32d..63d3952aa7 100644 --- a/src/pika_server.cc +++ b/src/pika_server.cc @@ -520,6 +520,9 @@ Status PikaServer::DoSameThingEveryDB(const TaskType& type) { case TaskType::kCompactOldestOrBestDeleteRatioSst: db_item.second->LongestNotCompactionSstCompact(storage::DataType::kAll); break; + case TaskType::kIncrementalCompact: + db_item.second->IncrementalCompact(storage::DataType::kAll); + break; default: break; } @@ -1234,6 +1237,8 @@ void PikaServer::AutoCompactRange() { DoSameThingEveryDB(TaskType::kCompactAll); } else if (g_pika_conf->compaction_strategy() == PikaConf::OldestOrBestDeleteRatioSstCompact) { DoSameThingEveryDB(TaskType::kCompactOldestOrBestDeleteRatioSst); + } else if (g_pika_conf->compaction_strategy() == PikaConf::IncrementalCompact) { + DoSameThingEveryDB(TaskType::kIncrementalCompact); } } diff --git a/src/storage/include/storage/storage.h b/src/storage/include/storage/storage.h index dd41b3ea94..1de0194c7e 100644 --- a/src/storage/include/storage/storage.h +++ b/src/storage/include/storage/storage.h @@ -167,6 +167,7 @@ enum Operation { kCleanAll, kCompactRange, kCompactOldestOrBestDeleteRatioSst, + kIncrementalCompact, }; struct BGTask { @@ -1098,6 +1099,18 @@ class Storage { */ Status LongestNotCompactionSstCompact(const DataType &type, bool sync = false); + /** + * IncrementalCompact: 渐进式 compact,每次只处理少量最老的 SST 文件 + * @param type: 数据类型 + * @param max_files: 单次最多处理文件数 + * @param max_time_ms: 单次最大执行时间 + * @param min_rate: 压缩率阈值,低于此值继续处理 + * @param sync: 是否同步执行 + * @return Status + */ + Status IncrementalCompact(const DataType &type, int max_files = 1, int max_time_ms = 1000, + int min_rate = 70, bool sync = false); + Status SetMaxCacheStatisticKeys(uint32_t max_cache_statistic_keys); Status SetSmallCompactionThreshold(uint32_t small_compaction_threshold); Status SetSmallCompactionDurationThreshold(uint32_t small_compaction_duration_threshold); diff --git a/src/storage/src/redis.cc b/src/storage/src/redis.cc index 077fe15dd0..dd06efd724 100644 --- a/src/storage/src/redis.cc +++ b/src/storage/src/redis.cc @@ -3,7 +3,9 @@ // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. +#include #include +#include #include "rocksdb/env.h" @@ -436,6 +438,110 @@ Status Redis::LongestNotCompactionSstCompact(const DataType& option_type, std::v return Status::OK(); } +Status Redis::IncrementalCompact(const DataType& option_type, std::vector* compact_result_vec, + const ColumnFamilyType& type, int max_files, int max_time_ms, + int min_rate, int target_level, int min_file_age) { + // 1. 并发控制 + bool no_compact = false; + bool to_compact = true; + if (!in_compact_flag_.compare_exchange_weak(no_compact, to_compact, std::memory_order_relaxed, + std::memory_order_relaxed)) { + return Status::Busy("compact running"); + } + DEFER { in_compact_flag_.store(false); }; + + // 2. 选择 Column Family + std::vector handleIdxVec; + SelectColumnFamilyHandles(option_type, type, handleIdxVec); + if (handleIdxVec.empty()) { + return Status::Corruption("Invalid data type"); + } + + if (compact_result_vec) { + compact_result_vec->clear(); + } + + // 3. 记录开始时间 + auto start_time = std::chrono::steady_clock::now(); + int64_t now_sec = std::time(nullptr); + + for (auto idx : handleIdxVec) { + int processed = 0; + while (processed < max_files) { + // 3.1 检查超时 + auto elapsed = std::chrono::steady_clock::now() - start_time; + if (std::chrono::duration_cast(elapsed).count() >= max_time_ms) { + LOG(INFO) << "IncrementalCompact timeout, processed " << processed << " files"; + break; + } + + // 3.2 获取元数据,找最老的文件 + rocksdb::ColumnFamilyMetaData meta; + db_->GetColumnFamilyMetaData(handles_[idx], &meta); + + std::string oldest_file; + int oldest_level = 0; + uint64_t oldest_number = UINT64_MAX; + + for (const auto& level_meta : meta.levels) { + for (const auto& file_meta : level_meta.files) { + // 跳过太新的文件 + if (file_meta.file_creation_time > 0 && + (now_sec - file_meta.file_creation_time) < min_file_age) { + continue; + } + + uint64_t number = TableFileNameToNumber(file_meta.name); + if (number < oldest_number) { + oldest_number = number; + oldest_file = file_meta.db_path + "/" + file_meta.name; + oldest_level = level_meta.level; + } + } + } + + if (oldest_file.empty()) { + break; // 没有符合条件的文件 + } + + // 3.3 使用 CompactFiles 进行 compact + std::vector input_files{oldest_file}; + rocksdb::CompactionOptions compact_options; + int dest_level = (target_level >= 0) ? target_level : oldest_level + 1; + + rocksdb::CompactionJobInfo job_info; + Status s = db_->CompactFiles(compact_options, handles_[idx], + input_files, dest_level, -1, + nullptr, &job_info); + + if (!s.ok()) { + LOG(WARNING) << "IncrementalCompact failed for file " << oldest_file + << ": " << s.ToString(); + break; + } + + // 3.4 检查压缩率,决定是否继续 + if (job_info.stats.num_input_records > 0) { + int rate = job_info.stats.num_output_records * 100 / job_info.stats.num_input_records; + LOG(INFO) << "IncrementalCompact " << oldest_file << " rate=" << rate << "%"; + + if (rate >= min_rate) { + // 压缩效果好,暂停处理 + break; + } + } + + processed++; + } + + if (compact_result_vec) { + compact_result_vec->push_back(Status::OK()); + } + } + + return Status::OK(); +} + Status Redis::SetSmallCompactionThreshold(uint64_t small_compaction_threshold) { small_compaction_threshold_ = small_compaction_threshold; return Status::OK(); diff --git a/src/storage/src/redis.h b/src/storage/src/redis.h index 54c6e10d46..cdc3903f68 100644 --- a/src/storage/src/redis.h +++ b/src/storage/src/redis.h @@ -110,6 +110,12 @@ class Redis { virtual Status LongestNotCompactionSstCompact(const DataType& option_type, std::vector* compact_result_vec, const ColumnFamilyType& type = kMetaAndData); + virtual Status IncrementalCompact(const DataType& option_type, std::vector* compact_result_vec, + const ColumnFamilyType& type = kMetaAndData, + int max_files = 1, int max_time_ms = 1000, + int min_rate = 70, int target_level = -1, + int min_file_age = 60); + virtual Status GetProperty(const std::string& property, uint64_t* out); Status ScanKeyNum(std::vector* key_info); diff --git a/src/storage/src/storage.cc b/src/storage/src/storage.cc index cc7ca864f0..5db402b589 100644 --- a/src/storage/src/storage.cc +++ b/src/storage/src/storage.cc @@ -1729,6 +1729,11 @@ Status Storage::RunBGTask() { DoCompactRange(task.type, "", ""); } else if (task.operation == kCompactOldestOrBestDeleteRatioSst) { LongestNotCompactionSstCompact(task.type, true); + } else if (task.operation == kIncrementalCompact) { + IncrementalCompact(task.type, + g_pika_conf->incremental_compact_max_files(), + g_pika_conf->incremental_compact_max_time_ms(), + g_pika_conf->incremental_compact_min_rate(), true); } else if (task.operation == kCompactRange) { if (task.argv.size() == 1) { DoCompactSpecificKey(task.type, task.argv[0]); @@ -1754,6 +1759,29 @@ Status Storage::LongestNotCompactionSstCompact(const DataType &type, bool sync) } } return s; + } else { + AddBGTask({type, kIncrementalCompact}); + } + return Status::OK(); +} + +Status Storage::IncrementalCompact(const DataType &type, int max_files, int max_time_ms, + int min_rate, bool sync) { + if (sync) { + Status s; + for (const auto& inst : insts_) { + std::vector compact_result_vec; + s = inst->IncrementalCompact(type, &compact_result_vec, storage::kMetaAndData, + max_files, max_time_ms, min_rate, + g_pika_conf->incremental_compact_target_level(), + g_pika_conf->incremental_compact_min_file_age()); + for (auto compact_result : compact_result_vec) { + if (!compact_result.ok()) { + LOG(ERROR) << compact_result.ToString(); + } + } + } + return s; } else { AddBGTask({type, kCompactOldestOrBestDeleteRatioSst}); } From 06299484378c3295186d976f7b7bc8b7354be9a3 Mon Sep 17 00:00:00 2001 From: chenbt Date: Wed, 25 Mar 2026 14:16:22 +0800 Subject: [PATCH 04/11] =?UTF-8?q?=20fix:=20=E4=BC=98=E5=8C=96=E6=B8=90?= =?UTF-8?q?=E8=BF=9B=E5=BC=8Fcompact=EF=BC=9Btodo-=E6=A3=80=E6=9F=A5?= =?UTF-8?q?=E9=85=8D=E7=BD=AE=E8=83=BD=E5=90=A6=E7=94=9F=E6=95=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/pika_db.cc | 4 +- src/storage/include/storage/storage.h | 5 ++- src/storage/src/redis.cc | 58 +++++++++++++++++++++++---- src/storage/src/storage.cc | 34 +++++++++++----- 4 files changed, 82 insertions(+), 19 deletions(-) diff --git a/src/pika_db.cc b/src/pika_db.cc index 6cef5c2f78..1a1f1b0ea4 100644 --- a/src/pika_db.cc +++ b/src/pika_db.cc @@ -209,7 +209,9 @@ void DB::IncrementalCompact(const storage::DataType& type) { storage_->IncrementalCompact(type, g_pika_conf->incremental_compact_max_files(), g_pika_conf->incremental_compact_max_time_ms(), - g_pika_conf->incremental_compact_min_rate()); + g_pika_conf->incremental_compact_min_rate(), + g_pika_conf->incremental_compact_target_level(), + g_pika_conf->incremental_compact_min_file_age()); } void DB::DoKeyScan(void* arg) { diff --git a/src/storage/include/storage/storage.h b/src/storage/include/storage/storage.h index 1de0194c7e..7ee44717f8 100644 --- a/src/storage/include/storage/storage.h +++ b/src/storage/include/storage/storage.h @@ -1105,11 +1105,14 @@ class Storage { * @param max_files: 单次最多处理文件数 * @param max_time_ms: 单次最大执行时间 * @param min_rate: 压缩率阈值,低于此值继续处理 + * @param target_level: 目标 level,-1 表示当前 level + 1 + * @param min_file_age: 文件最小年龄(秒) * @param sync: 是否同步执行 * @return Status */ Status IncrementalCompact(const DataType &type, int max_files = 1, int max_time_ms = 1000, - int min_rate = 70, bool sync = false); + int min_rate = 70, int target_level = -1, int min_file_age = 60, + bool sync = false); Status SetMaxCacheStatisticKeys(uint32_t max_cache_statistic_keys); Status SetSmallCompactionThreshold(uint32_t small_compaction_threshold); diff --git a/src/storage/src/redis.cc b/src/storage/src/redis.cc index dd06efd724..f9bb5f436b 100644 --- a/src/storage/src/redis.cc +++ b/src/storage/src/redis.cc @@ -438,6 +438,29 @@ Status Redis::LongestNotCompactionSstCompact(const DataType& option_type, std::v return Status::OK(); } +// Helper function to extract table file number from filename +// e.g., "000123.sst" -> 123 +static uint64_t ExtractFileNumber(const std::string& name) { + uint64_t number = 0; + uint64_t base = 1; + size_t pos = name.find_last_of('.'); + if (pos == std::string::npos) { + return 0; + } + // Move backwards from '.' to find the digits + while (pos > 0) { + --pos; + char c = name[pos]; + if (c >= '0' && c <= '9') { + number += (c - '0') * base; + base *= 10; + } else { + break; + } + } + return number; +} + Status Redis::IncrementalCompact(const DataType& option_type, std::vector* compact_result_vec, const ColumnFamilyType& type, int max_files, int max_time_ms, int min_rate, int target_level, int min_file_age) { @@ -461,17 +484,19 @@ Status Redis::IncrementalCompact(const DataType& option_type, std::vectorclear(); } - // 3. 记录开始时间 - auto start_time = std::chrono::steady_clock::now(); int64_t now_sec = std::time(nullptr); for (auto idx : handleIdxVec) { + // 每个 CF 独立计时和配额 + auto cf_start_time = std::chrono::steady_clock::now(); int processed = 0; + while (processed < max_files) { - // 3.1 检查超时 - auto elapsed = std::chrono::steady_clock::now() - start_time; + // 3.1 检查该 CF 的超时 + auto elapsed = std::chrono::steady_clock::now() - cf_start_time; if (std::chrono::duration_cast(elapsed).count() >= max_time_ms) { - LOG(INFO) << "IncrementalCompact timeout, processed " << processed << " files"; + LOG(INFO) << "IncrementalCompact timeout for cf=" << handles_[idx]->GetName() + << ", processed " << processed << " files"; break; } @@ -484,6 +509,11 @@ Status Redis::IncrementalCompact(const DataType& option_type, std::vector 0 && @@ -491,7 +521,7 @@ Status Redis::IncrementalCompact(const DataType& option_type, std::vector= 6) { + LOG(INFO) << "IncrementalCompact skip L6 file: " << oldest_file; + break; + } + + // 3.3 使用 CompactFiles 进行 compact(只处理 L1-L5) std::vector input_files{oldest_file}; rocksdb::CompactionOptions compact_options; - int dest_level = (target_level >= 0) ? target_level : oldest_level + 1; + // 目标层 = 当前层 + 1(L1→L2, L2→L3, ... L5→L6) + int dest_level = oldest_level + 1; + + LOG(INFO) << "IncrementalCompact start: file=" << oldest_file + << ", cf=" << handles_[idx]->GetName() + << ", from_level=" << oldest_level + << ", to_level=" << dest_level; rocksdb::CompactionJobInfo job_info; Status s = db_->CompactFiles(compact_options, handles_[idx], diff --git a/src/storage/src/storage.cc b/src/storage/src/storage.cc index 5db402b589..8d7dcbd3be 100644 --- a/src/storage/src/storage.cc +++ b/src/storage/src/storage.cc @@ -1730,10 +1730,20 @@ Status Storage::RunBGTask() { } else if (task.operation == kCompactOldestOrBestDeleteRatioSst) { LongestNotCompactionSstCompact(task.type, true); } else if (task.operation == kIncrementalCompact) { - IncrementalCompact(task.type, - g_pika_conf->incremental_compact_max_files(), - g_pika_conf->incremental_compact_max_time_ms(), - g_pika_conf->incremental_compact_min_rate(), true); + // Parse parameters from argv, use defaults if not provided + int max_files = 1; + int max_time_ms = 1000; + int min_rate = 70; + int target_level = -1; + int min_file_age = 60; + + if (task.argv.size() > 0) max_files = std::atoi(task.argv[0].c_str()); + if (task.argv.size() > 1) max_time_ms = std::atoi(task.argv[1].c_str()); + if (task.argv.size() > 2) min_rate = std::atoi(task.argv[2].c_str()); + if (task.argv.size() > 3) target_level = std::atoi(task.argv[3].c_str()); + if (task.argv.size() > 4) min_file_age = std::atoi(task.argv[4].c_str()); + + IncrementalCompact(task.type, max_files, max_time_ms, min_rate, target_level, min_file_age, true); } else if (task.operation == kCompactRange) { if (task.argv.size() == 1) { DoCompactSpecificKey(task.type, task.argv[0]); @@ -1760,21 +1770,20 @@ Status Storage::LongestNotCompactionSstCompact(const DataType &type, bool sync) } return s; } else { - AddBGTask({type, kIncrementalCompact}); + AddBGTask({type, kCompactOldestOrBestDeleteRatioSst}); } return Status::OK(); } Status Storage::IncrementalCompact(const DataType &type, int max_files, int max_time_ms, - int min_rate, bool sync) { + int min_rate, int target_level, int min_file_age, bool sync) { if (sync) { Status s; for (const auto& inst : insts_) { std::vector compact_result_vec; s = inst->IncrementalCompact(type, &compact_result_vec, storage::kMetaAndData, max_files, max_time_ms, min_rate, - g_pika_conf->incremental_compact_target_level(), - g_pika_conf->incremental_compact_min_file_age()); + target_level, min_file_age); for (auto compact_result : compact_result_vec) { if (!compact_result.ok()) { LOG(ERROR) << compact_result.ToString(); @@ -1783,7 +1792,14 @@ Status Storage::IncrementalCompact(const DataType &type, int max_files, int max_ } return s; } else { - AddBGTask({type, kCompactOldestOrBestDeleteRatioSst}); + // Pass parameters via argv for async execution + AddBGTask({type, kIncrementalCompact, { + std::to_string(max_files), + std::to_string(max_time_ms), + std::to_string(min_rate), + std::to_string(target_level), + std::to_string(min_file_age) + }}); } return Status::OK(); } From 85c4e7009a52cca64df65a8af0cb470ed0bd4836 Mon Sep 17 00:00:00 2001 From: chenbt Date: Tue, 14 Apr 2026 16:24:23 +0800 Subject: [PATCH 05/11] =?UTF-8?q?feat=EF=BC=9A=E6=B5=8B=E8=AF=95=E9=AD=94?= =?UTF-8?q?=E6=94=B9=E4=BB=A3=E7=A0=81+=E5=8E=8B=E6=B5=8B=E5=B7=A5?= =?UTF-8?q?=E5=85=B7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CMakeLists.txt | 1 + src/storage/src/redis.cc | 38 +++++++++++++++++++++++++++----------- 2 files changed, 28 insertions(+), 11 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index b0c54e0607..7877cc7ea0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -760,6 +760,7 @@ if (USE_PIKA_TOOLS) add_subdirectory(tools) endif() aux_source_directory(src DIR_SRCS) +list(REMOVE_ITEM DIR_SRCS "src/build_version.cc") # # generate version string(TIMESTAMP TS "%Y-%m-%d %H:%M:%S") diff --git a/src/storage/src/redis.cc b/src/storage/src/redis.cc index f9bb5f436b..5db9e2a017 100644 --- a/src/storage/src/redis.cc +++ b/src/storage/src/redis.cc @@ -6,6 +6,7 @@ #include #include #include +#include #include "rocksdb/env.h" @@ -196,15 +197,35 @@ Status Redis::SetMaxCacheStatisticKeys(size_t max_cache_statistic_keys) { /* * compactrange no longer supports compact for a single data type + * + * 魔改版本:添加延迟放大并发竞争窗口,用于复现 SST 损坏问题 + * 注意:此修改仅用于测试环境,生产环境请勿使用 */ Status Redis::CompactRange(const rocksdb::Slice* begin, const rocksdb::Slice* end) { + // 随机延迟 0-50ms,让 7 个 CF 的启动时间错开但仍重叠 + std::this_thread::sleep_for(std::chrono::milliseconds(rand() % 50)); + db_->CompactRange(default_compact_range_options_, begin, end); + + // 每个 CF 之间固定延迟 20ms,增加并发重叠度 + std::this_thread::sleep_for(std::chrono::milliseconds(20)); db_->CompactRange(default_compact_range_options_, handles_[kHashesDataCF], begin, end); + + std::this_thread::sleep_for(std::chrono::milliseconds(20)); db_->CompactRange(default_compact_range_options_, handles_[kSetsDataCF], begin, end); + + std::this_thread::sleep_for(std::chrono::milliseconds(20)); db_->CompactRange(default_compact_range_options_, handles_[kListsDataCF], begin, end); + + std::this_thread::sleep_for(std::chrono::milliseconds(20)); db_->CompactRange(default_compact_range_options_, handles_[kZsetsDataCF], begin, end); + + std::this_thread::sleep_for(std::chrono::milliseconds(20)); db_->CompactRange(default_compact_range_options_, handles_[kZsetsScoreCF], begin, end); + + std::this_thread::sleep_for(std::chrono::milliseconds(20)); db_->CompactRange(default_compact_range_options_, handles_[kStreamsDataCF], begin, end); + return Status::OK(); } @@ -312,18 +333,13 @@ Status Redis::LongestNotCompactionSstCompact(const DataType& option_type, std::v // clear deleted sst file records because we use them in different cf listener_.Clear(); - // The main goal of compaction was reclaimed the disk space and removed - // the tombstone. It seems that compaction scheduler was unnecessary here when - // the live files was too few, Hard code to 1 here. - if (props.size() <= 1) { - // LOG(WARNING) << "LongestNotCompactionSstCompact " << handles_[idx]->GetName() << " only one file"; - if (compact_result_vec) { - compact_result_vec->push_back(Status::OK()); - } - continue; - } + // 魔改:删除文件数量限制,只要有文件就 compact + // 原代码:if (props.size() <= 1) { ... continue; } + // 现在:注释掉这段逻辑,让 compact 更激进 - size_t max_files_to_compact = 1; + // size_t max_files_to_compact = 1; + // 魔改:每次处理 5 个文件,增加并发任务数 + size_t max_files_to_compact = 5; const StorageOptions& storageOptions = storage_->GetStorageOptions(); if (props.size() / storageOptions.compact_param_.compact_every_num_of_files_ > max_files_to_compact) { max_files_to_compact = props.size() / storageOptions.compact_param_.compact_every_num_of_files_; From f6ddc52bbe3ec4898ea1788f4e5becaf016aaebd Mon Sep 17 00:00:00 2001 From: chenbt Date: Wed, 15 Apr 2026 15:12:00 +0800 Subject: [PATCH 06/11] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8DL6=E8=B7=B3?= =?UTF-8?q?=E8=BF=87=E9=80=BB=E8=BE=91=E9=94=99=E8=AF=AF=EF=BC=9B=E6=96=B0?= =?UTF-8?q?=E5=A2=9E=E5=8A=A8=E6=80=81=E5=8F=82=E6=95=B0=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- conf/pika.conf | 3 -- include/pika_conf.h | 30 ++++++++++--- include/pika_server.h | 1 + src/pika_admin.cc | 65 +++++++++++++++++++++++++++ src/pika_conf.cc | 2 - src/pika_db.cc | 1 - src/pika_server.cc | 9 +++- src/storage/include/storage/storage.h | 3 +- src/storage/src/redis.cc | 20 ++------- src/storage/src/redis.h | 3 +- src/storage/src/storage.cc | 11 ++--- 11 files changed, 109 insertions(+), 39 deletions(-) diff --git a/conf/pika.conf b/conf/pika.conf index 0977dab062..b06f4f5162 100644 --- a/conf/pika.conf +++ b/conf/pika.conf @@ -725,8 +725,5 @@ incremental-compact-max-time-ms : 1000 # Compression rate threshold (%), continue processing if rate is below this value incremental-compact-min-rate : 70 -# Target level for compact files (-1 means current level + 1) -incremental-compact-target-level : -1 - # Minimum file age in seconds to be considered for compaction incremental-compact-min-file-age : 60 \ No newline at end of file diff --git a/include/pika_conf.h b/include/pika_conf.h index e7aa8198db..12e1665c82 100644 --- a/include/pika_conf.h +++ b/include/pika_conf.h @@ -164,10 +164,6 @@ class PikaConf : public pstd::BaseConf { std::shared_lock l(rwlock_); return incremental_compact_min_rate_; } - int incremental_compact_target_level() { - std::shared_lock l(rwlock_); - return incremental_compact_target_level_; - } int incremental_compact_min_file_age() { std::shared_lock l(rwlock_); return incremental_compact_min_file_age_; @@ -753,6 +749,31 @@ class PikaConf : public pstd::BaseConf { TryPushDiffCommands("disable_auto_compactions", value); disable_auto_compactions_ = value == "true"; } + void SetIncrementalCompactInterval(const int value) { + std::lock_guard l(rwlock_); + TryPushDiffCommands("incremental-compact-interval", std::to_string(value)); + incremental_compact_interval_ = value; + } + void SetIncrementalCompactMaxFiles(const int value) { + std::lock_guard l(rwlock_); + TryPushDiffCommands("incremental-compact-max-files", std::to_string(value)); + incremental_compact_max_files_ = value; + } + void SetIncrementalCompactMaxTimeMs(const int value) { + std::lock_guard l(rwlock_); + TryPushDiffCommands("incremental-compact-max-time-ms", std::to_string(value)); + incremental_compact_max_time_ms_ = value; + } + void SetIncrementalCompactMinRate(const int value) { + std::lock_guard l(rwlock_); + TryPushDiffCommands("incremental-compact-min-rate", std::to_string(value)); + incremental_compact_min_rate_ = value; + } + void SetIncrementalCompactMinFileAge(const int value) { + std::lock_guard l(rwlock_); + TryPushDiffCommands("incremental-compact-min-file-age", std::to_string(value)); + incremental_compact_min_file_age_ = value; + } void SetMaxSubcompactions(const int& value) { std::lock_guard l(rwlock_); TryPushDiffCommands("max-subcompactions", std::to_string(value)); @@ -1070,7 +1091,6 @@ class PikaConf : public pstd::BaseConf { int incremental_compact_max_files_ = 1; int incremental_compact_max_time_ms_ = 1000; int incremental_compact_min_rate_ = 70; - int incremental_compact_target_level_ = -1; int incremental_compact_min_file_age_ = 60; int64_t resume_check_interval_ = 60; // seconds diff --git a/include/pika_server.h b/include/pika_server.h index 0706af357f..a78c5e290f 100644 --- a/include/pika_server.h +++ b/include/pika_server.h @@ -551,6 +551,7 @@ class PikaServer : public pstd::noncopyable { */ bool have_scheduled_crontask_ = false; struct timeval last_check_compact_time_; + struct timeval last_incremental_compact_time_ = {0, 0}; /* * ResumeDB used diff --git a/src/pika_admin.cc b/src/pika_admin.cc index 944c65f9ec..c98aa897f8 100644 --- a/src/pika_admin.cc +++ b/src/pika_admin.cc @@ -2012,6 +2012,31 @@ void ConfigCmd::ConfigGet(std::string& ret) { EncodeString(&config_body, "disable_auto_compactions"); EncodeString(&config_body, g_pika_conf->disable_auto_compactions() ? "true" : "false"); } + if (pstd::stringmatch(pattern.data(), "incremental-compact-interval", 1) != 0) { + elements += 2; + EncodeString(&config_body, "incremental-compact-interval"); + EncodeNumber(&config_body, g_pika_conf->incremental_compact_interval()); + } + if (pstd::stringmatch(pattern.data(), "incremental-compact-max-files", 1) != 0) { + elements += 2; + EncodeString(&config_body, "incremental-compact-max-files"); + EncodeNumber(&config_body, g_pika_conf->incremental_compact_max_files()); + } + if (pstd::stringmatch(pattern.data(), "incremental-compact-max-time-ms", 1) != 0) { + elements += 2; + EncodeString(&config_body, "incremental-compact-max-time-ms"); + EncodeNumber(&config_body, g_pika_conf->incremental_compact_max_time_ms()); + } + if (pstd::stringmatch(pattern.data(), "incremental-compact-min-rate", 1) != 0) { + elements += 2; + EncodeString(&config_body, "incremental-compact-min-rate"); + EncodeNumber(&config_body, g_pika_conf->incremental_compact_min_rate()); + } + if (pstd::stringmatch(pattern.data(), "incremental-compact-min-file-age", 1) != 0) { + elements += 2; + EncodeString(&config_body, "incremental-compact-min-file-age"); + EncodeNumber(&config_body, g_pika_conf->incremental_compact_min_file_age()); + } if (pstd::stringmatch(pattern.data(), "network-interface", 1) != 0) { elements += 2; EncodeString(&config_body, "network-interface"); @@ -2345,6 +2370,11 @@ void ConfigCmd::ConfigSet(std::shared_ptr db) { "compact-cron", "compact-interval", "disable_auto_compactions", + "incremental-compact-interval", + "incremental-compact-max-files", + "incremental-compact-max-time-ms", + "incremental-compact-min-rate", + "incremental-compact-min-file-age", "slave-priority", "sync-window-size", "slow-cmd-list", @@ -2563,6 +2593,41 @@ void ConfigCmd::ConfigSet(std::shared_ptr db) { } g_pika_conf->SetDisableAutoCompaction(value); res_.AppendStringRaw("+OK\r\n"); + } else if (set_item == "incremental-compact-interval") { + if (pstd::string2int(value.data(), value.size(), &ival) == 0 || ival <= 0) { + res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'incremental-compact-interval'\r\n"); + return; + } + g_pika_conf->SetIncrementalCompactInterval(static_cast(ival)); + res_.AppendStringRaw("+OK\r\n"); + } else if (set_item == "incremental-compact-max-files") { + if (pstd::string2int(value.data(), value.size(), &ival) == 0 || ival <= 0) { + res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'incremental-compact-max-files'\r\n"); + return; + } + g_pika_conf->SetIncrementalCompactMaxFiles(static_cast(ival)); + res_.AppendStringRaw("+OK\r\n"); + } else if (set_item == "incremental-compact-max-time-ms") { + if (pstd::string2int(value.data(), value.size(), &ival) == 0 || ival <= 0) { + res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'incremental-compact-max-time-ms'\r\n"); + return; + } + g_pika_conf->SetIncrementalCompactMaxTimeMs(static_cast(ival)); + res_.AppendStringRaw("+OK\r\n"); + } else if (set_item == "incremental-compact-min-rate") { + if (pstd::string2int(value.data(), value.size(), &ival) == 0 || ival <= 0 || ival > 100) { + res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'incremental-compact-min-rate'\r\n"); + return; + } + g_pika_conf->SetIncrementalCompactMinRate(static_cast(ival)); + res_.AppendStringRaw("+OK\r\n"); + } else if (set_item == "incremental-compact-min-file-age") { + if (pstd::string2int(value.data(), value.size(), &ival) == 0 || ival < 0) { + res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'incremental-compact-min-file-age'\r\n"); + return; + } + g_pika_conf->SetIncrementalCompactMinFileAge(static_cast(ival)); + res_.AppendStringRaw("+OK\r\n"); } else if (set_item == "rate-limiter-bandwidth") { int64_t new_bandwidth = 0; if (pstd::string2int(value.data(), value.size(), &new_bandwidth) == 0 || new_bandwidth <= 0) { diff --git a/src/pika_conf.cc b/src/pika_conf.cc index 052284642a..4a0199182f 100644 --- a/src/pika_conf.cc +++ b/src/pika_conf.cc @@ -364,7 +364,6 @@ int PikaConf::Load() { if (incremental_compact_min_rate_ <= 0 || incremental_compact_min_rate_ > 100) { incremental_compact_min_rate_ = 70; } - GetConfInt("incremental-compact-target-level", &incremental_compact_target_level_); GetConfInt("incremental-compact-min-file-age", &incremental_compact_min_file_age_); if (incremental_compact_min_file_age_ < 0) { incremental_compact_min_file_age_ = 60; @@ -929,7 +928,6 @@ int PikaConf::ConfigRewrite() { SetConfInt("incremental-compact-max-files", incremental_compact_max_files_); SetConfInt("incremental-compact-max-time-ms", incremental_compact_max_time_ms_); SetConfInt("incremental-compact-min-rate", incremental_compact_min_rate_); - SetConfInt("incremental-compact-target-level", incremental_compact_target_level_); SetConfInt("incremental-compact-min-file-age", incremental_compact_min_file_age_); SetConfStr("disable_auto_compactions", disable_auto_compactions_ ? "true" : "false"); diff --git a/src/pika_db.cc b/src/pika_db.cc index 1a1f1b0ea4..f91c41883d 100644 --- a/src/pika_db.cc +++ b/src/pika_db.cc @@ -210,7 +210,6 @@ void DB::IncrementalCompact(const storage::DataType& type) { g_pika_conf->incremental_compact_max_files(), g_pika_conf->incremental_compact_max_time_ms(), g_pika_conf->incremental_compact_min_rate(), - g_pika_conf->incremental_compact_target_level(), g_pika_conf->incremental_compact_min_file_age()); } diff --git a/src/pika_server.cc b/src/pika_server.cc index 63d3952aa7..1723337bce 100644 --- a/src/pika_server.cc +++ b/src/pika_server.cc @@ -1238,7 +1238,14 @@ void PikaServer::AutoCompactRange() { } else if (g_pika_conf->compaction_strategy() == PikaConf::OldestOrBestDeleteRatioSstCompact) { DoSameThingEveryDB(TaskType::kCompactOldestOrBestDeleteRatioSst); } else if (g_pika_conf->compaction_strategy() == PikaConf::IncrementalCompact) { - DoSameThingEveryDB(TaskType::kIncrementalCompact); + struct timeval now; + gettimeofday(&now, nullptr); + int interval = g_pika_conf->incremental_compact_interval(); + if (last_incremental_compact_time_.tv_sec == 0 || + now.tv_sec - last_incremental_compact_time_.tv_sec >= interval) { + gettimeofday(&last_incremental_compact_time_, nullptr); + DoSameThingEveryDB(TaskType::kIncrementalCompact); + } } } diff --git a/src/storage/include/storage/storage.h b/src/storage/include/storage/storage.h index 7ee44717f8..f17e72bca2 100644 --- a/src/storage/include/storage/storage.h +++ b/src/storage/include/storage/storage.h @@ -1105,13 +1105,12 @@ class Storage { * @param max_files: 单次最多处理文件数 * @param max_time_ms: 单次最大执行时间 * @param min_rate: 压缩率阈值,低于此值继续处理 - * @param target_level: 目标 level,-1 表示当前 level + 1 * @param min_file_age: 文件最小年龄(秒) * @param sync: 是否同步执行 * @return Status */ Status IncrementalCompact(const DataType &type, int max_files = 1, int max_time_ms = 1000, - int min_rate = 70, int target_level = -1, int min_file_age = 60, + int min_rate = 70, int min_file_age = 60, bool sync = false); Status SetMaxCacheStatisticKeys(uint32_t max_cache_statistic_keys); diff --git a/src/storage/src/redis.cc b/src/storage/src/redis.cc index 5db9e2a017..e0851d7aae 100644 --- a/src/storage/src/redis.cc +++ b/src/storage/src/redis.cc @@ -333,13 +333,7 @@ Status Redis::LongestNotCompactionSstCompact(const DataType& option_type, std::v // clear deleted sst file records because we use them in different cf listener_.Clear(); - // 魔改:删除文件数量限制,只要有文件就 compact - // 原代码:if (props.size() <= 1) { ... continue; } - // 现在:注释掉这段逻辑,让 compact 更激进 - - // size_t max_files_to_compact = 1; - // 魔改:每次处理 5 个文件,增加并发任务数 - size_t max_files_to_compact = 5; + size_t max_files_to_compact = 1; const StorageOptions& storageOptions = storage_->GetStorageOptions(); if (props.size() / storageOptions.compact_param_.compact_every_num_of_files_ > max_files_to_compact) { max_files_to_compact = props.size() / storageOptions.compact_param_.compact_every_num_of_files_; @@ -479,7 +473,7 @@ static uint64_t ExtractFileNumber(const std::string& name) { Status Redis::IncrementalCompact(const DataType& option_type, std::vector* compact_result_vec, const ColumnFamilyType& type, int max_files, int max_time_ms, - int min_rate, int target_level, int min_file_age) { + int min_rate, int min_file_age) { // 1. 并发控制 bool no_compact = false; bool to_compact = true; @@ -525,8 +519,8 @@ Status Redis::IncrementalCompact(const DataType& option_type, std::vector= 6) { continue; } @@ -550,12 +544,6 @@ Status Redis::IncrementalCompact(const DataType& option_type, std::vector= 6) { - LOG(INFO) << "IncrementalCompact skip L6 file: " << oldest_file; - break; - } - // 3.3 使用 CompactFiles 进行 compact(只处理 L1-L5) std::vector input_files{oldest_file}; rocksdb::CompactionOptions compact_options; diff --git a/src/storage/src/redis.h b/src/storage/src/redis.h index cdc3903f68..63c00491ff 100644 --- a/src/storage/src/redis.h +++ b/src/storage/src/redis.h @@ -113,8 +113,7 @@ class Redis { virtual Status IncrementalCompact(const DataType& option_type, std::vector* compact_result_vec, const ColumnFamilyType& type = kMetaAndData, int max_files = 1, int max_time_ms = 1000, - int min_rate = 70, int target_level = -1, - int min_file_age = 60); + int min_rate = 70, int min_file_age = 60); virtual Status GetProperty(const std::string& property, uint64_t* out); diff --git a/src/storage/src/storage.cc b/src/storage/src/storage.cc index 8d7dcbd3be..78e8e6b956 100644 --- a/src/storage/src/storage.cc +++ b/src/storage/src/storage.cc @@ -1734,16 +1734,14 @@ Status Storage::RunBGTask() { int max_files = 1; int max_time_ms = 1000; int min_rate = 70; - int target_level = -1; int min_file_age = 60; if (task.argv.size() > 0) max_files = std::atoi(task.argv[0].c_str()); if (task.argv.size() > 1) max_time_ms = std::atoi(task.argv[1].c_str()); if (task.argv.size() > 2) min_rate = std::atoi(task.argv[2].c_str()); - if (task.argv.size() > 3) target_level = std::atoi(task.argv[3].c_str()); - if (task.argv.size() > 4) min_file_age = std::atoi(task.argv[4].c_str()); + if (task.argv.size() > 3) min_file_age = std::atoi(task.argv[3].c_str()); - IncrementalCompact(task.type, max_files, max_time_ms, min_rate, target_level, min_file_age, true); + IncrementalCompact(task.type, max_files, max_time_ms, min_rate, min_file_age, true); } else if (task.operation == kCompactRange) { if (task.argv.size() == 1) { DoCompactSpecificKey(task.type, task.argv[0]); @@ -1776,14 +1774,14 @@ Status Storage::LongestNotCompactionSstCompact(const DataType &type, bool sync) } Status Storage::IncrementalCompact(const DataType &type, int max_files, int max_time_ms, - int min_rate, int target_level, int min_file_age, bool sync) { + int min_rate, int min_file_age, bool sync) { if (sync) { Status s; for (const auto& inst : insts_) { std::vector compact_result_vec; s = inst->IncrementalCompact(type, &compact_result_vec, storage::kMetaAndData, max_files, max_time_ms, min_rate, - target_level, min_file_age); + min_file_age); for (auto compact_result : compact_result_vec) { if (!compact_result.ok()) { LOG(ERROR) << compact_result.ToString(); @@ -1797,7 +1795,6 @@ Status Storage::IncrementalCompact(const DataType &type, int max_files, int max_ std::to_string(max_files), std::to_string(max_time_ms), std::to_string(min_rate), - std::to_string(target_level), std::to_string(min_file_age) }}); } From db019e1b79f3de5e6114598c8821c3405684fb43 Mon Sep 17 00:00:00 2001 From: chenbt Date: Wed, 15 Apr 2026 15:51:25 +0800 Subject: [PATCH 07/11] =?UTF-8?q?fix:=20=E5=B0=9D=E8=AF=95=E4=BF=AE?= =?UTF-8?q?=E5=A4=8Dobdcompact=20error=20=E6=BC=8F=E4=BC=A0=E4=BA=86=20han?= =?UTF-8?q?dles=5F[idx]=EF=BC=8C=E5=AF=BC=E8=87=B4=E9=9D=9EkMetaCF=20?= =?UTF-8?q?=E7=9A=84=20CF=20=E8=A7=A6=E5=8F=91=20force=20compact=E6=97=B6?= =?UTF-8?q?=EF=BC=8C=E5=AE=9E=E9=99=85=E8=A2=AB=20compact=20=E7=9A=84?= =?UTF-8?q?=E6=98=AF=20kMetaCF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/storage/src/redis.cc | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/storage/src/redis.cc b/src/storage/src/redis.cc index e0851d7aae..18fa9459f8 100644 --- a/src/storage/src/redis.cc +++ b/src/storage/src/redis.cc @@ -400,7 +400,12 @@ Status Redis::LongestNotCompactionSstCompact(const DataType& option_type, std::v if (file_creation_time < static_cast(now / 1000 - storageOptions.compact_param_.force_compact_file_age_seconds_) && delete_ratio >= force_compact_min_ratio) { - compact_result = db_->CompactRange(default_compact_range_options_, &start_key, &stop_key); + compact_result = db_->CompactRange(default_compact_range_options_, handles_[idx], &start_key, &stop_key); + if (!compact_result.ok()) { + LOG(WARNING) << handles_[idx]->GetName() + << " force CompactRange failed: " << compact_result.ToString() + << " file=" << file_path; + } if (--max_files_to_compact == 0) { break; } From 64ee3a6f97e48a9fb3c985725ad46216cb6e7141 Mon Sep 17 00:00:00 2001 From: chenbt Date: Fri, 17 Apr 2026 10:53:34 +0800 Subject: [PATCH 08/11] =?UTF-8?q?Revert=20"feat=EF=BC=9A=E6=B5=8B=E8=AF=95?= =?UTF-8?q?=E9=AD=94=E6=94=B9=E4=BB=A3=E7=A0=81"?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/storage/src/redis.cc | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/src/storage/src/redis.cc b/src/storage/src/redis.cc index 18fa9459f8..198a7cf975 100644 --- a/src/storage/src/redis.cc +++ b/src/storage/src/redis.cc @@ -198,32 +198,15 @@ Status Redis::SetMaxCacheStatisticKeys(size_t max_cache_statistic_keys) { /* * compactrange no longer supports compact for a single data type * - * 魔改版本:添加延迟放大并发竞争窗口,用于复现 SST 损坏问题 - * 注意:此修改仅用于测试环境,生产环境请勿使用 */ Status Redis::CompactRange(const rocksdb::Slice* begin, const rocksdb::Slice* end) { - // 随机延迟 0-50ms,让 7 个 CF 的启动时间错开但仍重叠 - std::this_thread::sleep_for(std::chrono::milliseconds(rand() % 50)); db_->CompactRange(default_compact_range_options_, begin, end); - - // 每个 CF 之间固定延迟 20ms,增加并发重叠度 - std::this_thread::sleep_for(std::chrono::milliseconds(20)); db_->CompactRange(default_compact_range_options_, handles_[kHashesDataCF], begin, end); - - std::this_thread::sleep_for(std::chrono::milliseconds(20)); db_->CompactRange(default_compact_range_options_, handles_[kSetsDataCF], begin, end); - - std::this_thread::sleep_for(std::chrono::milliseconds(20)); db_->CompactRange(default_compact_range_options_, handles_[kListsDataCF], begin, end); - - std::this_thread::sleep_for(std::chrono::milliseconds(20)); db_->CompactRange(default_compact_range_options_, handles_[kZsetsDataCF], begin, end); - - std::this_thread::sleep_for(std::chrono::milliseconds(20)); db_->CompactRange(default_compact_range_options_, handles_[kZsetsScoreCF], begin, end); - - std::this_thread::sleep_for(std::chrono::milliseconds(20)); db_->CompactRange(default_compact_range_options_, handles_[kStreamsDataCF], begin, end); return Status::OK(); From c90028d235174d259886cc8824e1eaa21933dc9c Mon Sep 17 00:00:00 2001 From: chenbt Date: Fri, 17 Apr 2026 11:41:40 +0800 Subject: [PATCH 09/11] =?UTF-8?q?fix:=20=E7=A7=BB=E9=99=A4=E4=B8=B4?= =?UTF-8?q?=E6=97=B6=E6=96=87=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 6 +- check_delayed_cleanup.sh | 104 ------ check_orphan_files.sh | 83 ----- docs/pika_full_sync_solution.md | 587 -------------------------------- docs/sync_solution_analysis.md | 182 ---------- 5 files changed, 1 insertion(+), 961 deletions(-) delete mode 100644 check_delayed_cleanup.sh delete mode 100644 check_orphan_files.sh delete mode 100644 docs/pika_full_sync_solution.md delete mode 100644 docs/sync_solution_analysis.md diff --git a/.gitignore b/.gitignore index 80f26c7561..4749601897 100644 --- a/.gitignore +++ b/.gitignore @@ -73,8 +73,4 @@ pkg # include codis fe javascript lib files !codis/cmd/fe/assets/** -tests/tmp -./.claude -CLAUDE.md -./.claude -CLAUDE.md \ No newline at end of file +tests/tmp \ No newline at end of file diff --git a/check_delayed_cleanup.sh b/check_delayed_cleanup.sh deleted file mode 100644 index 231716e1e1..0000000000 --- a/check_delayed_cleanup.sh +++ /dev/null @@ -1,104 +0,0 @@ -#!/bin/bash -# 分析日志,找出被添加到延迟删除队列但实际没有执行删除的文件 - -LOG_FILE="${1:-/app/pika4/pika-9454/log/pika.INFO}" - -echo "==========================================" -echo "分析延迟清理队列执行情况" -echo "日志文件: $LOG_FILE" -echo "==========================================" - -# 提取所有 Scheduled 的文件(延迟删除调度) -echo "" -echo "步骤 1: 提取所有被调度的文件..." -grep "Scheduled file for delayed cleanup" "$LOG_FILE" 2>/dev/null | \ - sed 's/.*cleanup: //g' | \ - awk '{print $1}' | \ - grep "^/" | \ - sort -u > /tmp/scheduled_files.txt - -scheduled_count=$(wc -l < /tmp/scheduled_files.txt 2>/dev/null | tr -d ' ') -echo " 被调度的文件数: $scheduled_count" - -# 提取所有 Deleted 的文件(实际删除) -echo "" -echo "步骤 2: 提取所有实际删除的文件..." -grep "Deleted delayed cleanup file" "$LOG_FILE" 2>/dev/null | \ - sed 's/.*Deleted delayed cleanup file: //g' | \ - awk '{print $1}' | \ - grep "^/" | \ - sort -u > /tmp/deleted_files.txt - -deleted_count=$(wc -l < /tmp/deleted_files.txt 2>/dev/null | tr -d ' ') -echo " 实际删除的文件数: $deleted_count" - -# 如果没有任何记录,直接退出 -if [ "$scheduled_count" -eq 0 ] && [ "$deleted_count" -eq 0 ]; then - echo "" - echo "未找到任何延迟清理相关日志" - rm -f /tmp/scheduled_files.txt /tmp/deleted_files.txt - exit 0 -fi - -# 找出被调度但未删除的文件 -echo "" -echo "步骤 3: 找出被调度但未删除的文件..." -comm -23 /tmp/scheduled_files.txt /tmp/deleted_files.txt > /tmp/missing_files.txt - -missing_count=$(wc -l < /tmp/missing_files.txt 2>/dev/null | tr -d ' ') -echo " 被调度但未删除的文件数: $missing_count" - -if [ "$missing_count" -gt 0 ]; then - echo "" - echo "==========================================" - echo "被调度但未删除的文件列表:" - echo "==========================================" - - while IFS= read -r filepath; do - if [ -n "$filepath" ]; then - # 检查文件是否仍然存在 - if [ -f "$filepath" ]; then - # 获取文件大小和 nlink - size=$(stat -c %s "$filepath" 2>/dev/null || stat -f %z "$filepath" 2>/dev/null) - nlink=$(stat -c %h "$filepath" 2>/dev/null || stat -f %l "$filepath" 2>/dev/null) - - if command -v numfmt >/dev/null 2>&1; then - human_size=$(numfmt --to=iec-i --suffix=B "$size" 2>/dev/null) - else - human_size="${size} bytes" - fi - - echo " [仍存在] $filepath" - echo " 大小: $human_size, 硬链接数: $nlink" - - # 查找该文件的调度时间 - scheduled_time=$(grep "Scheduled file for delayed cleanup.*$filepath" "$LOG_FILE" | tail -1 | awk '{print $1}') - if [ -n "$scheduled_time" ]; then - echo " 调度时间: $scheduled_time" - fi - echo "" - else - echo " [已消失] $filepath (可能已被其他方式删除)" - fi - fi - done < /tmp/missing_files.txt -fi - -# 清理临时文件 -rm -f /tmp/scheduled_files.txt /tmp/deleted_files.txt /tmp/missing_files.txt - -echo "" -echo "==========================================" -echo "总结:" -echo " 调度文件: $scheduled_count" -echo " 删除文件: $deleted_count" -echo " 未删除文件: $missing_count" -if [ "$missing_count" -gt 0 ]; then - echo "" - echo "⚠️ 发现 $missing_count 个文件被调度但未删除!" - echo " 可能原因:" - echo " 1. 延迟时间未到 (600秒)" - echo " 2. 文件在删除时 nlink != 1 (不再是孤儿文件)" - echo " 3. ProcessPendingCleanupFiles 未执行或执行失败" -fi -echo "==========================================" diff --git a/check_orphan_files.sh b/check_orphan_files.sh deleted file mode 100644 index bec2e954ce..0000000000 --- a/check_orphan_files.sh +++ /dev/null @@ -1,83 +0,0 @@ -#!/bin/bash -# 检查 dump 目录下的所有孤儿文件(nlink=1 的 SST 文件)及其大小 - -DUMP_DIR="${1:-/app/pika4/pika-9454/dump}" - -echo "==========================================" -echo "扫描 Dump 目录下的孤儿文件 (nlink=1)" -echo "目录: $DUMP_DIR" -echo "==========================================" - -# 统计总文件数、孤儿文件数、总大小 -total_files=0 -orphan_files=0 -total_orphan_size=0 - -# 遍历所有 dump 子目录 -for dump_subdir in "$DUMP_DIR"/dump-*/; do - if [ -d "$dump_subdir" ]; then - echo "" - echo "检查目录: $dump_subdir" - echo "----------------------------------------" - - # 查找所有 .sst 文件并检查 nlink - find "$dump_subdir" -name "*.sst" -type f 2>/dev/null | while read -r file; do - total_files=$((total_files + 1)) - - # 获取硬链接数 - nlink=$(stat -c %h "$file" 2>/dev/null || stat -f %l "$file" 2>/dev/null) - - if [ "$nlink" -eq 1 ]; then - # 获取文件大小 - size=$(stat -c %s "$file" 2>/dev/null || stat -f %z "$file" 2>/dev/null) - - if command -v numfmt >/dev/null 2>&1; then - human_size=$(numfmt --to=iec-i --suffix=B "$size" 2>/dev/null) - else - human_size="${size} bytes" - fi - - echo "[孤儿文件] $file (大小: $human_size)" - fi - done - fi -done - -echo "" -echo "==========================================" -echo "正在统计总数..." -echo "==========================================" - -# 重新统计 -total_files=0 -orphan_files=0 -total_orphan_size=0 - -for dump_subdir in "$DUMP_DIR"/dump-*/; do - if [ -d "$dump_subdir" ]; then - find "$dump_subdir" -name "*.sst" -type f 2>/dev/null | while read -r file; do - total_files=$((total_files + 1)) - nlink=$(stat -c %h "$file" 2>/dev/null || stat -f %l "$file" 2>/dev/null) - if [ "$nlink" -eq 1 ]; then - size=$(stat -c %s "$file" 2>/dev/null || stat -f %z "$file" 2>/dev/null) - orphan_files=$((orphan_files + 1)) - total_orphan_size=$((total_orphan_size + size)) - echo "$size $file" - fi - done - fi -done > /tmp/orphan_list.txt - -orphan_files=$(wc -l < /tmp/orphan_list.txt 2>/dev/null || echo 0) -total_orphan_size=$(awk '{sum+=$1} END {print sum}' /tmp/orphan_list.txt 2>/dev/null || echo 0) - -echo "统计结果:" -echo " 孤儿文件数: $orphan_files" -if command -v numfmt >/dev/null 2>&1; then - echo " 孤儿文件总大小: $(numfmt --to=iec-i --suffix=B $total_orphan_size 2>/dev/null || echo ${total_orphan_size}bytes)" -else - echo " 孤儿文件总大小: $total_orphan_size bytes" -fi - -rm -f /tmp/orphan_list.txt -echo "==========================================" diff --git a/docs/pika_full_sync_solution.md b/docs/pika_full_sync_solution.md deleted file mode 100644 index ec9a793082..0000000000 --- a/docs/pika_full_sync_solution.md +++ /dev/null @@ -1,587 +0,0 @@ -# Pika 全量同步方案详解(Scheme A) - -## 文档说明 - -本文档详细描述 Pika 3.5.0+ 版本的全量同步机制(Scheme A),包括各场景下的完整流程、状态变化、数据流转以及已知问题。 - -**版本信息** -- 适用版本:Pika 3.5.0+ -- 方案名称:Scheme A(独立 Dump + 延迟清理) -- 最后更新:2026-03-06 - ---- - -## 1. 架构概述 - -### 1.1 核心设计 - -**Scheme A** 采用以下设计原则: - -1. **每个 Slave 独占一个 Dump 目录**:`dump-YYYYMMDD-NN/db_name` 格式 -2. **传输完成后延迟清理**:孤儿文件(nlink=1)传输完成后加入延迟清理队列(10分钟后删除) -3. **最大并发限制**:默认最多 3 个并发 dump -4. **细粒度文件保护**:传输中的文件受保护,防止被误删 -5. **统一清理入口**:所有孤儿文件清理通过 `RemoveTransferringFile` 统一处理 - -### 1.2 关键组件 - -| 组件 | 文件 | 职责 | -|------|------|------| -| RsyncServer | `rsync_server.cc` | 处理 Slave 文件同步请求 | -| RsyncServerConn | `rsync_server.cc` | 维护单个连接的状态 | -| PikaServer | `pika_server.cc` | 管理 Dump 占用、snapshot 注册 | -| DB | `pika_db.cc` | 管理 bgsave 和 dump 元数据 | - -### 1.3 关键数据结构 - -```cpp -// Dump 占用信息 -struct DumpOwnerInfo { - std::string conn_id; // 占用连接的 ID - std::string dump_path; // dump 目录路径 -}; -std::map dump_owners_; // snapshot_uuid -> 占用信息 - -// 传输中文件保护 -std::map> rsync_transferring_files_; // snapshot_uuid -> 文件集合 - -// 活跃 snapshot -std::set active_rsync_snapshots_; // 用于孤儿文件清理保护 -``` - ---- - -## 2. 单个 Slave 单 DB 全量同步流程 - -以 `db0` 为例,详细描述 Master 和 Slave 的状态变化。 - -### 2.1 流程时序图 - -``` -阶段1: 触发全量同步 -┌─────────────┐ ┌─────────────┐ -│ Slave │ │ Master │ -└──────┬──────┘ └──────┬──────┘ - │ │ - │ 1. 判断需要全量同步 │ - │ (repl_state: kTryConnect) │ - │ │ - │ 2. 发送 DBSync 请求 │ - │ ───────────────────────────────>│ - │ │ - │ 3. 检查是否正在 bgsave - │ (IsBgSaving()) - │ │ - │ 4. 如果不在 bgsave,触发 bgsave - │ (BgSaveDB()) - │ │ - │ 5. 返回 kErr(等待 bgsave) │ - │ <───────────────────────────────│ - │ │ - │ 6. 重试(循环) │ - │ ───────────────────────────────>│ - │ 7. 如果仍在 bgsave,返回 kErr - │ <───────────────────────────────│ - │ │ - -阶段2: bgsave 执行 -┌─────────────┐ ┌─────────────┐ -│ Background │ │ Master │ -│ Thread │ │ │ -└──────┬──────┘ └──────┬──────┘ - │ │ - │ 1. 创建 dump 目录 │ - │ (InitBgsaveEnv) │ - │ dump-20260305-0/db0 │ - │ │ - │ 2. 创建 RocksDB Checkpoint │ - │ (创建硬链接) │ - │ │ - │ 3. 生成 info 文件 │ - │ │ - │ 4. bgsave 完成 │ - │ (IsBgSaving() -> false) │ - │ │ - -阶段3: Meta 请求处理 -┌─────────────┐ ┌─────────────┐ -│ Slave │ │ Master │ -└──────┬──────┘ └──────┬──────┘ - │ │ - │ 1. 再次发送 DBSync 请求 │ - │ (循环重试后 bgsave 已完成) │ - │ ───────────────────────────────>│ - │ │ - │ 2. 获取文件列表 - │ (GetDumpMeta) - │ 扫描 dump-20260305-0/db0 - │ 生成 snapshot_uuid - │ │ - │ 3. 检查 dump 完整性 - │ 4. 检查是否已被占用 - │ 5. 检查并发限制 - │ 6. 标记 dump 为占用 - │ (MarkDumpInUse) - │ 7. 注册 snapshot - │ (RegisterSnapshot) - │ 8. 预注册所有文件 - │ (AddTransferringFile) - │ │ - │ 9. 返回 Meta 响应 │ - │ (snapshot_uuid + 文件列表) │ - │ <───────────────────────────────│ - │ │ - -阶段4: 文件传输 -┌─────────────┐ ┌─────────────┐ -│ Slave │ │ Master │ -└──────┬──────┘ └──────┬──────┘ - │ │ - │ 1. 多线程下载文件 │ - │ ─────────────────────────────> │ - │ │ - │ 2. 检查文件是否存在 - │ 3. 注册文件为传输中 - │ 4. 读取文件内容 - │ 5. 注销文件 - │ 6. 如果是最后一块(is_eof) - │ 检查是否为孤儿文件(nlink=1) - │ 如果是孤儿,加入延迟清理队列(10分钟) - │ │ - │ 7. 返回文件数据 │ - │ <───────────────────────────────│ - │ │ - │ (重复直到所有文件下载完成) │ - -阶段5: 清理 -┌─────────────┐ ┌─────────────┐ -│ Slave │ │ Master │ -└──────┬──────┘ └──────┬──────┘ - │ │ - │ 1. 下载完成,关闭连接 │ - │ ───────X────────────────────────>│ - │ │ - │ 2. 连接断开,析构 RsyncServerConn - │ 3. 释放 dump 占用 - │ (ReleaseDump) - │ 4. 注销 snapshot - │ (UnregisterSnapshot) - │ │ - │ 5. AutoDeleteExpiredDump 定时执行 - │ 处理延迟清理队列(ProcessPendingCleanupFiles) - │ 删除过期 dump 目录 - │ (注:CleanupOrphanSstFiles 已移除,延迟清理统一处理) - │ │ -``` - -### 2.2 Master 状态变化 - -| 阶段 | 状态 | 说明 | -|------|------|------| -| T0 | 无 dump | 初始状态 | -| T1 | bgsaving | 创建 dump-20260305-0/db0 | -| T2 | dump 可用 | bgsave 完成,等待 Meta 请求 | -| T3 | dump 占用 | 收到 Meta 请求,标记为占用 | -| T4 | 传输中 | 文件传输中,即时清理进行中 | -| T5 | dump 释放 | Slave 断开,释放占用 | -| T6 | dump 过期 | AutoDeleteExpiredDump 删除过期 dump | - -### 2.3 Slave 状态变化 - -| 阶段 | 状态 | 说明 | -|------|------|------| -| T0 | kTryConnect | 尝试连接 Master | -| T1 | kWaitDBSync | 等待 Master bgsave 完成 | -| T2 | kWaitDBSync | 获取文件列表,开始下载 | -| T3 | kWaitDBSync | 文件下载中 | -| T4 | kConnected | 全量同步完成,开始增量同步 | - -### 2.4 数据变化 - -**Master 磁盘占用变化**: - -| 时间点 | 数据目录 | Dump 目录 | 总计 | -|--------|----------|-----------|------| -| 初始 | 100GB | 0 | 100GB | -| bgsave 中 | 100GB | 0 (硬链接不占用) | 100GB | -| compaction 后 | 100GB | 部分孤儿文件 | 100GB + 孤儿文件 | -| 传输中 | 100GB | 100GB (dump) | 200GB | -| 传输完成 | 100GB | 孤儿文件延迟10分钟清理 | 100GB ~ 200GB | - ---- - -## 3. 多 Slave 同步流程 - -### 3.1 场景描述 -- Master 有 100GB 数据 -- Slave-1 先发起同步 -- Slave-2 在 Slave-1 同步过程中发起同步 - -### 3.2 流程时序 - -``` -时间线: -T0: - Slave-1 ──DBSync──> Master - Master: IsBgSaving? No - Master: 触发 BgSaveDB() - Master: 创建 dump-20260305-0/db0 - Slave-1 <──kErr─── Master (等待 bgsave) - -T30s: - Master: bgsave 完成 - Slave-1 ──DBSync──> Master - Master: 获取文件列表 (dump-0) - Master: MarkDumpInUse(dump-0, Slave-1) - Slave-1 <──文件列表── Master - Slave-1 开始下载... - -T31s: - Slave-2 ──DBSync──> Master - Master: IsDumpInUse(dump-0)? Yes (被 Slave-1 占用) - Master: 触发新的 BgSaveDB() - Master: 创建 dump-20260305-1/db0 - Slave-2 <──kErr─── Master (等待新 bgsave) - -T61s: - Master: 新 bgsave 完成 - Slave-2 ──DBSync──> Master - Master: MarkDumpInUse(dump-1, Slave-2) - Slave-2 <──文件列表── Master - Slave-2 开始下载... - -T120s: - Slave-1: 下载完成,断开连接 - Master: ReleaseDump(dump-0) - Master: 删除 dump-0 (AutoDeleteExpiredDump) - -T180s: - Slave-2: 下载完成,断开连接 - Master: ReleaseDump(dump-1) - Master: 删除 dump-1 -``` - -### 3.3 关键限制 - -- **最大并发 dump 数**:3 个(`kMaxConcurrentDumps = 3`) -- **超过限制**:返回 kErr,Slave 重试 - ---- - -## 4. 单 Slave 多 DB 同步流程 - -### 4.1 场景描述 -- Master 配置 3 个 DB:db0, db1, db2 -- 每个 DB 有独立的 RocksDB 实例(db-instance-num=3) -- Slave 同时同步所有 DB - -### 4.2 目录结构 - -``` -dump/dump-20260305-0/ -├── db0/ -│ ├── 0/ # RocksDB 实例 0 -│ │ ├── 000001.sst -│ │ └── 000002.sst -│ ├── 1/ # RocksDB 实例 1 -│ │ └── 000003.sst -│ ├── 2/ # RocksDB 实例 2 -│ │ └── 000004.sst -│ └── info # dump 元信息 -├── db1/ -│ ├── 0/ -│ ├── 1/ -│ ├── 2/ -│ └── info -└── db2/ - ├── 0/ - ├── 1/ - ├── 2/ - └── info -``` - -### 4.3 文件命名规则 - -- Slave 请求格式:`{rocksdb_instance}/{filename}` -- 示例:`0/000001.sst`, `1/000003.sst` -- **注意**:不包含 db0/db1/db2 前缀 - -### 4.4 同步流程 - -每个 DB 独立同步: - -1. Slave 发送 db0 的 DBSync 请求 -2. Master 返回 db0 的文件列表 -3. Slave 下载 db0 的所有文件 -4. 重复步骤 1-3 对 db1 和 db2 - -### 4.5 潜在问题 - -**问题:info 文件位置不一致** -- `AutoDeleteExpiredDump` 查找:`dump/dump-xxx/info` -- 实际位置:`dump/dump-xxx/db0/info` - -**已修复**:先尝试 `db0/info`,再回退到 `info` - ---- - -## 5. 多 Slave 多 DB 同步流程 - -这是 Scheme A 最复杂的场景,结合了多 Slave 和多 DB 的特点。 - -### 5.1 场景描述 -- Master:3 个 DB (db0, db1, db2) -- Slave-1:同步 db0, db1, db2 -- Slave-2:同步 db0, db1, db2 - -### 5.2 Dump 占用机制 - -**方案 A 设计**:每个 Slave 独占整个 dump 目录(包含所有 DB) - -``` -Slave-1 占用 dump-20260305-0: -├── db0 (传输中) -├── db1 (传输中) -└── db2 (传输中) - -Slave-2 占用 dump-20260305-1: -├── db0 (传输中) -├── db1 (传输中) -└── db2 (传输中) -``` - -### 5.3 占用检查 - -- 检查粒度:**整个 dump 目录** -- 如果一个 Slave 正在使用 dump-0,其他 Slave 不能使用 -- 触发新的 bgsave 创建 dump-1 - -### 5.4 潜在问题 - -**问题 1:DB 级别粒度 vs Dump 级别粒度** -- 当前设计:dump 级别占用 -- 如果 Slave-1 只同步 db0,dump-0 仍不能被 Slave-2 使用 -- 浪费磁盘空间 - -**问题 2:多 DB 的孤儿文件清理** -- `AutoDeleteExpiredDump` 只检查 `db0/info` -- 如果 db1 或 db2 还在传输,可能被误判为可清理 - ---- - -## 6. 孤儿文件清理机制(统一延迟清理) - -### 6.1 触发条件 - -孤儿文件:nlink=1 的 SST 文件(只被 dump 引用,不被 RocksDB 引用) - -**产生原因**: -- RocksDB compaction 删除旧 SST -- dump 中的硬链接变成孤儿 - -### 6.2 统一清理策略 - -**设计变更**:移除 `CleanupOrphanSstFiles` 函数,统一使用延迟清理队列 - -**新清理流程**: - -``` -1. 文件传输完成时(RemoveTransferringFile) - - 检查 is_eof=true(最后一块传输完成) - - stat 检查文件 nlink - - 如果 nlink=1(孤儿文件): - * 加入延迟清理队列(ScheduleFileForCleanup,延迟600秒) - * 记录日志 "Scheduled orphan file for cleanup" - - 如果 nlink=2(非孤儿): - * 不做处理,RocksDB 会管理生命周期 - -2. AutoDeleteExpiredDump 定时执行(每60秒) - - 调用 ProcessPendingCleanupFiles() - - 检查队列中到期的文件 - - 删除到期文件,记录日志 "Deleted delayed cleanup file" - - 同时检查并删除过期的 dump 目录 -``` - -### 6.3 保护机制 - -| 保护级别 | 说明 | 实现位置 | -|----------|------|----------| -| 传输中保护 | 传输中的文件不会被清理 | `rsync_transferring_files_` | -| 延迟保护 | 孤儿文件延迟10分钟删除,给 Slave 重试时间 | `ScheduleFileForCleanup(filepath, 600)` | -| nlink 检查 | 只清理孤儿文件(nlink=1),避免误删 | `stat` 检查 | - -### 6.4 时序说明 - -``` -T0: 文件传输完成(is_eof=true) - └─> RemoveTransferringFile 检查 nlink==1 - └─> ScheduleFileForCleanup(filepath, 600) 加入队列 - └─> 日志: "Scheduled orphan file for cleanup in 10min" - -T0+10min: AutoDeleteExpiredDump 定时执行 - └─> ProcessPendingCleanupFiles() - └─> 检查到期文件 - └─> 删除文件 - └─> 日志: "Deleted delayed cleanup file" -``` - -### 6.5 对比:旧方案 vs 新方案 - -| 方面 | 旧方案(CleanupOrphanSstFiles) | 新方案(统一延迟清理) | -|------|--------------------------------|----------------------| -| 触发时机 | 定时扫描所有 dump 目录 | 传输完成时即时检查 | -| 清理延迟 | 扫描周期不确定 | 固定延迟10分钟 | -| 竞争条件 | 与传输过程可能竞争 | 无竞争,统一入口 | -| 代码复杂度 | ~170行独立函数 | ~15行集成逻辑 | -| Slave 重试 | 可能失败(文件已被删) | 10分钟内可重试 | - ---- - -## 7. Bug 列表 - -### 7.1 已修复的 Bug - -| Bug | 影响 | 修复 | Commit | -|-----|------|------|--------| -| CreatePath 逻辑错误 | 无法创建 dump 目录 | 添加 EnsureDirExists 包装函数 | ee37a586 | -| 文件分片传输被提前删除 | Slave 重试失败 | 添加 is_eof 参数,只在传输完成时删除 | eb778848 | -| Snapshot 注册失败 | 文件被误删 | 修复 RegisterSnapshot 调用顺序 | 08ee1c36 | -| Info 文件路径错误 | 无法读取 snapshot_uuid | 先尝试 db0/info,再回退到 info | ad54f43a | -| 空 dump 目录返回空列表 | Slave 尝试下载不存在的文件 | 添加 filenames.empty() 检查 | 27c3a838 | -| 即时清理与延迟清理竞争 | Slave 重试时文件已被删 | 移除 CleanupOrphanSstFiles,统一使用延迟清理队列 | 当前版本 | -| CleanupOrphanSstFiles 误删传输中文件 | 同步失败 | 删除 CleanupOrphanSstFiles 函数 | 当前版本 | - -### 7.2 已移除的 Bug(通过架构调整修复) - -| Bug | 原影响 | 修复方式 | -|-----|--------|----------| -| CleanupOrphanSstFiles 竞争问题 | 与延迟清理队列竞争同一文件 | 移除 CleanupOrphanSstFiles 函数 | -| 即时清理导致重试失败 | Slave 30分钟后重试,文件已删 | 统一使用延迟10分钟清理 | - -### 7.3 待修复的 Bug - -| Bug | 影响 | 严重程度 | 修复方案 | -|-----|------|----------|----------| -| 多 DB 场景下孤儿文件清理粒度问题 | db1/db2 传输中可能被误判 | 中 | 检查所有 DB 的 info 文件 | -| 多 Slave 多 DB 的磁盘浪费 | 每个 Slave 独占整个 dump | 低 | 支持 DB 级别占用 | - ---- - -## 8. 待办事项 - -### 8.1 高优先级 - -- [x] **统一孤儿文件清理机制(已完成)** - - 移除 CleanupOrphanSstFiles 函数 - - 统一使用 RemoveTransferringFile + 延迟清理队列 - - 延迟10分钟,给 Slave 重试时间 - -- [ ] **修复多 DB 孤儿文件清理粒度问题** - - 当前只检查 db0/info - - 需要检查所有 DB 子目录 - - 如果任何 DB 在使用中,整个 dump 应被保护 - -### 8.2 中优先级 - -- [ ] **优化多 Slave 多 DB 的磁盘占用** - - 当前:每个 Slave 独占整个 dump - - 优化:支持 DB 级别占用 - - 影响:需要修改占用管理逻辑 - -- [ ] **完善监控指标** - - Dump 占用数量 - - 孤儿文件清理统计 - - 传输失败率 - -### 8.3 低优先级 - -- [ ] **支持动态调整并发限制** - - 当前:编译期常量 kMaxConcurrentDumps=3 - - 优化:支持配置热更新 - -- [ ] **Dump 压缩传输** - - 减少网络带宽 - - 权衡 CPU 和网络 - ---- - -## 9. 配置建议 - -### 9.1 关键配置项 - -```ini -# pika.conf - -# dump 目录前缀 -dump-prefix : dump- - -# dump 目录路径 -dump-path : ./dump/ - -# dump 过期时间(天) -# 0 表示永不过期 -dump-expire : 1 - -# RocksDB 实例数 -db-instance-num : 3 - -# 最大并发 dump 数(编译期配置) -# kMaxConcurrentDumps = 3 -``` - -### 9.2 部署建议 - -1. **磁盘空间**:预留 3 × 数据量 的空间 -2. **监控**:监控 dump 目录数量和磁盘使用率 -3. **日志**:关注 `[Rsync Meta]`、`[RsyncTransfer]`、`[Scheduled orphan file`、`[Deleted delayed cleanup` 日志 - ---- - -## 10. 附录 - -### 10.1 关键日志 - -```bash -# 查看 Meta 请求处理 -grep "Rsync Meta" log/pika.INFO - -# 查看文件传输 -grep "RsyncTransfer" log/pika.INFO - -# 查看孤儿文件延迟清理调度 -grep "Scheduled orphan file" log/pika.INFO - -# 查看延迟清理执行 -grep "Deleted delayed cleanup file" log/pika.INFO - -# 查看 dump 占用 -grep "DumpOwnership" log/pika.INFO - -# 查看错误 -grep "File no longer exists" log/pika.WARNING -``` - -### 10.2 状态码说明 - -| 状态码 | 含义 | 处理 | -|--------|------|------| -| kOk | 成功 | 继续处理 | -| kErr | 错误 | Slave 重试 | - -### 10.3 文件路径规范 - -| 类型 | 格式 | 示例 | -|------|------|------| -| Dump 目录 | dump-YYYYMMDD-NN/db_name | dump-20260305-0/db0 | -| RocksDB 实例 | {rocksdb_instance}/ | 0/, 1/, 2/ | -| SST 文件 | {instance}/{filename}.sst | 0/000001.sst | -| Info 文件 | db_name/info | db0/info | - ---- - -## 文档历史 - -| 版本 | 日期 | 修改内容 | -|------|------|----------| -| 1.0 | 2026-03-05 | 初始版本,整理 Scheme A 方案 | -| 1.1 | 2026-03-06 | 更新为统一延迟清理机制,移除 CleanupOrphanSstFiles | diff --git a/docs/sync_solution_analysis.md b/docs/sync_solution_analysis.md deleted file mode 100644 index 279d2c926d..0000000000 --- a/docs/sync_solution_analysis.md +++ /dev/null @@ -1,182 +0,0 @@ -# Pika 主从同步方案变更分析 - -## 最近14次提交记录 - -1. **27c3a838** - fix: 修复 dump 目录为空时返回空文件列表导致同步失败 -2. **08ee1c36** - fix: 修复 snapshot 注册失败导致文件被误删 -3. **ad54f43a** - fix: 修复 AutoDeleteExpiredDump 中 info 文件路径问题 -4. **eb778848** - fix: 修复文件分片传输过程中被提前删除的问题 -5. **ee37a586** - fix: 添加 EnsureDirExists 包装函数修复 dump 目录创建失败 -6. **c3b83b5b** - feat: 实现方案A - 独立dump + 即时清理机制 -7. **e166b072** - fix: 修复RsyncServerConn析构函数死锁问题 -8. **6abdfd5b** - fix: 方案D完善 降低传输时的清理频率 -9. **403c6eb6** - fix: 方案D完善 - 处理传输中途文件丢失场景 -10. **733cd7ba** - fix: 修复全量同步过程中孤儿文件清理导致的文件丢失问题 -11. **4ed0ea47** - GetChildren添加IsDir保护防止传入文件路径崩溃 -12. **dc16f255** - 修复GetBgSaveMetaData目录遍历崩溃问题 -13. **2f3b8337** - 修复目录扫描层级问题 -14. **05248b00** - 修复Pika全量同步孤儿文件问题 - ---- - -## 当前最终方案 - -**方案A + 延迟清理 + 完整性检查**: -1. GetBgSaveMetaData 不过滤孤儿文件 -2. HandleMetaRsyncRequest 增加完整性检查(重新扫描对比) -3. RemoveTransferringFile 延迟10分钟清理 -4. AutoDeleteExpiredDump 调用 ProcessPendingCleanupFiles - ---- - -## 与当前方案不一致的代码分析 - -### 1. 即时清理代码(c3b83b5b 引入)- 需要修改 - -**位置**: src/rsync_server.cc:RemoveTransferringFile -**问题**: c3b83b5b 提交实现了即时清理,但当前方案改为延迟清理 -**状态**: 已在当前工作区修改为延迟清理 - -```cpp -// c3b83b5b 中的代码(即时清理): -if (stat(filepath.c_str(), &st) == 0 && st.st_nlink == 1) { - pstd::DeleteFile(filepath); // 立即删除 -} - -// 当前方案(延迟清理): -if (stat(filepath.c_str(), &st) == 0 && st.st_nlink == 1) { - g_pika_server->ScheduleFileForCleanup(filepath, 600); // 10分钟后删除 -} -``` - ---- - -### 2. CleanupOrphanSstFiles 函数 - 需要评估 - -**位置**: src/pika_server.cc:1523 -**问题**: 这个函数在 AutoDeleteExpiredDump 中被调用,用于清理孤儿文件 -**与延迟清理的关系**: -- CleanupOrphanSstFiles: 定时清理所有孤儿文件(非延迟) -- ProcessPendingCleanupFiles: 清理延迟队列中的文件 - -**建议**: 保留 CleanupOrphanSstFiles,但降低调用频率或仅在 dump 不再使用时调用 - -**原因**: -1. 延迟清理只处理传输完成的文件 -2. CleanupOrphanSstFiles 处理所有未被延迟清理覆盖的孤儿文件 -3. 两者可以共存,作为双保险 - ---- - -### 3. GetBgSaveMetaData 的孤儿文件过滤 - 已修改 - -**位置**: src/pika_db.cc:421 -**问题**: 之前提交过滤孤儿文件,当前方案不过滤 -**状态**: 已在当前工作区修改为不跳过 - -```cpp -// 之前(跳过): -if (st.st_nlink == 1) { - continue; // 跳过孤儿文件 -} - -// 当前(不跳过): -if (st.st_nlink == 1) { - // 记录但不跳过 - LOG(INFO) << "[GetBgSaveMetaData] Including orphan SST file: " << fullPath; -} -``` - ---- - -### 4. AutoDeleteExpiredDump 的 rate limiting - 需要关注 - -**位置**: src/pika_server.cc:1766 -**问题**: 当前工作区将 120 秒改为 600 秒 -**分析**: -```cpp -// HEAD~14: 120秒 -if (!active_rsync_snapshots_.empty() && (now - last_cleanup_time < 120)) - -// 当前工作区: 600秒 -if (!active_rsync_snapshots_.empty() && (now - last_cleanup_time < 600)) -``` -**建议**: 600秒(10分钟)与延迟清理时间一致,合理 - ---- - -### 5. rsync_transferring_files_ 保护机制 - 保留 - -**位置**: include/pika_server.h 和 src/pika_server.cc -**功能**: 跟踪正在传输的文件,防止被 CleanupOrphanSstFiles 误删 -**状态**: 应该保留,与延迟清理不冲突 - ---- - -### 6. 旧方案D的代码 - 可以删除 - -**相关提交**: -- 6abdfd5b: 方案D完善 降低传输时的清理频率 -- 403c6eb6: 方案D完善 - 处理传输中途文件丢失场景 -- 733cd7ba: 修复全量同步过程中孤儿文件清理导致的文件丢失问题 - -**分析**: 这些提交是在方案A之前的尝试,方案A已经替代了方案D -**但是**: 方案A的实现(c3b83b5b)是基于方案D的改进,不是完全替换 -**结论**: 不需要删除,方案A已经整合了这些修复 - ---- - -### 7. 早期bug修复 - 保留 - -**保留的提交**: -- e166b072: 修复RsyncServerConn析构函数死锁问题 -- ee37a586: 添加 EnsureDirExists 包装函数 -- ad54f43a: 修复 AutoDeleteExpiredDump 中 info 文件路径问题 -- 08ee1c36: 修复 snapshot 注册失败导致文件被误删 -- 27c3a838: 修复 dump 目录为空时返回空文件列表 - -**这些与当前方案不冲突,应该保留** - ---- - -## 需要删除或修改的代码清单 - -### 需要修改的代码 - -1. **无** - 当前工作区的修改已经覆盖了需要改的地方 - -### 可能需要删除/禁用的代码 - -1. **CleanupOrphanSstFiles 中的即时删除逻辑** - - 可选:改为仅在 dump 不再使用时调用 - - 或者降低调用频率 - -2. **检查是否有过时的配置项或调试代码** - - 检查 conf/pika.conf 是否有不需要的配置 - -### 建议保留但可能未使用的代码 - -1. **rsync_transferring_files_ 保护机制** - - 虽然延迟清理减少了即时删除的需求,但作为保护机制仍有用 - -2. **IsDumpInUse 和 dump_owners_** - - Scheme A 的核心机制,应该保留 - ---- - -## 总结 - -**当前工作区的修改已经覆盖了主要的不一致**: -1. GetBgSaveMetaData 不过滤孤儿文件 -2. RemoveTransferringFile 改为延迟清理 -3. HandleMetaRsyncRequest 增加完整性检查 -4. AutoDeleteExpiredDump 调用 ProcessPendingCleanupFiles - -**唯一需要评估的是**: CleanupOrphanSstFiles 是否还需要在同步期间调用 -建议: -- 选项1: 完全禁用 CleanupOrphanSstFiles(依赖延迟清理) -- 选项2: 仅在 dump 不再使用时调用 CleanupOrphanSstFiles(当前行为,可保留) - ---- - -文档生成时间: 2026-03-05 From 63a13b77b21501a0460d99d14ac492b367fd966f Mon Sep 17 00:00:00 2001 From: chenbt Date: Fri, 17 Apr 2026 14:24:58 +0800 Subject: [PATCH 10/11] =?UTF-8?q?fix:=20ai=20review=20bug=20=E4=BF=AE?= =?UTF-8?q?=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 3 ++- src/pika_server.cc | 3 +++ src/storage/src/redis.cc | 16 +++++++++------- src/storage/src/redis.h | 2 +- src/storage/src/storage.cc | 2 +- 5 files changed, 16 insertions(+), 10 deletions(-) diff --git a/.gitignore b/.gitignore index 4749601897..7518e2e548 100644 --- a/.gitignore +++ b/.gitignore @@ -73,4 +73,5 @@ pkg # include codis fe javascript lib files !codis/cmd/fe/assets/** -tests/tmp \ No newline at end of file +tests/tmp +.claude \ No newline at end of file diff --git a/src/pika_server.cc b/src/pika_server.cc index 1723337bce..d81152f119 100644 --- a/src/pika_server.cc +++ b/src/pika_server.cc @@ -1241,6 +1241,9 @@ void PikaServer::AutoCompactRange() { struct timeval now; gettimeofday(&now, nullptr); int interval = g_pika_conf->incremental_compact_interval(); + if (interval <= 0) { + return; + } if (last_incremental_compact_time_.tv_sec == 0 || now.tv_sec - last_incremental_compact_time_.tv_sec >= interval) { gettimeofday(&last_incremental_compact_time_, nullptr); diff --git a/src/storage/src/redis.cc b/src/storage/src/redis.cc index 198a7cf975..ad269e9905 100644 --- a/src/storage/src/redis.cc +++ b/src/storage/src/redis.cc @@ -6,7 +6,6 @@ #include #include #include -#include #include "rocksdb/env.h" @@ -272,9 +271,9 @@ void SelectColumnFamilyHandles(const DataType& option_type, const ColumnFamilyTy Status Redis::LongestNotCompactionSstCompact(const DataType& option_type, std::vector* compact_result_vec, const ColumnFamilyType& type) { bool no_compact = false; - bool to_comapct = true; - if (!in_compact_flag_.compare_exchange_weak(no_compact, to_comapct, std::memory_order_relaxed, - std::memory_order_relaxed)) { + bool to_compact = true; + if (!in_compact_flag_.compare_exchange_strong(no_compact, to_compact, std::memory_order_relaxed, + std::memory_order_relaxed)) { return Status::Busy("compact running"); } @@ -465,8 +464,8 @@ Status Redis::IncrementalCompact(const DataType& option_type, std::vectorpush_back(s); + } break; } @@ -568,7 +570,7 @@ Status Redis::IncrementalCompact(const DataType& option_type, std::vectorempty() || compact_result_vec->back().ok())) { compact_result_vec->push_back(Status::OK()); } } diff --git a/src/storage/src/redis.h b/src/storage/src/redis.h index 63c00491ff..3c451b0cde 100644 --- a/src/storage/src/redis.h +++ b/src/storage/src/redis.h @@ -525,7 +525,7 @@ class Redis { rocksdb::WriteOptions default_write_options_; rocksdb::ReadOptions default_read_options_; rocksdb::CompactRangeOptions default_compact_range_options_; - std::atomic in_compact_flag_; + std::atomic in_compact_flag_{false}; OBDSstListener listener_; // listening created sst file while compacting in OBD-compact // For Scan diff --git a/src/storage/src/storage.cc b/src/storage/src/storage.cc index 78e8e6b956..bf2018114c 100644 --- a/src/storage/src/storage.cc +++ b/src/storage/src/storage.cc @@ -1697,7 +1697,7 @@ Status Storage::StartBGThread() { Status Storage::AddBGTask(const BGTask& bg_task) { bg_tasks_mutex_.lock(); - if (bg_task.type == DataType::kAll) { + if (bg_task.operation == kCleanAll) { // if current task it is global compact, // clear the bg_tasks_queue_; std::queue empty_queue; From 8043759c3534f3fc387de017840b2698b630a55f Mon Sep 17 00:00:00 2001 From: chenbt Date: Fri, 17 Apr 2026 14:31:46 +0800 Subject: [PATCH 11/11] =?UTF-8?q?change:=20=E7=A7=BB=E9=99=A4=E5=A4=9A?= =?UTF-8?q?=E4=BD=99=E6=96=87=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .claude/settings.local.json | 14 -------------- 1 file changed, 14 deletions(-) delete mode 100644 .claude/settings.local.json diff --git a/.claude/settings.local.json b/.claude/settings.local.json deleted file mode 100644 index 020c30a5b1..0000000000 --- a/.claude/settings.local.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "permissions": { - "allow": [ - "Bash(claude doctor:*)", - "WebFetch(domain:github.com)", - "Bash(ls -la:*)", - "Bash(git stash push:*)", - "Bash(git fetch:*)", - "Bash(git checkout:*)", - "Bash(git stash pop:*)", - "Bash(git add:*)" - ] - } -}