Skip to content

Commit 7929232

Browse files
fix(pipeline): InsertTokenizerIndexes deadlocks on uids >= 2^63
When loading via dgraph live (or any mutation source whose uids span the full uint64 range, including xidmap-assigned uids), the per-predicate pipeline hung indefinitely on the very first batch with zero forward progress. A goroutine dump showed the dispatcher goroutine wedged on \`chan send (nil chan)\` at the line: chMap[int(uid)%numGo] <- uid uid is uint64. Casting directly to int produces a negative value for uid >= 2^63, so int(uid)%10 can be in [-9, -1]. chMap[-3] returns the zero value for a chan uint64, which is a nil channel; sending on a nil channel blocks forever. The 10 worker goroutines (also created here) were idle on \`for uid := range uids\` since no uids ever reached them, so the parent \`wg.Wait()\` and the surrounding errgroup never returned. applyMutations therefore never released the txn, the alpha's old-txn abort loop kept retrying every minute, and live-load showed "Txns: 0 N-Quads: 0" indefinitely. Fix: hash unsigned, then cast: \`chMap[int(uid%uint64(numGo))]\`. Verified end-to-end with the live loader against the 1million.rdf.gz benchmark dataset (1,041,684 n-quads, schema mixes [uid] @reverse @count, [uid] @count, datetime @index(year), string @index(...) @lang, geo @index(geo), string @index(exact) @upsert): legacy : 13.85s / 14.74s (avg ~14.3s, ~77k n-quads/s) pipeline : 9.65s / 9.36s (avg ~9.5s, ~116k n-quads/s) That is ~1.50x faster on a realistic multi-predicate, multi-index workload — i.e. the case the per-predicate runner pipeline is built for. Also adds worker/pipeline_bench_test.go: in-process Go benchmarks comparing legacy runMutation vs newRunMutations across a matrix of (predicates, edges-per-predicate, indexed/non-indexed) shapes. They show the pipeline loses ~2x on tiny mutations (1-10 edges) and wins 1.2x-1.55x on bulk (10 preds x 100+ edges, indexed or not), which is why the feature flag stays default-off and the live-loader speedup above is the right place to evaluate this work. Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
1 parent 43b826d commit 7929232

2 files changed

Lines changed: 215 additions & 1 deletion

File tree

posting/index.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,11 @@ func (mp *MutationPipeline) InsertTokenizerIndexes(ctx context.Context, pipeline
233233
}
234234

235235
for uid := range *postings {
236-
chMap[int(uid)%numGo] <- uid
236+
// uid is uint64; converting directly to int can produce a negative
237+
// value for uid >= 2^63, which would index outside chMap and resolve
238+
// to a nil channel (deadlocks the dispatcher). Hash unsigned, then
239+
// cast.
240+
chMap[int(uid%uint64(numGo))] <- uid
237241
}
238242

239243
for i := 0; i < numGo; i++ {

worker/pipeline_bench_test.go

Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
/*
2+
* SPDX-FileCopyrightText: © 2017-2025 Istari Digital, Inc.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package worker
7+
8+
import (
9+
"context"
10+
"fmt"
11+
"os"
12+
"testing"
13+
14+
"github.com/dgraph-io/badger/v4"
15+
"github.com/dgraph-io/dgraph/v25/posting"
16+
"github.com/dgraph-io/dgraph/v25/protos/pb"
17+
"github.com/dgraph-io/dgraph/v25/schema"
18+
"github.com/dgraph-io/dgraph/v25/x"
19+
)
20+
21+
// Benchmarks comparing the legacy serial mutation path (runMutation per edge)
22+
// with the new per-predicate mutation pipeline (newRunMutations).
23+
//
24+
// What the pipeline ought to win on:
25+
// - many predicates per transaction → one goroutine per predicate
26+
// - many indexed edges per predicate → 10-way intra-predicate
27+
// parallelism on tokenization
28+
//
29+
// What it shouldn't help (and may regret):
30+
// - tiny mutations (1-2 edges, 1 predicate) where goroutine spin-up cost
31+
// dominates the mutation work
32+
//
33+
// Each iteration is a single transaction: build a fresh batch of edges,
34+
// run mutations, txn.Update(), CommitToDisk. We do NOT include the b.ResetTimer()
35+
// before edge construction because edge construction is part of the
36+
// per-transaction cost the pipeline is supposed to amortize.
37+
38+
func benchSetup(b *testing.B, schemaTxt string) *badger.DB {
39+
b.Helper()
40+
dir, err := os.MkdirTemp("", "pipeline_bench_")
41+
if err != nil {
42+
b.Fatal(err)
43+
}
44+
b.Cleanup(func() { _ = os.RemoveAll(dir) })
45+
46+
ps, err := badger.OpenManaged(badger.DefaultOptions(dir).WithLoggingLevel(badger.ERROR))
47+
if err != nil {
48+
b.Fatal(err)
49+
}
50+
b.Cleanup(func() { _ = ps.Close() })
51+
52+
posting.Init(ps, 0, false)
53+
Init(ps)
54+
posting.Oracle().ResetTxns()
55+
if err := schema.ParseBytes([]byte(schemaTxt), 1); err != nil {
56+
b.Fatal(err)
57+
}
58+
return ps
59+
}
60+
61+
// buildEdges constructs numPreds*edgesPerPred edges across distinct predicates,
62+
// indexed-string-valued. The same generator drives both legacy and pipeline
63+
// runs so the input is identical.
64+
func buildEdges(numPreds, edgesPerPred int, baseUid uint64) []*pb.DirectedEdge {
65+
edges := make([]*pb.DirectedEdge, 0, numPreds*edgesPerPred)
66+
for p := 0; p < numPreds; p++ {
67+
attr := x.AttrInRootNamespace(fmt.Sprintf("p%d", p))
68+
for e := 0; e < edgesPerPred; e++ {
69+
edges = append(edges, &pb.DirectedEdge{
70+
Entity: baseUid + uint64(e),
71+
Attr: attr,
72+
Value: []byte(fmt.Sprintf("v%d_%d", p, e)),
73+
ValueType: pb.Posting_STRING,
74+
Op: pb.DirectedEdge_SET,
75+
})
76+
}
77+
}
78+
return edges
79+
}
80+
81+
// schemaForPreds emits "p0: string @index(exact) ., p1: ..., ..." (or no
82+
// index, depending on indexed). Each predicate is a distinct list-or-scalar.
83+
func schemaForPreds(numPreds int, indexed bool, list bool) string {
84+
var b []byte
85+
for p := 0; p < numPreds; p++ {
86+
ty := "string"
87+
if list {
88+
ty = "[string]"
89+
}
90+
idx := ""
91+
if indexed {
92+
idx = " @index(exact)"
93+
}
94+
b = append(b, []byte(fmt.Sprintf("p%d: %s%s .\n", p, ty, idx))...)
95+
}
96+
return string(b)
97+
}
98+
99+
// runOne executes one transaction's mutations through the chosen path.
100+
// startTs/commitTs must be unique per call.
101+
func runOnePipeline(b *testing.B, ps *badger.DB, edges []*pb.DirectedEdge, startTs, commitTs uint64) {
102+
b.Helper()
103+
txn := posting.Oracle().RegisterStartTs(startTs)
104+
if err := newRunMutations(context.Background(), edges, txn); err != nil {
105+
b.Fatal(err)
106+
}
107+
txn.Update()
108+
w := posting.NewTxnWriter(ps)
109+
if err := txn.CommitToDisk(w, commitTs); err != nil {
110+
b.Fatal(err)
111+
}
112+
if err := w.Flush(); err != nil {
113+
b.Fatal(err)
114+
}
115+
txn.UpdateCachedKeys(commitTs)
116+
}
117+
118+
func runOneLegacy(b *testing.B, ps *badger.DB, edges []*pb.DirectedEdge, startTs, commitTs uint64) {
119+
b.Helper()
120+
txn := posting.Oracle().RegisterStartTs(startTs)
121+
for _, e := range edges {
122+
if err := runMutation(context.Background(), e, txn); err != nil {
123+
b.Fatal(err)
124+
}
125+
}
126+
txn.Update()
127+
w := posting.NewTxnWriter(ps)
128+
if err := txn.CommitToDisk(w, commitTs); err != nil {
129+
b.Fatal(err)
130+
}
131+
if err := w.Flush(); err != nil {
132+
b.Fatal(err)
133+
}
134+
txn.UpdateCachedKeys(commitTs)
135+
}
136+
137+
// runBench runs sub-benchmarks (legacy vs pipeline) for a single
138+
// (numPreds, edgesPerPred, indexed, list) configuration.
139+
func runBench(b *testing.B, numPreds, edgesPerPred int, indexed, list bool) {
140+
for _, mode := range []struct {
141+
name string
142+
fn func(*testing.B, *badger.DB, []*pb.DirectedEdge, uint64, uint64)
143+
}{
144+
{"legacy", runOneLegacy},
145+
{"pipeline", runOnePipeline},
146+
} {
147+
b.Run(mode.name, func(b *testing.B) {
148+
ps := benchSetup(b, schemaForPreds(numPreds, indexed, list))
149+
b.ReportAllocs()
150+
b.ResetTimer()
151+
ts := uint64(10)
152+
for i := 0; i < b.N; i++ {
153+
edges := buildEdges(numPreds, edgesPerPred, uint64(i)*1_000_000+1)
154+
mode.fn(b, ps, edges, ts, ts+1)
155+
ts += 2
156+
}
157+
})
158+
}
159+
}
160+
161+
// 1 predicate, 1 edge — smallest possible mutation. Pipeline overhead
162+
// is most visible here.
163+
func BenchmarkMutate_1pred_1edge_indexed(b *testing.B) {
164+
runBench(b, 1, 1, true, false)
165+
}
166+
167+
// 1 predicate, 100 indexed edges — exercises intra-predicate
168+
// tokenization parallelism.
169+
func BenchmarkMutate_1pred_100edges_indexed(b *testing.B) {
170+
runBench(b, 1, 100, true, false)
171+
}
172+
173+
// 10 predicates, 1 edge each — per-predicate parallelism with light work
174+
// per predicate.
175+
func BenchmarkMutate_10preds_1edge_indexed(b *testing.B) {
176+
runBench(b, 10, 1, true, false)
177+
}
178+
179+
// 10 predicates, 100 edges each — full benefit case: per-predicate AND
180+
// intra-predicate parallelism on indexed work.
181+
func BenchmarkMutate_10preds_100edges_indexed(b *testing.B) {
182+
runBench(b, 10, 100, true, false)
183+
}
184+
185+
// 1 predicate, 1000 indexed edges — heavy intra-predicate.
186+
func BenchmarkMutate_1pred_1000edges_indexed(b *testing.B) {
187+
runBench(b, 1, 1000, true, false)
188+
}
189+
190+
// 10 predicates, 1000 edges each — large mutation, indexed.
191+
func BenchmarkMutate_10preds_1000edges_indexed(b *testing.B) {
192+
runBench(b, 10, 1000, true, false)
193+
}
194+
195+
// Non-indexed counterparts isolate per-predicate parallelism from the
196+
// tokenization parallelism.
197+
func BenchmarkMutate_10preds_1000edges_noindex(b *testing.B) {
198+
runBench(b, 10, 1000, false, false)
199+
}
200+
201+
// Very large indexed mutation: 50 predicates × 1000 edges each = 50k edges.
202+
// Where the pipeline should shine most.
203+
func BenchmarkMutate_50preds_1000edges_indexed(b *testing.B) {
204+
runBench(b, 50, 1000, true, false)
205+
}
206+
207+
// 50 predicates, 100 edges each (5k edges) — typical-ish bulk write shape.
208+
func BenchmarkMutate_50preds_100edges_indexed(b *testing.B) {
209+
runBench(b, 50, 100, true, false)
210+
}

0 commit comments

Comments
 (0)