Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 12 additions & 23 deletions posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,14 @@ func NewPosting(t *pb.DirectedEdge) *pb.Posting {
return p
}

func createDeleteAllPosting() *pb.Posting {
return &pb.Posting{
Op: Del,
Value: []byte(x.Star),
Uid: math.MaxUint64,
}
}

func hasDeleteAll(mpost *pb.Posting) bool {
return mpost.Op == Del && bytes.Equal(mpost.Value, []byte(x.Star)) && len(mpost.LangTag) == 0
}
Expand Down Expand Up @@ -768,30 +776,11 @@ func (l *List) updateMutationLayer(mpost *pb.Posting, singleUidUpdate, hasCountI
// be done because the fingerprint for the value is not math.MaxUint64 as is
// the case with the rest of the scalar predicates.
newPlist := &pb.PostingList{}
newPlist.Postings = append(newPlist.Postings, mpost)

// Add the deletions in the existing plist because those postings are not picked
// up by iterating. Not doing so would result in delete operations that are not
// applied when the transaction is committed.
l.mutationMap.currentEntries = &pb.PostingList{}
err := l.iterate(mpost.StartTs, 0, func(obj *pb.Posting) error {
// Ignore values which have the same uid as they will get replaced
// by the current value.
if obj.Uid == mpost.Uid {
return nil
}

// Mark all other values as deleted. By the end of the iteration, the
// list of postings will contain deleted operations and only one set
// for the mutation stored in mpost.
objCopy := proto.Clone(obj).(*pb.Posting)
objCopy.Op = Del
newPlist.Postings = append(newPlist.Postings, objCopy)
return nil
})
if err != nil {
return err
if mpost.Op != Del {
// If we are setting a new value then we can just delete all the older values.
newPlist.Postings = append(newPlist.Postings, createDeleteAllPosting())
}
newPlist.Postings = append(newPlist.Postings, mpost)
l.mutationMap.setCurrentEntries(mpost.StartTs, newPlist)
return nil
}
Expand Down
62 changes: 61 additions & 1 deletion worker/sort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,67 @@ func TestScalarPredicateCount(t *testing.T) {
l.RUnlock()
}

func TestSingleUid(t *testing.T) {
func TestSingleUidReplacement(t *testing.T) {
dir, err := os.MkdirTemp("", "storetest_")
x.Check(err)
defer os.RemoveAll(dir)

opt := badger.DefaultOptions(dir)
ps, err := badger.OpenManaged(opt)
x.Check(err)
pstore = ps
posting.Init(ps, 0, false)
Init(ps)
err = schema.ParseBytes([]byte("singleUidReplaceTest: uid ."), 1)
require.NoError(t, err)

ctx := context.Background()
txn := posting.Oracle().RegisterStartTs(5)
attr := x.GalaxyAttr("singleUidReplaceTest")

// Txn 1. Set 1 -> 2
x.Check(runMutation(ctx, &pb.DirectedEdge{
ValueId: 2,
Attr: attr,
Entity: 1,
Op: pb.DirectedEdge_SET,
}, txn))

txn.Update()
writer := posting.NewTxnWriter(pstore)
require.NoError(t, txn.CommitToDisk(writer, 7))
require.NoError(t, writer.Flush())
txn.UpdateCachedKeys(7)

// Txn 2. Set 1 -> 3
txn = posting.Oracle().RegisterStartTs(9)

x.Check(runMutation(ctx, &pb.DirectedEdge{
ValueId: 3,
Attr: attr,
Entity: 1,
Op: pb.DirectedEdge_SET,
}, txn))

txn.Update()
writer = posting.NewTxnWriter(pstore)
require.NoError(t, txn.CommitToDisk(writer, 11))
require.NoError(t, writer.Flush())
txn.UpdateCachedKeys(11)

key := x.DataKey(attr, 1)

// Reading the david index, we should see 2 inserted, 1 deleted
txn = posting.Oracle().RegisterStartTs(15)
l, err := txn.Get(key)
require.NoError(t, err)

uids, err := l.Uids(posting.ListOptions{ReadTs: 15})
require.NoError(t, err)
require.Equal(t, uids.Uids, []uint64{3})
}

func TestSingleString(t *testing.T) {
dir, err := os.MkdirTemp("", "storetest_")
x.Check(err)
defer os.RemoveAll(dir)
Expand Down