Skip to content

Commit c8f5fbc

Browse files
feat(mutations): replace pipeline on/off flag with edge-count threshold
The benchmark matrix in worker/pipeline_bench_test.go showed the pipeline loses ~2x on small mutations (≤10 edges total) and wins ~1.5x on bulk (live-loader sized: 1000 edges per txn across many predicates). A binary on/off flag forces an all-or-nothing choice, penalising whichever side of that crossover the workload spends most time on. Replace MutationsUsePipeline (bool) with MutationsPipelineThreshold (int): threshold = 0 -> never use the pipeline (default; legacy behavior) threshold = 1 -> always use the pipeline (any txn with ≥1 edge) threshold = N -> use the pipeline only when len(m.Edges) >= N The threshold compares against total edges in the proposal. From the benches the crossover is around 100; the live-loader 1M dataset uses ~1000 edges per txn, so anything from 100-1000 will engage the pipeline only on bulk-shaped mutations and leave small interactive mutations on the legacy serial path. Wiring: - x.WorkerConfig.MutationsPipelineThreshold (int) replaces the bool field. - feature-flags superflag: "mutations-pipeline-threshold=0". - alpha/run.go reads it via featureFlagsConf.GetInt64. - worker/draft.go applyMutations branches on `t > 0 && len(m.Edges) >= t`. Verified end-to-end against the live-loader benchmark (1million.rdf.gz, official 1M schema): threshold=0 : 13.56s, 80,129 N-Quads/s (legacy, matches baseline) threshold=1 : 9.92s, 115,742 N-Quads/s (always-on, matches prior) CLI usage: dgraph alpha --feature-flags="mutations-pipeline-threshold=200" Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
1 parent 7929232 commit c8f5fbc

4 files changed

Lines changed: 15 additions & 7 deletions

File tree

dgraph/cmd/alpha/run.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -796,7 +796,7 @@ func run() {
796796
x.Config.NormalizeCompatibilityMode = featureFlagsConf.GetString("normalize-compatibility-mode")
797797
enableDetailedMetrics := featureFlagsConf.GetBool("enable-detailed-metrics")
798798
x.WorkerConfig.SlowQueryLogThreshold = featureFlagsConf.GetDuration("log-slow-query-threshold")
799-
x.WorkerConfig.MutationsUsePipeline = featureFlagsConf.GetBool("mutations-use-pipeline")
799+
x.WorkerConfig.MutationsPipelineThreshold = int(featureFlagsConf.GetInt64("mutations-pipeline-threshold"))
800800

801801
x.PrintVersion()
802802
glog.Infof("x.Config: %+v", x.Config)

worker/draft.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -526,7 +526,7 @@ func (n *node) applyMutations(ctx context.Context, proposal *pb.Proposal) (rerr
526526
// Discard the posting lists from cache to release memory at the end.
527527
defer txn.Update()
528528

529-
if x.WorkerConfig.MutationsUsePipeline {
529+
if t := x.WorkerConfig.MutationsPipelineThreshold; t > 0 && len(m.Edges) >= t {
530530
mp := posting.NewMutationPipeline(txn)
531531
return mp.Process(ctx, m.Edges)
532532
}

worker/server_state.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ const (
4343
`lambda-url=;`
4444
CacheDefaults = `size-mb=4096; percentage=40,40,20; remove-on-update=false`
4545
FeatureFlagsDefaults = `normalize-compatibility-mode=; enable-detailed-metrics=false; ` +
46-
`log-slow-query-threshold=0; mutations-use-pipeline=false`
46+
`log-slow-query-threshold=0; mutations-pipeline-threshold=0`
4747
)
4848

4949
// ServerState holds the state of the Dgraph server.

x/config.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -138,10 +138,18 @@ type WorkerOptions struct {
138138
HardSync bool
139139
// Audit contains the audit flags that enables the audit.
140140
Audit bool
141-
// MutationsUsePipeline enables the per-predicate mutation pipeline in
142-
// applyMutations. When false (default), mutations follow the legacy
143-
// serial path. The flag is plumbed via the "feature-flags" superflag.
144-
MutationsUsePipeline bool
141+
// MutationsPipelineThreshold gates the per-predicate mutation pipeline
142+
// in applyMutations. A mutation runs through the pipeline only when
143+
// MutationsPipelineThreshold > 0 and len(m.Edges) >= the threshold;
144+
// otherwise it falls back to the legacy serial path. Set to 0 (default)
145+
// to disable the pipeline entirely. Set to 1 to always use the pipeline.
146+
// The pipeline pays goroutine spin-up cost per predicate, so small
147+
// mutations are slower on it; bulk multi-predicate mutations are
148+
// faster — pick a value above the per-mutation edge count where the
149+
// crossover happens for your workload (~100 in benchmarks here).
150+
// Plumbed via the "feature-flags" superflag as
151+
// "mutations-pipeline-threshold".
152+
MutationsPipelineThreshold int
145153
}
146154

147155
// WorkerConfig stores the global instance of the worker package's options.

0 commit comments

Comments
 (0)