Skip to content

Commit cbd57eb

Browse files
author
wuxianrong
committed
The data backup and recovery functions have been added
1 parent 3e19283 commit cbd57eb

30 files changed

Lines changed: 2281 additions & 61 deletions

.github/workflows/pika.yml

Lines changed: 52 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ env:
1212
ARTIFACT_PIKA_NAME: artifact-pika
1313

1414
jobs:
15+
1516
build_on_ubuntu:
1617
# The CMake configure and build commands are platform-agnostic and should work equally well on Windows or Mac.
1718
# You can convert this to a matrix build if you need cross-platform coverage.
@@ -41,12 +42,32 @@ jobs:
4142
- name: Install Deps
4243
run: |
4344
sudo apt-get update
44-
sudo apt-get install -y autoconf libprotobuf-dev protobuf-compiler clang-tidy
45+
sudo apt-get install -y autoconf libprotobuf-dev protobuf-compiler clang-tidy gcc-11 g++-11 zlib1g-dev
46+
47+
- name: Patch glibc headers for GCC compatibility
48+
run: |
49+
# Workaround for __has_attribute compatibility issue between newer glibc and GCC
50+
# Create a compatibility header that will be included before system headers
51+
cat > /tmp/glibc_compat_patch.h << 'EOF'
52+
/* Workaround for GCC __has_attribute compatibility */
53+
#if defined(__GNUC__) && __GNUC__ >= 11 && !defined(__clang__)
54+
#ifdef __glibc_has_attribute
55+
#undef __glibc_has_attribute
56+
#endif
57+
#define __glibc_has_attribute(attr) __has_attribute(attr)
58+
#endif
59+
EOF
60+
# Prepend the workaround to sys/cdefs.h
61+
if [ -f /usr/include/sys/cdefs.h ]; then
62+
sudo cp /usr/include/sys/cdefs.h /tmp/cdefs.h.backup
63+
cat /tmp/glibc_compat_patch.h /tmp/cdefs.h.backup | sudo tee /usr/include/sys/cdefs.h > /dev/null
64+
elif [ -f /usr/include/x86_64-linux-gnu/sys/cdefs.h ]; then
65+
sudo cp /usr/include/x86_64-linux-gnu/sys/cdefs.h /tmp/cdefs.h.backup
66+
cat /tmp/glibc_compat_patch.h /tmp/cdefs.h.backup | sudo tee /usr/include/x86_64-linux-gnu/sys/cdefs.h > /dev/null
67+
fi
4568
4669
- name: Configure CMake
47-
# Configure CMake in a 'build' subdirectory. `CMAKE_BUILD_TYPE` is only required if you are using a single-configuration generator such as make.
48-
# See https://cmake.org/cmake/help/latest/variable/CMAKE_BUILD_TYPE.html?highlight=cmake_build_type
49-
run: cmake -B build -DCMAKE_BUILD_TYPE=${{ env.BUILD_TYPE }} -DUSE_PIKA_TOOLS=ON -DCMAKE_CXX_FLAGS_DEBUG=-fsanitize=address -D CMAKE_C_COMPILER_LAUNCHER=ccache -D CMAKE_CXX_COMPILER_LAUNCHER=ccache
70+
run: cmake -B build -DCMAKE_BUILD_TYPE=${{ env.BUILD_TYPE }} -DUSE_PIKA_TOOLS=ON -DCMAKE_CXX_FLAGS_DEBUG=-fsanitize=address -D CMAKE_C_COMPILER_LAUNCHER=ccache -D CMAKE_CXX_COMPILER_LAUNCHER=ccache -DCMAKE_C_COMPILER=gcc-11 -DCMAKE_CXX_COMPILER=g++-11
5071

5172
- name: Build
5273
# Build your program with the given configuration
@@ -166,7 +187,7 @@ jobs:
166187
- name: Install deps
167188
run: |
168189
dnf update -y
169-
dnf install -y bash cmake wget git autoconf gcc perl-Digest-SHA tcl which tar g++ tar epel-release gcc-c++ libstdc++-devel gcc-toolset-13 binutils
190+
dnf install -y bash cmake wget git autoconf gcc perl-Digest-SHA tcl which tar g++ tar epel-release gcc-c++ libstdc++-devel gcc-toolset-12 binutils openssl openssl-devel zlib-devel
170191
dnf clean all
171192
rm -rf /var/cache/dnf
172193
@@ -180,9 +201,28 @@ jobs:
180201
with:
181202
fetch-depth: 1
182203

