Skip to content

Commit df56a2f

Browse files
author
Harshil Goel
authored
fix(core): fix issues with count index for scalar predicates (#9292)
1 parent 5dbcd0f commit df56a2f

3 files changed

Lines changed: 301 additions & 17 deletions

File tree

posting/index.go

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -262,13 +262,17 @@ func (txn *Txn) addReverseMutationHelper(ctx context.Context, plist *List,
262262
return emptyCountParams, errors.Wrapf(ErrTsTooOld, "Adding reverse mutation helper count")
263263
}
264264
}
265+
265266
if !(hasCountIndex && !shouldAddCountEdge(found, edge)) {
266267
if err := plist.addMutationInternal(ctx, txn, edge); err != nil {
267268
return emptyCountParams, err
268269
}
269270
}
271+
270272
if hasCountIndex {
271-
countAfter = countAfterMutation(countBefore, found, edge.Op)
273+
pk, _ := x.Parse(plist.key)
274+
shouldCountOneUid := !schema.State().IsList(edge.Attr) && !pk.IsReverse()
275+
countAfter = countAfterMutation(countBefore, found, edge.Op, shouldCountOneUid)
272276
return countParams{
273277
attr: edge.Attr,
274278
countBefore: countBefore,
@@ -475,7 +479,21 @@ func (txn *Txn) updateCount(ctx context.Context, params countParams) error {
475479
return nil
476480
}
477481

478-
func countAfterMutation(countBefore int, found bool, op pb.DirectedEdge_Op) int {
482+
// Gives the count of the posting after the mutation has finished. Currently we use this to figure after the mutation
483+
// what is the count. For non scalar predicate, we need to use found and the operation that the user did to figure out
484+
// if the new node was inserted or not. However, for single uid predicates this information is not useful. For scalar
485+
// predicate, delete only works if the value was found. Set would just result in 1 alaways.
486+
func countAfterMutation(countBefore int, found bool, op pb.DirectedEdge_Op, shouldCountOneUid bool) int {
487+
if shouldCountOneUid {
488+
if op == pb.DirectedEdge_SET {
489+
return 1
490+
} else if op == pb.DirectedEdge_DEL && found {
491+
return 0
492+
} else {
493+
return countBefore
494+
}
495+
}
496+
479497
if !found && op != pb.DirectedEdge_DEL {
480498
return countBefore + 1
481499
} else if found && op == pb.DirectedEdge_DEL {
@@ -516,7 +534,8 @@ func (txn *Txn) addMutationHelper(ctx context.Context, l *List, doUpdateIndex bo
516534
var found bool
517535
var err error
518536

519-
delNonListPredicate := !schema.State().IsList(t.Attr) &&
537+
isScalarPredicate := !schema.State().IsList(t.Attr)
538+
delNonListPredicate := isScalarPredicate &&
520539
t.Op == pb.DirectedEdge_DEL && string(t.Value) != x.Star
521540

522541
switch {
@@ -560,7 +579,9 @@ func (txn *Txn) addMutationHelper(ctx context.Context, l *List, doUpdateIndex bo
560579
}
561580

562581
if hasCountIndex {
563-
countAfter = countAfterMutation(countBefore, found, t.Op)
582+
pk, _ := x.Parse(l.key)
583+
shouldCountOneUid := isScalarPredicate && !pk.IsReverse()
584+
countAfter = countAfterMutation(countBefore, found, t.Op, shouldCountOneUid)
564585
return val, found, countParams{
565586
attr: t.Attr,
566587
countBefore: countBefore,

posting/list.go

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,7 @@ func (mm *MutableLayer) populateUidMap(pl *pb.PostingList) {
345345
}
346346

347347
// insertPosting inserts a new posting in the mutable layers. It updates the currentUids map.
348-
func (mm *MutableLayer) insertPosting(mpost *pb.Posting) {
348+
func (mm *MutableLayer) insertPosting(mpost *pb.Posting, hasCountIndex bool) {
349349
if mm.readTs != 0 {
350350
x.AssertTrue(mpost.StartTs == mm.readTs)
351351
}
@@ -359,8 +359,30 @@ func (mm *MutableLayer) insertPosting(mpost *pb.Posting) {
359359
}
360360

361361
if mpost.Uid != 0 {
362+
// If hasCountIndex, in that case while inserting uids, if there's a delete, we only delete from the
363+
// current entries, we dont' insert the delete posting. If we insert the delete posting, there won't be
364+
// any set posting in the list. This would mess up the count. We can do this for all types, however,
365+
// there might be a performance hit becasue of it.
362366
mm.populateUidMap(mm.currentEntries)
363367
if postIndex, ok := mm.currentUids[mpost.Uid]; ok {
368+
if hasCountIndex && mpost.Op == Del {
369+
// If the posting was there before, just remove it from the map, and then remove it
370+
// from the array.
371+
post := mm.currentEntries.Postings[postIndex]
372+
if post.Op == Del {
373+
// No need to do anything
374+
mm.currentEntries.Postings[postIndex] = mpost
375+
return
376+
}
377+
res := mm.currentEntries.Postings[:postIndex]
378+
if postIndex+1 <= len(mm.currentEntries.Postings) {
379+
mm.currentEntries.Postings = append(res,
380+
mm.currentEntries.Postings[(postIndex+1):]...)
381+
}
382+
mm.currentUids = nil
383+
mm.currentEntries.Postings = res
384+
return
385+
}
364386
mm.currentEntries.Postings[postIndex] = mpost
365387
} else {
366388
mm.currentEntries.Postings = append(mm.currentEntries.Postings, mpost)
@@ -724,7 +746,7 @@ func hasDeleteAll(mpost *pb.Posting) bool {
724746
}
725747

726748
// Ensure that you either abort the uncommitted postings or commit them before calling me.
727-
func (l *List) updateMutationLayer(mpost *pb.Posting, singleUidUpdate bool) error {
749+
func (l *List) updateMutationLayer(mpost *pb.Posting, singleUidUpdate, hasCountIndex bool) error {
728750
l.AssertLock()
729751
x.AssertTrue(mpost.Op == Set || mpost.Op == Del || mpost.Op == Ovr)
730752

@@ -755,12 +777,7 @@ func (l *List) updateMutationLayer(mpost *pb.Posting, singleUidUpdate bool) erro
755777
// Add the deletions in the existing plist because those postings are not picked
756778
// up by iterating. Not doing so would result in delete operations that are not
757779
// applied when the transaction is committed.
758-
for _, post := range l.mutationMap.currentEntries.Postings {
759-
if post.Op == Del && post.Uid != mpost.Uid {
760-
newPlist.Postings = append(newPlist.Postings, post)
761-
}
762-
}
763-
780+
l.mutationMap.currentEntries = &pb.PostingList{}
764781
err := l.iterate(mpost.StartTs, 0, func(obj *pb.Posting) error {
765782
// Ignore values which have the same uid as they will get replaced
766783
// by the current value.
@@ -779,14 +796,11 @@ func (l *List) updateMutationLayer(mpost *pb.Posting, singleUidUpdate bool) erro
779796
if err != nil {
780797
return err
781798
}
782-
783-
// Update the mutation map with the new plist. Return here since the code below
784-
// does not apply for predicates of type uid.
785799
l.mutationMap.setCurrentEntries(mpost.StartTs, newPlist)
786800
return nil
787801
}
788802

789-
l.mutationMap.insertPosting(mpost)
803+
l.mutationMap.insertPosting(mpost, hasCountIndex)
790804
return nil
791805
}
792806

@@ -910,7 +924,7 @@ func (l *List) addMutationInternal(ctx context.Context, txn *Txn, t *pb.Directed
910924
isSingleUidUpdate := ok && !pred.GetList() && pred.GetValueType() == pb.Posting_UID &&
911925
pk.IsData() && mpost.Op != Del && mpost.PostingType == pb.Posting_REF
912926

913-
if err != l.updateMutationLayer(mpost, isSingleUidUpdate) {
927+
if err != l.updateMutationLayer(mpost, isSingleUidUpdate, pred.GetCount() && (pk.IsData() || pk.IsReverse())) {
914928
return errors.Wrapf(err, "cannot update mutation layer of key %s with value %+v",
915929
hex.EncodeToString(l.key), mpost)
916930
}

0 commit comments

Comments
 (0)