From 45eefc7b4a346fd50c18b93797111552e8ba13b9 Mon Sep 17 00:00:00 2001 From: mattthew Date: Thu, 5 Feb 2026 16:29:40 -0500 Subject: [PATCH 01/15] Fix endpoint following opentelemetry upgrade --- compose/compose.go | 54 ++++++++++++++++++++++++++++----------- contrib/jepsen/main.go | 2 +- worker/docker-compose.yml | 25 ++++++++---------- x/metrics.go | 8 ++++-- 4 files changed, 57 insertions(+), 32 deletions(-) diff --git a/compose/compose.go b/compose/compose.go index 796fa484d96..044e02cd30d 100644 --- a/compose/compose.go +++ b/compose/compose.go @@ -79,7 +79,7 @@ type options struct { DataVol bool TmpFS bool UserOwnership bool - Jaeger bool + Jaeger int Metrics bool PortOffset int Verbosity int @@ -202,8 +202,8 @@ func initService(basename string, idx, grpcPort int) service { svc.Command += fmt.Sprintf(" --cwd=/data/%s", svc.name) } svc.Command += " " + basename - if opts.Jaeger { - svc.Command += ` --trace "jaeger=http://jaeger:14268;"` + if opts.Jaeger > 0 { + svc.Command += ` --trace "jaeger=http://jaeger:4318;"` } return svc } @@ -375,23 +375,44 @@ func getVolume(vol string) volume { } -func getJaeger() service { - svc := service{ - Image: "jaegertracing/all-in-one:1.18", +func getJaeger(version int) service { + if version == 2 { + // Jaeger v2.x uses OpenTelemetry Collector architecture with YAML config. + // OTLP receivers bind to localhost by default, so we need JAEGER_LISTEN_HOST=0.0.0.0 + // for container environments. + return service{ + Image: "jaegertracing/jaeger:latest", + ContainerName: containerName("jaeger"), + Ports: []string{ + toPort(4318), + toPort(16686), + }, + Environment: []string{ + "JAEGER_LISTEN_HOST=0.0.0.0", + }, + } + } + // Jaeger v1.60 with badger storage + return service{ + Image: "jaegertracing/all-in-one:1.60", ContainerName: containerName("jaeger"), - WorkingDir: "/working/jaeger", + User: "0", Ports: []string{ - toPort(14268), + toPort(4318), toPort(16686), }, + Volumes: []volume{{ + Type: "volume", + Source: "jaeger-volume", + Target: "/jaeger", + }}, Environment: []string{ "SPAN_STORAGE_TYPE=badger", }, Command: "--badger.ephemeral=false" + - " --badger.directory-key /working/jaeger" + - " --badger.directory-value /working/jaeger", + " --badger.directory-key /jaeger" + + " --badger.directory-value /jaeger", } - return svc } func getMinio(minioDataDir string) service { @@ -543,8 +564,8 @@ func main() { "run as the current user rather than root") cmd.PersistentFlags().BoolVar(&opts.TmpFS, "tmpfs", false, "store w and zw directories on a tmpfs filesystem") - cmd.PersistentFlags().BoolVarP(&opts.Jaeger, "jaeger", "j", false, - "include jaeger service") + cmd.PersistentFlags().IntVarP(&opts.Jaeger, "jaeger", "j", 0, + "include jaeger service (1 for v1.60, 2 for v2.x)") cmd.PersistentFlags().BoolVarP(&opts.Metrics, "metrics", "m", false, "include metrics (prometheus, grafana) services") cmd.PersistentFlags().IntVarP(&opts.PortOffset, "port_offset", "o", 100, @@ -689,8 +710,11 @@ func main() { cfg.Volumes["data"] = stringMap{} } - if opts.Jaeger { - services["jaeger"] = getJaeger() + if opts.Jaeger > 0 { + services["jaeger"] = getJaeger(opts.Jaeger) + if opts.Jaeger == 1 { + cfg.Volumes["jaeger-volume"] = stringMap{} + } } if opts.Ratel { diff --git a/contrib/jepsen/main.go b/contrib/jepsen/main.go index 83288b227b6..b76c394792c 100644 --- a/contrib/jepsen/main.go +++ b/contrib/jepsen/main.go @@ -116,7 +116,7 @@ var ( testCount = pflag.IntP("test-count", "c", 1, "Test count per Jepsen test.") jaeger = pflag.StringP("jaeger", "j", "", "Run with Jaeger collector. Set to empty string to disable collection to Jaeger."+ - " Otherwise set to http://jaeger:14268.") + " Otherwise set to http://jaeger:4318.") jaegerSaveTraces = pflag.Bool("jaeger-save-traces", true, "Save Jaeger traces on test error.") deferDbTeardown = pflag.Bool("defer-db-teardown", false, "Wait until user input to tear down DB nodes") diff --git a/worker/docker-compose.yml b/worker/docker-compose.yml index ef4fe6587c3..079e63af40f 100644 --- a/worker/docker-compose.yml +++ b/worker/docker-compose.yml @@ -16,7 +16,7 @@ services: target: /gobin read_only: true command: - /gobin/dgraph ${COVERAGE_OUTPUT} alpha --trace "jaeger=http://jaeger:14268;" --my=alpha1:7080 + /gobin/dgraph ${COVERAGE_OUTPUT} alpha --trace "jaeger=http://jaeger:4318;" --my=alpha1:7080 --zero=zero1:5080,zero2:5080,zero3:5080 --logtostderr -v=2 --raft "idx=1; group=1; snapshot-after-entries=100; snapshot-after-duration=1m" --security "whitelist=0.0.0.0/0;" alpha2: @@ -33,7 +33,7 @@ services: target: /gobin read_only: true command: - /gobin/dgraph ${COVERAGE_OUTPUT} alpha --trace "jaeger=http://jaeger:14268;" --my=alpha2:7080 + /gobin/dgraph ${COVERAGE_OUTPUT} alpha --trace "jaeger=http://jaeger:4318;" --my=alpha2:7080 --zero=zero1:5080,zero2:5080,zero3:5080 --logtostderr -v=2 --raft "idx=2; group=1; snapshot-after-entries=100; snapshot-after-duration=1m" --security "whitelist=0.0.0.0/0;" alpha3: @@ -50,7 +50,7 @@ services: target: /gobin read_only: true command: - /gobin/dgraph ${COVERAGE_OUTPUT} alpha --trace "jaeger=http://jaeger:14268;" --my=alpha3:7080 + /gobin/dgraph ${COVERAGE_OUTPUT} alpha --trace "jaeger=http://jaeger:4318;" --my=alpha3:7080 --zero=zero1:5080,zero2:5080,zero3:5080 --logtostderr -v=2 --raft "idx=3; group=1; snapshot-after-entries=100; snapshot-after-duration=1m" --security "whitelist=0.0.0.0/0;" alpha4: @@ -67,7 +67,7 @@ services: target: /gobin read_only: true command: - /gobin/dgraph ${COVERAGE_OUTPUT} alpha --trace "jaeger=http://jaeger:14268;" --my=alpha4:7080 + /gobin/dgraph ${COVERAGE_OUTPUT} alpha --trace "jaeger=http://jaeger:4318;" --my=alpha4:7080 --zero=zero1:5080,zero2:5080,zero3:5080 --logtostderr -v=2 --raft "idx=4; group=2; snapshot-after-entries=100; snapshot-after-duration=1m" --security "whitelist=0.0.0.0/0;" alpha5: @@ -84,7 +84,7 @@ services: target: /gobin read_only: true command: - /gobin/dgraph ${COVERAGE_OUTPUT} alpha --trace "jaeger=http://jaeger:14268;" --my=alpha5:7080 + /gobin/dgraph ${COVERAGE_OUTPUT} alpha --trace "jaeger=http://jaeger:4318;" --my=alpha5:7080 --zero=zero1:5080,zero2:5080,zero3:5080 --logtostderr -v=2 --raft "idx=5; group=2; snapshot-after-entries=100; snapshot-after-duration=1m" --security "whitelist=0.0.0.0/0;" alpha6: @@ -101,20 +101,17 @@ services: target: /gobin read_only: true command: - /gobin/dgraph ${COVERAGE_OUTPUT} alpha --trace "jaeger=http://jaeger:14268;" --my=alpha6:7080 + /gobin/dgraph ${COVERAGE_OUTPUT} alpha --trace "jaeger=http://jaeger:4318;" --my=alpha6:7080 --zero=zero1:5080,zero2:5080,zero3:5080 --logtostderr -v=2 --raft "idx=6; group=2; snapshot-after-entries=100; snapshot-after-duration=1m" --security "whitelist=0.0.0.0/0;" jaeger: image: jaegertracing/all-in-one:1.60 - working_dir: /working/jaeger environment: - SPAN_STORAGE_TYPE=badger ports: - - "14268" + - "4318" - "16686" - command: - --badger.ephemeral=false --badger.directory-key /working/jaeger --badger.directory-value - /working/jaeger + command: --badger.ephemeral=true zero1: image: dgraph/dgraph:local working_dir: /data/zero1 @@ -130,7 +127,7 @@ services: read_only: true command: /gobin/dgraph ${COVERAGE_OUTPUT} zero --telemetry "reports=false;" --trace - "jaeger=http://jaeger:14268;" --raft='idx=1' --my=zero1:5080 --replicas=3 --logtostderr -v=2 + "jaeger=http://jaeger:4318;" --raft='idx=1' --my=zero1:5080 --replicas=3 --logtostderr -v=2 --bindall zero2: image: dgraph/dgraph:local @@ -149,7 +146,7 @@ services: read_only: true command: /gobin/dgraph ${COVERAGE_OUTPUT} zero --telemetry "reports=false;" --trace - "jaeger=http://jaeger:14268;" --raft='idx=2' --my=zero2:5080 --replicas=3 --logtostderr -v=2 + "jaeger=http://jaeger:4318;" --raft='idx=2' --my=zero2:5080 --replicas=3 --logtostderr -v=2 --peer=zero1:5080 zero3: image: dgraph/dgraph:local @@ -168,6 +165,6 @@ services: read_only: true command: /gobin/dgraph ${COVERAGE_OUTPUT} zero --telemetry "reports=false;" --trace - "jaeger=http://jaeger:14268;" --raft='idx=3' --my=zero3:5080 --replicas=3 --logtostderr -v=2 + "jaeger=http://jaeger:4318;" --raft='idx=3' --my=zero3:5080 --replicas=3 --logtostderr -v=2 --peer=zero1:5080 volumes: {} diff --git a/x/metrics.go b/x/metrics.go index 5f79ba21ceb..112ede61111 100644 --- a/x/metrics.go +++ b/x/metrics.go @@ -632,11 +632,13 @@ func RegisterExporters(conf *viper.Viper, service string) { // Set up Jaeger exporter if configured if collector := t.GetString("jaeger"); len(collector) > 0 { + // WithEndpoint expects host:port, not a full URL. Strip any scheme prefix. + endpoint := strings.TrimPrefix(strings.TrimPrefix(collector, "http://"), "https://") // Create Jaeger exporter using OpenTelemetry OTLP jaegerExp, err := otlptrace.New( context.Background(), otlptracehttp.NewClient( - otlptracehttp.WithEndpoint(collector), + otlptracehttp.WithEndpoint(endpoint), otlptracehttp.WithInsecure(), ), ) @@ -659,11 +661,13 @@ func RegisterExporters(conf *viper.Viper, service string) { // Set up OTLP exporter for Datadog if configured // Note: In OpenTelemetry, typically Datadog integration uses the OTLP exporter if collector := t.GetString("datadog"); len(collector) > 0 { + // WithEndpoint expects host:port, not a full URL. Strip any scheme prefix. + endpoint := strings.TrimPrefix(strings.TrimPrefix(collector, "http://"), "https://") // Create OTLP exporter for Datadog ddExporter, err := otlptrace.New( context.Background(), otlptracehttp.NewClient( - otlptracehttp.WithEndpoint(collector), + otlptracehttp.WithEndpoint(endpoint), otlptracehttp.WithInsecure(), ), ) From 0291a90aafc20df6304880ea2cb72852dd73f8a9 Mon Sep 17 00:00:00 2001 From: mattthew Date: Mon, 9 Feb 2026 20:10:11 -0500 Subject: [PATCH 02/15] Add a UTF conversion func --- x/x.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/x/x.go b/x/x.go index 4204486885e..0444900735b 100644 --- a/x/x.go +++ b/x/x.go @@ -148,6 +148,11 @@ func init() { GrootUid.Store(RootNamespace, 0) } +// SafeUTF8 sanitizes a string for OTLP export by replacing invalid UTF-8 sequences. +func SafeUTF8(s string) string { + return strings.ToValidUTF8(s, "?") +} + // ShouldCrash returns true if the error should cause the process to crash. func ShouldCrash(err error) bool { if err == nil { From 89b1f0e662ae1c3eed5bddd1bdac072f1ce78065 Mon Sep 17 00:00:00 2001 From: mattthew Date: Mon, 9 Feb 2026 20:16:42 -0500 Subject: [PATCH 03/15] Add per-service naming support for tracing Previously, all zeros and alphas shared the same service name --- x/flags.go | 4 +++- x/metrics.go | 5 +++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/x/flags.go b/x/flags.go index a6ac0063463..8a2758f2b53 100644 --- a/x/flags.go +++ b/x/flags.go @@ -12,7 +12,7 @@ import ( ) const ( - TraceDefaults = `ratio=0.01; jaeger=; datadog=;` + TraceDefaults = `ratio=0.01; jaeger=; datadog=; service=;` TelemetryDefaults = `reports=true;sentry=false;` ) @@ -34,6 +34,8 @@ func FillCommonFlags(flag *pflag.FlagSet) { Flag("datadog", "URL of Datadog to send OpenCensus traces. As of now, the trace exporter does not "+ "support annotation logs and discards them."). + Flag("service", + "Custom service name for tracing. If set, overrides the default (dgraph.alpha/dgraph.zero)."). String()) flag.String("survive", "process", diff --git a/x/metrics.go b/x/metrics.go index 112ede61111..d81621a6b89 100644 --- a/x/metrics.go +++ b/x/metrics.go @@ -609,6 +609,11 @@ func SinceMs(startTime time.Time) float64 { func RegisterExporters(conf *viper.Viper, service string) { if traceFlag := conf.GetString("trace"); len(traceFlag) > 0 { t := z.NewSuperFlag(traceFlag).MergeAndCheckDefault(TraceDefaults) + // Use custom service name if specified, otherwise use the default + if customService := t.GetString("service"); len(customService) > 0 { + service = customService + } + // Create resource with service information res := resource.NewWithAttributes( semconv.SchemaURL, From 3a8788cf3ee5d7b602427e46f3e927ea261dc72d Mon Sep 17 00:00:00 2001 From: mattthew Date: Mon, 9 Feb 2026 20:18:39 -0500 Subject: [PATCH 04/15] Add support for proper otel cross service context propagation --- dgraph/cmd/alpha/run.go | 5 +++-- dgraph/cmd/zero/run.go | 2 +- worker/sort.go | 29 ++++++++++++++++++++++++- worker/task.go | 47 ++++++++++++++++++++++++++++++++++------- worker/worker.go | 3 +-- x/metrics.go | 8 +++++++ 6 files changed, 80 insertions(+), 14 deletions(-) diff --git a/dgraph/cmd/alpha/run.go b/dgraph/cmd/alpha/run.go index 1d696c516d6..d68860a23e1 100644 --- a/dgraph/cmd/alpha/run.go +++ b/dgraph/cmd/alpha/run.go @@ -28,7 +28,7 @@ import ( "github.com/mark3labs/mcp-go/server" "github.com/pkg/errors" "github.com/spf13/cobra" - "go.opencensus.io/plugin/ocgrpc" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "go.opentelemetry.io/contrib/zpages" "golang.org/x/net/trace" "google.golang.org/grpc" @@ -456,7 +456,7 @@ func serveGRPC(l net.Listener, tlsCfg *tls.Config, closer *z.Closer) { grpc.MaxRecvMsgSize(x.GrpcMaxSize), grpc.MaxSendMsgSize(x.GrpcMaxSize), grpc.MaxConcurrentStreams(1000), - grpc.StatsHandler(&ocgrpc.ServerHandler{}), + grpc.StatsHandler(otelgrpc.NewServerHandler()), grpc.UnaryInterceptor(audit.AuditRequestGRPC), } if tlsCfg != nil { @@ -812,6 +812,7 @@ func run() { posting.Init(worker.State.Pstore, postingListCacheSize, removeOnUpdate) posting.SetEnabledDetailedMetrics(enableDetailedMetrics) defer posting.Cleanup() + worker.Init(worker.State.Pstore) // setup shutdown os signal handler diff --git a/dgraph/cmd/zero/run.go b/dgraph/cmd/zero/run.go index e5ddc3e36c4..5fe8a88e987 100644 --- a/dgraph/cmd/zero/run.go +++ b/dgraph/cmd/zero/run.go @@ -142,7 +142,7 @@ func (st *state) serveGRPC(l net.Listener, store *raftwal.DiskStorage) { grpc.MaxRecvMsgSize(x.GrpcMaxSize), grpc.MaxSendMsgSize(x.GrpcMaxSize), grpc.MaxConcurrentStreams(1000), - grpc.StatsHandler(otelgrpc.NewClientHandler()), + grpc.StatsHandler(otelgrpc.NewServerHandler()), grpc.UnaryInterceptor(audit.AuditRequestGRPC), } diff --git a/worker/sort.go b/worker/sort.go index deabecdac26..ef3f06b219a 100644 --- a/worker/sort.go +++ b/worker/sort.go @@ -16,7 +16,9 @@ import ( "github.com/golang/glog" "github.com/pkg/errors" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/trace" + "google.golang.org/grpc/metadata" "github.com/dgraph-io/badger/v4" "github.com/dgraph-io/dgraph/v25/algo" @@ -66,10 +68,19 @@ func SortOverNetwork(ctx context.Context, q *pb.SortMessage) (*pb.SortResult, er return processSort(ctx, q) } + // Add span for cross-alpha network call + ctx, networkSpan := otel.Tracer("worker").Start(ctx, "SortOverNetwork.RemoteCall") + networkSpan.SetAttributes( + attribute.String("target_group", fmt.Sprintf("%d", gid)), + attribute.String("attr", x.SafeUTF8(q.Order[0].Attr)), + attribute.Bool("is_remote", true), + ) + result, err := processWithBackupRequest( ctx, gid, func(ctx context.Context, c pb.WorkerClient) (interface{}, error) { return c.Sort(ctx, q) }) + networkSpan.End() if err != nil { return &emptySortResult, err } @@ -81,7 +92,23 @@ func (w *grpcWorker) Sort(ctx context.Context, s *pb.SortMessage) (*pb.SortResul if ctx.Err() != nil { return &emptySortResult, ctx.Err() } - ctx, span := otel.Tracer("").Start(ctx, "worker.Sort") + + // Manually extract trace context from gRPC metadata for cross-alpha tracing + if md, ok := metadata.FromIncomingContext(ctx); ok { + propagator := propagation.NewCompositeTextMapPropagator( + propagation.TraceContext{}, + propagation.Baggage{}, + ) + carrier := propagation.HeaderCarrier{} + for k, vals := range md { + for _, v := range vals { + carrier.Set(k, v) + } + } + ctx = propagator.Extract(ctx, carrier) + } + + ctx, span := otel.Tracer("worker").Start(ctx, "worker.Sort") defer span.End() gid, err := groups().BelongsToReadOnly(s.Order[0].Attr, s.ReadTs) diff --git a/worker/task.go b/worker/task.go index e5c5dd20835..cde73a42b5a 100644 --- a/worker/task.go +++ b/worker/task.go @@ -20,8 +20,10 @@ import ( "github.com/pkg/errors" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/trace" "golang.org/x/sync/errgroup" + "google.golang.org/grpc/metadata" "google.golang.org/protobuf/proto" "github.com/dgraph-io/badger/v4" @@ -142,10 +144,19 @@ func ProcessTaskOverNetwork(ctx context.Context, q *pb.Query) (*pb.Result, error return processTask(ctx, q, gid) } + // Add span for cross-alpha network call + ctx, networkSpan := otel.Tracer("worker").Start(ctx, "ProcessTaskOverNetwork.RemoteCall") + networkSpan.SetAttributes( + attribute.String("target_group", fmt.Sprintf("%d", gid)), + attribute.String("predicate", x.SafeUTF8(attr)), + attribute.Bool("is_remote", true), + ) + result, err := processWithBackupRequest(ctx, gid, func(ctx context.Context, c pb.WorkerClient) (interface{}, error) { return c.ServeTask(ctx, q) }) + networkSpan.End() if err != nil { return nil, err } @@ -339,7 +350,7 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er defer stop() span.AddEvent("Number of uids and args.srcFn", trace.WithAttributes( attribute.Int64("uids_count", int64(srcFn.n)), - attribute.String("srcFn", fmt.Sprintf("%+v", args.srcFn)))) + attribute.String("srcFn", x.SafeUTF8(fmt.Sprintf("%+v", args.srcFn))))) switch srcFn.fnType { case notAFunction, aggregatorFn, passwordFn, compareAttrFn, similarToFn: @@ -786,7 +797,7 @@ func (qs *queryState) handleUidPostings( defer stop() span.AddEvent("Number of uids and args.srcFn", trace.WithAttributes( attribute.Int64("uids_count", int64(srcFn.n)), - attribute.String("srcFn", fmt.Sprintf("%+v", args.srcFn)))) + attribute.String("srcFn", x.SafeUTF8(fmt.Sprintf("%+v", args.srcFn))))) if srcFn.n == 0 { return nil } @@ -1003,10 +1014,11 @@ const ( // processTask processes the query, accumulates and returns the result. func processTask(ctx context.Context, q *pb.Query, gid uint32) (*pb.Result, error) { - ctx, span := otel.Tracer("").Start(ctx, "processTask."+q.Attr) + safeAttr := x.SafeUTF8(q.Attr) + ctx, span := otel.Tracer("").Start(ctx, "processTask."+safeAttr) defer span.End() - stop := x.SpanTimer(span, "processTask"+q.Attr) + stop := x.SpanTimer(span, "processTask"+safeAttr) defer stop() span.SetAttributes( @@ -1236,7 +1248,7 @@ func (qs *queryState) handleRegexFunction(ctx context.Context, arg funcArgs) err if span != nil { span.AddEvent("Processing UIDs", trace.WithAttributes( attribute.Int64("uid_count", int64(arg.srcFn.n)), - attribute.String("srcFn", fmt.Sprintf("%+v", arg.srcFn)))) + attribute.String("srcFn", x.SafeUTF8(fmt.Sprintf("%+v", arg.srcFn))))) } attr := arg.q.Attr @@ -1353,7 +1365,7 @@ func (qs *queryState) handleCompareFunction(ctx context.Context, arg funcArgs) e defer stop() span.AddEvent("Processing UIDs", trace.WithAttributes( attribute.Int64("uid_count", int64(arg.srcFn.n)), - attribute.String("srcFn", fmt.Sprintf("%+v", arg.srcFn)))) + attribute.String("srcFn", x.SafeUTF8(fmt.Sprintf("%+v", arg.srcFn))))) attr := arg.q.Attr span.AddEvent("Function information", trace.WithAttributes( @@ -1521,7 +1533,7 @@ func (qs *queryState) handleMatchFunction(ctx context.Context, arg funcArgs) err defer stop() span.AddEvent("Processing UIDs", trace.WithAttributes( attribute.Int64("uid_count", int64(arg.srcFn.n)), - attribute.String("srcFn", fmt.Sprintf("%+v", arg.srcFn)))) + attribute.String("srcFn", x.SafeUTF8(fmt.Sprintf("%+v", arg.srcFn))))) attr := arg.q.Attr typ := arg.srcFn.atype @@ -2207,7 +2219,26 @@ func interpretVFloatOrUid(val string) ([]float32, uint64, error) { // ServeTask is used to respond to a query. func (w *grpcWorker) ServeTask(ctx context.Context, q *pb.Query) (*pb.Result, error) { - ctx, span := otel.Tracer("").Start(ctx, "worker.ServeTask") + // Manually extract trace context from gRPC metadata using the propagator + // This ensures the trace context is properly extracted for cross-alpha tracing + if md, ok := metadata.FromIncomingContext(ctx); ok { + propagator := propagation.NewCompositeTextMapPropagator( + propagation.TraceContext{}, + propagation.Baggage{}, + ) + carrier := propagation.HeaderCarrier{} + for k, vals := range md { + for _, v := range vals { + carrier.Set(k, v) + } + } + ctx = propagator.Extract(ctx, carrier) + } + + // Sanitize attr for span name to ensure valid UTF-8 for OTLP export + safeAttr := x.SafeUTF8(q.Attr) + ctx, span := otel.Tracer("worker").Start(ctx, "worker.ServeTask", + trace.WithAttributes(attribute.String("predicate", safeAttr))) defer span.End() if ctx.Err() != nil { diff --git a/worker/worker.go b/worker/worker.go index f50800264ad..8fd9ec3f31c 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -55,9 +55,8 @@ func Init(ps *badger.DB) { grpc.MaxRecvMsgSize(x.GrpcMaxSize), grpc.MaxSendMsgSize(x.GrpcMaxSize), grpc.MaxConcurrentStreams(math.MaxInt32), - grpc.StatsHandler(otelgrpc.NewClientHandler()), + grpc.StatsHandler(otelgrpc.NewServerHandler()), } - if x.WorkerConfig.TLSServerConfig != nil { grpcOpts = append(grpcOpts, grpc.Creds(credentials.NewTLS(x.WorkerConfig.TLSServerConfig))) } diff --git a/x/metrics.go b/x/metrics.go index d81621a6b89..d4b307a4823 100644 --- a/x/metrics.go +++ b/x/metrics.go @@ -31,6 +31,7 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/exporters/otlp/otlptrace" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" + "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/sdk/resource" traceTel "go.opentelemetry.io/otel/sdk/trace" semconv "go.opentelemetry.io/otel/semconv/v1.4.0" @@ -609,6 +610,13 @@ func SinceMs(startTime time.Time) float64 { func RegisterExporters(conf *viper.Viper, service string) { if traceFlag := conf.GetString("trace"); len(traceFlag) > 0 { t := z.NewSuperFlag(traceFlag).MergeAndCheckDefault(TraceDefaults) + + // Set up propagator for trace context propagation across services + otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator( + propagation.TraceContext{}, + propagation.Baggage{}, + )) + // Use custom service name if specified, otherwise use the default if customService := t.GetString("service"); len(customService) > 0 { service = customService From 487b936332e86ae5c5b2e86d3b96f3994fc930f4 Mon Sep 17 00:00:00 2001 From: mattthew Date: Mon, 9 Feb 2026 20:19:37 -0500 Subject: [PATCH 05/15] Add flags for handling of tracing superflag config --- compose/compose.go | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/compose/compose.go b/compose/compose.go index 044e02cd30d..a7e9ee2987a 100644 --- a/compose/compose.go +++ b/compose/compose.go @@ -80,6 +80,8 @@ type options struct { TmpFS bool UserOwnership bool Jaeger int + TraceRatio string + TraceService bool Metrics bool PortOffset int Verbosity int @@ -203,7 +205,14 @@ func initService(basename string, idx, grpcPort int) service { } svc.Command += " " + basename if opts.Jaeger > 0 { - svc.Command += ` --trace "jaeger=http://jaeger:4318;"` + traceFlag := "jaeger=http://jaeger:4318;" + if opts.TraceRatio != "" { + traceFlag += fmt.Sprintf(" ratio=%s;", opts.TraceRatio) + } + if opts.TraceService { + traceFlag += fmt.Sprintf(" service=%s;", svc.name) + } + svc.Command += fmt.Sprintf(` --trace "%s"`, traceFlag) } return svc } @@ -442,12 +451,12 @@ func getRatel() service { portFlag = fmt.Sprintf(" -port=%d", opts.RatelPort) } svc := service{ - Image: opts.Image + ":" + opts.Tag, + Image: "dgraph/ratel:latest", ContainerName: containerName("ratel"), Ports: []string{ toPort(opts.RatelPort), }, - Command: "dgraph-ratel" + portFlag, + Command: portFlag, } return svc } @@ -566,6 +575,11 @@ func main() { "store w and zw directories on a tmpfs filesystem") cmd.PersistentFlags().IntVarP(&opts.Jaeger, "jaeger", "j", 0, "include jaeger service (1 for v1.60, 2 for v2.x)") + cmd.PersistentFlags().Lookup("jaeger").NoOptDefVal = "1" + cmd.PersistentFlags().StringVar(&opts.TraceRatio, "trace_ratio", "", + "ratio of queries to trace (e.g., 0.01 for 1%)") + cmd.PersistentFlags().BoolVar(&opts.TraceService, "trace_service", false, + "use compose service name as trace service name (e.g., alpha1, zero1)") cmd.PersistentFlags().BoolVarP(&opts.Metrics, "metrics", "m", false, "include metrics (prometheus, grafana) services") cmd.PersistentFlags().IntVarP(&opts.PortOffset, "port_offset", "o", 100, From 42366d636b4034bca2098a1c9ca21622c517f4d6 Mon Sep 17 00:00:00 2001 From: mattthew Date: Mon, 9 Feb 2026 20:20:29 -0500 Subject: [PATCH 06/15] Add script for cluster visualization (text-based) --- compose/cluster-viz.sh | 253 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 253 insertions(+) create mode 100755 compose/cluster-viz.sh diff --git a/compose/cluster-viz.sh b/compose/cluster-viz.sh new file mode 100755 index 00000000000..84b5b0911e7 --- /dev/null +++ b/compose/cluster-viz.sh @@ -0,0 +1,253 @@ +#!/bin/bash +# Visualize Dgraph cluster topology from docker-compose.yml + +FILE="${1:-docker-compose.yml}" + +if [[ ! -f $FILE ]]; then + echo "Usage: $0 [docker-compose.yml]" + exit 1 +fi + +# Flatten the file (join continuation lines) for easier parsing +FLAT=$(tr '\n' ' ' <"$FILE" | sed 's/ */ /g') + +# Extract zero nodes (find --my=zeroN:port patterns near "zero") +zeros=$(echo "$FLAT" | grep -oE '\-\-my=zero[a-zA-Z0-9_-]*:[0-9]+' | sed -E 's/--my=([^:]+):.*/\1/' | sort -u) + +# Extract alpha nodes with their group assignments +TMPFILE=$(mktemp) + +# Extract all alphas by their --my flag. Group assignment is optional in flags. +# If no group= is found near the alpha's command, put it under UNASSIGNED. +alphas=$(echo "$FLAT" | grep -oE '\-\-my=alpha[a-zA-Z0-9_-]*:[0-9]+' | sed -E 's/--my=([^:]+):.*/\1/' | sort -u) +for alpha in $alphas; do + # Look for group= near this alpha occurrence in the flattened compose. + # Many compose files won't specify it; in that case group is assigned dynamically by Zero. + # Use awk (portable on macOS) instead of BSD sed regex intervals. + group=$(awk -v s="$FLAT" -v a="--my=${alpha}:" ' + BEGIN { pos = index(s, a); if (pos == 0) exit } + END { + win = substr(s, pos, 600) + if (match(win, /group=[0-9]+/)) { + g = substr(win, RSTART, RLENGTH) + split(g, parts, "=") + print parts[2] + } + } + ' "$TMPFILE" + +# Get unique groups (numeric groups first, UNASSIGNED last, never duplicated) +numeric_groups=$(grep -E '^[0-9]+ ' "$TMPFILE" | cut -d' ' -f1 | sort -n | uniq) +unique_groups="$numeric_groups" +if grep -q '^UNASSIGNED ' "$TMPFILE"; then + unique_groups="$unique_groups UNASSIGNED" +fi + +# Extract ratel if present +has_ratel=$(grep -q 'ratel' "$FILE" && echo "yes" || echo "no") + +# Get replica count +replicas=$(grep -oE '\-\-replicas=[0-9]+' "$FILE" | head -1 | cut -d= -f2) +replicas=${replicas:-1} + +# Calculate max alphas per group for width calculation +max_per_group=0 +for group in $unique_groups; do + cnt=$(grep -c "^$group " "$TMPFILE" 2>/dev/null || echo 0) + if [[ $cnt -gt $max_per_group ]]; then + max_per_group=$cnt + fi +done + +# Count zeros +zero_count=$(echo "$zeros" | wc -w | tr -d ' ') + +# Box width: 14 chars per node (┌──────────┐ + 2 spaces) + margins +# Use the larger of zeros or max alphas per group +max_nodes=$zero_count +if [[ $max_per_group -gt $max_nodes ]]; then + max_nodes=$max_per_group +fi +[[ $max_nodes -lt 3 ]] && max_nodes=3 + +inner_width=$((max_nodes * 14 + 4)) +[[ $inner_width -lt 60 ]] && inner_width=60 +outer_width=$((inner_width + 4)) + +# Total line width (display columns between outer left and right border) +W=$((outer_width)) + +# Print N space characters +sp() { + local i + for ((i = 0; i < $1; i++)); do printf " "; done +} + +# Print N copies of a character +rp() { + local i + for ((i = 0; i < $1; i++)); do printf "%s" "$2"; done +} + +# Finish outer row: pad from current column to W, then print | +fin() { + sp "$((W - $1))" + printf "|\n" +} + +# Finish inner row (inside group box): pad then print | | +# Total should be W+1 cols: content($1) + spaces + 3(| |) = W+1 +fini() { + sp "$((W - $1 - 2))" + printf "| |\n" +} + +# Separator using full width +sep() { + printf "%s" "$1" + rp "$W" "$2" + printf "%s\n" "$3" +} + +# Print a row that starts with '|' and does NOT yet include the final right border. +# Pads with spaces so the final '|' is always aligned. +outln() { + local s="$1" + local l=${#s} + # Total line length must be W + 2 (left border + interior + right border). + # If s already includes the left border '|', it must be padded to length W+1, + # then we print the final right border. + printf "%s" "$s" + local pad=$((W + 1 - l)) + if [[ $pad -lt 0 ]]; then + pad=0 + fi + sp "$pad" + printf "|\n" +} + +# Render a row inside the group box. +# The group box top uses: "| +" + GW*"-" + "+". +# For interior rows we use: "| |" + + + "|". +out_group() { + local payload="$1" + local plen=${#payload} + local pad=$((GW - plen)) + if [[ $pad -lt 0 ]]; then + pad=0 + fi + local s="| |${payload}" + # pad inside the group box up to width GW + for ((i = 0; i < pad; i++)); do + s+=" " + done + s+="|" + outln "$s" +} + +echo "" + +# === HEADER === +sep "+" "=" "+" +outln "| DGRAPH CLUSTER TOPOLOGY" +sep "+" "=" "+" +echo "" + +# === ZERO LAYER === +sep "+" "-" "+" +outln "| ZERO LAYER (Cluster Coordination)" +sep "+" "-" "+" + +# Zero boxes: each is 14 cols (box 12 + 2 spaces) +line="| " +for z in $zeros; do line+="+----------+ "; done +outln "$line" +line="| " +for z in $zeros; do line+=$(printf "| %-8s | " "$z"); done +outln "$line" +line="| " +for z in $zeros; do line+="| (zero) | "; done +outln "$line" +line="| " +for z in $zeros; do line+="+----------+ "; done +outln "$line" + +sep "+" "-" "+" +echo "" + +# === ALPHA LAYER === +sep "+" "-" "+" +outln "| ALPHA LAYER (Data Storage) - Replicas: $replicas" +sep "+" "-" "+" + +# Group box inner width +GW=$((inner_width - 2)) + +for group in $unique_groups; do + alphas=$(grep "^$group " "$TMPFILE" | cut -d' ' -f2) + count=$(echo "$alphas" | wc -l | tr -d ' ') + alpha_count=$(echo "$alphas" | wc -w | tr -d ' ') + + # Empty row + outln "|" + + # Group box top + line="| +" + for ((i = 0; i < GW; i++)); do line+="-"; done + line+="+" + outln "$line" + + # Group title row + if [[ $group == "UNASSIGNED" ]]; then + gtitle=" GROUP UNASSIGNED (Raft Replicas: $count)" + else + gtitle=" GROUP $group (Raft Replicas: $count)" + fi + out_group "$gtitle" + + # Alpha boxes + line=" " + for a in $alphas; do line+="+----------+ "; done + out_group "$line" + line=" " + for a in $alphas; do line+=$(printf "| %-8s | " "$a"); done + out_group "$line" + line=" " + for a in $alphas; do line+="| (alpha) | "; done + out_group "$line" + line=" " + for a in $alphas; do line+="+----------+ "; done + out_group "$line" + + # Group box bottom + line="| +" + for ((i = 0; i < GW; i++)); do line+="-"; done + line+="+" + outln "$line" +done + +outln "|" +sep "+" "-" "+" + +# === RATEL === +if [[ $has_ratel == "yes" ]]; then + echo "" + sep "+" "-" "+" + outln "| UI LAYER" + sep "+" "-" "+" + outln "| +----------+" + outln "| | ratel | (Web UI on :8000)" + outln "| +----------+" + sep "+" "-" "+" +fi + +rm -f "$TMPFILE" + +echo "" +echo "Legend: Alphas in same GROUP replicate data via Raft consensus" +echo "" From ce634dbd4f9500dbd460fa827f24842b92a27798 Mon Sep 17 00:00:00 2001 From: mattthew Date: Mon, 9 Feb 2026 20:20:59 -0500 Subject: [PATCH 07/15] Add integration tests for jeager --- systest/tracing/jaeger1/docker-compose.yml | 51 ++++++++ systest/tracing/jaeger1/jaeger1_test.go | 141 +++++++++++++++++++++ systest/tracing/jaeger2/docker-compose.yml | 52 ++++++++ systest/tracing/jaeger2/jaeger2_test.go | 141 +++++++++++++++++++++ 4 files changed, 385 insertions(+) create mode 100644 systest/tracing/jaeger1/docker-compose.yml create mode 100644 systest/tracing/jaeger1/jaeger1_test.go create mode 100644 systest/tracing/jaeger2/docker-compose.yml create mode 100644 systest/tracing/jaeger2/jaeger2_test.go diff --git a/systest/tracing/jaeger1/docker-compose.yml b/systest/tracing/jaeger1/docker-compose.yml new file mode 100644 index 00000000000..97eb55f0dd9 --- /dev/null +++ b/systest/tracing/jaeger1/docker-compose.yml @@ -0,0 +1,51 @@ +# Jaeger 1.x tracing integration test +# Tests OTLP HTTP export to Jaeger all-in-one:1.60 +version: "3.5" +services: + alpha1: + image: dgraph/dgraph:local + working_dir: /data/alpha1 + labels: + cluster: test + ports: + - "8080" + - "9080" + volumes: + - type: bind + source: $GOPATH/bin + target: /gobin + read_only: true + depends_on: + - zero1 + - jaeger + command: + /gobin/dgraph ${COVERAGE_OUTPUT} alpha --trace "jaeger=http://jaeger:4318; ratio=1.0; + service=alpha1;" --my=alpha1:7080 --zero=zero1:5080 --logtostderr -v=2 --security + "whitelist=0.0.0.0/0;" + jaeger: + image: jaegertracing/all-in-one:1.60 + environment: + - SPAN_STORAGE_TYPE=memory + ports: + - "4318" + - "16686" + zero1: + image: dgraph/dgraph:local + working_dir: /data/zero1 + labels: + cluster: test + ports: + - "5080" + - "6080" + volumes: + - type: bind + source: $GOPATH/bin + target: /gobin + read_only: true + depends_on: + - jaeger + command: + /gobin/dgraph ${COVERAGE_OUTPUT} zero --telemetry "reports=false;" --trace + "jaeger=http://jaeger:4318; ratio=1.0; service=zero1;" --raft="idx=1" --my=zero1:5080 + --logtostderr -v=2 --bindall +volumes: {} diff --git a/systest/tracing/jaeger1/jaeger1_test.go b/systest/tracing/jaeger1/jaeger1_test.go new file mode 100644 index 00000000000..73e523409bc --- /dev/null +++ b/systest/tracing/jaeger1/jaeger1_test.go @@ -0,0 +1,141 @@ +//go:build integration + +/* + * SPDX-FileCopyrightText: © 2017-2025 Istari Digital, Inc. + * SPDX-License-Identifier: Apache-2.0 + */ + +package main + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "os" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/dgraph-io/dgo/v250/protos/api" + "github.com/dgraph-io/dgraph/v25/testutil" +) + +// JaegerServicesResponse represents the response from Jaeger's /api/services endpoint +type JaegerServicesResponse struct { + Data []string `json:"data"` + Total int `json:"total"` + Errors []any `json:"errors"` +} + +// JaegerTracesResponse represents the response from Jaeger's /api/traces endpoint +type JaegerTracesResponse struct { + Data []any `json:"data"` + Errors []any `json:"errors"` +} + +func TestMain(m *testing.M) { + os.Exit(m.Run()) +} + +// TestJaegerUIAccessible verifies that the Jaeger UI is accessible +func TestJaegerUIAccessible(t *testing.T) { + jaegerAddr := testutil.ContainerAddr("jaeger", 16686) + url := fmt.Sprintf("http://%s/", jaegerAddr) + + resp, err := http.Get(url) + require.NoError(t, err) + defer resp.Body.Close() + + require.Equal(t, http.StatusOK, resp.StatusCode, "Jaeger UI should be accessible") +} + +// TestJaegerReceivesTraces verifies that Jaeger receives traces from Dgraph +func TestJaegerReceivesTraces(t *testing.T) { + jaegerAddr := testutil.ContainerAddr("jaeger", 16686) + alphaAddr := testutil.ContainerAddr("alpha1", 9080) + + // Make a Dgraph query to generate traces + dg, err := testutil.DgraphClient(alphaAddr) + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + // Run a simple query to generate a trace + _, err = dg.NewTxn().Query(ctx, `{ q(func: has(name)) { uid } }`) + require.NoError(t, err) + + // Give Jaeger time to process the trace + time.Sleep(3 * time.Second) + + // Check that Jaeger has received services + servicesURL := fmt.Sprintf("http://%s/api/services", jaegerAddr) + resp, err := http.Get(servicesURL) + require.NoError(t, err) + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + + var services JaegerServicesResponse + err = json.Unmarshal(body, &services) + require.NoError(t, err) + + t.Logf("Jaeger services: %v", services.Data) + require.NotEmpty(t, services.Data, "Jaeger should have registered services") + + // Verify our custom service name appears + found := false + for _, svc := range services.Data { + if svc == "alpha1" { + found = true + break + } + } + require.True(t, found, "Service 'alpha1' should be registered in Jaeger, got: %v", services.Data) +} + +// TestJaegerTracesHaveSpans verifies that traces contain actual spans +func TestJaegerTracesHaveSpans(t *testing.T) { + jaegerAddr := testutil.ContainerAddr("jaeger", 16686) + alphaAddr := testutil.ContainerAddr("alpha1", 9080) + + // Make a Dgraph query to generate traces + dg, err := testutil.DgraphClient(alphaAddr) + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + // Run a mutation and query to generate traces + _, err = dg.NewTxn().Mutate(ctx, &api.Mutation{ + SetNquads: []byte(`_:test "trace-test" .`), + CommitNow: true, + }) + require.NoError(t, err) + + _, err = dg.NewTxn().Query(ctx, `{ q(func: has(name)) { uid name } }`) + require.NoError(t, err) + + // Give Jaeger time to process traces + time.Sleep(3 * time.Second) + + // Query for traces from our service + tracesURL := fmt.Sprintf("http://%s/api/traces?service=alpha1&limit=10", jaegerAddr) + resp, err := http.Get(tracesURL) + require.NoError(t, err) + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + + var traces JaegerTracesResponse + err = json.Unmarshal(body, &traces) + require.NoError(t, err) + + t.Logf("Found %d traces for service alpha1", len(traces.Data)) + require.NotEmpty(t, traces.Data, "Should have traces for alpha1 service") +} diff --git a/systest/tracing/jaeger2/docker-compose.yml b/systest/tracing/jaeger2/docker-compose.yml new file mode 100644 index 00000000000..bd04f3fb905 --- /dev/null +++ b/systest/tracing/jaeger2/docker-compose.yml @@ -0,0 +1,52 @@ +# Jaeger 2.x tracing integration test +# Tests OTLP HTTP export to Jaeger 2.x (jaeger:latest) +# Note: Jaeger 2.x requires JAEGER_LISTEN_HOST=0.0.0.0 to expose OTLP externally +version: "3.5" +services: + alpha1: + image: dgraph/dgraph:local + working_dir: /data/alpha1 + labels: + cluster: test + ports: + - "8080" + - "9080" + volumes: + - type: bind + source: $GOPATH/bin + target: /gobin + read_only: true + depends_on: + - zero1 + - jaeger + command: + /gobin/dgraph ${COVERAGE_OUTPUT} alpha --trace "jaeger=http://jaeger:4318; ratio=1.0; + service=alpha1;" --my=alpha1:7080 --zero=zero1:5080 --logtostderr -v=2 --security + "whitelist=0.0.0.0/0;" + jaeger: + image: jaegertracing/jaeger:latest + environment: + - JAEGER_LISTEN_HOST=0.0.0.0 + ports: + - "4318" + - "16686" + zero1: + image: dgraph/dgraph:local + working_dir: /data/zero1 + labels: + cluster: test + ports: + - "5080" + - "6080" + volumes: + - type: bind + source: $GOPATH/bin + target: /gobin + read_only: true + depends_on: + - jaeger + command: + /gobin/dgraph ${COVERAGE_OUTPUT} zero --telemetry "reports=false;" --trace + "jaeger=http://jaeger:4318; ratio=1.0; service=zero1;" --raft="idx=1" --my=zero1:5080 + --logtostderr -v=2 --bindall +volumes: {} diff --git a/systest/tracing/jaeger2/jaeger2_test.go b/systest/tracing/jaeger2/jaeger2_test.go new file mode 100644 index 00000000000..019e346db4f --- /dev/null +++ b/systest/tracing/jaeger2/jaeger2_test.go @@ -0,0 +1,141 @@ +//go:build integration + +/* + * SPDX-FileCopyrightText: © 2017-2025 Istari Digital, Inc. + * SPDX-License-Identifier: Apache-2.0 + */ + +package main + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "os" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/dgraph-io/dgo/v250/protos/api" + "github.com/dgraph-io/dgraph/v25/testutil" +) + +// JaegerServicesResponse represents the response from Jaeger's /api/services endpoint +type JaegerServicesResponse struct { + Data []string `json:"data"` + Total int `json:"total"` + Errors []any `json:"errors"` +} + +// JaegerTracesResponse represents the response from Jaeger's /api/traces endpoint +type JaegerTracesResponse struct { + Data []any `json:"data"` + Errors []any `json:"errors"` +} + +func TestMain(m *testing.M) { + os.Exit(m.Run()) +} + +// TestJaeger2UIAccessible verifies that the Jaeger 2.x UI is accessible +func TestJaeger2UIAccessible(t *testing.T) { + jaegerAddr := testutil.ContainerAddr("jaeger", 16686) + url := fmt.Sprintf("http://%s/", jaegerAddr) + + resp, err := http.Get(url) + require.NoError(t, err) + defer resp.Body.Close() + + require.Equal(t, http.StatusOK, resp.StatusCode, "Jaeger 2.x UI should be accessible") +} + +// TestJaeger2ReceivesTraces verifies that Jaeger 2.x receives traces from Dgraph +func TestJaeger2ReceivesTraces(t *testing.T) { + jaegerAddr := testutil.ContainerAddr("jaeger", 16686) + alphaAddr := testutil.ContainerAddr("alpha1", 9080) + + // Make a Dgraph query to generate traces + dg, err := testutil.DgraphClient(alphaAddr) + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + // Run a simple query to generate a trace + _, err = dg.NewTxn().Query(ctx, `{ q(func: has(name)) { uid } }`) + require.NoError(t, err) + + // Give Jaeger time to process the trace + time.Sleep(3 * time.Second) + + // Check that Jaeger has received services + servicesURL := fmt.Sprintf("http://%s/api/services", jaegerAddr) + resp, err := http.Get(servicesURL) + require.NoError(t, err) + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + + var services JaegerServicesResponse + err = json.Unmarshal(body, &services) + require.NoError(t, err) + + t.Logf("Jaeger 2.x services: %v", services.Data) + require.NotEmpty(t, services.Data, "Jaeger 2.x should have registered services") + + // Verify our custom service name appears + found := false + for _, svc := range services.Data { + if svc == "alpha1" { + found = true + break + } + } + require.True(t, found, "Service 'alpha1' should be registered in Jaeger 2.x, got: %v", services.Data) +} + +// TestJaeger2TracesHaveSpans verifies that traces contain actual spans in Jaeger 2.x +func TestJaeger2TracesHaveSpans(t *testing.T) { + jaegerAddr := testutil.ContainerAddr("jaeger", 16686) + alphaAddr := testutil.ContainerAddr("alpha1", 9080) + + // Make a Dgraph query to generate traces + dg, err := testutil.DgraphClient(alphaAddr) + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + // Run a mutation and query to generate traces + _, err = dg.NewTxn().Mutate(ctx, &api.Mutation{ + SetNquads: []byte(`_:test "trace-test-v2" .`), + CommitNow: true, + }) + require.NoError(t, err) + + _, err = dg.NewTxn().Query(ctx, `{ q(func: has(name)) { uid name } }`) + require.NoError(t, err) + + // Give Jaeger time to process traces + time.Sleep(3 * time.Second) + + // Query for traces from our service + tracesURL := fmt.Sprintf("http://%s/api/traces?service=alpha1&limit=10", jaegerAddr) + resp, err := http.Get(tracesURL) + require.NoError(t, err) + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + + var traces JaegerTracesResponse + err = json.Unmarshal(body, &traces) + require.NoError(t, err) + + t.Logf("Found %d traces for service alpha1 in Jaeger 2.x", len(traces.Data)) + require.NotEmpty(t, traces.Data, "Should have traces for alpha1 service in Jaeger 2.x") +} From 5ff830c085559a1e71eeac53c18041c118d13bda Mon Sep 17 00:00:00 2001 From: mattthew Date: Mon, 9 Feb 2026 20:25:50 -0500 Subject: [PATCH 08/15] Satisfy linter --- compose/cluster-viz.sh | 121 ++++++++++++++++++++--------------------- 1 file changed, 60 insertions(+), 61 deletions(-) diff --git a/compose/cluster-viz.sh b/compose/cluster-viz.sh index 84b5b0911e7..72e956bcb28 100755 --- a/compose/cluster-viz.sh +++ b/compose/cluster-viz.sh @@ -3,28 +3,28 @@ FILE="${1:-docker-compose.yml}" -if [[ ! -f $FILE ]]; then +if [[ ! -f ${FILE} ]]; then echo "Usage: $0 [docker-compose.yml]" exit 1 fi # Flatten the file (join continuation lines) for easier parsing -FLAT=$(tr '\n' ' ' <"$FILE" | sed 's/ */ /g') +FLAT=$(tr '\n' ' ' <"${FILE}" | sed 's/ */ /g' || true) # Extract zero nodes (find --my=zeroN:port patterns near "zero") -zeros=$(echo "$FLAT" | grep -oE '\-\-my=zero[a-zA-Z0-9_-]*:[0-9]+' | sed -E 's/--my=([^:]+):.*/\1/' | sort -u) +zeros=$(echo "${FLAT}" | grep -oE '\-\-my=zero[a-zA-Z0-9_-]*:[0-9]+' | sed -E 's/--my=([^:]+):.*/\1/' | sort -u || true) # Extract alpha nodes with their group assignments TMPFILE=$(mktemp) # Extract all alphas by their --my flag. Group assignment is optional in flags. # If no group= is found near the alpha's command, put it under UNASSIGNED. -alphas=$(echo "$FLAT" | grep -oE '\-\-my=alpha[a-zA-Z0-9_-]*:[0-9]+' | sed -E 's/--my=([^:]+):.*/\1/' | sort -u) -for alpha in $alphas; do +alphas=$(echo "${FLAT}" | grep -oE '\-\-my=alpha[a-zA-Z0-9_-]*:[0-9]+' | sed -E 's/--my=([^:]+):.*/\1/' | sort -u || true) +for alpha in ${alphas}; do # Look for group= near this alpha occurrence in the flattened compose. # Many compose files won't specify it; in that case group is assigned dynamically by Zero. # Use awk (portable on macOS) instead of BSD sed regex intervals. - group=$(awk -v s="$FLAT" -v a="--my=${alpha}:" ' + group=$(awk -v s="${FLAT}" -v a="--my=${alpha}:" ' BEGIN { pos = index(s, a); if (pos == 0) exit } END { win = substr(s, pos, 600) @@ -34,49 +34,49 @@ for alpha in $alphas; do print parts[2] } } - ' "$TMPFILE" + echo "${group} ${alpha}" +done >"${TMPFILE}" # Get unique groups (numeric groups first, UNASSIGNED last, never duplicated) -numeric_groups=$(grep -E '^[0-9]+ ' "$TMPFILE" | cut -d' ' -f1 | sort -n | uniq) -unique_groups="$numeric_groups" -if grep -q '^UNASSIGNED ' "$TMPFILE"; then - unique_groups="$unique_groups UNASSIGNED" +numeric_groups=$(grep -E '^[0-9]+ ' "${TMPFILE}" | cut -d' ' -f1 | sort -n | uniq || true) +unique_groups="${numeric_groups}" +if grep -q '^UNASSIGNED ' "${TMPFILE}"; then + unique_groups="${unique_groups} UNASSIGNED" fi # Extract ratel if present -has_ratel=$(grep -q 'ratel' "$FILE" && echo "yes" || echo "no") +has_ratel=$(grep -q 'ratel' "${FILE}" && echo "yes" || echo "no") # Get replica count -replicas=$(grep -oE '\-\-replicas=[0-9]+' "$FILE" | head -1 | cut -d= -f2) +replicas=$(grep -oE '\-\-replicas=[0-9]+' "${FILE}" | head -1 | cut -d= -f2 || true) replicas=${replicas:-1} # Calculate max alphas per group for width calculation max_per_group=0 -for group in $unique_groups; do - cnt=$(grep -c "^$group " "$TMPFILE" 2>/dev/null || echo 0) - if [[ $cnt -gt $max_per_group ]]; then - max_per_group=$cnt +for group in ${unique_groups}; do + cnt=$(grep -c "^${group} " "${TMPFILE}" 2>/dev/null || echo 0) + if [[ ${cnt} -gt ${max_per_group} ]]; then + max_per_group=${cnt} fi done # Count zeros -zero_count=$(echo "$zeros" | wc -w | tr -d ' ') +zero_count=$(echo "${zeros}" | wc -w | tr -d ' ' || true) # Box width: 14 chars per node (┌──────────┐ + 2 spaces) + margins # Use the larger of zeros or max alphas per group -max_nodes=$zero_count -if [[ $max_per_group -gt $max_nodes ]]; then - max_nodes=$max_per_group +max_nodes=${zero_count} +if [[ ${max_per_group} -gt ${max_nodes} ]]; then + max_nodes=${max_per_group} fi -[[ $max_nodes -lt 3 ]] && max_nodes=3 +[[ ${max_nodes} -lt 3 ]] && max_nodes=3 inner_width=$((max_nodes * 14 + 4)) -[[ $inner_width -lt 60 ]] && inner_width=60 +[[ ${inner_width} -lt 60 ]] && inner_width=60 outer_width=$((inner_width + 4)) # Total line width (display columns between outer left and right border) @@ -110,7 +110,7 @@ fini() { # Separator using full width sep() { printf "%s" "$1" - rp "$W" "$2" + rp "${W}" "$2" printf "%s\n" "$3" } @@ -122,12 +122,12 @@ outln() { # Total line length must be W + 2 (left border + interior + right border). # If s already includes the left border '|', it must be padded to length W+1, # then we print the final right border. - printf "%s" "$s" + printf "%s" "${s}" local pad=$((W + 1 - l)) - if [[ $pad -lt 0 ]]; then + if [[ ${pad} -lt 0 ]]; then pad=0 fi - sp "$pad" + sp "${pad}" printf "|\n" } @@ -138,7 +138,7 @@ out_group() { local payload="$1" local plen=${#payload} local pad=$((GW - plen)) - if [[ $pad -lt 0 ]]; then + if [[ ${pad} -lt 0 ]]; then pad=0 fi local s="| |${payload}" @@ -147,7 +147,7 @@ out_group() { s+=" " done s+="|" - outln "$s" + outln "${s}" } echo "" @@ -165,33 +165,32 @@ sep "+" "-" "+" # Zero boxes: each is 14 cols (box 12 + 2 spaces) line="| " -for z in $zeros; do line+="+----------+ "; done -outln "$line" +for z in ${zeros}; do line+="+----------+ "; done +outln "${line}" line="| " -for z in $zeros; do line+=$(printf "| %-8s | " "$z"); done -outln "$line" +for z in ${zeros}; do line+=$(printf "| %-8s | " "${z}"); done +outln "${line}" line="| " -for z in $zeros; do line+="| (zero) | "; done -outln "$line" +for z in ${zeros}; do line+="| (zero) | "; done +outln "${line}" line="| " -for z in $zeros; do line+="+----------+ "; done -outln "$line" +for z in ${zeros}; do line+="+----------+ "; done +outln "${line}" sep "+" "-" "+" echo "" # === ALPHA LAYER === sep "+" "-" "+" -outln "| ALPHA LAYER (Data Storage) - Replicas: $replicas" +outln "| ALPHA LAYER (Data Storage) - Replicas: ${replicas}" sep "+" "-" "+" # Group box inner width GW=$((inner_width - 2)) -for group in $unique_groups; do - alphas=$(grep "^$group " "$TMPFILE" | cut -d' ' -f2) - count=$(echo "$alphas" | wc -l | tr -d ' ') - alpha_count=$(echo "$alphas" | wc -w | tr -d ' ') +for group in ${unique_groups}; do + alphas=$(grep "^${group} " "${TMPFILE}" | cut -d' ' -f2 || true) + count=$(echo "${alphas}" | wc -l | tr -d ' ' || true) # Empty row outln "|" @@ -200,42 +199,42 @@ for group in $unique_groups; do line="| +" for ((i = 0; i < GW; i++)); do line+="-"; done line+="+" - outln "$line" + outln "${line}" # Group title row - if [[ $group == "UNASSIGNED" ]]; then - gtitle=" GROUP UNASSIGNED (Raft Replicas: $count)" + if [[ ${group} == "UNASSIGNED" ]]; then + gtitle=" GROUP UNASSIGNED (Raft Replicas: ${count})" else - gtitle=" GROUP $group (Raft Replicas: $count)" + gtitle=" GROUP ${group} (Raft Replicas: ${count})" fi - out_group "$gtitle" + out_group "${gtitle}" # Alpha boxes line=" " - for a in $alphas; do line+="+----------+ "; done - out_group "$line" + for a in ${alphas}; do line+="+----------+ "; done + out_group "${line}" line=" " - for a in $alphas; do line+=$(printf "| %-8s | " "$a"); done - out_group "$line" + for a in ${alphas}; do line+=$(printf "| %-8s | " "${a}" || true); done + out_group "${line}" line=" " - for a in $alphas; do line+="| (alpha) | "; done - out_group "$line" + for a in ${alphas}; do line+="| (alpha) | "; done + out_group "${line}" line=" " - for a in $alphas; do line+="+----------+ "; done - out_group "$line" + for a in ${alphas}; do line+="+----------+ "; done + out_group "${line}" # Group box bottom line="| +" for ((i = 0; i < GW; i++)); do line+="-"; done line+="+" - outln "$line" + outln "${line}" done outln "|" sep "+" "-" "+" # === RATEL === -if [[ $has_ratel == "yes" ]]; then +if [[ ${has_ratel} == "yes" ]]; then echo "" sep "+" "-" "+" outln "| UI LAYER" @@ -246,7 +245,7 @@ if [[ $has_ratel == "yes" ]]; then sep "+" "-" "+" fi -rm -f "$TMPFILE" +rm -f "${TMPFILE}" echo "" echo "Legend: Alphas in same GROUP replicate data via Raft consensus" From a0c6662ba1cc9bcbdeaf0fc2f09aed25393ca199 Mon Sep 17 00:00:00 2001 From: mattthew Date: Mon, 9 Feb 2026 20:53:12 -0500 Subject: [PATCH 09/15] Add default namespaces for zero and alpha --- x/metrics.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/x/metrics.go b/x/metrics.go index d4b307a4823..5f59235a42f 100644 --- a/x/metrics.go +++ b/x/metrics.go @@ -617,6 +617,10 @@ func RegisterExporters(conf *viper.Viper, service string) { propagation.Baggage{}, )) + // Determine namespace from the default service type (dgraph.alpha or dgraph.zero) + // This allows filtering all alphas or zeros in Jaeger even with custom service names + namespace := service // e.g., "dgraph.alpha" or "dgraph.zero" + // Use custom service name if specified, otherwise use the default if customService := t.GetString("service"); len(customService) > 0 { service = customService @@ -626,6 +630,7 @@ func RegisterExporters(conf *viper.Viper, service string) { res := resource.NewWithAttributes( semconv.SchemaURL, semconv.ServiceNameKey.String(service), + semconv.ServiceNamespaceKey.String(namespace), ) // Configure the batch span processor options From a1629bb7889803f4043c22c7a5986b473fc7159b Mon Sep 17 00:00:00 2001 From: mattthew Date: Thu, 12 Feb 2026 12:19:31 -0500 Subject: [PATCH 10/15] Add trace ids to existing query log reports (if avail) --- dgraph/cmd/alpha/run.go | 5 +++ edgraph/server.go | 25 +++++++++--- graphql/resolve/resolver.go | 17 +++++++- worker/server_state.go | 4 +- x/config.go | 7 +++- x/logger.go | 80 +++++++++++++++++++++++++++++++++++++ 6 files changed, 127 insertions(+), 11 deletions(-) diff --git a/dgraph/cmd/alpha/run.go b/dgraph/cmd/alpha/run.go index d68860a23e1..dcd9b32d542 100644 --- a/dgraph/cmd/alpha/run.go +++ b/dgraph/cmd/alpha/run.go @@ -270,6 +270,10 @@ they form a Raft group and provide synchronous replication. " 'v20': returns values with repeated key for fields with same alias (same as v20.11)."+ " For more details, see https://github.com/dgraph-io/dgraph/pull/7639"). Flag("enable-detailed-metrics", "Enable metrics about disk reads and cache per predicate"). + Flag("log-slow-query-threshold", "Queries that take longer than this threshold will be logged "+ + "with structured fields including trace ID for correlation with distributed traces. "+ + "Disabled by default (0). Note: enabling this logs query text which may contain "+ + "sensitive data; do not enable in deployments with strict data privacy requirements."). String()) } @@ -790,6 +794,7 @@ func run() { worker.FeatureFlagsDefaults) x.Config.NormalizeCompatibilityMode = featureFlagsConf.GetString("normalize-compatibility-mode") enableDetailedMetrics := featureFlagsConf.GetBool("enable-detailed-metrics") + x.WorkerConfig.SlowQueryLogThreshold = featureFlagsConf.GetDuration("log-slow-query-threshold") x.PrintVersion() glog.Infof("x.Config: %+v", x.Config) diff --git a/edgraph/server.go b/edgraph/server.go index 64c3a3de762..a76437cebd4 100644 --- a/edgraph/server.go +++ b/edgraph/server.go @@ -1247,11 +1247,6 @@ func (s *Server) doQuery(ctx context.Context, req *Request) (resp *api.Response, l := &query.Latency{} l.Start = time.Now() - if bool(glog.V(3)) || worker.LogDQLRequestEnabled() { - glog.Infof("Got a query, DQL form: %+v %+v at %+v", - req.req.Query, req.req.Mutations, l.Start.Format(time.RFC3339)) - } - isMutation := len(req.req.Mutations) > 0 methodRequest := methodQuery if isMutation { @@ -1264,6 +1259,15 @@ func (s *Server) doQuery(ctx context.Context, req *Request) (resp *api.Response, annotateNamespace(span, ns) } + if bool(glog.V(3)) || worker.LogDQLRequestEnabled() { + traceID := "" + if span.SpanContext().IsValid() { + traceID = fmt.Sprintf(" [trace_id=%s]", span.SpanContext().TraceID().String()) + } + glog.Infof("Got a query, DQL form: %+v %+v at %+v%s", + req.req.Query, req.req.Mutations, l.Start.Format(time.RFC3339), traceID) + } + ctx = x.WithMethod(ctx, methodRequest) defer func() { span.End() @@ -1275,6 +1279,16 @@ func (s *Server) doQuery(ctx context.Context, req *Request) (resp *api.Response, timeSpentMs := x.SinceMs(l.Start) measurements = append(measurements, x.LatencyMs.M(timeSpentMs)) ostats.Record(ctx, measurements...) + + // Log slow queries with structured fields for observability + if x.WorkerConfig.SlowQueryLogThreshold > 0 { + x.LogSlowOperation(ctx, "query", "dql", req.req.Query, &x.SlowOperationLatency{ + Start: l.Start, + Parsing: l.Parsing, + Processing: l.Processing, + Encoding: l.Json, + }) + } }() if rerr = x.HealthCheck(); rerr != nil { @@ -1381,6 +1395,7 @@ func (s *Server) doQuery(ctx context.Context, req *Request) (resp *api.Response, EncodingNs: uint64(l.Json.Nanoseconds()), TotalNs: uint64((time.Since(l.Start)).Nanoseconds()), } + return resp, gqlErrs } diff --git a/graphql/resolve/resolver.go b/graphql/resolve/resolver.go index 8352270004d..5686e3fb77d 100644 --- a/graphql/resolve/resolver.go +++ b/graphql/resolve/resolver.go @@ -8,6 +8,7 @@ package resolve import ( "context" "encoding/json" + "fmt" "net/http" "sort" "strings" @@ -482,6 +483,14 @@ func (r *RequestResolver) Resolve(ctx context.Context, gqlReq *schema.Request) ( endTime := time.Now() resp.Extensions.Tracing.EndTime = endTime.Format(time.RFC3339Nano) resp.Extensions.Tracing.Duration = endTime.Sub(startTime).Nanoseconds() + + // Log slow queries with structured fields for observability + if x.WorkerConfig.SlowQueryLogThreshold > 0 { + x.LogSlowOperation(ctx, "query", "graphql", gqlReq.Query, &x.SlowOperationLatency{ + Start: startTime, + Processing: endTime.Sub(startTime), + }) + } }() ctx = context.WithValue(ctx, resolveStartTime, startTime) @@ -508,8 +517,12 @@ func (r *RequestResolver) Resolve(ctx context.Context, gqlReq *schema.Request) ( if err != nil { glog.Infof("Failed to marshal variables for logging : %s", err) } - glog.Infof("Resolving GQL request: \n%s\nWith Variables: \n%s\n", - gqlReq.Query, string(b)) + traceID := "" + if span.SpanContext().IsValid() { + traceID = fmt.Sprintf(" [trace_id=%s]", span.SpanContext().TraceID().String()) + } + glog.Infof("Resolving GQL request: \n%s\nWith Variables: \n%s%s", + gqlReq.Query, string(b), traceID) } } diff --git a/worker/server_state.go b/worker/server_state.go index 54c4ac2b4e8..0591ccb4b5b 100644 --- a/worker/server_state.go +++ b/worker/server_state.go @@ -37,12 +37,12 @@ const ( `client_key=; sasl-mechanism=PLAIN; tls=false;` LimitDefaults = `mutations=allow; query-edge=1000000; normalize-node=10000; ` + `mutations-nquad=1000000; disallow-drop=false; query-timeout=0ms; txn-abort-after=5m; ` + - ` max-retries=10;max-pending-queries=10000;shared-instance=false;type-filter-uid-limit=10` + `max-retries=10; max-pending-queries=10000; shared-instance=false; type-filter-uid-limit=10` ZeroLimitsDefaults = `uid-lease=0; refill-interval=30s; disable-admin-http=false;` GraphQLDefaults = `introspection=true; debug=false; extensions=true; poll-interval=1s; ` + `lambda-url=;` CacheDefaults = `size-mb=4096; percentage=40,40,20; remove-on-update=false` - FeatureFlagsDefaults = `normalize-compatibility-mode=; enable-detailed-metrics=false` + FeatureFlagsDefaults = `normalize-compatibility-mode=; enable-detailed-metrics=false; log-slow-query-threshold=0` ) // ServerState holds the state of the Dgraph server. diff --git a/x/config.go b/x/config.go index ca0ac9a3326..37081a3df5a 100644 --- a/x/config.go +++ b/x/config.go @@ -131,6 +131,9 @@ type WorkerOptions struct { // queries hence it has been kept as int32. LogDQLRequest value 1 enables logging of requests // coming to alphas and 0 disables it. LogDQLRequest int32 + // SlowQueryLogThreshold is the duration after which a query is considered slow and logged + // with structured fields including trace ID. Zero disables slow query logging. + SlowQueryLogThreshold time.Duration // If true, we should call msync or fsync after every write to survive hard reboots. HardSync bool // Audit contains the audit flags that enables the audit. @@ -155,9 +158,9 @@ func (w WorkerOptions) String() string { return fmt.Sprintf("{TmpDir:%s ExportPath:%s MyAddr:%s ZeroAddr:%v Raft:%v "+ "WhiteListedIPRanges:%v StrictMutations:%v AclEnabled:%v AclJwtAlg:%v "+ "AclPublicKey:**** AbortOlderThan:%v ProposedGroupId:%d StartTime:%v "+ - "Security:**** EncryptionKey:**** LogDQLRequest:%d HardSync:%v Audit:%v}", + "Security:**** EncryptionKey:**** LogDQLRequest:%d SlowQueryThreshold:%v HardSync:%v Audit:%v}", w.TmpDir, w.ExportPath, w.MyAddr, w.ZeroAddr, w.Raft, w.WhiteListedIPRanges, w.StrictMutations, w.AclEnabled, w.AclJwtAlg, w.AbortOlderThan, w.ProposedGroupId, w.StartTime, - w.LogDQLRequest, w.HardSync, w.Audit) + w.LogDQLRequest, w.SlowQueryLogThreshold, w.HardSync, w.Audit) } diff --git a/x/logger.go b/x/logger.go index ae4ecd88ca9..73c8f71a8fe 100644 --- a/x/logger.go +++ b/x/logger.go @@ -6,9 +6,12 @@ package x import ( + "context" "os" "path/filepath" + "time" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "go.uber.org/zap/zapcore" ) @@ -100,3 +103,80 @@ func (l *Logger) Sync() { _ = l.logger.Sync() _ = l.writer.Close() } + +var slowOperationLogger *zap.Logger + +func init() { + initSlowOperationLogger() +} + +func initSlowOperationLogger() { + config := zap.NewProductionEncoderConfig() + config.EncodeTime = zapcore.ISO8601TimeEncoder + config.TimeKey = "timestamp" + config.MessageKey = "message" + + core := zapcore.NewCore( + zapcore.NewJSONEncoder(config), + zapcore.AddSync(os.Stderr), + zapcore.WarnLevel, + ) + slowOperationLogger = zap.New(core) +} + +// SlowOperationLatency holds timing information for slow operation logging. +type SlowOperationLatency struct { + Start time.Time + Parsing time.Duration + Processing time.Duration + Encoding time.Duration +} + +// Total returns the total duration since Start. +func (l *SlowOperationLatency) Total() time.Duration { + return time.Since(l.Start) +} + +// LogSlowOperation logs a slow operation with structured fields including trace ID. +// It only logs if the operation duration exceeds the configured threshold. +// Parameters: +// - operation: the type of operation (e.g., "query", "mutation", "backup") +// - opType: specific type within the operation (e.g., "dql", "graphql") +// - payload: the operation payload (e.g., query text) - will be truncated if too long +func LogSlowOperation(ctx context.Context, operation, opType, payload string, latency *SlowOperationLatency) { + threshold := WorkerConfig.SlowQueryLogThreshold + if threshold <= 0 { + return + } + + total := latency.Total() + if total < threshold { + return + } + + fields := []zap.Field{ + zap.String("operation", operation), + zap.String("type", opType), + zap.Duration("total", total), + zap.Duration("parsing", latency.Parsing), + zap.Duration("processing", latency.Processing), + zap.Duration("encoding", latency.Encoding), + } + + // Extract trace ID from context if available + span := trace.SpanFromContext(ctx) + if span.SpanContext().IsValid() { + fields = append(fields, + zap.String("trace_id", span.SpanContext().TraceID().String()), + zap.String("span_id", span.SpanContext().SpanID().String()), + ) + } + + // Truncate payload for logging (avoid huge log entries) + if len(payload) > 1000 { + payload = payload[:1000] + "...[truncated]" + } + fields = append(fields, zap.String("payload", payload)) + + slowOperationLogger.Warn("slow operation", fields...) +} From e5984ec970871df9dee41e6c1cd5052d35d46f29 Mon Sep 17 00:00:00 2001 From: mattthew Date: Wed, 18 Feb 2026 17:13:38 -0500 Subject: [PATCH 11/15] Update superflag option description --- x/flags.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x/flags.go b/x/flags.go index 8a2758f2b53..0570db25665 100644 --- a/x/flags.go +++ b/x/flags.go @@ -30,7 +30,7 @@ func FillCommonFlags(flag *pflag.FlagSet) { Flag("ratio", "The ratio of queries to trace."). Flag("jaeger", - "URL of Jaeger to send OpenCensus traces."). + "URL of Jaeger/opentelemetry-collector to send OpenTelemetry traces."). Flag("datadog", "URL of Datadog to send OpenCensus traces. As of now, the trace exporter does not "+ "support annotation logs and discards them."). From ae3e271521860767d0500822599ea266b579677d Mon Sep 17 00:00:00 2001 From: mattthew Date: Wed, 18 Feb 2026 17:14:01 -0500 Subject: [PATCH 12/15] Add a utility trace extractor --- x/x.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/x/x.go b/x/x.go index 0444900735b..3500753c7c5 100644 --- a/x/x.go +++ b/x/x.go @@ -34,7 +34,9 @@ import ( "github.com/pkg/errors" "github.com/spf13/viper" "go.opencensus.io/plugin/ocgrpc" + "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/trace" "golang.org/x/term" "google.golang.org/grpc" @@ -308,6 +310,19 @@ func ExtractJwt(ctx context.Context) (string, error) { return accessJwt[0], nil } +func ExtractTraceContext(ctx context.Context) context.Context { + if md, ok := metadata.FromIncomingContext(ctx); ok { + carrier := propagation.HeaderCarrier{} + for k, vals := range md { + for _, v := range vals { + carrier.Set(k, v) + } + } + return otel.GetTextMapPropagator().Extract(ctx, carrier) + } + return ctx +} + // WithLocations adds a list of locations to a GqlError and returns the same // GqlError (fluent style). func (gqlErr *GqlError) WithLocations(locs ...Location) *GqlError { From b1951ddd038449a4a1aecbbe2a3d6ec455fcb834 Mon Sep 17 00:00:00 2001 From: mattthew Date: Wed, 18 Feb 2026 17:15:27 -0500 Subject: [PATCH 13/15] Normalize tracer naming --- worker/sort.go | 20 +++----------------- worker/task.go | 20 +++----------------- 2 files changed, 6 insertions(+), 34 deletions(-) diff --git a/worker/sort.go b/worker/sort.go index ef3f06b219a..fca4b088e4b 100644 --- a/worker/sort.go +++ b/worker/sort.go @@ -16,9 +16,7 @@ import ( "github.com/golang/glog" "github.com/pkg/errors" "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/trace" - "google.golang.org/grpc/metadata" "github.com/dgraph-io/badger/v4" "github.com/dgraph-io/dgraph/v25/algo" @@ -69,7 +67,7 @@ func SortOverNetwork(ctx context.Context, q *pb.SortMessage) (*pb.SortResult, er } // Add span for cross-alpha network call - ctx, networkSpan := otel.Tracer("worker").Start(ctx, "SortOverNetwork.RemoteCall") + ctx, networkSpan := otel.Tracer("").Start(ctx, "SortOverNetwork.RemoteCall") networkSpan.SetAttributes( attribute.String("target_group", fmt.Sprintf("%d", gid)), attribute.String("attr", x.SafeUTF8(q.Order[0].Attr)), @@ -94,21 +92,9 @@ func (w *grpcWorker) Sort(ctx context.Context, s *pb.SortMessage) (*pb.SortResul } // Manually extract trace context from gRPC metadata for cross-alpha tracing - if md, ok := metadata.FromIncomingContext(ctx); ok { - propagator := propagation.NewCompositeTextMapPropagator( - propagation.TraceContext{}, - propagation.Baggage{}, - ) - carrier := propagation.HeaderCarrier{} - for k, vals := range md { - for _, v := range vals { - carrier.Set(k, v) - } - } - ctx = propagator.Extract(ctx, carrier) - } + ctx = x.ExtractTraceContext(ctx) - ctx, span := otel.Tracer("worker").Start(ctx, "worker.Sort") + ctx, span := otel.Tracer("").Start(ctx, "worker.Sort") defer span.End() gid, err := groups().BelongsToReadOnly(s.Order[0].Attr, s.ReadTs) diff --git a/worker/task.go b/worker/task.go index cde73a42b5a..76649bcead4 100644 --- a/worker/task.go +++ b/worker/task.go @@ -20,10 +20,8 @@ import ( "github.com/pkg/errors" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/trace" "golang.org/x/sync/errgroup" - "google.golang.org/grpc/metadata" "google.golang.org/protobuf/proto" "github.com/dgraph-io/badger/v4" @@ -145,7 +143,7 @@ func ProcessTaskOverNetwork(ctx context.Context, q *pb.Query) (*pb.Result, error } // Add span for cross-alpha network call - ctx, networkSpan := otel.Tracer("worker").Start(ctx, "ProcessTaskOverNetwork.RemoteCall") + ctx, networkSpan := otel.Tracer("").Start(ctx, "ProcessTaskOverNetwork.RemoteCall") networkSpan.SetAttributes( attribute.String("target_group", fmt.Sprintf("%d", gid)), attribute.String("predicate", x.SafeUTF8(attr)), @@ -2221,23 +2219,11 @@ func interpretVFloatOrUid(val string) ([]float32, uint64, error) { func (w *grpcWorker) ServeTask(ctx context.Context, q *pb.Query) (*pb.Result, error) { // Manually extract trace context from gRPC metadata using the propagator // This ensures the trace context is properly extracted for cross-alpha tracing - if md, ok := metadata.FromIncomingContext(ctx); ok { - propagator := propagation.NewCompositeTextMapPropagator( - propagation.TraceContext{}, - propagation.Baggage{}, - ) - carrier := propagation.HeaderCarrier{} - for k, vals := range md { - for _, v := range vals { - carrier.Set(k, v) - } - } - ctx = propagator.Extract(ctx, carrier) - } + ctx = x.ExtractTraceContext(ctx) // Sanitize attr for span name to ensure valid UTF-8 for OTLP export safeAttr := x.SafeUTF8(q.Attr) - ctx, span := otel.Tracer("worker").Start(ctx, "worker.ServeTask", + ctx, span := otel.Tracer("").Start(ctx, "worker.ServeTask", trace.WithAttributes(attribute.String("predicate", safeAttr))) defer span.End() From d098d76b6940bb2964522035c451fc287895c775 Mon Sep 17 00:00:00 2001 From: mattthew Date: Wed, 18 Feb 2026 17:15:59 -0500 Subject: [PATCH 14/15] Conform to new gobin mechanism --- systest/tracing/jaeger1/docker-compose.yml | 4 ++-- systest/tracing/jaeger2/docker-compose.yml | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/systest/tracing/jaeger1/docker-compose.yml b/systest/tracing/jaeger1/docker-compose.yml index 97eb55f0dd9..7d3d19cd6fc 100644 --- a/systest/tracing/jaeger1/docker-compose.yml +++ b/systest/tracing/jaeger1/docker-compose.yml @@ -12,7 +12,7 @@ services: - "9080" volumes: - type: bind - source: $GOPATH/bin + source: ${LINUX_GOBIN:-$GOPATH/bin} target: /gobin read_only: true depends_on: @@ -39,7 +39,7 @@ services: - "6080" volumes: - type: bind - source: $GOPATH/bin + source: ${LINUX_GOBIN:-$GOPATH/bin} target: /gobin read_only: true depends_on: diff --git a/systest/tracing/jaeger2/docker-compose.yml b/systest/tracing/jaeger2/docker-compose.yml index bd04f3fb905..f007fe92c6d 100644 --- a/systest/tracing/jaeger2/docker-compose.yml +++ b/systest/tracing/jaeger2/docker-compose.yml @@ -13,7 +13,7 @@ services: - "9080" volumes: - type: bind - source: $GOPATH/bin + source: ${LINUX_GOBIN:-$GOPATH/bin} target: /gobin read_only: true depends_on: @@ -40,7 +40,7 @@ services: - "6080" volumes: - type: bind - source: $GOPATH/bin + source: ${LINUX_GOBIN:-$GOPATH/bin} target: /gobin read_only: true depends_on: From cfa8e79eeba674f70377ca1db5e1eea76e9e55ec Mon Sep 17 00:00:00 2001 From: mattthew Date: Wed, 18 Feb 2026 17:16:45 -0500 Subject: [PATCH 15/15] Add tests to ensure cross service tracing --- systest/tracing/jaeger1/jaeger1_test.go | 122 +++++++++++++++++++++++- systest/tracing/jaeger2/jaeger2_test.go | 122 +++++++++++++++++++++++- 2 files changed, 240 insertions(+), 4 deletions(-) diff --git a/systest/tracing/jaeger1/jaeger1_test.go b/systest/tracing/jaeger1/jaeger1_test.go index 73e523409bc..732185f6617 100644 --- a/systest/tracing/jaeger1/jaeger1_test.go +++ b/systest/tracing/jaeger1/jaeger1_test.go @@ -30,10 +30,30 @@ type JaegerServicesResponse struct { Errors []any `json:"errors"` } +// JaegerTrace represents a single trace from Jaeger +type JaegerTrace struct { + TraceID string `json:"traceID"` + Spans []JaegerSpan `json:"spans"` + Processes map[string]JaegerProcess `json:"processes"` +} + +// JaegerSpan represents a span within a trace +type JaegerSpan struct { + TraceID string `json:"traceID"` + SpanID string `json:"spanID"` + OperationName string `json:"operationName"` + ProcessID string `json:"processID"` +} + +// JaegerProcess represents a process (service) in the trace +type JaegerProcess struct { + ServiceName string `json:"serviceName"` +} + // JaegerTracesResponse represents the response from Jaeger's /api/traces endpoint type JaegerTracesResponse struct { - Data []any `json:"data"` - Errors []any `json:"errors"` + Data []JaegerTrace `json:"data"` + Errors []any `json:"errors"` } func TestMain(m *testing.M) { @@ -139,3 +159,101 @@ func TestJaegerTracesHaveSpans(t *testing.T) { t.Logf("Found %d traces for service alpha1", len(traces.Data)) require.NotEmpty(t, traces.Data, "Should have traces for alpha1 service") } + +// TestCrossServiceTraceContext verifies that traces propagate correctly between alpha and zero. +// A mutation triggers alpha->zero communication (for timestamps/commit), and both services +// should appear in the same trace with the same trace ID. +func TestCrossServiceTraceContext(t *testing.T) { + jaegerAddr := testutil.ContainerAddr("jaeger", 16686) + alphaAddr := testutil.ContainerAddr("alpha1", 9080) + + dg, err := testutil.DgraphClient(alphaAddr) + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + // Run a mutation - this triggers alpha->zero communication for timestamps and commit + _, err = dg.NewTxn().Mutate(ctx, &api.Mutation{ + SetNquads: []byte(`_:cross "cross-service-trace-test" .`), + CommitNow: true, + }) + require.NoError(t, err) + + // Give Jaeger time to process traces + time.Sleep(5 * time.Second) + + // Verify both services are registered + servicesURL := fmt.Sprintf("http://%s/api/services", jaegerAddr) + resp, err := http.Get(servicesURL) + require.NoError(t, err) + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + + var services JaegerServicesResponse + err = json.Unmarshal(body, &services) + require.NoError(t, err) + + t.Logf("Registered services: %v", services.Data) + + // Check both alpha1 and zero1 are registered + foundAlpha := false + foundZero := false + for _, svc := range services.Data { + if svc == "alpha1" { + foundAlpha = true + } + if svc == "zero1" { + foundZero = true + } + } + require.True(t, foundAlpha, "Service 'alpha1' should be registered") + require.True(t, foundZero, "Service 'zero1' should be registered") + + // Query for traces from alpha1 + tracesURL := fmt.Sprintf("http://%s/api/traces?service=alpha1&limit=50", jaegerAddr) + resp, err = http.Get(tracesURL) + require.NoError(t, err) + defer resp.Body.Close() + + body, err = io.ReadAll(resp.Body) + require.NoError(t, err) + + var traces JaegerTracesResponse + err = json.Unmarshal(body, &traces) + require.NoError(t, err) + + require.NotEmpty(t, traces.Data, "Should have traces for alpha1") + + // Look for traces that contain both alpha1 and zero1 spans + // This proves cross-service trace context propagation is working + multiServiceTraceFound := false + for _, trace := range traces.Data { + servicesInTrace := make(map[string]bool) + for _, span := range trace.Spans { + if proc, ok := trace.Processes[span.ProcessID]; ok { + servicesInTrace[proc.ServiceName] = true + } + } + + hasAlpha := servicesInTrace["alpha1"] + hasZero := servicesInTrace["zero1"] + + if hasAlpha && hasZero { + multiServiceTraceFound = true + t.Logf("Found cross-service trace %s with services: %v", trace.TraceID, servicesInTrace) + + // Verify all spans share the same trace ID - this proves context propagation + for _, span := range trace.Spans { + require.Equal(t, trace.TraceID, span.TraceID, + "All spans in cross-service trace must share the same trace ID") + } + break + } + } + + require.True(t, multiServiceTraceFound, + "Should find at least one trace containing both alpha1 and zero1 spans (proves context propagation)") +} diff --git a/systest/tracing/jaeger2/jaeger2_test.go b/systest/tracing/jaeger2/jaeger2_test.go index 019e346db4f..6c5d5a541f1 100644 --- a/systest/tracing/jaeger2/jaeger2_test.go +++ b/systest/tracing/jaeger2/jaeger2_test.go @@ -30,10 +30,30 @@ type JaegerServicesResponse struct { Errors []any `json:"errors"` } +// JaegerTrace represents a single trace from Jaeger +type JaegerTrace struct { + TraceID string `json:"traceID"` + Spans []JaegerSpan `json:"spans"` + Processes map[string]JaegerProcess `json:"processes"` +} + +// JaegerSpan represents a span within a trace +type JaegerSpan struct { + TraceID string `json:"traceID"` + SpanID string `json:"spanID"` + OperationName string `json:"operationName"` + ProcessID string `json:"processID"` +} + +// JaegerProcess represents a process (service) in the trace +type JaegerProcess struct { + ServiceName string `json:"serviceName"` +} + // JaegerTracesResponse represents the response from Jaeger's /api/traces endpoint type JaegerTracesResponse struct { - Data []any `json:"data"` - Errors []any `json:"errors"` + Data []JaegerTrace `json:"data"` + Errors []any `json:"errors"` } func TestMain(m *testing.M) { @@ -139,3 +159,101 @@ func TestJaeger2TracesHaveSpans(t *testing.T) { t.Logf("Found %d traces for service alpha1 in Jaeger 2.x", len(traces.Data)) require.NotEmpty(t, traces.Data, "Should have traces for alpha1 service in Jaeger 2.x") } + +// TestCrossServiceTraceContext verifies that traces propagate correctly between alpha and zero. +// A mutation triggers alpha->zero communication (for timestamps/commit), and both services +// should appear in the same trace with the same trace ID. +func TestCrossServiceTraceContext(t *testing.T) { + jaegerAddr := testutil.ContainerAddr("jaeger", 16686) + alphaAddr := testutil.ContainerAddr("alpha1", 9080) + + dg, err := testutil.DgraphClient(alphaAddr) + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + // Run a mutation - this triggers alpha->zero communication for timestamps and commit + _, err = dg.NewTxn().Mutate(ctx, &api.Mutation{ + SetNquads: []byte(`_:cross "cross-service-trace-test-v2" .`), + CommitNow: true, + }) + require.NoError(t, err) + + // Give Jaeger time to process traces + time.Sleep(5 * time.Second) + + // Verify both services are registered + servicesURL := fmt.Sprintf("http://%s/api/services", jaegerAddr) + resp, err := http.Get(servicesURL) + require.NoError(t, err) + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + + var services JaegerServicesResponse + err = json.Unmarshal(body, &services) + require.NoError(t, err) + + t.Logf("Registered services: %v", services.Data) + + // Check both alpha1 and zero1 are registered + foundAlpha := false + foundZero := false + for _, svc := range services.Data { + if svc == "alpha1" { + foundAlpha = true + } + if svc == "zero1" { + foundZero = true + } + } + require.True(t, foundAlpha, "Service 'alpha1' should be registered") + require.True(t, foundZero, "Service 'zero1' should be registered") + + // Query for traces from alpha1 + tracesURL := fmt.Sprintf("http://%s/api/traces?service=alpha1&limit=50", jaegerAddr) + resp, err = http.Get(tracesURL) + require.NoError(t, err) + defer resp.Body.Close() + + body, err = io.ReadAll(resp.Body) + require.NoError(t, err) + + var traces JaegerTracesResponse + err = json.Unmarshal(body, &traces) + require.NoError(t, err) + + require.NotEmpty(t, traces.Data, "Should have traces for alpha1") + + // Look for traces that contain both alpha1 and zero1 spans + // This proves cross-service trace context propagation is working + multiServiceTraceFound := false + for _, trace := range traces.Data { + servicesInTrace := make(map[string]bool) + for _, span := range trace.Spans { + if proc, ok := trace.Processes[span.ProcessID]; ok { + servicesInTrace[proc.ServiceName] = true + } + } + + hasAlpha := servicesInTrace["alpha1"] + hasZero := servicesInTrace["zero1"] + + if hasAlpha && hasZero { + multiServiceTraceFound = true + t.Logf("Found cross-service trace %s with services: %v", trace.TraceID, servicesInTrace) + + // Verify all spans share the same trace ID - this proves context propagation + for _, span := range trace.Spans { + require.Equal(t, trace.TraceID, span.TraceID, + "All spans in cross-service trace must share the same trace ID") + } + break + } + } + + require.True(t, multiServiceTraceFound, + "Should find at least one trace containing both alpha1 and zero1 spans (proves context propagation)") +}