From 86696d1b288060899448e52aac97b41eee5f8849 Mon Sep 17 00:00:00 2001 From: Harshil Goel <54325286+harshil-goel@users.noreply.github.com> Date: Tue, 18 Mar 2025 20:29:34 +0530 Subject: [PATCH] fix(core): fix read scalar list with rollups (#9350) Fixes #9338 --- posting/oracle.go | 13 ++++++++--- worker/sort_test.go | 55 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 65 insertions(+), 3 deletions(-) diff --git a/posting/oracle.go b/posting/oracle.go index b058288dada..6ee68f843bc 100644 --- a/posting/oracle.go +++ b/posting/oracle.go @@ -169,6 +169,8 @@ func (txn *Txn) GetScalarList(key []byte) (*List, error) { if err != nil { return nil, err } + l.Lock() + defer l.Unlock() if l.mutationMap.len() == 0 && len(l.plist.Postings) == 0 { pl, err := txn.cache.readPostingListAt(key) if err == badger.ErrKeyNotFound { @@ -177,10 +179,15 @@ func (txn *Txn) GetScalarList(key []byte) (*List, error) { if err != nil { return nil, err } - if pl.CommitTs == 0 { - l.mutationMap.setCurrentEntries(txn.StartTs, pl) + + if pl.Pack != nil { + l.plist = pl } else { - l.mutationMap.insertCommittedPostings(pl) + if pl.CommitTs == 0 { + l.mutationMap.setCurrentEntries(txn.StartTs, pl) + } else { + l.mutationMap.insertCommittedPostings(pl) + } } } return l, nil diff --git a/worker/sort_test.go b/worker/sort_test.go index b2d6eb80170..e73f87dad1d 100644 --- a/worker/sort_test.go +++ b/worker/sort_test.go @@ -101,6 +101,61 @@ func TestEmptyTypeSchema(t *testing.T) { x.ParseNamespaceAttr(types[0].TypeName) } +func TestGetScalarList(t *testing.T) { + dir, err := os.MkdirTemp("", "storetest_") + x.Check(err) + defer os.RemoveAll(dir) + + opt := badger.DefaultOptions(dir) + ps, err := badger.OpenManaged(opt) + x.Check(err) + pstore = ps + posting.Init(ps, 0, false) + Init(ps) + err = schema.ParseBytes([]byte("scalarPredicateCount4: uid ."), 1) + require.NoError(t, err) + + runM := func(startTs, commitTs uint64, edges []*pb.DirectedEdge) { + txn := posting.Oracle().RegisterStartTs(startTs) + for _, edge := range edges { + x.Check(runMutation(context.Background(), edge, txn)) + } + txn.Update() + writer := posting.NewTxnWriter(pstore) + require.NoError(t, txn.CommitToDisk(writer, commitTs)) + require.NoError(t, writer.Flush()) + txn.UpdateCachedKeys(commitTs) + } + + attr := x.GalaxyAttr("scalarPredicateCount4") + + runM(5, 7, []*pb.DirectedEdge{{ + ValueId: 3, + ValueType: pb.Posting_UID, + Attr: attr, + Entity: 1, + Op: pb.DirectedEdge_SET, + }}) + + key := x.DataKey(attr, 1) + rollup(t, key, ps, 8) + + runM(9, 11, []*pb.DirectedEdge{{ + ValueId: 5, + ValueType: pb.Posting_UID, + Attr: attr, + Entity: 1, + Op: pb.DirectedEdge_SET, + }}) + + txn := posting.Oracle().RegisterStartTs(13) + l, err := txn.Get(key) + require.Nil(t, err) + uids, err := l.Uids(posting.ListOptions{ReadTs: 13}) + require.Nil(t, err) + require.Equal(t, 1, len(uids.Uids)) +} + func TestMultipleTxnListCount(t *testing.T) { dir, err := os.MkdirTemp("", "storetest_") x.Check(err)