Skip to content

feat: Enable batch import of SST files with S3/MinIO in Pika#3176

Merged
wangshao1 merged 3 commits intoOpenAtomFoundation:3.5-ospp-ingestfrom
byseea11:pika-ingest-v3.5
Oct 21, 2025
Merged

feat: Enable batch import of SST files with S3/MinIO in Pika#3176
wangshao1 merged 3 commits intoOpenAtomFoundation:3.5-ospp-ingestfrom
byseea11:pika-ingest-v3.5

Conversation

@byseea11
Copy link
Copy Markdown

修改内容概述

当前分支 pika-ingest-v3.5 相比于 3.5 分支,新增了 SST文件批量导入功能 (pika_batch_ingest),支持通过 S3/MinIO 等对象存储服务进行大规模数据的快速导入。该功能允许用户通过 manifest 文件批量导入大量 SST 文件到 Pika 数据库,并提供一个完整的批量数据生成和导入流水线系统。

主要更改

  1. 新增核心功能模块

    A. Manifest Ingest 功能

    • 新增 manifestingest 命令,用于从 S3/MinIO 下载 manifest 文件并导入对应的 SST 文件
    • 实现 ManifestIngestCmd 类,负责下载和导入外部 SST 文件
    • RedisStrings::SstExtendIngest 方法中实现 SST 扩展导入逻辑

    B. S3 服务模块

    • 新增 src/ingest 目录,包含完整的 S3 服务支持
    • 实现 S3Service 类,封装 AWS S3 客户端和传输管理器
    • 实现 SstDownloader 类,用于下载 manifest 和 SST 文件
  2. 配置文件更新

    A. 主配置文件

    • conf/pika.conf 中新增 ingest-conf-path 配置项,指定 ingest 配置文件路径
    • 保留 s3-conf-path 配置项,用于 S3 访问配置

    B. S3 配置文件

    • 新增 conf/s3.conf:定义 S3/MinIO 连接参数

      • endpoint:S3 或 MinIO 服务地址
      • region:区域设置
      • bucket:存储桶名称
      • access_key / secret_key:访问凭证
      • 传输参数:transfer_threadsmax_inflight
      • 重试相关配置

    C. Ingest 配置文件

    • conf/ingest.conf:定义 ingest 过程中的 RocksDB 参数配置
    • 包括导入期和恢复期两套参数,用于优化导入性能和恢复正常运行状态
  3. 新增工具链

    A. pika_batch_ingest 工具

    • 完整的批量数据生成和导入系统
    • 支持数据生成 → JSON 转 SST → 上传至 S3 → 自动导入的完整流水线
    • 包含多个模块:Mock(数据生成)、Exchange(SST 转换)、S3Put(上传)、iAgent(自动导入)

    B. Shell 脚本集合

    • 提供多个自动化脚本:run.shmock.shexchange.shs3put.shiagent.sh
    • 支持一键运行完整导入流程
  4. 代码架构改动

    A. PikaServer 更新

    • PikaServer 中新增 S3Service 成员变量
    • 实现 InitS3()StopS3() 方法用于 S3 服务管理
    • 在主启动流程中初始化 S3 服务

    B. 存储层扩展

    • storage 模块中新增 SstExtendIngest 接口
    • 实现动态 RocksDB 参数调整,导入时使用激进参数优化性能,导入完成后恢复生产参数

    C. 新增 proto 定义

    • 创建 manifest.proto 定义 manifest 文件格式
    • 用于描述 SST 文件清单信息
  5. 性能优化特性

    A. 激进导入参数

    • 导入期启用高性能参数:更高的 L0 阈值、禁用自动压缩等
    • 导入完成后自动恢复生产环境参数

    B. 并发处理

    • 支持多文件并行下载和导入
    • 使用计数器确保导入会话管理,避免参数冲突
  6. 集成测试

    • 添加 string_ingest.tcl 测试文件
    • 验证 manifestingest 命令的正确性和数据完整性
  7. 主要代码更新细节

    A. pika.cc

    • 在主启动流程中添加 S3 服务初始化逻辑
    • 检查 S3 配置并启动 S3 服务,失败则 fatal 退出

    B. pika_server.cc

    • 在析构函数中添加 StopS3() 调用,确保 S3 服务正确关闭
    • 实现 InitS3()StopS3() 方法

    C. pika_command.cc 和 pika_command.h

    • 在命令表中注册 manifestingest 命令
    • 定义命令名称常量

    D. pika_conf.cc 和 pika_conf.h

    • 添加 S3 和 ingest 配置路径的加载和访问方法
    • 在配置文件中读取 s3-conf-pathingest-conf-path 参数

    E. 全局变量声明更新

    • 将多个文件中的 extern std::unique_ptr<PikaServer> g_pika_server; 更改为 extern PikaServer* g_pika_server;
    • 这是为了兼容某些代码引用方式

