Skip to content

Commit cf586de

Browse files
author
Harshil Goel
authored
fix(core): fix read scalar list with rollups (#9350)
Fixes #9338
1 parent 1206e2a commit cf586de

2 files changed

Lines changed: 65 additions & 3 deletions

File tree

posting/oracle.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,8 @@ func (txn *Txn) GetScalarList(key []byte) (*List, error) {
160160
if err != nil {
161161
return nil, err
162162
}
163+
l.Lock()
164+
defer l.Unlock()
163165
if l.mutationMap.len() == 0 && len(l.plist.Postings) == 0 {
164166
pl, err := txn.cache.readPostingListAt(key)
165167
if err == badger.ErrKeyNotFound {
@@ -168,10 +170,15 @@ func (txn *Txn) GetScalarList(key []byte) (*List, error) {
168170
if err != nil {
169171
return nil, err
170172
}
171-
if pl.CommitTs == 0 {
172-
l.mutationMap.setCurrentEntries(txn.StartTs, pl)
173+
174+
if pl.Pack != nil {
175+
l.plist = pl
173176
} else {
174-
l.mutationMap.insertCommittedPostings(pl)
177+
if pl.CommitTs == 0 {
178+
l.mutationMap.setCurrentEntries(txn.StartTs, pl)
179+
} else {
180+
l.mutationMap.insertCommittedPostings(pl)
181+
}
175182
}
176183
}
177184
return l, nil

worker/sort_test.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,61 @@ func TestEmptyTypeSchema(t *testing.T) {
9090
x.ParseNamespaceAttr(types[0].TypeName)
9191
}
9292

93+
func TestGetScalarList(t *testing.T) {
94+
dir, err := os.MkdirTemp("", "storetest_")
95+
x.Check(err)
96+
defer os.RemoveAll(dir)
97+
98+
opt := badger.DefaultOptions(dir)
99+
ps, err := badger.OpenManaged(opt)
100+
x.Check(err)
101+
pstore = ps
102+
posting.Init(ps, 0, false)
103+
Init(ps)
104+
err = schema.ParseBytes([]byte("scalarPredicateCount4: uid ."), 1)
105+
require.NoError(t, err)
106+
107+
runM := func(startTs, commitTs uint64, edges []*pb.DirectedEdge) {
108+
txn := posting.Oracle().RegisterStartTs(startTs)
109+
for _, edge := range edges {
110+
x.Check(runMutation(context.Background(), edge, txn))
111+
}
112+
txn.Update()
113+
writer := posting.NewTxnWriter(pstore)
114+
require.NoError(t, txn.CommitToDisk(writer, commitTs))
115+
require.NoError(t, writer.Flush())
116+
txn.UpdateCachedKeys(commitTs)
117+
}
118+
119+
attr := x.GalaxyAttr("scalarPredicateCount4")
120+
121+
runM(5, 7, []*pb.DirectedEdge{{
122+
ValueId: 3,
123+
ValueType: pb.Posting_UID,
124+
Attr: attr,
125+
Entity: 1,
126+
Op: pb.DirectedEdge_SET,
127+
}})
128+
129+
key := x.DataKey(attr, 1)
130+
rollup(t, key, ps, 8)
131+
132+
runM(9, 11, []*pb.DirectedEdge{{
133+
ValueId: 5,
134+
ValueType: pb.Posting_UID,
135+
Attr: attr,
136+
Entity: 1,
137+
Op: pb.DirectedEdge_SET,
138+
}})
139+
140+
txn := posting.Oracle().RegisterStartTs(13)
141+
l, err := txn.Get(key)
142+
require.Nil(t, err)
143+
uids, err := l.Uids(posting.ListOptions{ReadTs: 13})
144+
require.Nil(t, err)
145+
require.Equal(t, 1, len(uids.Uids))
146+
}
147+
93148
func TestMultipleTxnListCount(t *testing.T) {
94149
dir, err := os.MkdirTemp("", "storetest_")
95150
x.Check(err)

0 commit comments

Comments
 (0)