From 4206c8232f19402c38da6a966a886b9544d85953 Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Tue, 15 Apr 2025 09:00:49 +0530 Subject: [PATCH] fix(core): fix old deletes for scalar postings --- posting/list.go | 35 +++++++++---------------- worker/sort_test.go | 62 ++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 73 insertions(+), 24 deletions(-) diff --git a/posting/list.go b/posting/list.go index 07bbd91c03f..2d7fea12a9a 100644 --- a/posting/list.go +++ b/posting/list.go @@ -737,6 +737,14 @@ func NewPosting(t *pb.DirectedEdge) *pb.Posting { return p } +func createDeleteAllPosting() *pb.Posting { + return &pb.Posting{ + Op: Del, + Value: []byte(x.Star), + Uid: math.MaxUint64, + } +} + func hasDeleteAll(mpost *pb.Posting) bool { return mpost.Op == Del && bytes.Equal(mpost.Value, []byte(x.Star)) && len(mpost.LangTag) == 0 } @@ -768,30 +776,11 @@ func (l *List) updateMutationLayer(mpost *pb.Posting, singleUidUpdate, hasCountI // be done because the fingerprint for the value is not math.MaxUint64 as is // the case with the rest of the scalar predicates. newPlist := &pb.PostingList{} - newPlist.Postings = append(newPlist.Postings, mpost) - - // Add the deletions in the existing plist because those postings are not picked - // up by iterating. Not doing so would result in delete operations that are not - // applied when the transaction is committed. - l.mutationMap.currentEntries = &pb.PostingList{} - err := l.iterate(mpost.StartTs, 0, func(obj *pb.Posting) error { - // Ignore values which have the same uid as they will get replaced - // by the current value. - if obj.Uid == mpost.Uid { - return nil - } - - // Mark all other values as deleted. By the end of the iteration, the - // list of postings will contain deleted operations and only one set - // for the mutation stored in mpost. - objCopy := proto.Clone(obj).(*pb.Posting) - objCopy.Op = Del - newPlist.Postings = append(newPlist.Postings, objCopy) - return nil - }) - if err != nil { - return err + if mpost.Op != Del { + // If we are setting a new value then we can just delete all the older values. + newPlist.Postings = append(newPlist.Postings, createDeleteAllPosting()) } + newPlist.Postings = append(newPlist.Postings, mpost) l.mutationMap.setCurrentEntries(mpost.StartTs, newPlist) return nil } diff --git a/worker/sort_test.go b/worker/sort_test.go index 57d8f1f2173..95b6dee3408 100644 --- a/worker/sort_test.go +++ b/worker/sort_test.go @@ -394,7 +394,67 @@ func TestScalarPredicateCount(t *testing.T) { l.RUnlock() } -func TestSingleUid(t *testing.T) { +func TestSingleUidReplacement(t *testing.T) { + dir, err := os.MkdirTemp("", "storetest_") + x.Check(err) + defer os.RemoveAll(dir) + + opt := badger.DefaultOptions(dir) + ps, err := badger.OpenManaged(opt) + x.Check(err) + pstore = ps + posting.Init(ps, 0, false) + Init(ps) + err = schema.ParseBytes([]byte("singleUidReplaceTest: uid ."), 1) + require.NoError(t, err) + + ctx := context.Background() + txn := posting.Oracle().RegisterStartTs(5) + attr := x.GalaxyAttr("singleUidReplaceTest") + + // Txn 1. Set 1 -> 2 + x.Check(runMutation(ctx, &pb.DirectedEdge{ + ValueId: 2, + Attr: attr, + Entity: 1, + Op: pb.DirectedEdge_SET, + }, txn)) + + txn.Update() + writer := posting.NewTxnWriter(pstore) + require.NoError(t, txn.CommitToDisk(writer, 7)) + require.NoError(t, writer.Flush()) + txn.UpdateCachedKeys(7) + + // Txn 2. Set 1 -> 3 + txn = posting.Oracle().RegisterStartTs(9) + + x.Check(runMutation(ctx, &pb.DirectedEdge{ + ValueId: 3, + Attr: attr, + Entity: 1, + Op: pb.DirectedEdge_SET, + }, txn)) + + txn.Update() + writer = posting.NewTxnWriter(pstore) + require.NoError(t, txn.CommitToDisk(writer, 11)) + require.NoError(t, writer.Flush()) + txn.UpdateCachedKeys(11) + + key := x.DataKey(attr, 1) + + // Reading the david index, we should see 2 inserted, 1 deleted + txn = posting.Oracle().RegisterStartTs(15) + l, err := txn.Get(key) + require.NoError(t, err) + + uids, err := l.Uids(posting.ListOptions{ReadTs: 15}) + require.NoError(t, err) + require.Equal(t, uids.Uids, []uint64{3}) +} + +func TestSingleString(t *testing.T) { dir, err := os.MkdirTemp("", "storetest_") x.Check(err) defer os.RemoveAll(dir)