技术实现要点

  1. 两阶段参数管理:导入时使用激进配置(高阈值、禁用压缩)以提高性能,导入完成后自动恢复生产配置
  2. S3 集成:支持从 S3/MinIO 下载 manifest 文件,解析后批量下载对应的 SST 文件
  3. 安全导入:验证 checksums,支持阻塞刷新等选项,确保数据完整性
  4. 自动会话管理:使用原子计数器跟踪并发导入会话,确保参数恢复的正确性
  5. 流水线系统:提供从数据生成到导入的完整自动化工具链
  6. 系统集成:在 Pika 主流程中正确初始化和关闭 S3 服务,确保系统完整运行
  7. 命令注册:将 manifestingest 命令正确注册到命令表中,使其可以在 Redis 协议中使用
  8. 配置管理:提供完善的 S3 和 ingest 配置文件,支持多种 S3 兼容存储服务

此功能主要面向大规模数据导入场景,如数据迁移、冷热数据切换等,显著提高了大数据量导入的效率和可靠性。

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Sep 30, 2025

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

✨ Finishing touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions github-actions Bot added ✏️ Feature New feature or request 📒 Documentation Improvements or additions to documentation labels Sep 30, 2025
@byseea11 byseea11 closed this Sep 30, 2025
@byseea11 byseea11 reopened this Sep 30, 2025
@wangshao1 wangshao1 changed the base branch from 3.5 to 3.5-ospp-ingest October 21, 2025 02:06
Comment thread conf/pika_master.conf Outdated
@@ -0,0 +1,717 @@
###########################
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个文件是你跑demo的时候的配置文件吧?在pr里删掉吧。

Comment thread include/pika_server.h
extern std::unique_ptr<PikaConf> g_pika_conf;

// Forward declarations
class PikaMigrateThread;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个类你应该没用到吧?为什么这里需要前置声明?

Copy link
Copy Markdown
Author

@byseea11 byseea11 Oct 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

在623行有一个 std::unique_ptr<PikaMigrateThread> pika_migrate_thread_; 的成员变量,使用了PikaMigrateThread。

Comment thread src/pika.cc Outdated

// === InitS3 ===
std::string s3_conf = g_pika_conf->s3_conf_path();
auto s3_st = g_pika_server->InitS3(s3_conf);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里建议加一个判断,如果没有s3相关配置,就不启动batch import功能。对应manifestcmd的处理逻辑直接返回not supported就可以。
因为不是所有的线上集群都需要开启这个功能,现在这种做法会导致pika强依赖一个s3组件。

Comment thread src/pika_kv.cc Outdated

// 下载所有 SST
SstDownloader* downloader = g_pika_server->s3()->Downloader();
rocksdb::Status s_ = downloader->DownloadAllFiles(key_, sst_files_path_);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个s_是不是跟类成员变量重复了?

@wangshao1 wangshao1 merged commit e15b426 into OpenAtomFoundation:3.5-ospp-ingest Oct 21, 2025
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

📒 Documentation Improvements or additions to documentation ✏️ Feature New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants