Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion posting/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -764,7 +764,9 @@ func (c *CachePL) Set(l *List, readTs uint64) {
func (ml *MemoryLayer) readFromCache(key []byte, readTs uint64) *List {
cacheItem, ok := ml.cache.get(key)

if ok && cacheItem.list != nil && cacheItem.list.minTs <= readTs {
// Issue #9597 fix: Cache is only valid if minTs <= readTs AND maxTs >= readTs.
// If maxTs < readTs, the cache is missing mutations committed after maxTs.
if ok && cacheItem.list != nil && cacheItem.list.minTs <= readTs && cacheItem.list.maxTs >= readTs {
cacheItem.list.RLock()
lCopy := copyList(cacheItem.list)
cacheItem.list.RUnlock()
Expand Down
91 changes: 91 additions & 0 deletions posting/mvcc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"math"
"math/rand"
"os"
"slices"
"testing"
"time"

Expand Down Expand Up @@ -182,6 +183,96 @@ func TestRollupTimestamp(t *testing.T) {
require.Equal(t, uint64(10), kvs[0].Version)
}

// TestCacheStaleWhenMaxTsLessThanReadTs tests that readFromCache
// returns nil (cache miss) when cacheMaxTs < readTs, forcing a disk read.
// Issue #9597: Without the maxTs >= readTs check, stale cache data is returned.
func TestCacheStaleWhenMaxTsLessThanReadTs(t *testing.T) {
require.NoError(t, pstore.DropAll())

// Re-initialize MemoryLayer with cache enabled (10MB) for this test
// The default test setup uses cache size 0 which disables caching
origMemLayer := MemLayerInstance
MemLayerInstance = initMemoryLayer(10<<20, false)
t.Cleanup(func() {
MemLayerInstance = origMemLayer
})

attr := x.AttrInRootNamespace("issue9597")
key := x.IndexKey(attr, "test")

// Step 1: Write UID 1 at commitTs=10 and populate cache via UpdateCachedKeys
p1 := new(pb.PostingList)
p1.Postings = []*pb.Posting{{
Uid: 1,
StartTs: 5,
CommitTs: 10,
Op: 1,
}}
delta1, err := proto.Marshal(p1)
require.NoError(t, err)

txn1 := Oracle().RegisterStartTs(5)
txn1.cache.deltas[string(key)] = delta1

writer1 := NewTxnWriter(pstore)
require.NoError(t, txn1.CommitToDisk(writer1, 10))
require.NoError(t, writer1.Flush())

// Read at ts=10 to populate the cache (this triggers saveInCache in ReadData)
l1, err := GetNoStore(key, 10)
require.NoError(t, err)
require.Equal(t, 1, l1.mutationMap.listLen(10))

// Wait for ristretto to process the cache set
MemLayerInstance.wait()

// Verify cache is populated
cacheItem, cacheOk := MemLayerInstance.cache.get(key)
require.True(t, cacheOk, "Cache should have entry after first read")
require.NotNil(t, cacheItem.list)
cacheMaxTsBefore := cacheItem.list.maxTs

// Step 2: Write UID 2 at commitTs=20 to disk, but DON'T call UpdateCachedKeys
// This simulates the race where disk has newer data than cache
p2 := new(pb.PostingList)
p2.Postings = []*pb.Posting{{
Uid: 2,
StartTs: 15,
CommitTs: 20,
Op: 1,
}}
delta2, err := proto.Marshal(p2)
require.NoError(t, err)

txn2 := Oracle().RegisterStartTs(15)
txn2.cache.deltas[string(key)] = delta2

writer2 := NewTxnWriter(pstore)
require.NoError(t, txn2.CommitToDisk(writer2, 20))
require.NoError(t, writer2.Flush())
// NOTE: We intentionally skip UpdateCachedKeys to keep cache stale

// Verify cache is still stale (maxTs unchanged)
cacheItem2, _ := MemLayerInstance.cache.get(key)
require.Equal(t, cacheMaxTsBefore, cacheItem2.list.maxTs, "Cache should still be stale")

// Step 3: Read at readTs=25 (greater than cache maxTs)
// With the fix: cache miss (maxTs < readTs), reads from disk, gets both UIDs
// Without fix: cache hit (minTs <= readTs), returns stale data without UID 2
l2, err := GetNoStore(key, 25)
require.NoError(t, err)

uidList, err := l2.Uids(ListOptions{ReadTs: 25})
require.NoError(t, err)

// Should see UID 2 (from disk read)
hasUid2 := slices.Contains(uidList.Uids, 2)
if !hasUid2 {
t.Fatalf("Expected UID 2 in result, got UIDs: %v", uidList.Uids)
}
require.True(t, hasUid2, "UID 2 missing - cache returned stale data (maxTs < readTs)")
}

func TestPostingListRead(t *testing.T) {
attr := x.AttrInRootNamespace("emptypl")
key := x.DataKey(attr, 1)
Expand Down
Loading