From 1e7cc85ae9648dedaed2632db2260bb685e51df8 Mon Sep 17 00:00:00 2001 From: mattthew Date: Thu, 26 Feb 2026 12:48:02 -0500 Subject: [PATCH] Check also for max timestamp greater than read timestamp for cache selection; add unit test --- posting/mvcc.go | 4 +- posting/mvcc_test.go | 91 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 94 insertions(+), 1 deletion(-) diff --git a/posting/mvcc.go b/posting/mvcc.go index 8856cc20415..81c5e375553 100644 --- a/posting/mvcc.go +++ b/posting/mvcc.go @@ -764,7 +764,9 @@ func (c *CachePL) Set(l *List, readTs uint64) { func (ml *MemoryLayer) readFromCache(key []byte, readTs uint64) *List { cacheItem, ok := ml.cache.get(key) - if ok && cacheItem.list != nil && cacheItem.list.minTs <= readTs { + // Issue #9597 fix: Cache is only valid if minTs <= readTs AND maxTs >= readTs. + // If maxTs < readTs, the cache is missing mutations committed after maxTs. + if ok && cacheItem.list != nil && cacheItem.list.minTs <= readTs && cacheItem.list.maxTs >= readTs { cacheItem.list.RLock() lCopy := copyList(cacheItem.list) cacheItem.list.RUnlock() diff --git a/posting/mvcc_test.go b/posting/mvcc_test.go index c7874a41994..e519e359d6f 100644 --- a/posting/mvcc_test.go +++ b/posting/mvcc_test.go @@ -10,6 +10,7 @@ import ( "math" "math/rand" "os" + "slices" "testing" "time" @@ -182,6 +183,96 @@ func TestRollupTimestamp(t *testing.T) { require.Equal(t, uint64(10), kvs[0].Version) } +// TestCacheStaleWhenMaxTsLessThanReadTs tests that readFromCache +// returns nil (cache miss) when cacheMaxTs < readTs, forcing a disk read. +// Issue #9597: Without the maxTs >= readTs check, stale cache data is returned. +func TestCacheStaleWhenMaxTsLessThanReadTs(t *testing.T) { + require.NoError(t, pstore.DropAll()) + + // Re-initialize MemoryLayer with cache enabled (10MB) for this test + // The default test setup uses cache size 0 which disables caching + origMemLayer := MemLayerInstance + MemLayerInstance = initMemoryLayer(10<<20, false) + t.Cleanup(func() { + MemLayerInstance = origMemLayer + }) + + attr := x.AttrInRootNamespace("issue9597") + key := x.IndexKey(attr, "test") + + // Step 1: Write UID 1 at commitTs=10 and populate cache via UpdateCachedKeys + p1 := new(pb.PostingList) + p1.Postings = []*pb.Posting{{ + Uid: 1, + StartTs: 5, + CommitTs: 10, + Op: 1, + }} + delta1, err := proto.Marshal(p1) + require.NoError(t, err) + + txn1 := Oracle().RegisterStartTs(5) + txn1.cache.deltas[string(key)] = delta1 + + writer1 := NewTxnWriter(pstore) + require.NoError(t, txn1.CommitToDisk(writer1, 10)) + require.NoError(t, writer1.Flush()) + + // Read at ts=10 to populate the cache (this triggers saveInCache in ReadData) + l1, err := GetNoStore(key, 10) + require.NoError(t, err) + require.Equal(t, 1, l1.mutationMap.listLen(10)) + + // Wait for ristretto to process the cache set + MemLayerInstance.wait() + + // Verify cache is populated + cacheItem, cacheOk := MemLayerInstance.cache.get(key) + require.True(t, cacheOk, "Cache should have entry after first read") + require.NotNil(t, cacheItem.list) + cacheMaxTsBefore := cacheItem.list.maxTs + + // Step 2: Write UID 2 at commitTs=20 to disk, but DON'T call UpdateCachedKeys + // This simulates the race where disk has newer data than cache + p2 := new(pb.PostingList) + p2.Postings = []*pb.Posting{{ + Uid: 2, + StartTs: 15, + CommitTs: 20, + Op: 1, + }} + delta2, err := proto.Marshal(p2) + require.NoError(t, err) + + txn2 := Oracle().RegisterStartTs(15) + txn2.cache.deltas[string(key)] = delta2 + + writer2 := NewTxnWriter(pstore) + require.NoError(t, txn2.CommitToDisk(writer2, 20)) + require.NoError(t, writer2.Flush()) + // NOTE: We intentionally skip UpdateCachedKeys to keep cache stale + + // Verify cache is still stale (maxTs unchanged) + cacheItem2, _ := MemLayerInstance.cache.get(key) + require.Equal(t, cacheMaxTsBefore, cacheItem2.list.maxTs, "Cache should still be stale") + + // Step 3: Read at readTs=25 (greater than cache maxTs) + // With the fix: cache miss (maxTs < readTs), reads from disk, gets both UIDs + // Without fix: cache hit (minTs <= readTs), returns stale data without UID 2 + l2, err := GetNoStore(key, 25) + require.NoError(t, err) + + uidList, err := l2.Uids(ListOptions{ReadTs: 25}) + require.NoError(t, err) + + // Should see UID 2 (from disk read) + hasUid2 := slices.Contains(uidList.Uids, 2) + if !hasUid2 { + t.Fatalf("Expected UID 2 in result, got UIDs: %v", uidList.Uids) + } + require.True(t, hasUid2, "UID 2 missing - cache returned stale data (maxTs < readTs)") +} + func TestPostingListRead(t *testing.T) { attr := x.AttrInRootNamespace("emptypl") key := x.DataKey(attr, 1)