Skip to content

Commit 43b826d

Browse files
fix(pipeline): scalar Del-of-old-value must not wipe new Set in ProcessCount
Bug: scalar @count writes were nondeterministically losing data under concurrent transactions. Roughly half the deltas committed by the new mutation pipeline contained only a [DeleteAll] posting and no Set, so reads at maxTs returned an empty value list. Root cause: in ProcessSingle, handleOldDeleteForSingle appends a synthetic Del-of-old-value to postings[uid] alongside the user's Set, so InsertTokenizerIndexes / ProcessReverse / count diffing can see the prior value. ProcessCount then iterates the postings and calls list.updateMutationLayer(post, singleUidUpdate=true, ...) on each. For non-Lang scalar predicates fingerprintEdge returns math.MaxUint64, so the synthetic Del and the user Set both have Uid == math.MaxUint64. The first iteration (Set new) leaves mutationMap.currentEntries = [DeleteAll, Set new]; the second iteration (Del old) finds the Set we just inserted via findPosting and applies updateMutationLayer in singleUidUpdate mode, which unconditionally rewrites currentEntries to [DeleteAll] (the Del branch never appends mpost) — wiping the new value. Fix: in ProcessCount, when iterating a !isListEdge predicate's postings, if the list contains a Set/Ovr posting, treat any Del as synthetic and skip it for the data-list update. Standalone user Dels (no accompanying Set) are still applied. Index/reverse/count diffing already happen before ProcessCount runs and aren't affected. Repro: TestPipelineCountIndexConcurrent in worker/sort_test.go is a new conflict-aware in-process harness that mirrors the systest TestCountIndexConcurrentSetDelScalarPredicate. It runs 200 contending transactions setting <0x1> <name> "name<rand>" against a "string @index(exact) @count" schema with a fakeOracle that implements the same hasConflict algorithm as dgraph/cmd/zero/oracle.go. Pre-fix the test fails roughly 50% of runs with an empty data list and the wrong count buckets; post-fix it is stable across 20+ -count iterations and under -race. Existing tests (TestScalarPredicateIntCount, *RevCount, *Count, TestSingleUidReplacement, TestDeleteSetWithVarEdgeCorruptsData, TestStringIndexWithLang, TestMultipleTxnListCount, TestGetScalarList, TestDatetime) all pass. Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
1 parent 2d2afb1 commit 43b826d

2 files changed

Lines changed: 200 additions & 0 deletions

File tree