204+
- name: Patch glibc headers for GCC compatibility
205+
run: |
206+
# Workaround for __has_attribute compatibility issue between newer glibc and GCC
207+
# Create a compatibility header that will be included before system headers
208+
cat > /tmp/glibc_compat_patch.h << 'EOF'
209+
/* Workaround for GCC __has_attribute compatibility */
210+
#if defined(__GNUC__) && __GNUC__ >= 11 && !defined(__clang__)
211+
#ifdef __glibc_has_attribute
212+
#undef __glibc_has_attribute
213+
#endif
214+
#define __glibc_has_attribute(attr) __has_attribute(attr)
215+
#endif
216+
EOF
217+
# Prepend the workaround to sys/cdefs.h
218+
if [ -f /usr/include/sys/cdefs.h ]; then
219+
cp /usr/include/sys/cdefs.h /tmp/cdefs.h.backup
220+
cat /tmp/glibc_compat_patch.h /tmp/cdefs.h.backup > /usr/include/sys/cdefs.h
221+
fi
222+
183223
- name: Configure CMake
184224
run: |
185-
source /opt/rh/gcc-toolset-13/enable
225+
source /opt/rh/gcc-toolset-12/enable
186226
cmake -B build -DCMAKE_BUILD_TYPE=${{ env.BUILD_TYPE }} -DUSE_PIKA_TOOLS=ON -DCMAKE_CXX_FLAGS_DEBUG=-fsanitize=address .
187227
188228
- uses: actions/cache@v3
@@ -197,7 +237,7 @@ jobs:
197237

198238
- name: Build
199239
run: |
200-
source /opt/rh/gcc-toolset-13/enable
240+
source /opt/rh/gcc-toolset-12/enable
201241
cmake --build build --config ${{ env.BUILD_TYPE }}
202242
203243
- name: Cleanup
@@ -292,13 +332,14 @@ jobs:
292332
- name: Install Deps
293333
run: |
294334
brew list --versions cmake && brew uninstall --ignore-dependencies --force cmake || true
295-
brew install gcc@13 automake cmake make binutils
335+
brew install gcc@11 automake cmake make binutils
296336
297337
- name: Configure CMake
338+
# Use GCC 11 to avoid potential __has_attribute compatibility issues with GCC 13
298339
run: |
299-
GCC_PREFIX=$(brew --prefix gcc@13)
300-
export CC=$GCC_PREFIX/bin/gcc-13
301-
cmake -B build -DCMAKE_C_COMPILER=$GCC_PREFIX/bin/gcc-13 -DUSE_PIKA_TOOLS=ON -DCMAKE_BUILD_TYPE=${{ env.BUILD_TYPE }} -DCMAKE_CXX_FLAGS_DEBUG=-fsanitize=address -D CMAKE_C_COMPILER_LAUNCHER=ccache -D CMAKE_CXX_COMPILER_LAUNCHER=ccache
340+
GCC_PREFIX=$(brew --prefix gcc@11)
341+
export CC=$GCC_PREFIX/bin/gcc-11
342+
cmake -B build -DCMAKE_C_COMPILER=$GCC_PREFIX/bin/gcc-11 -DUSE_PIKA_TOOLS=ON -DCMAKE_BUILD_TYPE=${{ env.BUILD_TYPE }} -DCMAKE_CXX_FLAGS_DEBUG=-fsanitize=address -D CMAKE_C_COMPILER_LAUNCHER=ccache -D CMAKE_CXX_COMPILER_LAUNCHER=ccache
302343
303344
- name: Build
304345
run: |

