Skip to content

Commit ab92f9a

Browse files
author
Harshil Goel
authored
fix(core): delete all before set for scalar postings (#9378)
1 parent 6973b8e commit ab92f9a

2 files changed

Lines changed: 73 additions & 24 deletions

File tree

posting/list.go

Lines changed: 12 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -737,6 +737,14 @@ func NewPosting(t *pb.DirectedEdge) *pb.Posting {
737737
return p
738738
}
739739

740+
func createDeleteAllPosting() *pb.Posting {
741+
return &pb.Posting{
742+
Op: Del,
743+
Value: []byte(x.Star),
744+
Uid: math.MaxUint64,
745+
}
746+
}
747+
740748
func hasDeleteAll(mpost *pb.Posting) bool {
741749
return mpost.Op == Del && bytes.Equal(mpost.Value, []byte(x.Star)) && len(mpost.LangTag) == 0
742750
}
@@ -768,30 +776,11 @@ func (l *List) updateMutationLayer(mpost *pb.Posting, singleUidUpdate, hasCountI
768776
// be done because the fingerprint for the value is not math.MaxUint64 as is
769777
// the case with the rest of the scalar predicates.
770778
newPlist := &pb.PostingList{}
771-
newPlist.Postings = append(newPlist.Postings, mpost)
772-
773-
// Add the deletions in the existing plist because those postings are not picked
774-
// up by iterating. Not doing so would result in delete operations that are not
775-
// applied when the transaction is committed.
776-
l.mutationMap.currentEntries = &pb.PostingList{}
777-
err := l.iterate(mpost.StartTs, 0, func(obj *pb.Posting) error {
778-
// Ignore values which have the same uid as they will get replaced
779-
// by the current value.
780-
if obj.Uid == mpost.Uid {
781-
return nil
782-
}
783-
784-
// Mark all other values as deleted. By the end of the iteration, the
785-
// list of postings will contain deleted operations and only one set
786-
// for the mutation stored in mpost.
787-
objCopy := proto.Clone(obj).(*pb.Posting)
788-
objCopy.Op = Del
789-
newPlist.Postings = append(newPlist.Postings, objCopy)
790-
return nil
791-
})
792-
if err != nil {
793-
return err
779+
if mpost.Op != Del {
780+
// If we are setting a new value then we can just delete all the older values.
781+
newPlist.Postings = append(newPlist.Postings, createDeleteAllPosting())
794782
}
783+
newPlist.Postings = append(newPlist.Postings, mpost)
795784
l.mutationMap.setCurrentEntries(mpost.StartTs, newPlist)
796785
return nil
797786
}

worker/sort_test.go

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -394,7 +394,67 @@ func TestScalarPredicateCount(t *testing.T) {
394394
l.RUnlock()
395395
}
396396

397-
func TestSingleUid(t *testing.T) {
397+
func TestSingleUidReplacement(t *testing.T) {
398+
dir, err := os.MkdirTemp("", "storetest_")
399+
x.Check(err)
400+
defer os.RemoveAll(dir)
401+
402+
opt := badger.DefaultOptions(dir)
403+
ps, err := badger.OpenManaged(opt)
404+
x.Check(err)
405+
pstore = ps
406+
posting.Init(ps, 0, false)
407+
Init(ps)
408+
err = schema.ParseBytes([]byte("singleUidReplaceTest: uid ."), 1)
409+
require.NoError(t, err)
410+
411+
ctx := context.Background()
412+
txn := posting.Oracle().RegisterStartTs(5)
413+
attr := x.GalaxyAttr("singleUidReplaceTest")
414+
415+
// Txn 1. Set 1 -> 2
416+
x.Check(runMutation(ctx, &pb.DirectedEdge{
417+
ValueId: 2,
418+
Attr: attr,
419+
Entity: 1,
420+
Op: pb.DirectedEdge_SET,
421+
}, txn))
422+
423+
txn.Update()
424+
writer := posting.NewTxnWriter(pstore)
425+
require.NoError(t, txn.CommitToDisk(writer, 7))
426+
require.NoError(t, writer.Flush())
427+
txn.UpdateCachedKeys(7)
428+
429+
// Txn 2. Set 1 -> 3
430+
txn = posting.Oracle().RegisterStartTs(9)
431+
432+
x.Check(runMutation(ctx, &pb.DirectedEdge{
433+
ValueId: 3,
434+
Attr: attr,
435+
Entity: 1,
436+
Op: pb.DirectedEdge_SET,
437+
}, txn))
438+
439+
txn.Update()
440+
writer = posting.NewTxnWriter(pstore)
441+
require.NoError(t, txn.CommitToDisk(writer, 11))
442+
require.NoError(t, writer.Flush())
443+
txn.UpdateCachedKeys(11)
444+
445+
key := x.DataKey(attr, 1)
446+
447+
// Reading the david index, we should see 2 inserted, 1 deleted
448+
txn = posting.Oracle().RegisterStartTs(15)
449+
l, err := txn.Get(key)
450+
require.NoError(t, err)
451+
452+
uids, err := l.Uids(posting.ListOptions{ReadTs: 15})
453+
require.NoError(t, err)
454+
require.Equal(t, uids.Uids, []uint64{3})
455+
}
456+
457+
func TestSingleString(t *testing.T) {
398458
dir, err := os.MkdirTemp("", "storetest_")
399459
x.Check(err)
400460
defer os.RemoveAll(dir)

0 commit comments

Comments
 (0)