posting/index.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -540,7 +540,31 @@ func (mp *MutationPipeline) ProcessCount(ctx context.Context, pipeline *Predicat
540540
list.Lock()
541541
prevCount := list.GetLength(mp.txn.StartTs)
542542

543+
// For scalar (non-list) predicates, handleOldDeleteForSingle may have
544+
// appended a synthetic Del-of-old-value alongside the user's Set, so
545+
// that InsertTokenizerIndexes / ProcessReverse / count diffing can see
546+
// the prior value. The synthetic Del must NOT be applied to the data
547+
// list: scalar value postings all share Uid == math.MaxUint64
548+
// (fingerprintEdge returns MaxUint64 for non-Lang scalars), and
549+
// updateMutationLayer in singleUidUpdate mode would overwrite the
550+
// just-inserted Set with [DeleteAll] and drop the new value entirely.
551+
// A user-initiated Del (no accompanying Set) must still be applied.
552+
skipSyntheticDel := false
553+
if !isListEdge {
554+
hasSet := false
555+
for _, post := range postingList.Postings {
556+
if post.Op == Set || post.Op == Ovr {
557+
hasSet = true
558+
break
559+
}
560+
}
561+
skipSyntheticDel = hasSet
562+
}
563+
543564
for _, post := range postingList.Postings {
565+
if skipSyntheticDel && post.Op == Del {
566+
continue
567+
}
544568
found, _, _ := list.findPosting(post.StartTs, post.Uid)
545569
if found {
546570
if post.Op == Set && isListEdge {

worker/sort_test.go

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,14 @@ import (
1111
"math"
1212
"math/rand"
1313
"os"
14+
"strconv"
1415
"sync"
16+
"sync/atomic"
1517
"testing"
1618

1719
"github.com/dgraph-io/badger/v4"
1820
bpb "github.com/dgraph-io/badger/v4/pb"
21+
"github.com/dgraph-io/dgo/v250/protos/api"
1922
"github.com/dgraph-io/dgraph/v25/posting"
2023
"github.com/dgraph-io/dgraph/v25/protos/pb"
2124
"github.com/dgraph-io/dgraph/v25/schema"
@@ -409,6 +412,179 @@ func TestCount(t *testing.T) {
409412
require.Equal(t, subjects, len(uids.Uids))
410413
}
411414

415+
// fakeOracle is an in-memory stand-in for the zero Oracle. It hands out
416+
// monotonically increasing timestamps and rejects commits whose conflict
417+
// keys overlap a higher commitTs — same algorithm as
418+
// dgraph/cmd/zero/oracle.go's hasConflict.
419+
type fakeOracle struct {
420+
mu sync.Mutex
421+
nextTs uint64
422+
keyCommit map[uint64]uint64 // conflict-key fingerprint -> commitTs
423+
committed atomic.Int64
424+
aborted atomic.Int64
425+
}
426+
427+
func newFakeOracle(initialTs uint64) *fakeOracle {
428+
return &fakeOracle{nextTs: initialTs, keyCommit: map[uint64]uint64{}}
429+
}
430+
431+
func (o *fakeOracle) reserveStartTs() uint64 {
432+
o.mu.Lock()
433+
defer o.mu.Unlock()
434+
o.nextTs++
435+
return o.nextTs
436+
}
437+
438+
// tryCommit mirrors zero/oracle.go: for each conflict key, if a later
439+
// commitTs already exists, abort. Else stamp all keys with a fresh
440+
// commitTs and return it.
441+
func (o *fakeOracle) tryCommit(startTs uint64, conflictKeys []uint64) (uint64, bool) {
442+
o.mu.Lock()
443+
defer o.mu.Unlock()
444+
for _, k := range conflictKeys {
445+
if last, ok := o.keyCommit[k]; ok && last > startTs {
446+
o.aborted.Add(1)
447+
return 0, false
448+
}
449+
}
450+
o.nextTs++
451+
commitTs := o.nextTs
452+
for _, k := range conflictKeys {
453+
o.keyCommit[k] = commitTs
454+
}
455+
o.committed.Add(1)
456+
return commitTs, true
457+
}
458+
459+
// runPipelineTxn drives a single mutation through the new pipeline with
460+
// real conflict-aware commit semantics. Returns (committed, error).
461+
func runPipelineTxn(t *testing.T, ps *badger.DB, oracle *fakeOracle,
462+
edges []*pb.DirectedEdge) bool {
463+
t.Helper()
464+
startTs := oracle.reserveStartTs()
465+
txn := posting.Oracle().RegisterStartTs(startTs)
466+
467+
if err := newRunMutations(context.Background(), edges, txn); err != nil {
468+
t.Fatalf("pipeline failed at startTs=%d: %v", startTs, err)
469+
}
470+
471+
// FillContext bridges plists -> deltas (via Update) and emits the
472+
// txn's conflict keys as base-36 strings on ctx.Keys.
473+
ctxApi := &api.TxnContext{}
474+
txn.FillContext(ctxApi, 1, false)
475+
476+
keys := make([]uint64, 0, len(ctxApi.Keys))
477+
for _, k := range ctxApi.Keys {
478+
ki, err := strconv.ParseUint(k, 36, 64)
479+
require.NoError(t, err)
480+
keys = append(keys, ki)
481+
}
482+
483+
commitTs, ok := oracle.tryCommit(startTs, keys)
484+
if !ok {
485+
return false
486+
}
487+
writer := posting.NewTxnWriter(ps)
488+
require.NoError(t, txn.CommitToDisk(writer, commitTs))
489+
require.NoError(t, writer.Flush())
490+
txn.UpdateCachedKeys(commitTs)
491+
return true
492+
}
493+
494+
// TestPipelineCountIndexConcurrent mirrors the systest's
495+
// TestCountIndexConcurrentSetDelScalarPredicate at unit-test scope: many
496+
// concurrent transactions setting <0x1> <name> "name<rand>" against a
497+
// scalar string predicate with @index(exact) @count, with real
498+
// conflict-checking commit semantics. After everything settles, the data
499+
// list for 0x1 should hold exactly one value, the count(1) bucket should
500+
// reference exactly 0x1, and no other count bucket should reference 0x1.
501+
func TestPipelineCountIndexConcurrent(t *testing.T) {
502+
dir, err := os.MkdirTemp("", "storetest_")
503+
require.NoError(t, err)
504+
defer os.RemoveAll(dir)
505+
506+
ps, err := badger.OpenManaged(badger.DefaultOptions(dir))
507+
require.NoError(t, err)
508+
defer ps.Close()
509+
posting.Init(ps, 0, false)
510+
Init(ps)
511+
posting.Oracle().ResetTxns()
512+
513+
require.NoError(t, schema.ParseBytes(
514+
[]byte(`name: string @index(exact) @count .`), 1))
515+
516+
pred := x.AttrInRootNamespace("name")
517+
const target uint64 = 1
518+
519+
oracle := newFakeOracle(10)
520+
521+
const (
522+
numRoutines = 10
523+
txnsPerRoute = 20
524+
)
525+
526+
var wg sync.WaitGroup
527+
wg.Add(numRoutines)
528+
for r := 0; r < numRoutines; r++ {
529+
go func(seed int) {
530+
defer wg.Done()
531+
rnd := rand.New(rand.NewSource(int64(seed)))
532+
for i := 0; i < txnsPerRoute; i++ {
533+
value := []byte(fmt.Sprintf("name%d", rnd.Intn(10000)))
534+
// Retry on conflict — same as a client doing dg.NewTxn().Mutate().
535+
// Each attempt uses a fresh edge: makePostingFromEdge mutates
536+
// edge.ValueId during processing, and reusing the object across
537+
// attempts would make ValidateAndConvert see it as a uid edge.
538+
// Real production gets a freshly-deserialized edge per Raft apply.
539+
for attempt := 0; attempt < 100; attempt++ {
540+
edge := &pb.DirectedEdge{
541+
Entity: target,
542+
Attr: pred,
543+
Value: value,
544+
ValueType: pb.Posting_STRING,
545+
Op: pb.DirectedEdge_SET,
546+
}
547+
if runPipelineTxn(t, ps, oracle, []*pb.DirectedEdge{edge}) {
548+
break
549+
}
550+
}
551+
}
552+
}(r)
553+
}
554+
wg.Wait()
555+
556+
t.Logf("committed=%d aborted=%d", oracle.committed.Load(), oracle.aborted.Load())
557+
558+
// Verify final state: exactly one value on 0x1, that uid in count(1) only.
559+
readTxn := posting.Oracle().RegisterStartTs(math.MaxUint64)
560+
561+
dataKey := x.DataKey(pred, target)
562+
dpl, err := readTxn.Get(dataKey)
563+
require.NoError(t, err)
564+
// Scalar string predicate: AllValues returns the live posting list values
565+
// (one entry for a non-list scalar with a current value).
566+
vals, err := dpl.AllValues(math.MaxUint64)
567+
require.NoError(t, err)
568+
require.Equal(t, 1, len(vals),
569+
"scalar predicate should retain exactly one value, got %v", vals)
570+
571+
for c := 0; c <= 5; c++ {
572+
ck := x.CountKey(pred, uint32(c), false)
573+
cpl, err := readTxn.Get(ck)
574+
require.NoError(t, err)
575+
cuids, err := cpl.Uids(posting.ListOptions{ReadTs: math.MaxUint64})
576+
require.NoError(t, err)
577+
switch c {
578+
case 1:
579+
require.Equal(t, []uint64{target}, cuids.Uids,
580+
"count(1) bucket must contain exactly the target uid")
581+
default:
582+
require.NotContains(t, cuids.Uids, target,
583+
"count(%d) bucket must not contain the target uid", c)
584+
}
585+
}
586+
}
587+
412588
func TestDeleteSetWithVarEdgeCorruptsData(t *testing.T) {
413589
// Setup temporary directory for Badger DB
414590
dir, err := os.MkdirTemp("", "storetest_")

0 commit comments

Comments
 (0)