CMakeLists.txt

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,15 @@ elseif (CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
2222
endif()
2323
endif()
2424

25-
link_directories("/opt/rh/gcc-toolset-13/root/lib/gcc/x86_64-redhat-linux/13")
25+
# Link directories for gcc-toolset on RHEL/Rocky/CentOS
26+
# Support gcc-toolset-11, gcc-toolset-12 and gcc-toolset-13
27+
if(EXISTS "/opt/rh/gcc-toolset-13/root/lib/gcc/x86_64-redhat-linux/13")
28+
link_directories("/opt/rh/gcc-toolset-13/root/lib/gcc/x86_64-redhat-linux/13")
29+
elseif(EXISTS "/opt/rh/gcc-toolset-12/root/lib/gcc/x86_64-redhat-linux/12")
30+
link_directories("/opt/rh/gcc-toolset-12/root/lib/gcc/x86_64-redhat-linux/12")
31+
elseif(EXISTS "/opt/rh/gcc-toolset-11/root/lib/gcc/x86_64-redhat-linux/11")
32+
link_directories("/opt/rh/gcc-toolset-11/root/lib/gcc/x86_64-redhat-linux/11")
33+
endif()
2634

2735
############# You should enable sanitizer if you are developing pika #############
2836
# Uncomment the following two lines to enable AddressSanitizer to detect memory leaks and other memory-related bugs.
@@ -648,6 +656,10 @@ else()
648656
endif()
649657
set(LEVELDB_INCLUDE_DIR ${INSTALL_INCLUDEDIR})
650658

659+
# Workaround for __has_attribute compatibility issue with newer GCC and glibc
660+
# The issue is that newer glibc uses __glibc_has_attribute which conflicts with GCC's __has_attribute
661+
# We need to set an include path that has a fixed version of the problematic header
662+
651663
ExternalProject_Add(brpc
652664
DEPENDS
653665
gflags
@@ -657,13 +669,14 @@ ExternalProject_Add(brpc
657669
snappy
658670
zlib
659671
URL
660-
https://github.com/apache/brpc/archive/refs/tags/1.6.0.tar.gz
672+
https://github.com/apache/brpc/archive/refs/tags/1.11.0.tar.gz
661673
URL_HASH
662-
MD5=0d37cea25bd006e89806f461ef7e39ba
674+
MD5=f55e582fb8032768f9070865b48e892d
663675
DOWNLOAD_NO_PROGRESS
664676
1
665677
UPDATE_COMMAND
666678
""
679+
667680
LOG_CONFIGURE
668681
1
669682
LOG_BUILD
@@ -677,6 +690,8 @@ ExternalProject_Add(brpc
677690
-DCMAKE_INSTALL_PREFIX=${STAGED_INSTALL_PREFIX}
678691
-DCMAKE_BUILD_TYPE=${LIB_BUILD_TYPE}
679692
-DCMAKE_PREFIX_PATH=${CMAKE_PREFIX_PATH}
693+
-DCMAKE_C_COMPILER=${CMAKE_C_COMPILER}
694+
-DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER}
680695
-DWITH_GLOG=ON
681696
-DWITH_SNAPPY=ON
682697
-DBUILD_SHARED_LIBS=OFF
@@ -724,6 +739,8 @@ ExternalProject_Add(braft
724739
-DCMAKE_INSTALL_PREFIX=${STAGED_INSTALL_PREFIX}
725740
-DCMAKE_BUILD_TYPE=${LIB_BUILD_TYPE}
726741
-DCMAKE_PREFIX_PATH=${CMAKE_PREFIX_PATH}
742+
-DCMAKE_C_COMPILER=${CMAKE_C_COMPILER}
743+
-DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER}
727744
-DWITH_GLOG=ON
728745
-DBUILD_SHARED_LIBS=OFF
729746
-DBUILD_UNIT_TESTS=OFF

docs/raft_implementation_review.md

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
# Pika Raft 模式实现 Review 文档
2+
3+
## 1. 概述
4+
5+
Pika 的 Raft 模式基于 [braft](https://github.com/baidu/braft) 实现分布式一致性,使用 brpc 进行节点间通信。
6+
7+
**核心特性**
8+
- 强一致性保证
9+
- 自动 Leader 选举
10+
- 基于 RocksDB Checkpoint 的快照机制
11+
12+
## 2. 架构概览
13+
14+
```
15+
┌─────────────────────────────────────────────────────┐
16+
│ Pika Server │
17+
├─────────────────────────────────────────────────────┤
18+
│ Client Request → RaftManager → PikaRaftNode │
19+
│ ↓ │
20+
│ braft::Node │
21+
│ ↓ │
22+
│ PikaStateMachine │
23+
│ ↓ │
24+
│ Storage::OnBinlogWrite() │
25+
│ ↓ │
26+
│ RocksDB │
27+
└─────────────────────────────────────────────────────┘
28+
```
29+
30+
## 3. 核心组件
31+
32+
| 组件 | 文件 | 职责 |
33+
|------|------|------|
34+
| `RaftManager` | [praft.h](../src/praft/include/praft/praft.h) | 管理多个 Raft 节点(每个 DB 一个) |
35+
| `PikaRaftNode` | [praft.h](../src/praft/include/praft/praft.h) | 封装 braft::Node,提供日志追加接口 |
36+
| `PikaStateMachine` | [praft.cc](../src/praft/src/praft.cc) | 实现状态机,处理日志应用和快照 |
37+
| `PPosixFileSystemAdaptor` | [psnapshot.cc](../src/praft/src/psnapshot.cc) | 快照文件系统适配器 |
38+
39+
## 4. 数据流
40+
41+
### 写入流程
42+
1. 客户端发送写命令
43+
2. Storage 层构建 Binlog(Protobuf 格式)
44+
3. 通过 `PikaRaftNode::AppendLog()` 提交到 Raft
45+
4. braft 复制日志到多数节点
46+
5. 提交后 `PikaStateMachine::on_apply()` 被调用
47+
6. 调用 `Storage::OnBinlogWrite()` 写入 RocksDB
48+
49+
### Binlog 格式
50+
51+
定义在 [binlog.proto](../src/praft/src/binlog.proto)
52+
- 支持数据类型:Strings, Hashes, Lists, Sets, ZSets, Streams
53+
- 操作类型:Put, Delete
54+
55+
## 5. 快照机制
56+
57+
- 使用 RocksDB Checkpoint 创建快照
58+
- 快照恢复点基于 `GetSmallestFlushedLogIndex()`
59+
- 通过 `LogIndexOfColumnFamilies` 追踪各 CF 的日志应用进度
60+
61+
## 6. 配置选项
62+
63+
| 配置项 | 默认值 | 说明 |
64+
|--------|--------|------|
65+
| `raft_enabled` | false | 是否启用 Raft |
66+
| `raft_group_id` | "" | Raft Group ID |
67+
| `raft_election_timeout_ms` | 1000 | 选举超时(ms) |
68+
| `raft_snapshot_interval_s` | 3600 | 快照间隔(s) |
69+
70+
## 7. Review 重点
71+
72+
### 正确性
73+
- **日志幂等性**: `IsApplied()` 检查是否已应用,防止重复应用
74+
- **快照一致性**: 使用最小 flushed_log_index 确定快照点
75+
- **WAL 禁用**: Raft 日志提供持久性保证,RocksDB WAL 被禁用
76+
77+
### 性能
78+
- 同步写入等待 Raft 提交(可优化为批量提交)
79+
- Binlog 使用 Protobuf 序列化
80+
81+
### 线程安全
82+
- `LogIndexOfColumnFamilies` 使用 mutex 保护
83+
- braft 保证 `on_apply()` 顺序调用
84+
85+
## 8. 关键代码位置
86+
87+
| 功能 | 文件 |
88+
|------|------|
89+
| 日志应用 | [praft.cc](../src/praft/src/praft.cc) - `on_apply()` |
90+
| Binlog 处理 | [storage.cc](../src/storage/src/storage.cc) - `OnBinlogWrite()` |
91+
| Log Index 追踪 | [log_index.cc](../src/storage/src/log_index.cc) |
92+
| 快照创建 | [psnapshot.cc](../src/praft/src/psnapshot.cc) |
93+
| Raft 命令 | [pika_raft.cc](../src/pika_raft.cc) |

include/pika_db.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ class DB : public std::enable_shared_from_this<DB>, public pstd::noncopyable {
9494
std::shared_ptr<storage::Storage> storage() const;
9595
void GetBgSaveMetaData(std::vector<std::string>* fileNames, std::string* snapshot_uuid);
9696
void BgSaveDB();
97+
pstd::Status CreateCheckpoint(const std::string& checkpoint_dir);
98+
pstd::Status LoadDBFromCheckpoint(const std::string& checkpoint_dir);
9799
void SetBinlogIoError();
98100
void SetBinlogIoErrorrelieve();
99101
bool IsBinlogIoError();

include/pika_server.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ enum TaskType {
7171
kCompactRangeSets,
7272
kCompactRangeZSets,
7373
kCompactRangeList,
74+
kLoadDBFromCheckpoint,
75+
kCreateCheckpoint,
7476
};
7577

7678
struct TaskArg {

src/pika_db.cc

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,75 @@ void DB::BgSaveDB() {
6767
g_pika_server->BGSaveTaskSchedule(&DoBgSave, static_cast<void*>(bg_task_arg));
6868
}
6969

70+
pstd::Status DB::CreateCheckpoint(const std::string& checkpoint_dir) {
71+
std::string checkpoint_sub_path = checkpoint_dir;
72+
if (!checkpoint_sub_path.empty() && checkpoint_sub_path.back() != '/') {
73+
checkpoint_sub_path.push_back('/');
74+
}
75+
checkpoint_sub_path += db_name_;
76+
77+
if (!pstd::FileExists(checkpoint_sub_path)) {
78+
if (pstd::CreatePath(checkpoint_sub_path, 0755) != 0) {
79+
return Status::IOError("Failed to create checkpoint path", checkpoint_sub_path);
80+
}
81+
}
82+
83+
std::shared_lock guard(dbs_rw_);
84+
auto tasks = storage_->CreateCheckpoint(checkpoint_sub_path);
85+
for (auto& task : tasks) {
86+
auto status = task.get();
87+
if (!status.ok()) {
88+
return Status::Corruption("Create checkpoint failed: " + status.ToString());
89+
}
90+
}
91+
return Status::OK();
92+
}
93+
94+
pstd::Status DB::LoadDBFromCheckpoint(const std::string& checkpoint_dir) {
95+
std::string checkpoint_sub_path = checkpoint_dir;
96+
if (!checkpoint_sub_path.empty() && checkpoint_sub_path.back() != '/') {
97+
checkpoint_sub_path.push_back('/');
98+
}
99+
checkpoint_sub_path += db_name_;
100+
101+
if (!pstd::FileExists(checkpoint_sub_path)) {
102+
return Status::NotFound("Checkpoint dir does not exist: " + checkpoint_sub_path);
103+
}
104+
105+
std::lock_guard<std::shared_mutex> guard(dbs_rw_);
106+
opened_ = false;
107+
108+
auto old_storage = storage_;
109+
storage_.reset();
110+
if (old_storage) {
111+
old_storage->Close();
112+
}
113+
114+
storage_ = std::make_shared<storage::Storage>();
115+
auto checkpoint_tasks = storage_->LoadCheckpoint(checkpoint_sub_path, db_path_);
116+
for (auto& task : checkpoint_tasks) {
117+
auto status = task.get();
118+
if (!status.ok()) {
119+
storage_.reset();
120+
return Status::Corruption("Load checkpoint failed: " + status.ToString());
121+
}
122+
}
123+
124+
storage::StorageOptions storage_options = g_pika_server->storage_options();
125+
auto open_status = storage_->Open(storage_options, db_path_);
126+
if (!open_status.ok()) {
127+
storage_.reset();
128+
return Status::Corruption("Storage open failed: " + open_status.ToString());
129+
}
130+
131+
if (!g_pika_conf->raft_enabled()) {
132+
storage_->DisableWal(false);
133+
}
134+
135+
opened_ = true;
136+
return Status::OK();
137+
}
138+
70139
void DB::SetBinlogIoError() { return binlog_io_error_.store(true); }
71140
void DB::SetBinlogIoErrorrelieve() { return binlog_io_error_.store(false); }
72141
bool DB::IsBinlogIoError() { return binlog_io_error_.load(); }
@@ -206,6 +275,11 @@ bool DB::FlushDBWithoutLock() {
206275
}
207276

208277
LOG(INFO) << db_name_ << " Delete old db...";
278+
// First close the storage properly to cancel background work and wait for threads
279+
// This prevents crash when RocksDB background threads access destroyed resources
280+
if (storage_) {
281+
storage_->Close();
282+
}
209283
storage_.reset();
210284

211285
std::string dbpath = db_path_;

0 commit comments

Comments
 (0)