Skip to content

Commit 934a203

Browse files
fix(posting): prevent stale cache hits when maxTs < readTs (#9597) (#9614)
**Description** When reading posting lists from the cache, `readFromCache` only verified that `minTs <= readTs`, but did not check that `maxTs >= readTs`. This allowed stale cache entries to be returned when newer data had been committed to disk but the cache hadn't been updated yet. Closes #9597 **Checklist** - [x] The PR title follows the [Conventional Commits](https://www.conventionalcommits.org/en/v1.0.0/#summary) syntax, leading with `fix:`, `feat:`, `chore:`, `ci:`, etc. - [x] Code compiles correctly and linting (via trunk) passes locally - [x] Tests added for new functionality, or regression tests for bug fixes added as applicable
1 parent 3a022ed commit 934a203

2 files changed

Lines changed: 94 additions & 1 deletion

File tree

posting/mvcc.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -764,7 +764,9 @@ func (c *CachePL) Set(l *List, readTs uint64) {
764764
func (ml *MemoryLayer) readFromCache(key []byte, readTs uint64) *List {
765765
cacheItem, ok := ml.cache.get(key)
766766

767-
if ok && cacheItem.list != nil && cacheItem.list.minTs <= readTs {
767+
// Issue #9597 fix: Cache is only valid if minTs <= readTs AND maxTs >= readTs.
768+
// If maxTs < readTs, the cache is missing mutations committed after maxTs.
769+
if ok && cacheItem.list != nil && cacheItem.list.minTs <= readTs && cacheItem.list.maxTs >= readTs {
768770
cacheItem.list.RLock()
769771
lCopy := copyList(cacheItem.list)
770772
cacheItem.list.RUnlock()

posting/mvcc_test.go

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"math"
1111
"math/rand"
1212
"os"
13+
"slices"
1314
"testing"
1415
"time"
1516

@@ -182,6 +183,96 @@ func TestRollupTimestamp(t *testing.T) {
182183
require.Equal(t, uint64(10), kvs[0].Version)
183184
}
184185

186+
// TestCacheStaleWhenMaxTsLessThanReadTs tests that readFromCache
187+
// returns nil (cache miss) when cacheMaxTs < readTs, forcing a disk read.
188+
// Issue #9597: Without the maxTs >= readTs check, stale cache data is returned.
189+
func TestCacheStaleWhenMaxTsLessThanReadTs(t *testing.T) {
190+
require.NoError(t, pstore.DropAll())
191+
192+
// Re-initialize MemoryLayer with cache enabled (10MB) for this test
193+
// The default test setup uses cache size 0 which disables caching
194+
origMemLayer := MemLayerInstance
195+
MemLayerInstance = initMemoryLayer(10<<20, false)
196+
t.Cleanup(func() {
197+
MemLayerInstance = origMemLayer
198+
})
199+
200+
attr := x.AttrInRootNamespace("issue9597")
201+
key := x.IndexKey(attr, "test")
202+
203+
// Step 1: Write UID 1 at commitTs=10 and populate cache via UpdateCachedKeys
204+
p1 := new(pb.PostingList)
205+
p1.Postings = []*pb.Posting{{
206+
Uid: 1,
207+
StartTs: 5,
208+
CommitTs: 10,
209+
Op: 1,
210+
}}
211+
delta1, err := proto.Marshal(p1)
212+
require.NoError(t, err)
213+
214+
txn1 := Oracle().RegisterStartTs(5)
215+
txn1.cache.deltas[string(key)] = delta1
216+
217+
writer1 := NewTxnWriter(pstore)
218+
require.NoError(t, txn1.CommitToDisk(writer1, 10))
219+
require.NoError(t, writer1.Flush())
220+
221+
// Read at ts=10 to populate the cache (this triggers saveInCache in ReadData)
222+
l1, err := GetNoStore(key, 10)
223+
require.NoError(t, err)
224+
require.Equal(t, 1, l1.mutationMap.listLen(10))
225+
226+
// Wait for ristretto to process the cache set
227+
MemLayerInstance.wait()
228+
229+
// Verify cache is populated
230+
cacheItem, cacheOk := MemLayerInstance.cache.get(key)
231+
require.True(t, cacheOk, "Cache should have entry after first read")
232+
require.NotNil(t, cacheItem.list)
233+
cacheMaxTsBefore := cacheItem.list.maxTs
234+
235+
// Step 2: Write UID 2 at commitTs=20 to disk, but DON'T call UpdateCachedKeys
236+
// This simulates the race where disk has newer data than cache
237+
p2 := new(pb.PostingList)
238+
p2.Postings = []*pb.Posting{{
239+
Uid: 2,
240+
StartTs: 15,
241+
CommitTs: 20,
242+
Op: 1,
243+
}}
244+
delta2, err := proto.Marshal(p2)
245+
require.NoError(t, err)
246+
247+
txn2 := Oracle().RegisterStartTs(15)
248+
txn2.cache.deltas[string(key)] = delta2
249+
250+
writer2 := NewTxnWriter(pstore)
251+
require.NoError(t, txn2.CommitToDisk(writer2, 20))
252+
require.NoError(t, writer2.Flush())
253+
// NOTE: We intentionally skip UpdateCachedKeys to keep cache stale
254+
255+
// Verify cache is still stale (maxTs unchanged)
256+
cacheItem2, _ := MemLayerInstance.cache.get(key)
257+
require.Equal(t, cacheMaxTsBefore, cacheItem2.list.maxTs, "Cache should still be stale")
258+
259+
// Step 3: Read at readTs=25 (greater than cache maxTs)
260+
// With the fix: cache miss (maxTs < readTs), reads from disk, gets both UIDs
261+
// Without fix: cache hit (minTs <= readTs), returns stale data without UID 2
262+
l2, err := GetNoStore(key, 25)
263+
require.NoError(t, err)
264+
265+
uidList, err := l2.Uids(ListOptions{ReadTs: 25})
266+
require.NoError(t, err)
267+
268+
// Should see UID 2 (from disk read)
269+
hasUid2 := slices.Contains(uidList.Uids, 2)
270+
if !hasUid2 {
271+
t.Fatalf("Expected UID 2 in result, got UIDs: %v", uidList.Uids)
272+
}
273+
require.True(t, hasUid2, "UID 2 missing - cache returned stale data (maxTs < readTs)")
274+
}
275+
185276
func TestPostingListRead(t *testing.T) {
186277
attr := x.AttrInRootNamespace("emptypl")
187278
key := x.DataKey(attr, 1)

0 commit comments

Comments
 (0)