refactor(storage): MemTable 采用双表(active/dirty)模式消除 Flush 数据竞争#24
Conversation
- 抽取 SkipList 类型封装跳表状态(size/level/head) - MemTable 维护 active+dirty 两张表,类似 sync.Map 的 read/dirty 机制 - Flush 时原子交换 active→dirty,创建新 active,I/O 在锁外执行不阻塞写入 - Get 查找顺序: active → dirty(不可变快照) → SSTable - Engine 移除冗余锁,变为薄封装层,同步完全由 MemTable 负责 - 重命名 MAXL→maxLevel, P→probability(Go 命名规范)
📝 WalkthroughWalkthroughThe PR refactors storage layer concurrency by moving synchronization from Engine to MemTable. Engine loses its RWMutex and becomes a thin delegating wrapper. MemTable gains a double-buffer design with active/dirty skip lists, skip-list operations migrated onto SkipList itself, and a rewritten Flush that swaps buffers and persists outside locks. ChangesStorage Concurrency Refactor
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Tip 💬 Introducing Slack Agent: The best way for teams to turn conversations into code.Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.
Built for teams:
One agent for your entire SDLC. Right inside Slack. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
storage/zstorage/memtable.go (1)
400-411:⚠️ Potential issue | 🔴 Critical | ⚡ Quick winData race:
WriteSSTableiterates active without synchronization.After releasing
RLock, concurrentPut()calls can modify the sameSkipListviainsert()whilecollectAllEntry(active)is iterating over it. This can cause corrupted iteration or crashes.Unlike
Flush(), which correctly swapsactive → dirtyto create an immutable snapshot, this method reads from a live, mutable structure.🔧 Proposed fix: use the same swap pattern as Flush
func (m *MemTable) WriteSSTable() error { m.mu.RLock() - active := m.active + entries := collectAllEntry(m.active) m.mu.RUnlock() - err := m.sst.writeToSSTable(collectAllEntry(active)) + err := m.sst.writeToSSTable(entries) select { case m.compactCh <- true: default: } return err }Alternatively, if you need a true snapshot without blocking writes, apply the same swap pattern used in
Flush().🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@storage/zstorage/memtable.go` around lines 400 - 411, WriteSSTable currently grabs and releases m.mu.RLock then calls collectAllEntry(active) which can race with concurrent Put() mutations of the SkipList; change WriteSSTable to take the same immutable-snapshot swap used by Flush: acquire m.mu.Lock, swap m.active into a new local variable (e.g., dirty or snapshot) and replace m.active with an empty SkipList, release the lock, then call collectAllEntry(snapshot) and m.sst.writeToSSTable; keep the compactCh send logic unchanged so writers are not blocked.
🧹 Nitpick comments (2)
storage/engine.go (1)
53-62: 💤 Low valueErrors from
Put/DeleteinapplyWorkerare silently dropped.This is likely pre-existing behavior, but errors returned from
e.Put()ande.Delete()are ignored, which could mask failures in the apply pipeline.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@storage/engine.go` around lines 53 - 62, The applyWorker currently ignores return values from e.Put and e.Delete; update Engine.applyWorker to capture and handle errors from those calls (e.g., err := e.Put(...)/e.Delete(...), then if err != nil handle it). Handle errors by logging using Engine's logger (or return them on a new engine-level channel such as e.applyErrCh so the caller can observe/handle failures), and consider whether the worker should retry, stop processing, or propagate the error; ensure you reference the applyCh consumer in applyWorker and the Put/Delete calls when adding the error handling.storage/zstorage/memtable.go (1)
334-338: ⚖️ Poor tradeoffConsider retaining dirty reference on flush error for recovery.
If
writeToSSTablefails, the method returns early without settingdirty = nil, which is correct for allowingGetto still find data. However, the data indirtyis effectively lost since it's not in the newactiveeither. Consider whether to copy entries back toactiveor implement retry logic.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@storage/zstorage/memtable.go` around lines 334 - 338, The flush currently returns on error from m.sst.writeToSSTable(allEntries) which leaves the staged entries neither in the new SST nor in m.active; to avoid data loss, on write error restore the entries into the in-memory store instead of dropping them: when writeToSSTable returns an error, merge allEntries (or m.dirty) back into m.active (or leave m.dirty intact) while holding the same mutex used by Get, so lookups still succeed; alternatively implement a bounded retry loop around m.sst.writeToSSTable before abandoning the merge; reference m.sst.writeToSSTable, m.dirty, m.active, and Get to locate where to perform the merge/ retry and ensure proper locking.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@storage/zstorage/memtable.go`:
- Around line 221-235: The Delete method in MemTable (MemTable.Delete) removes
keys from the active memtable via m.active.delete but does not persist a
tombstone to the write-ahead log (WAL), so deletes are lost on recovery; update
Delete to append a delete marker to the WAL before mutating the in-memory table
(mirror the Put flow that writes to WAL first), e.g. call the existing WAL write
method for deletions (or add one) and ensure the WAL entry is flushed/checked
for errors before calling m.active.delete; keep error handling consistent with
Put so a failed WAL write prevents the in-memory delete.
---
Outside diff comments:
In `@storage/zstorage/memtable.go`:
- Around line 400-411: WriteSSTable currently grabs and releases m.mu.RLock then
calls collectAllEntry(active) which can race with concurrent Put() mutations of
the SkipList; change WriteSSTable to take the same immutable-snapshot swap used
by Flush: acquire m.mu.Lock, swap m.active into a new local variable (e.g.,
dirty or snapshot) and replace m.active with an empty SkipList, release the
lock, then call collectAllEntry(snapshot) and m.sst.writeToSSTable; keep the
compactCh send logic unchanged so writers are not blocked.
---
Nitpick comments:
In `@storage/engine.go`:
- Around line 53-62: The applyWorker currently ignores return values from e.Put
and e.Delete; update Engine.applyWorker to capture and handle errors from those
calls (e.g., err := e.Put(...)/e.Delete(...), then if err != nil handle it).
Handle errors by logging using Engine's logger (or return them on a new
engine-level channel such as e.applyErrCh so the caller can observe/handle
failures), and consider whether the worker should retry, stop processing, or
propagate the error; ensure you reference the applyCh consumer in applyWorker
and the Put/Delete calls when adding the error handling.
In `@storage/zstorage/memtable.go`:
- Around line 334-338: The flush currently returns on error from
m.sst.writeToSSTable(allEntries) which leaves the staged entries neither in the
new SST nor in m.active; to avoid data loss, on write error restore the entries
into the in-memory store instead of dropping them: when writeToSSTable returns
an error, merge allEntries (or m.dirty) back into m.active (or leave m.dirty
intact) while holding the same mutex used by Get, so lookups still succeed;
alternatively implement a bounded retry loop around m.sst.writeToSSTable before
abandoning the merge; reference m.sst.writeToSSTable, m.dirty, m.active, and Get
to locate where to perform the merge/ retry and ensure proper locking.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro Plus
Run ID: 9c2cd3dd-9406-4296-9da7-5b5387df5ed2
📒 Files selected for processing (2)
storage/engine.gostorage/zstorage/memtable.go
Summary by CodeRabbit