diff --git a/conn/node.go b/conn/node.go index 48a5aa75f1e..ff52be912fe 100644 --- a/conn/node.go +++ b/conn/node.go @@ -21,7 +21,8 @@ import ( "github.com/pkg/errors" "go.etcd.io/etcd/raft/v3" "go.etcd.io/etcd/raft/v3/raftpb" - otrace "go.opencensus.io/trace" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/trace" "google.golang.org/protobuf/proto" "github.com/dgraph-io/badger/v4/y" @@ -432,7 +433,7 @@ func (n *Node) doSendMessage(to uint64, msgCh chan []byte) error { } c := pb.NewRaftClient(pool.Get()) - ctx, span := otrace.StartSpan(context.Background(), + ctx, span := otel.Tracer("").Start(context.Background(), fmt.Sprintf("RaftMessage-%d-to-%d", n.Id, to)) defer span.End() @@ -473,15 +474,15 @@ func (n *Node) doSendMessage(to uint64, msgCh chan []byte) error { Payload: &api.Payload{Data: data}, } slurp(batch) // Pick up more entries from msgCh, if present. - span.Annotatef(nil, "[to: %x] [Packets: %d] Sending data of length: %d.", - to, packets, len(batch.Payload.Data)) + span.AddEvent(fmt.Sprintf("[to: %x] [Packets: %d] Sending data of length: %d.", + to, packets, len(batch.Payload.Data))) if packets%10000 == 0 { glog.V(2).Infof("[to: %x] [Packets: %d] Sending data of length: %d.", to, packets, len(batch.Payload.Data)) } packets++ if err := mc.Send(batch); err != nil { - span.Annotatef(nil, "Error while mc.Send: %v", err) + span.AddEvent(fmt.Sprintf("Error while mc.Send: %v", err)) glog.Errorf("[to: %x] Error while mc.Send: %v", to, err) switch { case strings.Contains(err.Error(), "TransientFailure"): @@ -513,8 +514,7 @@ func (n *Node) doSendMessage(to uint64, msgCh chan []byte) error { } case <-ticker.C: if lastPackets == packets { - span.Annotatef(nil, - "No activity for a while [Packets == %d]. Closing connection.", packets) + span.AddEvent(fmt.Sprintf("No activity for a while [Packets == %d]. Closing connection.", packets)) return mc.CloseSend() } lastPackets = packets @@ -640,8 +640,8 @@ var readIndexOk, readIndexTotal uint64 // WaitLinearizableRead waits until a linearizable read can be performed. func (n *Node) WaitLinearizableRead(ctx context.Context) error { - span := otrace.FromContext(ctx) - span.Annotate(nil, "WaitLinearizableRead") + span := trace.SpanFromContext(ctx) + span.AddEvent("WaitLinearizableRead") if num := atomic.AddUint64(&readIndexTotal, 1); num%1000 == 0 { glog.V(2).Infof("ReadIndex Total: %d\n", num) @@ -649,25 +649,25 @@ func (n *Node) WaitLinearizableRead(ctx context.Context) error { indexCh := make(chan uint64, 1) select { case n.requestCh <- linReadReq{indexCh: indexCh}: - span.Annotate(nil, "Pushed to requestCh") + span.AddEvent("Pushed to requestCh") case <-ctx.Done(): - span.Annotate(nil, "Context expired") + span.AddEvent("Context expired") return ctx.Err() } select { case index := <-indexCh: - span.Annotatef(nil, "Received index: %d", index) + span.AddEvent(fmt.Sprintf("Received index %d", index)) if index == 0 { return errReadIndex } else if num := atomic.AddUint64(&readIndexOk, 1); num%1000 == 0 { glog.V(2).Infof("ReadIndex OK: %d\n", num) } err := n.Applied.WaitForMark(ctx, index) - span.Annotatef(nil, "Error from Applied.WaitForMark: %v", err) + span.AddEvent(fmt.Sprintf("Error from Applied.WaitForMark: %v", err)) return err case <-ctx.Done(): - span.Annotate(nil, "Context expired") + span.AddEvent("Context expired") return ctx.Err() } } diff --git a/conn/pool.go b/conn/pool.go index 25f9abf8cbc..30a0ca0ca37 100644 --- a/conn/pool.go +++ b/conn/pool.go @@ -14,7 +14,7 @@ import ( "github.com/golang/glog" "github.com/pkg/errors" - "go.opencensus.io/plugin/ocgrpc" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" @@ -159,7 +159,7 @@ func (p *Pools) Connect(addr string, tlsClientConf *tls.Config) *Pool { // newPool creates a new "pool" with one gRPC connection, refcount 0. func newPool(addr string, tlsClientConf *tls.Config) (*Pool, error) { conOpts := []grpc.DialOption{ - grpc.WithStatsHandler(&ocgrpc.ClientHandler{}), + grpc.WithStatsHandler(otelgrpc.NewClientHandler()), grpc.WithDefaultCallOptions( grpc.MaxCallRecvMsgSize(x.GrpcMaxSize), grpc.MaxCallSendMsgSize(x.GrpcMaxSize), diff --git a/conn/raft_server.go b/conn/raft_server.go index 6dc527d97a1..2c96e2f0e00 100644 --- a/conn/raft_server.go +++ b/conn/raft_server.go @@ -8,6 +8,7 @@ package conn import ( "context" "encoding/binary" + "fmt" "math/rand" "strconv" "sync" @@ -17,7 +18,7 @@ import ( "github.com/golang/glog" "github.com/pkg/errors" "go.etcd.io/etcd/raft/v3/raftpb" - otrace "go.opencensus.io/trace" + "go.opentelemetry.io/otel/trace" "github.com/dgraph-io/dgo/v240/protos/api" "github.com/hypermodeinc/dgraph/v24/protos/pb" @@ -183,13 +184,13 @@ func (w *RaftServer) RaftMessage(server pb.Raft_RaftMessageServer) error { if ctx.Err() != nil { return ctx.Err() } - span := otrace.FromContext(ctx) + span := trace.SpanFromContext(ctx) node := w.GetNode() if node == nil || node.Raft() == nil { return ErrNoNode } - span.Annotatef(nil, "Stream server is node %#x", node.Id) + span.AddEvent(fmt.Sprintf("Stream server is node %d", node.Id)) var rc *pb.RaftContext raft := node.Raft() @@ -247,7 +248,7 @@ func (w *RaftServer) RaftMessage(server pb.Raft_RaftMessageServer) error { } if loop == 1 { rc = batch.GetContext() - span.Annotatef(nil, "Stream from %#x", rc.GetId()) + span.AddEvent(fmt.Sprintf("Stream from %#x", rc.GetId())) if rc != nil { node.Connect(rc.Id, rc.Addr) } diff --git a/dgraph/cmd/alpha/run.go b/dgraph/cmd/alpha/run.go index 16d6f05bf67..b2f78263522 100644 --- a/dgraph/cmd/alpha/run.go +++ b/dgraph/cmd/alpha/run.go @@ -28,8 +28,7 @@ import ( "github.com/pkg/errors" "github.com/spf13/cobra" "go.opencensus.io/plugin/ocgrpc" - otrace "go.opencensus.io/trace" - "go.opencensus.io/zpages" + "go.opentelemetry.io/contrib/zpages" "golang.org/x/net/trace" "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -509,7 +508,7 @@ func setupServer(closer *z.Closer) { baseMux.HandleFunc("/health", healthCheck) baseMux.HandleFunc("/state", stateHandler) baseMux.HandleFunc("/debug/jemalloc", x.JemallocHandler) - zpages.Handle(baseMux, "/debug/z") + http.DefaultServeMux.Handle("/debug/z", zpages.NewTracezHandler(zpages.NewSpanProcessor())) // TODO: Figure out what this is for? http.HandleFunc("/debug/store", storeStatsHandler) @@ -769,10 +768,6 @@ func run() { return true, true } } - otrace.ApplyConfig(otrace.Config{ - DefaultSampler: otrace.ProbabilitySampler(x.WorkerConfig.Trace.GetFloat64("ratio")), - MaxAnnotationEventsPerSpan: 256, - }) // Posting will initialize index which requires schema. Hence, initialize // schema before calling posting.Init(). diff --git a/dgraph/cmd/zero/assign.go b/dgraph/cmd/zero/assign.go index 0bac008870d..a0b6d10d66e 100644 --- a/dgraph/cmd/zero/assign.go +++ b/dgraph/cmd/zero/assign.go @@ -7,12 +7,14 @@ package zero import ( "context" + "fmt" "math/rand" "time" "github.com/golang/glog" "github.com/pkg/errors" - otrace "go.opencensus.io/trace" + "go.opentelemetry.io/otel" + attribute "go.opentelemetry.io/otel/attribute" "google.golang.org/grpc/metadata" "github.com/hypermodeinc/dgraph/v24/protos/pb" @@ -175,7 +177,7 @@ func (s *Server) AssignIds(ctx context.Context, num *pb.Num) (*pb.AssignedIds, e if ctx.Err() != nil { return &emptyAssignedIds, ctx.Err() } - ctx, span := otrace.StartSpan(ctx, "Zero.AssignIds") + ctx, span := otel.Tracer("").Start(ctx, "Zero.AssignIds") defer span.End() rateLimit := func() error { @@ -215,11 +217,13 @@ func (s *Server) AssignIds(ctx context.Context, num *pb.Num) (*pb.AssignedIds, e if err := rateLimit(); err != nil { return err } - span.Annotatef(nil, "Zero leader leasing %d ids", num.GetVal()) + span.SetAttributes(attribute.String("tablet", "predicate")) + span.AddEvent(fmt.Sprintf("Zero leader leasing %d ids", num.GetVal())) reply, err = s.lease(ctx, num) return err } - span.Annotate(nil, "Not Zero leader") + span.SetAttributes(attribute.String("Not Zero leader", "true")) + span.AddEvent("Not Zero leader") // I'm not the leader and this request was forwarded to me by a peer, who thought I'm the // leader. if num.Forwarded { @@ -230,7 +234,7 @@ func (s *Server) AssignIds(ctx context.Context, num *pb.Num) (*pb.AssignedIds, e if pl == nil { return errors.Errorf("No healthy connection found to Leader of group zero") } - span.Annotatef(nil, "Sending request to %v", pl.Addr) + span.AddEvent(fmt.Sprintf("Sending request to %v", pl.Addr)) zc := pb.NewZeroClient(pl.Get()) num.Forwarded = true // pass on the incoming metadata to the zero leader. @@ -269,7 +273,7 @@ func (s *Server) AssignIds(ctx context.Context, num *pb.Num) (*pb.AssignedIds, e case <-ctx.Done(): return &emptyAssignedIds, ctx.Err() case err := <-c: - span.Annotatef(nil, "Error while leasing %+v: %v", num, err) + span.AddEvent(fmt.Sprintf("Error while leasing %+v: %v", num, err)) return reply, err } } diff --git a/dgraph/cmd/zero/oracle.go b/dgraph/cmd/zero/oracle.go index 200d9d1fcb5..c72180c0d24 100644 --- a/dgraph/cmd/zero/oracle.go +++ b/dgraph/cmd/zero/oracle.go @@ -7,6 +7,7 @@ package zero import ( "context" + "fmt" "math/rand" "strconv" "strings" @@ -14,7 +15,9 @@ import ( "github.com/golang/glog" "github.com/pkg/errors" - otrace "go.opencensus.io/trace" + "go.opentelemetry.io/otel" + attribute "go.opentelemetry.io/otel/attribute" + trace "go.opentelemetry.io/otel/trace" "github.com/dgraph-io/badger/v4/y" "github.com/dgraph-io/dgo/v240/protos/api" @@ -337,8 +340,8 @@ func (s *Server) proposeTxn(ctx context.Context, src *api.TxnContext) error { } func (s *Server) commit(ctx context.Context, src *api.TxnContext) error { - span := otrace.FromContext(ctx) - span.Annotate([]otrace.Attribute{otrace.Int64Attribute("startTs", int64(src.StartTs))}, "") + span := trace.SpanFromContext(ctx) + span.SetAttributes(attribute.Int64("startTs", int64(src.StartTs))) if src.Aborted { return s.proposeTxn(ctx, src) } @@ -348,8 +351,7 @@ func (s *Server) commit(ctx context.Context, src *api.TxnContext) error { conflict := s.orc.hasConflict(src) s.orc.RUnlock() if conflict { - span.Annotate([]otrace.Attribute{otrace.BoolAttribute("abort", true)}, - "Oracle found conflict") + span.SetAttributes(attribute.Bool("abort", true)) src.Aborted = true return s.proposeTxn(ctx, src) } @@ -384,7 +386,7 @@ func (s *Server) commit(ctx context.Context, src *api.TxnContext) error { return nil } if err := checkPreds(); err != nil { - span.Annotate([]otrace.Attribute{otrace.BoolAttribute("abort", true)}, err.Error()) + span.SetAttributes(attribute.Bool("abort", true)) src.Aborted = true return s.proposeTxn(ctx, src) } @@ -397,15 +399,16 @@ func (s *Server) commit(ctx context.Context, src *api.TxnContext) error { src.CommitTs = assigned.StartId // Mark the transaction as done, irrespective of whether the proposal succeeded or not. defer s.orc.doneUntil.Done(src.CommitTs) - span.Annotatef([]otrace.Attribute{otrace.Int64Attribute("commitTs", int64(src.CommitTs))}, - "Node Id: %d. Proposing TxnContext: %+v", s.Node.Id, src) + span.SetAttributes(attribute.Int64("commitTs", int64(src.CommitTs))) + span.SetAttributes(attribute.Int64("nodeId", int64(s.Node.Id))) + span.AddEvent(fmt.Sprintf("TXN Context: %+v", src)) if err := s.orc.commit(src); err != nil { - span.Annotatef(nil, "Found a conflict. Aborting.") + span.SetAttributes(attribute.Bool("abort", true)) src.Aborted = true } if err := ctx.Err(); err != nil { - span.Annotatef(nil, "Aborting txn due to context timing out.") + span.SetAttributes(attribute.Bool("abort", true)) src.Aborted = true } // Propose txn should be used to set watermark as done. @@ -420,7 +423,7 @@ func (s *Server) CommitOrAbort(ctx context.Context, src *api.TxnContext) (*api.T if ctx.Err() != nil { return nil, ctx.Err() } - ctx, span := otrace.StartSpan(ctx, "Zero.CommitOrAbort") + ctx, span := otel.Tracer("").Start(ctx, "Zero.CommitOrAbort") defer span.End() if !s.Node.AmLeader() { @@ -428,7 +431,7 @@ func (s *Server) CommitOrAbort(ctx context.Context, src *api.TxnContext) (*api.T } err := s.commit(ctx, src) if err != nil { - span.Annotate([]otrace.Attribute{otrace.BoolAttribute("error", true)}, err.Error()) + span.SetAttributes(attribute.Bool("error", true)) } return src, err } @@ -491,17 +494,18 @@ func (s *Server) TryAbort(ctx context.Context, // Timestamps is used to assign startTs for a new transaction func (s *Server) Timestamps(ctx context.Context, num *pb.Num) (*pb.AssignedIds, error) { - ctx, span := otrace.StartSpan(ctx, "Zero.Timestamps") + ctx, span := otel.Tracer("").Start(ctx, "Zero.Timestamps") defer span.End() - span.Annotatef(nil, "Zero id: %d. Timestamp request: %+v", s.Node.Id, num) + span.SetAttributes(attribute.Int64("zeroId", int64(s.Node.Id))) + span.SetAttributes(attribute.String("timestampRequest", fmt.Sprintf("%+v", num))) if ctx.Err() != nil { return &emptyAssignedIds, ctx.Err() } num.Type = pb.Num_TXN_TS reply, err := s.lease(ctx, num) - span.Annotatef(nil, "Response: %+v. Error: %v", reply, err) + span.AddEvent(fmt.Sprintf("Response: %+v, Error: %v", reply, err)) switch err { case nil: diff --git a/dgraph/cmd/zero/raft.go b/dgraph/cmd/zero/raft.go index b7b3d76ecc3..e3e6d5d75e6 100644 --- a/dgraph/cmd/zero/raft.go +++ b/dgraph/cmd/zero/raft.go @@ -25,7 +25,9 @@ import ( "go.etcd.io/etcd/raft/v3/raftpb" ostats "go.opencensus.io/stats" "go.opencensus.io/tag" - otrace "go.opencensus.io/trace" + "go.opentelemetry.io/otel" + attribute "go.opentelemetry.io/otel/attribute" + trace "go.opentelemetry.io/otel/trace" "google.golang.org/protobuf/proto" "github.com/dgraph-io/ristretto/v2/z" @@ -106,9 +108,9 @@ func (n *node) proposeAndWait(ctx context.Context, proposal *pb.ZeroProposal) er // We could consider adding a wrapper around the user proposal, so we can access any key-values. // Something like this: // https://github.com/golang/go/commit/5d39260079b5170e6b4263adb4022cc4b54153c4 - span := otrace.FromContext(ctx) + span := trace.SpanFromContext(ctx) // Overwrite ctx, so we no longer enforce the timeouts or cancels from ctx. - ctx = otrace.NewContext(context.Background(), span) + ctx = trace.ContextWithSpan(context.Background(), span) stop := x.SpanTimer(span, "n.proposeAndWait") defer stop() @@ -136,7 +138,9 @@ func (n *node) proposeAndWait(ctx context.Context, proposal *pb.ZeroProposal) er key = n.uniqueKey() } defer n.Proposals.Delete(key) - span.Annotatef(nil, "Proposing with key: %d. Timeout: %v", key, timeout) + span.AddEvent("Proposing with key: %d. Timeout: %v", trace.WithAttributes( + attribute.Int64("key", int64(key)), + attribute.Int64("timeout", int64(timeout)))) sz := proto.Size(proposal) data := make([]byte, 8+sz) @@ -148,7 +152,8 @@ func (n *node) proposeAndWait(ctx context.Context, proposal *pb.ZeroProposal) er data = data[:8+sz] // Propose the change. if err := n.Raft().Propose(cctx, data); err != nil { - span.Annotatef(nil, "Error while proposing via Raft: %v", err) + span.AddEvent("Error while proposing via Raft: %v", trace.WithAttributes( + attribute.String("error", err.Error()))) return errors.Wrapf(err, "While proposing") } @@ -158,7 +163,7 @@ func (n *node) proposeAndWait(ctx context.Context, proposal *pb.ZeroProposal) er // We arrived here by a call to n.props.Done(). return err case <-cctx.Done(): - span.Annotatef(nil, "Internal context timeout %s. Will retry...", timeout) + span.AddEvent(fmt.Sprintf("Internal context timeout %s. Will retry...", timeout)) return errInternalRetry } } @@ -399,7 +404,7 @@ func (n *node) applyProposal(e raftpb.Entry) (uint64, error) { if err := proto.Unmarshal(e.Data[8:], &p); err != nil { return key, err } - span := otrace.FromContext(n.Proposals.Ctx(key)) + span := trace.SpanFromContext(n.Proposals.Ctx(key)) n.server.Lock() defer n.server.Unlock() @@ -428,14 +433,14 @@ func (n *node) applyProposal(e raftpb.Entry) (uint64, error) { } if p.Member != nil { if err := n.handleMemberProposal(p.Member); err != nil { - span.Annotatef(nil, "While applying membership proposal: %+v", err) + span.AddEvent(fmt.Sprintf("While applying membership proposal: %+v", err)) glog.Errorf("While applying membership proposal: %+v", err) return key, err } } if p.Tablet != nil { if err := n.handleTabletProposal(p.Tablet); err != nil { - span.Annotatef(nil, "While applying tablet proposal: %v", err) + span.AddEvent(fmt.Sprintf("While applying tablet proposal: %v", err)) glog.Errorf("While applying tablet proposal: %v", err) return key, err } @@ -443,7 +448,7 @@ func (n *node) applyProposal(e raftpb.Entry) (uint64, error) { if len(p.Tablets) > 0 { if err := n.handleBulkTabletProposal(p.Tablets); err != nil { - span.Annotatef(nil, "While applying bulk tablet proposal: %v", err) + span.AddEvent(fmt.Sprintf("While applying bulk tablet proposal: %v", err)) glog.Errorf("While applying bulk tablet proposal: %v", err) return key, err } @@ -698,8 +703,6 @@ func (n *node) updateZeroMembershipPeriodically(closer *z.Closer) { } } -var startOption = otrace.WithSampler(otrace.ProbabilitySampler(0.01)) - func (n *node) checkQuorum(closer *z.Closer) { defer closer.Done() ticker := time.NewTicker(time.Second) @@ -710,9 +713,9 @@ func (n *node) checkQuorum(closer *z.Closer) { ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() - ctx, span := otrace.StartSpan(ctx, "Zero.checkQuorum", startOption) + ctx, span := otel.Tracer("").Start(ctx, "Zero.checkQuorum") defer span.End() - span.Annotatef(nil, "Node id: %d", n.Id) + span.SetAttributes(attribute.String("Node id", fmt.Sprintf("%d", n.Id))) if state, err := n.server.latestMembershipState(ctx); err == nil { n.mu.Lock() @@ -720,10 +723,10 @@ func (n *node) checkQuorum(closer *z.Closer) { n.mu.Unlock() // Also do some connection cleanup. conn.GetPools().RemoveInvalid(state) - span.Annotate(nil, "Updated lastQuorum") + span.AddEvent("Updated lastQuorum") } else if glog.V(1) { - span.Annotatef(nil, "Got error: %v", err) + span.AddEvent(fmt.Sprintf("Got error: %v", err)) glog.Warningf("Zero node: %#x unable to reach quorum. Error: %v", n.Id, err) } } @@ -771,8 +774,7 @@ func (n *node) calculateAndProposeSnapshot() error { return nil } - _, span := otrace.StartSpan(n.ctx, "Calculate.Snapshot", - otrace.WithSampler(otrace.AlwaysSample())) + _, span := otel.Tracer("").Start(n.ctx, "Calculate.Snapshot") defer span.End() // We calculate the minimum timestamp from all the group's maxAssigned. @@ -785,12 +787,12 @@ func (n *node) calculateAndProposeSnapshot() error { " Num groups: %d, Num checkpoints: %d\n", len(s.state.Groups), len(s.checkpointPerGroup)) s.RUnlock() - span.Annotatef(nil, log) + span.AddEvent(log) glog.Infof(log) return nil } for gid, ts := range s.checkpointPerGroup { - span.Annotatef(nil, "Group: %d Checkpoint Ts: %d", gid, ts) + span.AddEvent(fmt.Sprintf("Group: %d Checkpoint Ts: %d", gid, ts)) discardBelow = x.Min(discardBelow, ts) } s.RUnlock() @@ -798,23 +800,23 @@ func (n *node) calculateAndProposeSnapshot() error { first, err := n.Store.FirstIndex() if err != nil { - span.Annotatef(nil, "FirstIndex error: %v", err) + span.AddEvent(fmt.Sprintf("FirstIndex error: %v", err)) return err } last, err := n.Store.LastIndex() if err != nil { - span.Annotatef(nil, "LastIndex error: %v", err) + span.AddEvent(fmt.Sprintf("LastIndex error: %v", err)) return err } - span.Annotatef(nil, "First index: %d. Last index: %d. Discard Below Ts: %d", - first, last, discardBelow) + span.AddEvent(fmt.Sprintf("First index: %d. Last index: %d. Discard Below Ts: %d", + first, last, discardBelow)) var snapshotIndex uint64 for batchFirst := first; batchFirst <= last; { entries, err := n.Store.Entries(batchFirst, last+1, 256<<20) if err != nil { - span.Annotatef(nil, "Error: %v", err) + span.AddEvent(fmt.Sprintf("Error: %v", err)) return err } // Exit early from the loop if no entries were found. @@ -827,7 +829,7 @@ func (n *node) calculateAndProposeSnapshot() error { } var p pb.ZeroProposal if err := proto.Unmarshal(entry.Data[8:], &p); err != nil { - span.Annotatef(nil, "Error: %v", err) + span.AddEvent(fmt.Sprintf("Error: %v", err)) return err } if txn := p.Txn; txn != nil { @@ -841,7 +843,7 @@ func (n *node) calculateAndProposeSnapshot() error { if snapshotIndex == 0 { return nil } - span.Annotatef(nil, "Taking snapshot at index: %d", snapshotIndex) + span.AddEvent(fmt.Sprintf("Taking snapshot at index: %d", snapshotIndex)) state := n.server.membershipState() zs := &pb.ZeroSnapshot{ @@ -854,10 +856,10 @@ func (n *node) calculateAndProposeSnapshot() error { zp := &pb.ZeroProposal{Snapshot: zs} if err = n.proposeAndWait(n.ctx, zp); err != nil { glog.Errorf("Error while proposing snapshot: %v\n", err) - span.Annotatef(nil, "Error while proposing snapshot: %v", err) + span.AddEvent(fmt.Sprintf("Error while proposing snapshot: %v", err)) return err } - span.Annotatef(nil, "Snapshot proposed: Done") + span.AddEvent("Snapshot proposed: Done") return nil } @@ -905,14 +907,13 @@ func (n *node) Run() { n.Raft().Tick() case rd := <-n.Raft().Ready(): timer.Start() - _, span := otrace.StartSpan(n.ctx, "Zero.RunLoop", - otrace.WithSampler(otrace.ProbabilitySampler(0.001))) + _, span := otel.Tracer("").Start(n.ctx, "Zero.RunLoop") for _, rs := range rd.ReadStates { // No need to use select-case-default on pushing to readStateCh. It is typically // empty. readStateCh <- rs } - span.Annotatef(nil, "Pushed %d readstates", len(rd.ReadStates)) + span.AddEvent(fmt.Sprintf("Pushed %d readstates", len(rd.ReadStates))) if rd.SoftState != nil { if rd.RaftState == raft.StateLeader && !leader { @@ -948,7 +949,7 @@ func (n *node) Run() { } n.SaveToStorage(&rd.HardState, rd.Entries, &rd.Snapshot) timer.Record("disk") - span.Annotatef(nil, "Saved to storage") + span.AddEvent("Saved to storage") for x.WorkerConfig.HardSync && rd.MustSync { if err := n.Store.Sync(); err != nil { glog.Errorf("Error while calling Store.Sync: %v", err) @@ -997,7 +998,7 @@ func (n *node) Run() { } n.Applied.Done(entry.Index) } - span.Annotatef(nil, "Applied %d CommittedEntries", len(rd.CommittedEntries)) + span.AddEvent(fmt.Sprintf("Applied %d CommittedEntries", len(rd.CommittedEntries))) if !leader { // Followers should send messages later. @@ -1005,11 +1006,11 @@ func (n *node) Run() { n.Send(&rd.Messages[i]) } } - span.Annotate(nil, "Sent messages") + span.AddEvent("Sent messages") timer.Record("proposals") n.Raft().Advance() - span.Annotate(nil, "Advanced Raft") + span.AddEvent("Advanced Raft") timer.Record("advance") span.End() diff --git a/dgraph/cmd/zero/run.go b/dgraph/cmd/zero/run.go index 3548e96980a..d28e309ee59 100644 --- a/dgraph/cmd/zero/run.go +++ b/dgraph/cmd/zero/run.go @@ -20,10 +20,8 @@ import ( "github.com/golang/glog" "github.com/spf13/cobra" - "go.opencensus.io/plugin/ocgrpc" - otrace "go.opencensus.io/trace" - "go.opencensus.io/zpages" - "golang.org/x/net/trace" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + "go.opentelemetry.io/contrib/zpages" "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -144,7 +142,7 @@ func (st *state) serveGRPC(l net.Listener, store *raftwal.DiskStorage) { grpc.MaxRecvMsgSize(x.GrpcMaxSize), grpc.MaxSendMsgSize(x.GrpcMaxSize), grpc.MaxConcurrentStreams(1000), - grpc.StatsHandler(&ocgrpc.ServerHandler{}), + grpc.StatsHandler(otelgrpc.NewClientHandler()), grpc.UnaryInterceptor(audit.AuditRequestGRPC), } @@ -240,13 +238,6 @@ func run() { opts.numReplicas) } - if Zero.Conf.GetBool("expose_trace") { - // TODO: Remove this once we get rid of event logs. - trace.AuthRequest = func(req *http.Request) (any, sensitive bool) { - return true, true - } - } - if opts.audit != nil { wd, err := filepath.Abs(opts.w) x.Check(err) @@ -261,10 +252,6 @@ func run() { opts.rebalanceInterval) } - grpc.EnableTracing = false - otrace.ApplyConfig(otrace.Config{ - DefaultSampler: otrace.ProbabilitySampler(Zero.Conf.GetFloat64("trace"))}) - addr := "localhost" if opts.bindall { addr = "0.0.0.0" @@ -315,7 +302,7 @@ func run() { baseMux.HandleFunc("/assign", st.assign) } baseMux.HandleFunc("/debug/jemalloc", x.JemallocHandler) - zpages.Handle(baseMux, "/debug/z") + http.DefaultServeMux.Handle("/debug/z", zpages.NewTracezHandler(zpages.NewSpanProcessor())) // This must be here. It does not work if placed before Grpc init. x.Check(st.node.initAndStartNode()) diff --git a/dgraph/cmd/zero/tablet.go b/dgraph/cmd/zero/tablet.go index 85adb9863db..1d05708c5cc 100644 --- a/dgraph/cmd/zero/tablet.go +++ b/dgraph/cmd/zero/tablet.go @@ -14,7 +14,8 @@ import ( humanize "github.com/dustin/go-humanize" "github.com/golang/glog" "github.com/pkg/errors" - otrace "go.opencensus.io/trace" + attribute "go.opentelemetry.io/otel/attribute" + trace "go.opentelemetry.io/otel/trace" "github.com/hypermodeinc/dgraph/v24/protos/pb" "github.com/hypermodeinc/dgraph/v24/x" @@ -120,7 +121,7 @@ func (s *Server) movePredicate(predicate string, srcGroup, dstGroup uint32) erro ctx, cancel := context.WithTimeout(context.Background(), predicateMoveTimeout) defer cancel() - ctx, span := otrace.StartSpan(ctx, "Zero.MovePredicate") + span := trace.SpanFromContext(ctx) defer span.End() // Ensure that reserved predicates cannot be moved. @@ -143,7 +144,8 @@ func (s *Server) movePredicate(predicate string, srcGroup, dstGroup uint32) erro " from group %d to %d\n", predicate, humanize.IBytes(uint64(tab.OnDiskBytes)), humanize.IBytes(uint64(tab.UncompressedBytes)), srcGroup, dstGroup) glog.Info(msg) - span.Annotate([]otrace.Attribute{otrace.StringAttribute("tablet", predicate)}, msg) + span.SetAttributes(attribute.String("tablet", predicate)) + span.SetStatus(1, msg) // Block all commits on this predicate. Keep them blocked until we return from this function. unblock := s.blockTablet(predicate) @@ -168,7 +170,7 @@ func (s *Server) movePredicate(predicate string, srcGroup, dstGroup uint32) erro DestGid: dstGroup, TxnTs: ids.StartId, } - span.Annotatef(nil, "Starting move: %+v", in) + span.AddEvent(fmt.Sprintf("Move Predicate payload: %+v", in)) glog.Infof("Starting move: %+v", in) if _, err := wc.MovePredicate(ctx, in); err != nil { return errors.Wrapf(err, "while calling MovePredicate") @@ -184,15 +186,15 @@ func (s *Server) movePredicate(predicate string, srcGroup, dstGroup uint32) erro MoveTs: in.TxnTs, } msg = fmt.Sprintf("Move at Alpha done. Now proposing: %+v", p) - span.Annotate(nil, msg) + span.AddEvent(fmt.Sprintf("Zero proposal: %+v", p)) glog.Info(msg) if err := s.Node.proposeAndWait(ctx, p); err != nil { return errors.Wrapf(err, "while proposing tablet reassignment. Proposal: %+v", p) } msg = fmt.Sprintf("Predicate move done for: [%v] from group %d to %d\n", predicate, srcGroup, dstGroup) + span.AddEvent(msg) glog.Info(msg) - span.Annotate(nil, msg) // Now that the move has happened, we can delete the predicate from the source group. But before // doing that, we should ensure the source group understands that the predicate is now being @@ -205,11 +207,11 @@ func (s *Server) movePredicate(predicate string, srcGroup, dstGroup uint32) erro if _, err := wc.MovePredicate(ctx, in); err != nil { msg = fmt.Sprintf("While deleting predicate [%v] in group %d. Error: %v", in.Predicate, in.SourceGid, err) - span.Annotate(nil, msg) + span.AddEvent(msg) glog.Warningf(msg) } else { msg = fmt.Sprintf("Deleted predicate %v in group %d", in.Predicate, in.SourceGid) - span.Annotate(nil, msg) + span.AddEvent(msg) glog.V(1).Infof(msg) } return nil diff --git a/dgraph/cmd/zero/zero.go b/dgraph/cmd/zero/zero.go index 647bb6f5cdc..ad28edd3b68 100644 --- a/dgraph/cmd/zero/zero.go +++ b/dgraph/cmd/zero/zero.go @@ -8,13 +8,15 @@ package zero import ( "context" "crypto/tls" + "fmt" "math" "sync" "time" "github.com/golang/glog" "github.com/pkg/errors" - otrace "go.opencensus.io/trace" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" "google.golang.org/protobuf/proto" "github.com/dgraph-io/dgo/v240/protos/api" @@ -393,7 +395,7 @@ func (s *Server) createProposals(dst *pb.Group) ([]*pb.ZeroProposal, error) { } func (s *Server) Inform(ctx context.Context, req *pb.TabletRequest) (*pb.TabletResponse, error) { - ctx, span := otrace.StartSpan(ctx, "Zero.Inform") + ctx, span := otel.Tracer("").Start(ctx, "Zero.Inform") defer span.End() if req == nil || len(req.Tablets) == 0 { return nil, errors.Errorf("Tablets are empty in %+v", req) @@ -407,7 +409,7 @@ func (s *Server) Inform(ctx context.Context, req *pb.TabletRequest) (*pb.TabletR unknownTablets := make([]*pb.Tablet, 0) for _, t := range req.Tablets { tab := s.ServingTablet(t.Predicate) - span.Annotatef(nil, "Tablet for %s: %+v", t.Predicate, tab) + span.SetAttributes(attribute.String("tablet_predicate", t.Predicate)) switch { case tab != nil && !t.Force: tablets = append(tablets, t) @@ -441,14 +443,14 @@ func (s *Server) Inform(ctx context.Context, req *pb.TabletRequest) (*pb.TabletR } if err := s.Node.proposeAndWait(ctx, &proposal); err != nil && err != errTabletAlreadyServed { - span.Annotatef(nil, "While proposing tablet: %v", err) + span.AddEvent(fmt.Sprintf("Error proposing tablet: %+v. Error: %v", &proposal, err)) return nil, err } for _, t := range unknownTablets { tab := s.ServingTablet(t.Predicate) x.AssertTrue(tab != nil) - span.Annotatef(nil, "Now serving tablet for %s: %+v", t.Predicate, tab) + span.AddEvent(fmt.Sprintf("Tablet served: %+v", tab)) tablets = append(tablets, tab) } @@ -656,7 +658,7 @@ func (s *Server) DeleteNamespace(ctx context.Context, in *pb.DeleteNsRequest) (* // ShouldServe returns the tablet serving the predicate passed in the request. func (s *Server) ShouldServe( ctx context.Context, tablet *pb.Tablet) (resp *pb.Tablet, err error) { - ctx, span := otrace.StartSpan(ctx, "Zero.ShouldServe") + ctx, span := otel.Tracer("").Start(ctx, "Zero.ShouldServe") defer span.End() if tablet.Predicate == "" { @@ -668,7 +670,7 @@ func (s *Server) ShouldServe( // Check who is serving this tablet. tab := s.ServingTablet(tablet.Predicate) - span.Annotatef(nil, "Tablet for %s: %+v", tablet.Predicate, tab) + span.SetAttributes(attribute.String("tablet_predicate", tablet.Predicate)) if tab != nil && !tablet.Force { // Someone is serving this tablet. Could be the caller as well. // The caller should compare the returned group against the group it holds to check who's @@ -696,12 +698,12 @@ func (s *Server) ShouldServe( } proposal.Tablet = tablet if err := s.Node.proposeAndWait(ctx, &proposal); err != nil && err != errTabletAlreadyServed { - span.Annotatef(nil, "While proposing tablet: %v", err) + span.AddEvent(fmt.Sprintf("Error proposing tablet: %+v. Error: %v", &proposal, err)) return tablet, err } tab = s.ServingTablet(tablet.Predicate) x.AssertTrue(tab != nil) - span.Annotatef(nil, "Now serving tablet for %s: %+v", tablet.Predicate, tab) + span.SetAttributes(attribute.String("tablet_predicate_served", tablet.Predicate)) return tab, nil } diff --git a/edgraph/alter.go b/edgraph/alter.go index 2dfd89ea556..04c72ca9af9 100644 --- a/edgraph/alter.go +++ b/edgraph/alter.go @@ -18,10 +18,12 @@ import ( "github.com/hypermodeinc/dgraph/v24/schema" "github.com/hypermodeinc/dgraph/v24/worker" "github.com/hypermodeinc/dgraph/v24/x" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "github.com/golang/glog" "github.com/pkg/errors" - otrace "go.opencensus.io/trace" "google.golang.org/grpc/status" ) @@ -249,9 +251,9 @@ func executeSetSchema(ctx context.Context, startTs uint64, req *apiv25.AlterRequ // Alter handles requests to change the schema or remove parts or all of the data. func (s *ServerV25) Alter(ctx context.Context, req *apiv25.AlterRequest) (*apiv25.AlterResponse, error) { - ctx, span := otrace.StartSpan(ctx, "ServerV25.Alter") + ctx, span := otel.Tracer("").Start(ctx, "ServerV25.Alter") defer span.End() - span.Annotatef(nil, "Alter operation: %+v", req) + span.AddEvent("Alter operation", trace.WithAttributes(attribute.String("request", req.String()))) // Always print out Alter operations because they are important and rare. glog.Infof("Received ALTER op: %+v", req) diff --git a/edgraph/server.go b/edgraph/server.go index 003688e5957..fbb31bfba2a 100644 --- a/edgraph/server.go +++ b/edgraph/server.go @@ -24,7 +24,9 @@ import ( "github.com/pkg/errors" ostats "go.opencensus.io/stats" "go.opencensus.io/tag" - otrace "go.opencensus.io/trace" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" @@ -365,11 +367,11 @@ func InsertDropRecord(ctx context.Context, dropOp string) error { // Alter handles requests to change the schema or remove parts or all of the data. func (s *Server) Alter(ctx context.Context, op *api.Operation) (*api.Payload, error) { - ctx, span := otrace.StartSpan(ctx, "Server.Alter") + ctx, span := otel.Tracer("").Start(ctx, "Server.Alter") defer span.End() ctx = x.AttachJWTNamespace(ctx) - span.Annotatef(nil, "Alter operation: %+v", op) + span.AddEvent("Alter operation", trace.WithAttributes(attribute.String("op", op.String()))) // Always print out Alter operations because they are important and rare. glog.Infof("Received ALTER op: %+v", op) @@ -539,12 +541,12 @@ func (s *Server) Alter(ctx context.Context, op *api.Operation) (*api.Payload, er return empty, nil } -func annotateNamespace(span *otrace.Span, ns uint64) { - span.AddAttributes(otrace.Int64Attribute("ns", int64(ns))) +func annotateNamespace(span trace.Span, ns uint64) { + span.SetAttributes(attribute.String("ns", fmt.Sprintf("%d", ns))) } -func annotateStartTs(span *otrace.Span, ts uint64) { - span.AddAttributes(otrace.Int64Attribute("startTs", int64(ts))) +func annotateStartTs(span trace.Span, ts uint64) { + span.SetAttributes(attribute.String("startTs", fmt.Sprintf("%d", ts))) } func (s *Server) doMutate(ctx context.Context, qc *queryContext, resp *api.Response) error { @@ -620,9 +622,15 @@ func (s *Server) doMutate(ctx context.Context, qc *queryContext, resp *api.Respo return err } - qc.span.Annotatef(nil, "Applying mutations: %+v", m) + qc.span.AddEvent("Applying mutations", + trace.WithAttributes(attribute.String("m", fmt.Sprintf("%+v", m)))) resp.Txn, err = query.ApplyMutations(ctx, m) - qc.span.Annotatef(nil, "Txn Context: %+v. Err=%v", resp.Txn, err) + qc.span.AddEvent("Txn Context", + trace.WithAttributes(attribute.String("txn", fmt.Sprintf("%+v", resp.Txn)))) + if err != nil { + qc.span.AddEvent("Error", + trace.WithAttributes(attribute.String("err", err.Error()))) + } // calculateMutationMetrics calculate cost for the mutation. calculateMutationMetrics := func() { @@ -659,12 +667,13 @@ func (s *Server) doMutate(ctx context.Context, qc *queryContext, resp *api.Respo return err } - qc.span.Annotatef(nil, "Prewrites err: %v. Attempting to commit/abort immediately.", err) ctxn := resp.Txn // zero would assign the CommitTs cts, err := worker.CommitOverNetwork(ctx, ctxn) - qc.span.Annotatef(nil, "Status of commit at ts: %d: %v", ctxn.StartTs, err) if err != nil { + qc.span.AddEvent("Status of commit at ts", + trace.WithAttributes(attribute.String("err", err.Error()))) + if err == dgo.ErrAborted { err = status.Errorf(codes.Aborted, err.Error()) resp.Txn.Aborted = true @@ -1052,7 +1061,7 @@ type queryContext struct { // l stores latency numbers latency *query.Latency // span stores a opencensus span used throughout the query processing - span *otrace.Span + span trace.Span // graphql indicates whether the given request is from graphql admin or not. graphql bool // gqlField stores the GraphQL field for which the query is being processed. @@ -1282,7 +1291,7 @@ func (s *Server) doQuery(ctx context.Context, req *Request) (resp *api.Response, } var measurements []ostats.Measurement - ctx, span := otrace.StartSpan(ctx, methodRequest) + ctx, span := otel.Tracer("").Start(ctx, methodRequest) if ns, err := x.ExtractNamespace(ctx); err == nil { annotateNamespace(span, ns) } @@ -1307,12 +1316,12 @@ func (s *Server) doQuery(ctx context.Context, req *Request) (resp *api.Response, req.req.Query = strings.TrimSpace(req.req.Query) isQuery := len(req.req.Query) != 0 if !isQuery && !isMutation { - span.Annotate(nil, "empty request") + span.AddEvent("empty request") return nil, errors.Errorf("empty request") } - span.AddAttributes(otrace.StringAttribute("Query", req.req.Query)) - span.Annotatef(nil, "Request received: %v", req.req) + span.AddEvent("Request received", + trace.WithAttributes(attribute.String("Query", req.req.Query))) if isQuery { ostats.Record(ctx, x.PendingQueries.M(1), x.NumQueries.M(1)) defer func() { @@ -1429,11 +1438,11 @@ func processQuery(ctx context.Context, qc *queryContext) (*api.Response, error) // If we haven't processed any updates yet then fall back to getting TS from Zero. switch { case qc.req.BestEffort: - qc.span.Annotate([]otrace.Attribute{otrace.BoolAttribute("be", true)}, "") + qc.span.AddEvent("", trace.WithAttributes(attribute.Bool("be", true))) case qc.req.ReadOnly: - qc.span.Annotate([]otrace.Attribute{otrace.BoolAttribute("ro", true)}, "") + qc.span.AddEvent("", trace.WithAttributes(attribute.Bool("ro", true))) default: - qc.span.Annotate([]otrace.Attribute{otrace.BoolAttribute("no", true)}, "") + qc.span.AddEvent("", trace.WithAttributes(attribute.Bool("no", true))) } if qc.req.BestEffort { @@ -1500,7 +1509,8 @@ func processQuery(ctx context.Context, qc *queryContext) (*api.Response, error) if err != nil && (qc.gqlField == nil || !x.IsGqlErrorList(err)) { return resp, err } - qc.span.Annotatef(nil, "Response = %s", resp.Json) + qc.span.AddEvent("Response", + trace.WithAttributes(attribute.String("response", string(resp.Json)))) // varToUID contains a map of variable name to the uids corresponding to it. // It is used later for constructing set and delete mutations by replacing @@ -1833,7 +1843,7 @@ func validateNamespace(ctx context.Context, tc *api.TxnContext) error { // CommitOrAbort commits or aborts a transaction. func (s *Server) CommitOrAbort(ctx context.Context, tc *api.TxnContext) (*api.TxnContext, error) { - ctx, span := otrace.StartSpan(ctx, "Server.CommitOrAbort") + ctx, span := otel.Tracer("").Start(ctx, "Server.CommitOrAbort") defer span.End() if err := x.HealthCheck(); err != nil { @@ -1854,7 +1864,7 @@ func (s *Server) CommitOrAbort(ctx context.Context, tc *api.TxnContext) (*api.Tx return &api.TxnContext{}, err } - span.Annotatef(nil, "Txn Context received: %+v", tc) + span.AddEvent("Txn Context received", trace.WithAttributes(attribute.Stringer("txn", tc))) commitTs, err := worker.CommitOverNetwork(ctx, tc) if err == dgo.ErrAborted { // If err returned is dgo.ErrAborted and tc.Aborted was set, that means the client has diff --git a/go.mod b/go.mod index 56e7554c1c1..7c9ad5aad01 100644 --- a/go.mod +++ b/go.mod @@ -3,9 +3,7 @@ module github.com/hypermodeinc/dgraph/v24 go 1.23.6 require ( - contrib.go.opencensus.io/exporter/jaeger v0.2.1 contrib.go.opencensus.io/exporter/prometheus v0.4.2 - github.com/DataDog/opencensus-go-exporter-datadog v0.0.0-20220622145613-731d59e8b567 github.com/HdrHistogram/hdrhistogram-go v1.1.2 github.com/IBM/sarama v1.45.1 github.com/Masterminds/semver/v3 v3.3.1 @@ -51,6 +49,13 @@ require ( github.com/xdg/scram v1.0.5 go.etcd.io/etcd/raft/v3 v3.5.21 go.opencensus.io v0.24.0 + go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0 + go.opentelemetry.io/contrib/zpages v0.59.0 + go.opentelemetry.io/otel v1.34.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.34.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.34.0 + go.opentelemetry.io/otel/sdk v1.34.0 + go.opentelemetry.io/otel/trace v1.34.0 go.uber.org/zap v1.27.0 golang.org/x/crypto v0.36.0 golang.org/x/exp v0.0.0-20250305212735-054e65f0b394 @@ -68,7 +73,6 @@ require ( require ( filippo.io/edwards25519 v1.1.0 // indirect - github.com/DataDog/datadog-go v4.8.3+incompatible // indirect github.com/Microsoft/go-winio v0.6.2 // indirect github.com/agnivade/levenshtein v1.2.1 // indirect github.com/beorn7/perks v1.0.1 // indirect @@ -100,6 +104,7 @@ require ( github.com/golang/protobuf v1.5.4 // indirect github.com/google/flatbuffers v25.2.10+incompatible // indirect github.com/google/pprof v0.0.0-20250128161936-077ca0a936bf // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect @@ -131,7 +136,6 @@ require ( github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.1.0 // indirect github.com/pelletier/go-toml/v2 v2.2.3 // indirect - github.com/philhofer/fwd v1.1.3-0.20240916144458-20a13a1f6b7c // indirect github.com/pierrec/lz4/v4 v4.1.22 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/client_model v0.6.1 // indirect @@ -144,21 +148,16 @@ require ( github.com/sourcegraph/conc v0.3.0 // indirect github.com/spf13/afero v1.12.0 // indirect github.com/subosito/gotenv v1.6.0 // indirect - github.com/tinylib/msgp v1.2.5 // indirect - github.com/uber/jaeger-client-go v2.28.0+incompatible // indirect github.com/viterin/partial v1.1.0 // indirect github.com/xdg/stringprep v1.0.3 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.59.0 // indirect - go.opentelemetry.io/otel v1.34.0 // indirect - go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.34.0 // indirect go.opentelemetry.io/otel/metric v1.34.0 // indirect - go.opentelemetry.io/otel/trace v1.34.0 // indirect + go.opentelemetry.io/proto/otlp v1.5.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/time v0.9.0 // indirect - google.golang.org/api v0.219.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20250115164207-1a7da9e5054f // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250127172529-29210b9bc287 // indirect - gopkg.in/DataDog/dd-trace-go.v1 v1.71.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gotest.tools/v3 v3.5.1 // indirect diff --git a/go.sum b/go.sum index dd620eb2636..df3c717a2a2 100644 --- a/go.sum +++ b/go.sum @@ -30,8 +30,6 @@ cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0Zeo cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk= cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs= cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= -contrib.go.opencensus.io/exporter/jaeger v0.2.1 h1:yGBYzYMewVL0yO9qqJv3Z5+IRhPdU7e9o/2oKpX4YvI= -contrib.go.opencensus.io/exporter/jaeger v0.2.1/go.mod h1:Y8IsLgdxqh1QxYxPC5IgXVmBaeLUeQFfBeBi9PbeZd0= contrib.go.opencensus.io/exporter/prometheus v0.4.2 h1:sqfsYl5GIY/L570iT+l93ehxaWJs2/OwXtiWwew3oAg= contrib.go.opencensus.io/exporter/prometheus v0.4.2/go.mod h1:dvEHbiKmgvbr5pjaF9fpw1KeYcjrnC1J8B+JKjsZyRQ= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= @@ -44,11 +42,6 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU= github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU= -github.com/DataDog/datadog-go v3.5.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= -github.com/DataDog/datadog-go v4.8.3+incompatible h1:fNGaYSuObuQb5nzeTQqowRAd9bpDIRRV4/gUtIBjh8Q= -github.com/DataDog/datadog-go v4.8.3+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= -github.com/DataDog/opencensus-go-exporter-datadog v0.0.0-20220622145613-731d59e8b567 h1:Z7zdcyzme2egv0lC43X1Q/+DxHjZflQCnJXX0mDp7+I= -github.com/DataDog/opencensus-go-exporter-datadog v0.0.0-20220622145613-731d59e8b567/go.mod h1:/VV3EFO/hTNQZHAqaj+CPGy2+ioFrP4EX3iRwozubhQ= github.com/HdrHistogram/hdrhistogram-go v1.1.2 h1:5IcZpTvzydCQeHzK4Ef/D5rrSqwxob0t8PQPMybUNFM= github.com/HdrHistogram/hdrhistogram-go v1.1.2/go.mod h1:yDgFjdqOqDEKOvasDdhWNXYg9BVp4O+o5f6V/ehm6Oo= github.com/IBM/sarama v1.45.1 h1:nY30XqYpqyXOXSNoe2XCgjj9jklGM1Ye94ierUb1jQ0= @@ -455,9 +448,6 @@ github.com/paulmach/go.geojson v1.5.0 h1:7mhpMK89SQdHFcEGomT7/LuJhwhEgfmpWYVlVmL github.com/paulmach/go.geojson v1.5.0/go.mod h1:DgdUy2rRVDDVgKqrjMe2vZAHMfhDTrjVKt3LmHIXGbU= github.com/pelletier/go-toml/v2 v2.2.3 h1:YmeHyLY8mFWbdkNWwpr+qIL2bEqT0o95WSdkNHvL12M= github.com/pelletier/go-toml/v2 v2.2.3/go.mod h1:MfCQTFTvCcUyyvvwm1+G6H/jORL20Xlb6rzQu9GuUkc= -github.com/philhofer/fwd v1.0.0/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU= -github.com/philhofer/fwd v1.1.3-0.20240916144458-20a13a1f6b7c h1:dAMKvw0MlJT1GshSTtih8C2gDs04w8dReiOGXrGLNoY= -github.com/philhofer/fwd v1.1.3-0.20240916144458-20a13a1f6b7c/go.mod h1:RqIHx9QI14HlwKwm98g9Re5prTQ6LdeRQn+gXJFxsJM= github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -546,8 +536,6 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= -github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= -github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.2.1/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= @@ -563,14 +551,8 @@ github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf github.com/stvp/go-udp-testing v0.0.0-20201019212854-469649b16807/go.mod h1:7jxmlfBCDBXRzr0eAQJ48XC1hBu1np4CS5+cHEYfwpc= github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= -github.com/tinylib/msgp v1.1.2/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE= -github.com/tinylib/msgp v1.2.5 h1:WeQg1whrXRFiZusidTQqzETkRpGjFjcIhW6uqWH09po= -github.com/tinylib/msgp v1.2.5/go.mod h1:ykjzy2wzgrlvpDCRc4LA8UXy6D8bzMSuAF3WD57Gok0= github.com/twpayne/go-geom v1.6.0 h1:WPOJLCdd8OdcnHvKQepLKwOZrn5BzVlNxtQB59IDHRE= github.com/twpayne/go-geom v1.6.0/go.mod h1:Kr+Nly6BswFsKM5sd31YaoWS5PeDDH2NftJTK7Gd028= -github.com/uber/jaeger-client-go v2.25.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk= -github.com/uber/jaeger-client-go v2.28.0+incompatible h1:G4QSBfvPKvg5ZM2j9MrJFdfI5iSljY/WnJqOGFao6HI= -github.com/uber/jaeger-client-go v2.28.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk= github.com/urfave/cli/v2 v2.1.1/go.mod h1:SE9GqnLQmjVa0iPEY0f1w3ygNIYcIJ0OKPMoW2caLfQ= github.com/vektah/dataloaden v0.2.1-0.20190515034641-a19b9a6e7c9e/go.mod h1:/HUdMve7rvxZma+2ZELQeNh88+003LL7Pf/CZ089j8U= github.com/vektah/gqlparser/v2 v2.1.0/go.mod h1:SyUiHgLATUR8BiYURfTirrTcGpcE+4XkV2se04Px1Ms= @@ -601,8 +583,12 @@ go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0 h1:r6I7RJCN86bpD/FQwedZ0vSixDpwuWREjW9oRMsmqDc= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0/go.mod h1:B9yO6b04uB80CzjedvewuqDhxJxi11s7/GtiGa8bAjI= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.59.0 h1:CV7UdSGJt/Ao6Gp4CXckLxVRRsRgDHoI8XjbL3PDl8s= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.59.0/go.mod h1:FRmFuRJfag1IZ2dPkHnEoSFVgTVPUd2qf5Vi69hLb8I= +go.opentelemetry.io/contrib/zpages v0.59.0 h1:t0H5zUy8fifIhRuVwm2FrA/D70Kk10SSpAEvvbaNscw= +go.opentelemetry.io/contrib/zpages v0.59.0/go.mod h1:9wo+yUPvHnBQEzoHJ8R3nA/Q5rkef7HjtLlSFI0Tgrc= go.opentelemetry.io/otel v1.6.3/go.mod h1:7BgNga5fNlF/iZjG06hM3yofffp0ofKCDwSXx1GC4dI= go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY= go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI= @@ -887,8 +873,6 @@ google.golang.org/api v0.24.0/go.mod h1:lIXQywCXRcnZPGlsd8NbLnOjtAoL6em04bJ9+z0M google.golang.org/api v0.28.0/go.mod h1:lIXQywCXRcnZPGlsd8NbLnOjtAoL6em04bJ9+z0MncE= google.golang.org/api v0.29.0/go.mod h1:Lcubydp8VUV7KeIHD9z2Bys/sm/vGKnG1UHuDBSrHWM= google.golang.org/api v0.30.0/go.mod h1:QGmEvQ87FHZNiUVJkT14jQNYJ4ZJjdRF23ZXz5138Fc= -google.golang.org/api v0.219.0 h1:nnKIvxKs/06jWawp2liznTBnMRQBEPpGo7I+oEypTX0= -google.golang.org/api v0.219.0/go.mod h1:K6OmjGm+NtLrIkHxv1U3a0qIf/0JOvAHd5O/6AoyKYE= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= @@ -924,7 +908,6 @@ google.golang.org/genproto v0.0.0-20200618031413-b414f8b61790/go.mod h1:jDfRM7Fc google.golang.org/genproto v0.0.0-20200729003335-053ba62fc06f/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20200804131852-c06518451d9c/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20200825200019-8632dd797987/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= -google.golang.org/genproto v0.0.0-20241118233622-e639e219e697 h1:ToEetK57OidYuqD4Q5w+vfEnPvPpuTwedCNVohYJfNk= google.golang.org/genproto/googleapis/api v0.0.0-20250115164207-1a7da9e5054f h1:gap6+3Gk41EItBuyi4XX/bp4oqJ3UwuIMl25yGinuAA= google.golang.org/genproto/googleapis/api v0.0.0-20250115164207-1a7da9e5054f/go.mod h1:Ic02D47M+zbarjYYUlK57y316f2MoN0gjAwI3f2S95o= google.golang.org/genproto/googleapis/rpc v0.0.0-20250127172529-29210b9bc287 h1:J1H9f+LEdWAfHcez/4cvaVBox7cOYT+IU6rgqj5x++8= @@ -960,9 +943,6 @@ google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqw google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY= google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= -gopkg.in/DataDog/dd-trace-go.v1 v1.22.0/go.mod h1:DVp8HmDh8PuTu2Z0fVVlBsyWaC++fzwVCaGWylTe3tg= -gopkg.in/DataDog/dd-trace-go.v1 v1.71.0 h1:+Lr4YwJQGZuIOoIFNjMY5l7bGZblbKrwMtmbIiWFmjI= -gopkg.in/DataDog/dd-trace-go.v1 v1.71.0/go.mod h1:0M7D+g0aTIlQgxqTSWrmTjssl+POsL5TVDaX2QFKk4U= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/graphql/resolve/mutation.go b/graphql/resolve/mutation.go index 72e78bc5b3b..04e43fd22ec 100644 --- a/graphql/resolve/mutation.go +++ b/graphql/resolve/mutation.go @@ -15,7 +15,8 @@ import ( "github.com/golang/glog" "github.com/pkg/errors" - otrace "go.opencensus.io/trace" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" dgoapi "github.com/dgraph-io/dgo/v240/protos/api" "github.com/hypermodeinc/dgraph/v24/dql" @@ -171,11 +172,14 @@ type dgraphResolver struct { } func (mr *dgraphResolver) Resolve(ctx context.Context, m schema.Mutation) (*Resolved, bool) { - span := otrace.FromContext(ctx) + span := trace.SpanFromContext(ctx) stop := x.SpanTimer(span, "resolveMutation") defer stop() if span != nil { - span.Annotatef(nil, "mutation alias: [%s] type: [%s]", m.Alias(), m.MutationType()) + span.AddEvent("Mutation Started", trace.WithAttributes( + attribute.String("Mutation alias", m.Alias()), + attribute.String("Type", m.MutatedType().DgraphName()), + )) } resolverTrace := &schema.ResolverTrace{ diff --git a/graphql/resolve/query.go b/graphql/resolve/query.go index d6551223de3..b02b6e8bdd7 100644 --- a/graphql/resolve/query.go +++ b/graphql/resolve/query.go @@ -12,7 +12,7 @@ import ( "strconv" "github.com/golang/glog" - otrace "go.opencensus.io/trace" + "go.opentelemetry.io/otel/trace" dgoapi "github.com/dgraph-io/dgo/v240/protos/api" "github.com/hypermodeinc/dgraph/v24/dql" @@ -64,7 +64,7 @@ type queryResolver struct { } func (qr *queryResolver) Resolve(ctx context.Context, query schema.Query) *Resolved { - span := otrace.FromContext(ctx) + span := trace.SpanFromContext(ctx) stop := x.SpanTimer(span, "resolveQuery") defer stop() @@ -150,7 +150,7 @@ type customDQLQueryResolver struct { } func (qr *customDQLQueryResolver) Resolve(ctx context.Context, query schema.Query) *Resolved { - span := otrace.FromContext(ctx) + span := trace.SpanFromContext(ctx) stop := x.SpanTimer(span, "resolveCustomDQLQuery") defer stop() diff --git a/graphql/resolve/resolver.go b/graphql/resolve/resolver.go index ffcaedecd11..1cdbaa1ba96 100644 --- a/graphql/resolve/resolver.go +++ b/graphql/resolve/resolver.go @@ -16,7 +16,7 @@ import ( "github.com/golang/glog" "github.com/pkg/errors" - otrace "go.opencensus.io/trace" + "go.opentelemetry.io/otel/trace" dgoapi "github.com/dgraph-io/dgo/v240/protos/api" "github.com/hypermodeinc/dgraph/v24/edgraph" @@ -447,7 +447,7 @@ func New(s schema.Schema, resolverFactory ResolverFactory) *RequestResolver { // and a schema and backend Dgraph should have been added. // Resolve records any errors in the response's error field. func (r *RequestResolver) Resolve(ctx context.Context, gqlReq *schema.Request) (resp *schema.Response) { - span := otrace.FromContext(ctx) + span := trace.SpanFromContext(ctx) stop := x.SpanTimer(span, methodResolve) defer stop() @@ -679,7 +679,7 @@ func NewHTTPMutationResolver(hc *http.Client) MutationResolver { } func (hr *httpResolver) Resolve(ctx context.Context, field schema.Field) *Resolved { - span := otrace.FromContext(ctx) + span := trace.SpanFromContext(ctx) stop := x.SpanTimer(span, "resolveHTTP") defer stop() diff --git a/posting/index.go b/posting/index.go index bc7347b10ef..6b082226139 100644 --- a/posting/index.go +++ b/posting/index.go @@ -22,7 +22,7 @@ import ( "github.com/golang/glog" "github.com/pkg/errors" ostats "go.opencensus.io/stats" - otrace "go.opencensus.io/trace" + "go.opentelemetry.io/otel/trace" "google.golang.org/protobuf/proto" "github.com/dgraph-io/badger/v4" @@ -502,9 +502,8 @@ func (txn *Txn) addMutationHelper(ctx context.Context, l *List, doUpdateIndex bo defer l.Unlock() if dur := time.Since(t1); dur > time.Millisecond { - span := otrace.FromContext(ctx) - span.Annotatef([]otrace.Attribute{otrace.BoolAttribute("slow-lock", true)}, - "Acquired lock %v %v %v", dur, t.Attr, t.Entity) + span := trace.SpanFromContext(ctx) + span.AddEvent(fmt.Sprintf("Acquired lock %v %v %v", dur, t.Attr, t.Entity)) } getUID := func(t *pb.DirectedEdge) uint64 { diff --git a/query/mutation.go b/query/mutation.go index b9992030836..3e9a168568b 100644 --- a/query/mutation.go +++ b/query/mutation.go @@ -12,7 +12,8 @@ import ( "github.com/golang/glog" "github.com/pkg/errors" - otrace "go.opencensus.io/trace" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "github.com/dgraph-io/dgo/v240/protos/api" "github.com/hypermodeinc/dgraph/v24/dql" @@ -39,9 +40,10 @@ func ApplyMutations(ctx context.Context, m *pb.Mutations) (*api.TxnContext, erro } tctx, err := worker.MutateOverNetwork(ctx, m) if err != nil { - if span := otrace.FromContext(ctx); span != nil { - span.Annotatef(nil, "MutateOverNetwork Error: %v. Mutation: %v.", err, m) - } + span := trace.SpanFromContext(ctx) + span.AddEvent("MutateOverNetwork Error", trace.WithAttributes( + attribute.String("error", err.Error()), + attribute.String("mutation", m.String()))) } return tctx, err } diff --git a/query/query.go b/query/query.go index 8e7927bad86..5689a3e49b2 100644 --- a/query/query.go +++ b/query/query.go @@ -16,7 +16,8 @@ import ( "github.com/golang/glog" "github.com/pkg/errors" - otrace "go.opencensus.io/trace" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "google.golang.org/grpc/metadata" "github.com/hypermodeinc/dgraph/v24/algo" @@ -1934,7 +1935,7 @@ func recursiveCopy(dst *SubGraph, src *SubGraph) { } func expandSubgraph(ctx context.Context, sg *SubGraph) ([]*SubGraph, error) { - span := otrace.FromContext(ctx) + span := trace.SpanFromContext(ctx) stop := x.SpanTimer(span, "expandSubgraph: "+sg.Attr) defer stop() @@ -1960,7 +1961,7 @@ func expandSubgraph(ctx context.Context, sg *SubGraph) ([]*SubGraph, error) { switch child.Params.Expand { // It could be expand(_all_) or expand(val(x)). case "_all_": - span.Annotate(nil, "expand(_all_)") + span.AddEvent("expand(_all_)") if len(typeNames) == 0 { break } @@ -1984,7 +1985,7 @@ func expandSubgraph(ctx context.Context, sg *SubGraph) ([]*SubGraph, error) { default: if len(child.ExpandPreds) > 0 { - span.Annotate(nil, "expand default") + span.AddEvent("expand default") // We already have the predicates populated from the var. temp := getPredsFromVals(child.ExpandPreds) for _, pred := range temp { @@ -2059,7 +2060,7 @@ func ProcessGraph(ctx context.Context, sg, parent *SubGraph, rch chan error) { if len(sg.Attr) > 0 { suffix += "." + sg.Attr } - span := otrace.FromContext(ctx) + span := trace.SpanFromContext(ctx) stop := x.SpanTimer(span, "query.ProcessGraph"+suffix) defer stop() @@ -2370,9 +2371,8 @@ func ProcessGraph(ctx context.Context, sg, parent *SubGraph, rch chan error) { if (sg.DestUIDs == nil || len(sg.DestUIDs.Uids) == 0) && childErr == nil { // Looks like we're done here. Be careful with nil srcUIDs! - if span != nil { - span.Annotatef(nil, "Zero uids for %q", sg.Attr) - } + span.AddEvent("Zero uids", trace.WithAttributes( + attribute.String("attr", sg.Attr))) out := sg.Children[:0] for _, child := range sg.Children { if child.IsInternal() && child.Attr == "expand" { @@ -2759,7 +2759,7 @@ type Request struct { // Fills Subgraphs and Vars. // It can process multiple query blocks that are part of the query.. func (req *Request) ProcessQuery(ctx context.Context) (err error) { - span := otrace.FromContext(ctx) + span := trace.SpanFromContext(ctx) stop := x.SpanTimer(span, "query.ProcessQuery") defer stop() @@ -2784,7 +2784,7 @@ func (req *Request) ProcessQuery(ctx context.Context) (err error) { sg.ReadTs = req.ReadTs sg.Cache = req.Cache }) - span.Annotate(nil, "Query parsed") + span.AddEvent("Query parsed") req.Subgraphs = append(req.Subgraphs, sg) } req.Latency.Parsing += time.Since(loopStart) diff --git a/worker/draft.go b/worker/draft.go index 0e6e767e676..ab8f45c0fa5 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -25,8 +25,9 @@ import ( "go.etcd.io/etcd/raft/v3/raftpb" ostats "go.opencensus.io/stats" "go.opencensus.io/tag" - otrace "go.opencensus.io/trace" - "golang.org/x/net/trace" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "google.golang.org/protobuf/proto" "github.com/dgraph-io/badger/v4" @@ -67,7 +68,6 @@ type node struct { opsLock sync.Mutex cdcTracker *CDC canCampaign bool - elog trace.EventLog } type op int @@ -274,7 +274,6 @@ func newNode(store *raftwal.DiskStorage, gid uint32, id uint64, myAddr string) * // 10ms. If we restrict the size here, then Raft goes into a loop trying // to maintain quorum health. applyCh: make(chan []raftpb.Entry, 1000), - elog: trace.NewEventLog("Dgraph", "ApplyCh"), closer: z.NewCloser(4), // Matches CLOSER:1 ops: make(map[op]operation), cdcTracker: newCDC(), @@ -333,7 +332,7 @@ func detectPendingTxns(attr string) error { // Wait for all transactions to either abort or complete and all write transactions // involving the predicate are aborted until schema mutations are done. func (n *node) applyMutations(ctx context.Context, proposal *pb.Proposal) (rerr error) { - span := otrace.FromContext(ctx) + span := trace.SpanFromContext(ctx) if proposal.Mutations.DropOp == pb.Mutations_ALL_IN_NS { ns, err := strconv.ParseUint(proposal.Mutations.DropValue, 0, 64) @@ -435,7 +434,7 @@ func (n *node) applyMutations(ctx context.Context, proposal *pb.Proposal) (rerr // by detectPendingTxns below. startTs := posting.Oracle().MaxAssigned() - span.Annotatef(nil, "Applying schema and types") + span.AddEvent("Applying schema and types") for _, supdate := range proposal.Mutations.Schema { // We should not need to check for predicate move here. if err := detectPendingTxns(supdate.Predicate); err != nil { @@ -475,10 +474,10 @@ func (n *node) applyMutations(ctx context.Context, proposal *pb.Proposal) (rerr // We should only drop the predicate if there is no pending // transaction. if err := detectPendingTxns(edge.Attr); err != nil { - span.Annotatef(nil, "Found pending transactions. Retry later.") + span.AddEvent("Found pending transactions. Retry later.") return err } - span.Annotatef(nil, "Deleting predicate: %s", edge.Attr) + span.AddEvent("Deleting predicate") return posting.DeletePredicate(ctx, edge.Attr, proposal.StartTs) } // Don't derive schema when doing deletion. @@ -531,7 +530,9 @@ func (n *node) applyMutations(ctx context.Context, proposal *pb.Proposal) (rerr txn := posting.Oracle().RegisterStartTs(m.StartTs) if txn.ShouldAbort() { - span.Annotatef(nil, "Txn %d should abort.", m.StartTs) + span.AddEvent("Txn should abort.", trace.WithAttributes( + attribute.Int64("start_ts", int64(m.StartTs)), + )) return x.ErrConflict } // Discard the posting lists from cache to release memory at the end. @@ -552,12 +553,16 @@ func (n *node) applyMutations(ctx context.Context, proposal *pb.Proposal) (rerr } } if retries > 0 { - span.Annotatef(nil, "retries=true num=%d", retries) + span.AddEvent("retries=true num=%d", trace.WithAttributes( + attribute.Int("retries", retries))) } return nil } numGo, width := x.DivideAndRule(len(m.Edges)) - span.Annotatef(nil, "To apply: %d edges. NumGo: %d. Width: %d", len(m.Edges), numGo, width) + span.AddEvent("To apply: %d edges. NumGo: %d. Width: %d", trace.WithAttributes( + attribute.Int("num_edges", len(m.Edges)), + attribute.Int("num_go", numGo), + attribute.Int("width", width))) if numGo == 1 { return process(m.Edges) @@ -619,19 +624,22 @@ func (n *node) applyCommitted(proposal *pb.Proposal, key uint64) error { }() ctx := n.Ctx(key) - span := otrace.FromContext(ctx) - span.Annotatef(nil, "node.applyCommitted Node id: %d. Group id: %d. Got proposal key: %d", - n.Id, n.gid, key) + span := trace.SpanFromContext(ctx) + span.AddEvent("Node.applyCommited", trace.WithAttributes( + attribute.Int64("node id", int64(n.Id)), + attribute.Int64("Group Id", int64(n.gid)), + attribute.Int64("proposal key", int64(key)))) if proposal.Mutations != nil { // syncmarks for this shouldn't be marked done until it's committed. - span.Annotate(nil, "Applying mutations") + span.AddEvent("Applying mutations") if err := n.applyMutations(ctx, proposal); err != nil { - span.Annotatef(nil, "While applying mutations: %v", err) + span.AddEvent("While applying mutations", trace.WithAttributes( + attribute.String("error", err.Error()))) return err } - span.Annotate(nil, "Done") + span.AddEvent("Done") return nil } @@ -640,14 +648,16 @@ func (n *node) applyCommitted(proposal *pb.Proposal, key uint64) error { return populateKeyValues(ctx, proposal.Kv) case proposal.State != nil: - n.elog.Printf("Applying state for key: %s", key) + span.AddEvent("Applying state for key: %s", trace.WithAttributes( + attribute.Int64("key", int64(key)))) // This state needn't be snapshotted in this group, on restart we would fetch // a state which is latest or equal to this. groups().applyState(groups().Node.Id, proposal.State) return nil case len(proposal.CleanPredicate) > 0: - n.elog.Printf("Cleaning predicate: %s", proposal.CleanPredicate) + span.AddEvent("Cleaning predicate: %s", trace.WithAttributes( + attribute.String("predicate", proposal.CleanPredicate))) end := time.Now().Add(10 * time.Second) for proposal.ExpectedChecksum > 0 && time.Now().Before(end) { cur := atomic.LoadUint64(&groups().membershipChecksum) @@ -675,7 +685,8 @@ func (n *node) applyCommitted(proposal *pb.Proposal, key uint64) error { return err case proposal.Delta != nil: - n.elog.Printf("Applying Oracle Delta for key: %d", key) + span.AddEvent("Applying Oracle Delta for key: %d", trace.WithAttributes( + attribute.Int64("key", int64(key)))) return n.commitOrAbort(key, proposal.Delta) case proposal.Snapshot != nil: @@ -685,13 +696,18 @@ func (n *node) applyCommitted(proposal *pb.Proposal, key uint64) error { } snap := proposal.Snapshot if existing.Metadata.Index >= snap.Index { + span.AddEvent("Skipping snapshot at %d, because found one at %d", + trace.WithAttributes( + attribute.Int64("snapshot index", int64(snap.Index)), + attribute.Int64("existing index", int64(existing.Metadata.Index)))) log := fmt.Sprintf("Skipping snapshot at %d, because found one at %d", snap.Index, existing.Metadata.Index) - n.elog.Printf(log) glog.Info(log) return nil } - n.elog.Printf("Creating snapshot: %+v", snap) + span.AddEvent("Creating snapshot: %+v", trace.WithAttributes( + attribute.Int64("snapshot index", int64(snap.Index)), + attribute.Int64("read ts", int64(snap.ReadTs)))) glog.Infof("Creating snapshot at Index: %d, ReadTs: %d\n", snap.Index, snap.ReadTs) data, err := proto.Marshal(snap) @@ -738,7 +754,8 @@ func (n *node) applyCommitted(proposal *pb.Proposal, key uint64) error { case proposal.DeleteNs != nil: x.AssertTrue(proposal.DeleteNs.Namespace != x.GalaxyNamespace) - n.elog.Printf("Deleting namespace: %d", proposal.DeleteNs.Namespace) + span.AddEvent("Deleting namespace: %d", trace.WithAttributes( + attribute.Int64("namespace", int64(proposal.DeleteNs.Namespace)))) return posting.DeleteNamespace(proposal.DeleteNs.Namespace) case proposal.CdcState != nil: @@ -818,7 +835,6 @@ func (n *node) processApplyCh() { msg := fmt.Sprintf("Proposal with key: %d already applied. Skipping index: %d."+ " Delta: %+v Snapshot: %+v.\n", key, proposal.Index, proposal.Delta, proposal.Snapshot) - n.elog.Printf(msg) glog.Infof(msg) previous[key].seen = time.Now() // Update the ts. // Don't break here. We still need to call the Done below. @@ -831,11 +847,16 @@ func (n *node) processApplyCh() { p := &P{err: perr, size: psz, seen: time.Now()} previous[key] = p } + span := trace.SpanFromContext(n.ctx) if perr != nil { - glog.Errorf("Applying proposal. Error: %v. Proposal: %q.", perr, proposal) + glog.Errorf("Applying proposal. Error: %v. Proposal: %q.", perr, &proposal) + span.AddEvent(fmt.Sprintf("Applying proposal failed. Error: %v Proposal: %q", perr, &proposal)) } - n.elog.Printf("Applied proposal with key: %d, index: %d. Err: %v", - key, proposal.Index, perr) + span.AddEvent("Applied proposal with key: %d, index: %d. Err: %v", + trace.WithAttributes( + attribute.Int64("key", int64(key)), + attribute.Int64("index", int64(proposal.Index)), + )) var tags []tag.Mutator switch { @@ -876,7 +897,7 @@ func (n *node) processApplyCh() { delete(previous, key) } } - n.elog.Printf("Size of previous map: %d", len(previous)) + glog.V(3).Infof("Size of previous map: %d", len(previous)) } } } @@ -1127,7 +1148,7 @@ func (n *node) checkpointAndClose(done chan struct{}) { // Do these operations asynchronously away from the main Run loop to allow heartbeats to // be sent on time. Otherwise, followers would just keep running elections. - n.elog.Printf("Size of applyCh: %d", len(n.applyCh)) + glog.V(3).Infof("Size of applyCh: %d", len(n.applyCh)) if err := n.updateRaftProgress(); err != nil { glog.Errorf("While updating Raft progress: %v", err) } @@ -1280,8 +1301,7 @@ func (n *node) Run() { // n.SaveToStorage should be called first before doing anything else. timer.Start() - _, span := otrace.StartSpan(n.ctx, "Alpha.RunLoop", - otrace.WithSampler(otrace.ProbabilitySampler(x.WorkerConfig.Trace.GetFloat64("ratio")))) + _, span := otel.Tracer("").Start(n.ctx, "Alpha.RunLoop") if rd.SoftState != nil { groups().triggerMembershipSync() @@ -1321,7 +1341,7 @@ func (n *node) Run() { timer.Record("leader sending message") } if span != nil { - span.Annotate(nil, "Handled ReadStates and SoftState.") + span.AddEvent("Handled ReadStates and SoftState.") } // We move the retrieval of snapshot before we store the rd.Snapshot, so that in case @@ -1389,7 +1409,7 @@ func (n *node) Run() { snap, n.gid, rc.Id) } if span != nil { - span.Annotate(nil, "Applied or retrieved snapshot.") + span.AddEvent("Applied or retrieved snapshot.") } timer.Record("got snapshot") } @@ -1397,12 +1417,10 @@ func (n *node) Run() { // Store the hardstate and entries. Note that these are not CommittedEntries. n.SaveToStorage(&rd.HardState, rd.Entries, &rd.Snapshot) timer.Record("disk") - if span != nil { - span.Annotatef(nil, "Saved %d entries. Snapshot, HardState empty? (%v, %v)", - len(rd.Entries), - raft.IsEmptySnap(rd.Snapshot), - raft.IsEmptyHardState(rd.HardState)) - } + span.AddEvent(fmt.Sprintf("Saved %d entries. Snapshot, HardState empty? (%v, %v)", + len(rd.Entries), + raft.IsEmptySnap(rd.Snapshot), + raft.IsEmptyHardState(rd.HardState))) for x.WorkerConfig.HardSync && rd.MustSync { if err := n.Store.Sync(); err != nil { @@ -1432,17 +1450,17 @@ func (n *node) Run() { n.Applied.Done(entry.Index) groups().triggerMembershipSync() case len(entry.Data) == 0: - n.elog.Printf("Found empty data at index: %d", entry.Index) + glog.V(3).Infof("Found empty data at index: %d", entry.Index) n.Applied.Done(entry.Index) case entry.Index < applied: - n.elog.Printf("Skipping over already applied entry: %d", entry.Index) + glog.V(3).Infof("Skipping over already applied entry: %d", entry.Index) n.Applied.Done(entry.Index) default: key := binary.BigEndian.Uint64(entry.Data[:8]) if pctx := n.Proposals.Get(key); pctx != nil { atomic.AddUint32(&pctx.Found, 1) - if span := otrace.FromContext(pctx.Ctx); span != nil { - span.Annotate(nil, "Proposal found in CommittedEntries") + if span := trace.SpanFromContext(pctx.Ctx); span != nil { + span.AddEvent("Proposal found in CommittedEntries") } } entries = append(entries, entry) @@ -1465,7 +1483,8 @@ func (n *node) Run() { } if span != nil { - span.Annotatef(nil, "Handled %d committed entries.", len(rd.CommittedEntries)) + span.AddEvent("Handled %d committed entries.", + trace.WithAttributes(attribute.Int("count", len(rd.CommittedEntries)))) } if !leader { @@ -1482,7 +1501,7 @@ func (n *node) Run() { } } if span != nil { - span.Annotate(nil, "Followed queued messages.") + span.AddEvent("Followed queued messages.") } timer.Record("proposals") @@ -1498,7 +1517,7 @@ func (n *node) Run() { firstRun = false } if span != nil { - span.Annotate(nil, "Advanced Raft. Done.") + span.AddEvent("Advanced Raft. Done.") span.End() if err := ostats.RecordWithTags(context.Background(), []tag.Mutator{tag.Upsert(x.KeyMethod, "alpha.RunLoop")}, @@ -1677,8 +1696,7 @@ func (n *node) abortOldTransactions() { // This is useful when we already have a previous snapshot checkpoint (all txns have concluded up // until that last checkpoint) that we can use as a new start point for the snapshot calculation. func (n *node) calculateSnapshot(startIdx, lastIdx, minPendingStart uint64) (*pb.Snapshot, error) { - _, span := otrace.StartSpan(n.ctx, "Calculate.Snapshot", - otrace.WithSampler(otrace.AlwaysSample())) + _, span := otel.Tracer("").Start(n.ctx, "Calculate.Snapshot") defer span.End() discardN := 1 @@ -1691,14 +1709,17 @@ func (n *node) calculateSnapshot(startIdx, lastIdx, minPendingStart uint64) (*pb first, err := n.Store.FirstIndex() if err != nil { - span.Annotatef(nil, "Error: %v", err) + span.AddEvent("Error", trace.WithAttributes( + attribute.String("error", err.Error()))) return nil, err } - span.Annotatef(nil, "First index: %d", first) + span.AddEvent("First index", trace.WithAttributes( + attribute.Int64("first", int64(first)))) if startIdx > first { // If we're starting from a higher index, set first to that. first = startIdx - span.Annotatef(nil, "Setting first to: %d", startIdx) + span.AddEvent("Setting first to", trace.WithAttributes( + attribute.Int64("first", int64(first)))) } rsnap, err := n.Store.Snapshot() @@ -1711,13 +1732,15 @@ func (n *node) calculateSnapshot(startIdx, lastIdx, minPendingStart uint64) (*pb return nil, err } } - span.Annotatef(nil, "Last snapshot: %+v", snap) + span.AddEvent("Last snapshot", trace.WithAttributes( + attribute.String("snapshot", snap.String()))) if int(lastIdx-first) < discardN { - span.Annotate(nil, "Skipping due to insufficient entries") + span.AddEvent("Skipping due to insufficient entries") return nil, nil } - span.Annotatef(nil, "Found Raft entries: %d", lastIdx-first) + span.AddEvent("Found Raft entries", trace.WithAttributes( + attribute.Int64("count", int64(lastIdx-first)))) if num := posting.Oracle().NumPendingTxns(); num > 0 { // TODO (Damon): this is associated with stuck alphas. Is there anything else we should log here @@ -1735,7 +1758,8 @@ func (n *node) calculateSnapshot(startIdx, lastIdx, minPendingStart uint64) (*pb for batchFirst := first; batchFirst <= lastIdx; { entries, err := n.Store.Entries(batchFirst, lastIdx+1, 256<<20) if err != nil { - span.Annotatef(nil, "Error: %v", err) + span.AddEvent("Error", trace.WithAttributes( + attribute.String("error", err.Error()))) return nil, err } // Exit early from the loop if no entries were found. @@ -1755,7 +1779,8 @@ func (n *node) calculateSnapshot(startIdx, lastIdx, minPendingStart uint64) (*pb } var proposal pb.Proposal if err := proto.Unmarshal(entry.Data[8:], &proposal); err != nil { - span.Annotatef(nil, "Error: %v", err) + span.AddEvent("Error", trace.WithAttributes( + attribute.String("error", err.Error()))) return nil, err } @@ -1775,23 +1800,26 @@ func (n *node) calculateSnapshot(startIdx, lastIdx, minPendingStart uint64) (*pb } if maxCommitTs == 0 { - span.Annotate(nil, "maxCommitTs is zero") + span.AddEvent("maxCommitTs is zero") return nil, nil } if snapshotIdx == 0 { // It is possible that there are no pending transactions. In that case, // snapshotIdx would be zero. snapshotIdx = lastEntry.Index - span.Annotatef(nil, "snapshotIdx is zero. Using last entry's index: %d", snapshotIdx) + span.AddEvent("snapshotIdx is zero. Using last entry's index", trace.WithAttributes( + attribute.Int64("index", int64(snapshotIdx)))) } numDiscarding := snapshotIdx - first + 1 - span.Annotatef(nil, - "Got snapshotIdx: %d. MaxCommitTs: %d. Discarding: %d. MinPendingStartTs: %d", - snapshotIdx, maxCommitTs, numDiscarding, minPendingStart) + span.AddEvent("Got snapshotIdx", trace.WithAttributes( + attribute.Int64("snapshotIdx", int64(snapshotIdx)), + attribute.Int64("maxCommitTs", int64(maxCommitTs)), + attribute.Int64("numDiscarding", int64(numDiscarding)), + attribute.Int64("minPendingStart", int64(minPendingStart)))) if int(numDiscarding) < discardN { - span.Annotate(nil, "Skipping snapshot because insufficient discard entries") + span.AddEvent("Skipping snapshot because insufficient discard entries") glog.Infof("Skipping snapshot at index: %d. Insufficient discard entries: %d."+ " MinPendingStartTs: %d\n", snapshotIdx, numDiscarding, minPendingStart) return nil, nil @@ -1802,7 +1830,8 @@ func (n *node) calculateSnapshot(startIdx, lastIdx, minPendingStart uint64) (*pb Index: snapshotIdx, ReadTs: maxCommitTs, } - span.Annotatef(nil, "Got snapshot: %+v", result) + span.AddEvent("Got snapshot", trace.WithAttributes( + attribute.Stringer("snapshot", result))) return result, nil } diff --git a/worker/mutation.go b/worker/mutation.go index e7fecbd9c65..eddba8e6b6b 100644 --- a/worker/mutation.go +++ b/worker/mutation.go @@ -16,7 +16,9 @@ import ( "github.com/golang/glog" "github.com/pkg/errors" ostats "go.opencensus.io/stats" - otrace "go.opencensus.io/trace" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "google.golang.org/grpc/metadata" "google.golang.org/protobuf/proto" @@ -104,10 +106,10 @@ func runMutation(ctx context.Context, edge *pb.DirectedEdge, txn *posting.Txn) e t := time.Now() plist, err := getFn(key) if dur := time.Since(t); dur > time.Millisecond { - if span := otrace.FromContext(ctx); span != nil { - span.Annotatef([]otrace.Attribute{otrace.BoolAttribute("slow-get", true)}, - "GetLru took %s", dur) - } + span := trace.SpanFromContext(ctx) + span.AddEvent("Slow GetLru", trace.WithAttributes( + attribute.Bool("slow-get", true), + attribute.String("duration", dur.String()))) } if err != nil { return err @@ -745,7 +747,7 @@ type res struct { // MutateOverNetwork checks which group should be running the mutations // according to the group config and sends it to that instance. func MutateOverNetwork(ctx context.Context, m *pb.Mutations) (*api.TxnContext, error) { - ctx, span := otrace.StartSpan(ctx, "worker.MutateOverNetwork") + ctx, span := otel.Tracer("").Start(ctx, "worker.MutateOverNetwork") defer span.End() tctx := &api.TxnContext{StartTs: m.StartTs} @@ -760,8 +762,10 @@ func MutateOverNetwork(ctx context.Context, m *pb.Mutations) (*api.TxnContext, e resCh := make(chan res, len(mutationMap)) for gid, mu := range mutationMap { if gid == 0 { - span.Annotatef(nil, "state: %+v", groups().state) - span.Annotatef(nil, "Group id zero for mutation: %+v", mu) + span.AddEvent("State information", trace.WithAttributes( + attribute.String("state", groups().state.String()))) + span.AddEvent("Group id zero for mutation", trace.WithAttributes( + attribute.String("mutation", mu.String()))) return tctx, errNonExistentTablet } mu.StartTs = m.StartTs @@ -875,7 +879,7 @@ func typeSanityCheck(t *pb.TypeUpdate) error { // CommitOverNetwork makes a proxy call to Zero to commit or abort a transaction. func CommitOverNetwork(ctx context.Context, tc *api.TxnContext) (uint64, error) { - ctx, span := otrace.StartSpan(ctx, "worker.CommitOverNetwork") + ctx, span := otel.Tracer("").Start(ctx, "worker.CommitOverNetwork") defer span.End() clientDiscard := false @@ -898,13 +902,13 @@ func CommitOverNetwork(ctx context.Context, tc *api.TxnContext) (uint64, error) tctx, err := zc.CommitOrAbort(ctx, tc) if err != nil { - span.Annotatef(nil, "Error=%v", err) + span.AddEvent("Error in CommitOrAbort", trace.WithAttributes( + attribute.String("error", err.Error()))) return 0, err } - var attributes []otrace.Attribute - attributes = append(attributes, otrace.Int64Attribute("commitTs", int64(tctx.CommitTs)), - otrace.BoolAttribute("committed", tctx.CommitTs > 0)) - span.Annotate(attributes, "") + span.AddEvent("Commit status", trace.WithAttributes( + attribute.Int64("commitTs", int64(tctx.CommitTs)), + attribute.Bool("committed", tctx.CommitTs > 0))) if tctx.Aborted || tctx.CommitTs == 0 { if !clientDiscard { @@ -945,7 +949,7 @@ func (w *grpcWorker) proposeAndWait(ctx context.Context, txnCtx *api.TxnContext, // Mutate is used to apply mutations over the network on other instances. func (w *grpcWorker) Mutate(ctx context.Context, m *pb.Mutations) (*api.TxnContext, error) { - ctx, span := otrace.StartSpan(ctx, "worker.Mutate") + ctx, span := otel.Tracer("").Start(ctx, "worker.Mutate") defer span.End() txnCtx := &api.TxnContext{} diff --git a/worker/predicate_move.go b/worker/predicate_move.go index 1171af21580..9ad87489e3b 100644 --- a/worker/predicate_move.go +++ b/worker/predicate_move.go @@ -16,7 +16,8 @@ import ( "github.com/dustin/go-humanize" "github.com/golang/glog" "github.com/pkg/errors" - otrace "go.opencensus.io/trace" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "google.golang.org/protobuf/proto" "github.com/dgraph-io/badger/v4" @@ -187,7 +188,7 @@ func (w *grpcWorker) ReceivePredicate(stream pb.Worker_ReceivePredicateServer) e func (w *grpcWorker) MovePredicate(ctx context.Context, in *pb.MovePredicatePayload) (*api.Payload, error) { - ctx, span := otrace.StartSpan(ctx, "worker.MovePredicate") + ctx, span := trace.SpanFromContext(ctx).TracerProvider().Tracer("grpcWorker").Start(ctx, "MovePredicate") defer span.End() n := groups().Node @@ -244,11 +245,11 @@ func (w *grpcWorker) MovePredicate(ctx context.Context, msg := fmt.Sprintf("Move predicate request: %+v", in) glog.Info(msg) - span.Annotate(nil, msg) + span.SetAttributes(attribute.String("predicate", in.Predicate)) err = movePredicateHelper(ctx, in) if err != nil { - span.Annotatef(nil, "Error while movePredicateHelper: %v", err) + span.SetStatus(1, err.Error()) } return &emptyPayload, err } @@ -263,7 +264,7 @@ func movePredicateHelper(ctx context.Context, in *pb.MovePredicatePayload) error } defer closer.Done() - span := otrace.FromContext(ctx) + span := trace.SpanFromContext(ctx) pl := groups().Leader(in.DestGid) if pl == nil { @@ -341,7 +342,8 @@ func movePredicateHelper(ctx context.Context, in *pb.MovePredicatePayload) error } return out.Send(kvs) } - span.Annotatef(nil, "Starting stream list orchestrate") + span.AddEvent("Starting stream list orchestrate", trace.WithAttributes( + attribute.String("predicate", in.Predicate))) if err := stream.Orchestrate(out.Context()); err != nil { return err } @@ -356,7 +358,8 @@ func movePredicateHelper(ctx context.Context, in *pb.MovePredicatePayload) error } msg := fmt.Sprintf("Receiver %s says it got %d keys.\n", pl.Addr, recvCount) - span.Annotate(nil, msg) + span.AddEvent("Moving predicate", trace.WithAttributes( + attribute.String("predicate", in.Predicate))) glog.Infof(msg) return nil } diff --git a/worker/proposal.go b/worker/proposal.go index 27e1961717c..a555f0ea0dd 100644 --- a/worker/proposal.go +++ b/worker/proposal.go @@ -16,7 +16,8 @@ import ( "github.com/pkg/errors" ostats "go.opencensus.io/stats" tag "go.opencensus.io/tag" - otrace "go.opencensus.io/trace" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "google.golang.org/protobuf/proto" "github.com/hypermodeinc/dgraph/v24/conn" @@ -210,7 +211,7 @@ func (n *node) proposeAndWait(ctx context.Context, proposal *pb.Proposal) (perr // Trim data to the new size after Marshal. data = data[:8+sz] - span := otrace.FromContext(ctx) + span := trace.SpanFromContext(ctx) stop := x.SpanTimer(span, "n.proposeAndWait") defer stop() @@ -228,7 +229,9 @@ func (n *node) proposeAndWait(ctx context.Context, proposal *pb.Proposal) (perr x.AssertTruef(n.Proposals.Store(key, pctx), "Found existing proposal with key: [%x]", key) defer n.Proposals.Delete(key) // Ensure that it gets deleted on return. - span.Annotatef(nil, "Proposing with key: %d. Timeout: %v", key, timeout) + span.AddEvent("Proposing", trace.WithAttributes( + attribute.Int64("key", int64(key)), + attribute.String("timeout", timeout.String()))) if err = n.Raft().Propose(cctx, data); err != nil { return errors.Wrapf(err, "While proposing") @@ -249,7 +252,8 @@ func (n *node) proposeAndWait(ctx context.Context, proposal *pb.Proposal) (perr if atomic.LoadUint32(&pctx.Found) > 0 { // We found the proposal in CommittedEntries. No need to retry. } else { - span.Annotatef(nil, "Timeout %s reached. Cancelling...", timeout) + span.AddEvent("Timeout reached", trace.WithAttributes( + attribute.String("timeout", timeout.String()))) cancel() } case <-cctx.Done(): @@ -287,7 +291,8 @@ func (n *node) proposeAndWait(ctx context.Context, proposal *pb.Proposal) (perr // below. We should always propose it irrespective of how many pending proposals there // might be. default: - span.Annotatef(nil, "incr with %d", i) + span.AddEvent("Incrementing limiter", trace.WithAttributes( + attribute.Int64("retry", int64(i)))) if err := limiter.incr(ctx, i); err != nil { return err } diff --git a/worker/schema.go b/worker/schema.go index b5c66c08178..6c6c1c0adc7 100644 --- a/worker/schema.go +++ b/worker/schema.go @@ -9,7 +9,7 @@ import ( "context" "github.com/pkg/errors" - otrace "go.opencensus.io/trace" + "go.opentelemetry.io/otel" "google.golang.org/protobuf/proto" "github.com/hypermodeinc/dgraph/v24/conn" @@ -31,7 +31,7 @@ type resultErr struct { // predicates is not specified, then all the predicates belonging to the group // are returned func getSchema(ctx context.Context, s *pb.SchemaRequest) (*pb.SchemaResult, error) { - _, span := otrace.StartSpan(ctx, "worker.getSchema") + _, span := otel.Tracer("").Start(ctx, "worker.getSchema") defer span.End() var result pb.SchemaResult @@ -182,7 +182,7 @@ func getSchemaOverNetwork(ctx context.Context, gid uint32, s *pb.SchemaRequest, func GetSchemaOverNetwork(ctx context.Context, schema *pb.SchemaRequest) ( []*pb.SchemaNode, error) { - ctx, span := otrace.StartSpan(ctx, "worker.GetSchemaOverNetwork") + ctx, span := otel.Tracer("").Start(ctx, "worker.GetSchemaOverNetwork") defer span.End() // There was a health check here which is not needed. The health check should be done by the diff --git a/worker/sort.go b/worker/sort.go index a9eb04184ab..f627611905d 100644 --- a/worker/sort.go +++ b/worker/sort.go @@ -8,13 +8,15 @@ package worker import ( "context" "encoding/hex" + "fmt" "sort" "strings" "time" "github.com/golang/glog" "github.com/pkg/errors" - otrace "go.opencensus.io/trace" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/trace" "github.com/dgraph-io/badger/v4" "github.com/hypermodeinc/dgraph/v24/algo" @@ -24,6 +26,7 @@ import ( "github.com/hypermodeinc/dgraph/v24/tok" "github.com/hypermodeinc/dgraph/v24/types" "github.com/hypermodeinc/dgraph/v24/x" + "go.opentelemetry.io/otel/attribute" ) var emptySortResult pb.SortResult @@ -51,9 +54,11 @@ func SortOverNetwork(ctx context.Context, q *pb.SortMessage) (*pb.SortResult, er errors.Errorf("Cannot sort by unknown attribute %s", x.ParseAttr(q.Order[0].Attr)) } - if span := otrace.FromContext(ctx); span != nil { - span.Annotatef(nil, "worker.SortOverNetwork. Attr: %s. Group: %d", - x.ParseAttr(q.Order[0].Attr), gid) + if span := trace.SpanFromContext(ctx); span != nil { + span.SetAttributes( + attribute.String("attribute", q.Order[0].Attr), + attribute.Int("groupId", int(gid)), + ) } if groups().ServesGroup(gid) { @@ -76,7 +81,7 @@ func (w *grpcWorker) Sort(ctx context.Context, s *pb.SortMessage) (*pb.SortResul if ctx.Err() != nil { return &emptySortResult, ctx.Err() } - ctx, span := otrace.StartSpan(ctx, "worker.Sort") + ctx, span := otel.Tracer("").Start(ctx, "worker.Sort") defer span.End() gid, err := groups().BelongsToReadOnly(s.Order[0].Attr, s.ReadTs) @@ -84,7 +89,10 @@ func (w *grpcWorker) Sort(ctx context.Context, s *pb.SortMessage) (*pb.SortResul return &emptySortResult, err } - span.Annotatef(nil, "Sorting: Attribute: %q groupId: %v Sort", s.Order[0].Attr, gid) + span.AddEvent("Sorting", trace.WithAttributes( + attribute.String("attribute", s.Order[0].Attr), + attribute.Int("groupId", int(gid)))) + if gid != groups().groupId() { return nil, errors.Errorf("attr: %q groupId: %v Request sent to wrong server.", s.Order[0].Attr, gid) @@ -116,8 +124,8 @@ func resultWithError(err error) *sortresult { } func sortWithoutIndex(ctx context.Context, ts *pb.SortMessage) *sortresult { - span := otrace.FromContext(ctx) - span.Annotate(nil, "sortWithoutIndex") + span := trace.SpanFromContext(ctx) + span.SetAttributes(attribute.String("sortWithoutIndex", "true")) n := len(ts.UidMatrix) r := new(pb.SortResult) @@ -170,8 +178,8 @@ func sortWithIndex(ctx context.Context, ts *pb.SortMessage) *sortresult { return resultWithError(ctx.Err()) } - span := otrace.FromContext(ctx) - span.Annotate(nil, "sortWithIndex") + span := trace.SpanFromContext(ctx) + span.SetAttributes(attribute.String("sortWithIndex", "true")) n := len(ts.UidMatrix) out := make([]intersectedList, n) @@ -350,8 +358,8 @@ type orderResult struct { } func multiSort(ctx context.Context, r *sortresult, ts *pb.SortMessage) error { - span := otrace.FromContext(ctx) - span.Annotate(nil, "multiSort") + span := trace.SpanFromContext(ctx) + span.SetAttributes(attribute.String("multiSort", "true")) // SrcUids for other queries are all the uids present in the response of the first sort. dest := destUids(r.reply.UidMatrix) @@ -471,19 +479,20 @@ func multiSort(ctx context.Context, r *sortresult, ts *pb.SortMessage) error { // enough for our pagination params. When all the UID lists are done, we stop // iterating over the index. func processSort(ctx context.Context, ts *pb.SortMessage) (*pb.SortResult, error) { - span := otrace.FromContext(ctx) + span := trace.SpanFromContext(ctx) stop := x.SpanTimer(span, "processSort") defer stop() - span.Annotatef(nil, "Waiting for startTs: %d", ts.ReadTs) + span.SetAttributes( + attribute.Int("startTs", int(ts.ReadTs)), + ) + if err := posting.Oracle().WaitForTs(ctx, ts.ReadTs); err != nil { return nil, err } - span.Annotatef(nil, "Waiting for checksum match") - if err := groups().ChecksumsMatch(ctx); err != nil { - return nil, err - } - span.Annotate(nil, "Done waiting") + span.SetAttributes( + attribute.String("checksumMatch", "true"), + ) if ts.Count < 0 { return nil, errors.Errorf( @@ -525,7 +534,7 @@ func processSort(ctx context.Context, ts *pb.SortMessage) (*pb.SortResult, error // wait for other goroutine to get cancelled <-resCh } else { - span.Annotatef(nil, "processSort error: %v", r.err) + span.AddEvent(fmt.Sprintf("Error processing sort: %+v", r.err)) r = <-resCh } diff --git a/worker/task.go b/worker/task.go index 0b5c067d4ff..cb34531e570 100644 --- a/worker/task.go +++ b/worker/task.go @@ -19,7 +19,9 @@ import ( cindex "github.com/google/codesearch/index" cregexp "github.com/google/codesearch/regexp" "github.com/pkg/errors" - otrace "go.opencensus.io/trace" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "golang.org/x/sync/errgroup" "google.golang.org/protobuf/proto" @@ -46,9 +48,9 @@ func invokeNetworkRequest(ctx context.Context, addr string, return nil, errors.Wrapf(err, "dispatchTaskOverNetwork: while retrieving connection.") } - if span := otrace.FromContext(ctx); span != nil { - span.Annotatef(nil, "invokeNetworkRequest: Sending request to %v", addr) - } + span := trace.SpanFromContext(ctx) + span.AddEvent("invokeNetworkRequest", trace.WithAttributes( + attribute.String("destination", addr))) c := pb.NewWorkerClient(pl.Get()) return f(ctx, c) } @@ -131,11 +133,12 @@ func ProcessTaskOverNetwork(ctx context.Context, q *pb.Query) (*pb.Result, error return nil, errNonExistentTablet } - span := otrace.FromContext(ctx) - if span != nil { - span.Annotatef(nil, "ProcessTaskOverNetwork. attr: %v gid: %v, readTs: %d, node id: %d", - attr, gid, q.ReadTs, groups().Node.Id) - } + span := trace.SpanFromContext(ctx) + span.AddEvent("ProcessTaskOverNetwork", trace.WithAttributes( + attribute.String("attr", attr), + attribute.String("gid", fmt.Sprintf("%d", gid)), + attribute.String("readTs", fmt.Sprintf("%d", q.ReadTs)), + attribute.String("node_id", fmt.Sprintf("%d", groups().Node.Id)))) if groups().ServesGroup(gid) { // No need for a network call, as this should be run from within this instance. @@ -151,10 +154,10 @@ func ProcessTaskOverNetwork(ctx context.Context, q *pb.Query) (*pb.Result, error } reply := result.(*pb.Result) - if span != nil { - span.Annotatef(nil, "Reply from server. len: %v gid: %v Attr: %v", - len(reply.UidMatrix), gid, attr) - } + span.AddEvent("Reply from server", trace.WithAttributes( + attribute.Int("len", len(reply.UidMatrix)), + attribute.Int64("gid", int64(gid)), + attribute.String("attr", attr))) return reply, nil } @@ -331,12 +334,12 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er return err } - span := otrace.FromContext(ctx) + span := trace.SpanFromContext(ctx) stop := x.SpanTimer(span, "handleValuePostings") defer stop() - if span != nil { - span.Annotatef(nil, "Number of uids: %d. args.srcFn: %+v", srcFn.n, args.srcFn) - } + span.AddEvent("Number of uids and args.srcFn", trace.WithAttributes( + attribute.Int64("uids_count", int64(srcFn.n)), + attribute.String("srcFn", fmt.Sprintf("%+v", args.srcFn)))) switch srcFn.fnType { case notAFunction, aggregatorFn, passwordFn, compareAttrFn, similarToFn: @@ -405,7 +408,9 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er // logic constitutes most of the code volume here. numGo, width := x.DivideAndRule(srcFn.n) x.AssertTrue(width > 0) - span.Annotatef(nil, "Width: %d. NumGo: %d", width, numGo) + span.AddEvent("Processing distribution", trace.WithAttributes( + attribute.Int("width", width), + attribute.Int("numGo", numGo))) outputs := make([]*pb.Result, numGo) listType := schema.State().IsList(q.Attr) @@ -758,12 +763,12 @@ func (qs *queryState) handleUidPostings( return err } - span := otrace.FromContext(ctx) + span := trace.SpanFromContext(ctx) stop := x.SpanTimer(span, "handleUidPostings") defer stop() - if span != nil { - span.Annotatef(nil, "Number of uids: %d. args.srcFn: %+v", srcFn.n, args.srcFn) - } + span.AddEvent("Number of uids and args.srcFn", trace.WithAttributes( + attribute.Int64("uids_count", int64(srcFn.n)), + attribute.String("srcFn", fmt.Sprintf("%+v", args.srcFn)))) if srcFn.n == 0 { return nil } @@ -783,7 +788,9 @@ func (qs *queryState) handleUidPostings( // Divide the task into many goroutines. numGo, width := x.DivideAndRule(srcFn.n) x.AssertTrue(width > 0) - span.Annotatef(nil, "Width: %d. NumGo: %d", width, numGo) + span.AddEvent("Processing distribution", trace.WithAttributes( + attribute.Int("width", width), + attribute.Int("numGo", numGo))) lang := langForFunc(q.Langs) needFiltering := needsStringFiltering(srcFn, q.Langs, q.Attr) @@ -834,7 +841,7 @@ func (qs *queryState) handleUidPostings( switch { case q.DoCount: if i == 0 { - span.Annotate(nil, "DoCount") + span.AddEvent("DoCount") } count, err := countForUidPostings(args, pl, facetsTree, opts) if err != nil { @@ -845,7 +852,7 @@ func (qs *queryState) handleUidPostings( out.UidMatrix = append(out.UidMatrix, &pb.List{}) case srcFn.fnType == compareScalarFn: if i == 0 { - span.Annotate(nil, "CompareScalarFn") + span.AddEvent("CompareScalarFn") } len := pl.Length(args.q.ReadTs, 0) if len == -1 { @@ -858,7 +865,7 @@ func (qs *queryState) handleUidPostings( } case srcFn.fnType == hasFn: if i == 0 { - span.Annotate(nil, "HasFn") + span.AddEvent("HasFn") } // We figure out if need to filter on bases of lang attribute or not. // If we don't need to do so, we can just check if the posting list @@ -895,7 +902,7 @@ func (qs *queryState) handleUidPostings( } case srcFn.fnType == uidInFn: if i == 0 { - span.Annotate(nil, "UidInFn") + span.AddEvent("UidInFn") } reqList := &pb.List{Uids: srcFn.uidsPresent} topts := posting.ListOptions{ @@ -914,7 +921,7 @@ func (qs *queryState) handleUidPostings( } case q.FacetParam != nil || facetsTree != nil: if i == 0 { - span.Annotate(nil, "default with facets") + span.AddEvent("default with facets") } uidList, fcsList, err := retrieveUidsAndFacets(args, pl, facetsTree, opts) if err != nil { @@ -926,7 +933,7 @@ func (qs *queryState) handleUidPostings( } default: if i == 0 { - span.Annotate(nil, "default no facets") + span.AddEvent("default no facets") } uidList, err := pl.Uids(opts) if err != nil { @@ -964,7 +971,8 @@ func (qs *queryState) handleUidPostings( for _, list := range out.UidMatrix { total += len(list.Uids) } - span.Annotatef(nil, "Total number of elements in matrix: %d", total) + span.AddEvent("Matrix elements count", trace.WithAttributes( + attribute.Int("total", total))) return nil } @@ -977,26 +985,28 @@ const ( // processTask processes the query, accumulates and returns the result. func processTask(ctx context.Context, q *pb.Query, gid uint32) (*pb.Result, error) { - ctx, span := otrace.StartSpan(ctx, "processTask."+q.Attr) + ctx, span := otel.Tracer("").Start(ctx, "processTask."+q.Attr) defer span.End() stop := x.SpanTimer(span, "processTask"+q.Attr) defer stop() - span.Annotatef(nil, "Waiting for startTs: %d at node: %d, gid: %d", - q.ReadTs, groups().Node.Id, gid) + span.SetAttributes( + attribute.String("startTs", fmt.Sprintf("%d", q.ReadTs)), + attribute.String("node", fmt.Sprintf("%d", groups().Node.Id)), + attribute.String("gid", fmt.Sprintf("%d", gid))) if err := posting.Oracle().WaitForTs(ctx, q.ReadTs); err != nil { return nil, err } - if span != nil { - maxAssigned := posting.Oracle().MaxAssigned() - span.Annotatef(nil, "Done waiting for maxAssigned. Attr: %q ReadTs: %d Max: %d", - q.Attr, q.ReadTs, maxAssigned) - } + maxAssigned := posting.Oracle().MaxAssigned() + span.AddEvent("Done waiting for maxAssigned", trace.WithAttributes( + attribute.String("attr", q.Attr), + attribute.String("readTs", fmt.Sprintf("%d", q.ReadTs)), + attribute.String("max", fmt.Sprintf("%d", maxAssigned)))) if err := groups().ChecksumsMatch(ctx); err != nil { return nil, err } - span.Annotatef(nil, "Done waiting for checksum match") + span.AddEvent("Done waiting for checksum match") // If a group stops serving tablet and it gets partitioned away from group // zero, then it wouldn't know that this group is no longer serving this @@ -1037,7 +1047,7 @@ type queryState struct { func (qs *queryState) helpProcessTask(ctx context.Context, q *pb.Query, gid uint32) ( *pb.Result, error) { - span := otrace.FromContext(ctx) + span := trace.SpanFromContext(ctx) out := new(pb.Result) attr := q.Attr @@ -1097,40 +1107,40 @@ func (qs *queryState) helpProcessTask(ctx context.Context, q *pb.Query, gid uint return nil, err } if needsValPostings { - span.Annotate(nil, "handleValuePostings") + span.AddEvent("handleValuePostings") if err := qs.handleValuePostings(ctx, args); err != nil { return nil, err } } else { - span.Annotate(nil, "handleUidPostings") + span.AddEvent("handleUidPostings") if err = qs.handleUidPostings(ctx, args, opts); err != nil { return nil, err } } if srcFn.fnType == hasFn && srcFn.isFuncAtRoot { - span.Annotate(nil, "handleHasFunction") + span.AddEvent("handleHasFunction") if err := qs.handleHasFunction(ctx, q, out, srcFn); err != nil { return nil, err } } if srcFn.fnType == compareScalarFn && srcFn.isFuncAtRoot { - span.Annotate(nil, "handleCompareScalarFunction") + span.AddEvent("handleCompareScalarFunction") if err := qs.handleCompareScalarFunction(ctx, args); err != nil { return nil, err } } if srcFn.fnType == regexFn { - span.Annotate(nil, "handleRegexFunction") + span.AddEvent("handleRegexFunction") if err := qs.handleRegexFunction(ctx, args); err != nil { return nil, err } } if srcFn.fnType == matchFn { - span.Annotate(nil, "handleMatchFunction") + span.AddEvent("handleMatchFunction") if err := qs.handleMatchFunction(ctx, args); err != nil { return nil, err } @@ -1139,7 +1149,7 @@ func (qs *queryState) helpProcessTask(ctx context.Context, q *pb.Query, gid uint // We fetch the actual value for the uids, compare them to the value in the // request and filter the uids only if the tokenizer IsLossy. if srcFn.fnType == compareAttrFn && len(srcFn.tokens) > 0 { - span.Annotate(nil, "handleCompareFunction") + span.AddEvent("handleCompareFunction") if err := qs.handleCompareFunction(ctx, args); err != nil { return nil, err } @@ -1147,7 +1157,7 @@ func (qs *queryState) helpProcessTask(ctx context.Context, q *pb.Query, gid uint // If geo filter, do value check for correctness. if srcFn.geoQuery != nil { - span.Annotate(nil, "handleGeoFunction") + span.AddEvent("handleGeoFunction") if err := qs.filterGeoFunction(ctx, args); err != nil { return nil, err } @@ -1156,7 +1166,7 @@ func (qs *queryState) helpProcessTask(ctx context.Context, q *pb.Query, gid uint // For string matching functions, check the language. We are not checking here // for hasFn as filtering for it has already been done in handleHasFunction. if srcFn.fnType != hasFn && needsStringFiltering(srcFn, q.Langs, attr) { - span.Annotate(nil, "filterStringFunction") + span.AddEvent("filterStringFunction") if err := qs.filterStringFunction(args); err != nil { return nil, err } @@ -1202,16 +1212,20 @@ func (qs *queryState) handleCompareScalarFunction(ctx context.Context, arg funcA } func (qs *queryState) handleRegexFunction(ctx context.Context, arg funcArgs) error { - span := otrace.FromContext(ctx) + span := trace.SpanFromContext(ctx) stop := x.SpanTimer(span, "handleRegexFunction") defer stop() if span != nil { - span.Annotatef(nil, "Number of uids: %d. args.srcFn: %+v", arg.srcFn.n, arg.srcFn) + span.AddEvent("Processing UIDs", trace.WithAttributes( + attribute.Int64("uid_count", int64(arg.srcFn.n)), + attribute.String("srcFn", fmt.Sprintf("%+v", arg.srcFn)))) } attr := arg.q.Attr typ, err := schema.State().TypeOf(attr) - span.Annotatef(nil, "Attr: %s. Type: %s", attr, typ.Name()) + span.AddEvent("Attribute information", trace.WithAttributes( + attribute.String("attr", attr), + attribute.String("type", typ.Name()))) if err != nil || !typ.IsScalar() { return errors.Errorf("Attribute not scalar: %s %v", x.ParseAttr(attr), typ) } @@ -1219,8 +1233,9 @@ func (qs *queryState) handleRegexFunction(ctx context.Context, arg funcArgs) err return errors.Errorf("Got non-string type. Regex match is allowed only on string type.") } useIndex := schema.State().HasTokenizer(ctx, tok.IdentTrigram, attr) - span.Annotatef(nil, "Trigram index found: %t, func at root: %t", - useIndex, arg.srcFn.isFuncAtRoot) + span.AddEvent("Trigram index information", trace.WithAttributes( + attribute.Bool("trigram_index_found", useIndex), + attribute.Bool("func_at_root", arg.srcFn.isFuncAtRoot))) query := cindex.RegexpQuery(arg.srcFn.regex.Syntax) empty := pb.List{} @@ -1261,7 +1276,10 @@ func (qs *queryState) handleRegexFunction(ctx context.Context, arg funcArgs) err isList := schema.State().IsList(attr) lang := langForFunc(arg.q.Langs) - span.Annotatef(nil, "Total uids: %d, list: %t lang: %v", len(uids.Uids), isList, lang) + span.AddEvent("Result UID information", trace.WithAttributes( + attribute.Int("uid_count", len(uids.Uids)), + attribute.Bool("is_list", isList), + attribute.String("lang", lang))) filtered := &pb.List{} for _, uid := range uids.Uids { @@ -1312,15 +1330,17 @@ func (qs *queryState) handleRegexFunction(ctx context.Context, arg funcArgs) err } func (qs *queryState) handleCompareFunction(ctx context.Context, arg funcArgs) error { - span := otrace.FromContext(ctx) + span := trace.SpanFromContext(ctx) stop := x.SpanTimer(span, "handleCompareFunction") defer stop() - if span != nil { - span.Annotatef(nil, "Number of uids: %d. args.srcFn: %+v", arg.srcFn.n, arg.srcFn) - } + span.AddEvent("Processing UIDs", trace.WithAttributes( + attribute.Int64("uid_count", int64(arg.srcFn.n)), + attribute.String("srcFn", fmt.Sprintf("%+v", arg.srcFn)))) attr := arg.q.Attr - span.Annotatef(nil, "Attr: %s. Fname: %s", attr, arg.srcFn.fname) + span.AddEvent("Function information", trace.WithAttributes( + attribute.String("attr", attr), + attribute.String("fname", arg.srcFn.fname))) tokenizer, err := pickTokenizer(ctx, attr, arg.srcFn.fname) if err != nil { return err @@ -1328,7 +1348,9 @@ func (qs *queryState) handleCompareFunction(ctx context.Context, arg funcArgs) e // Only if the tokenizer that we used IsLossy // then we need to fetch and compare the actual values. - span.Annotatef(nil, "Tokenizer: %s, Lossy: %t", tokenizer.Name(), tokenizer.IsLossy()) + span.AddEvent("Tokenizer details", trace.WithAttributes( + attribute.String("tokenizer", tokenizer.Name()), + attribute.Bool("is_lossy", tokenizer.IsLossy()))) if !tokenizer.IsLossy() { return nil @@ -1467,16 +1489,19 @@ func (qs *queryState) handleCompareFunction(ctx context.Context, arg funcArgs) e } func (qs *queryState) handleMatchFunction(ctx context.Context, arg funcArgs) error { - span := otrace.FromContext(ctx) + span := trace.SpanFromContext(ctx) stop := x.SpanTimer(span, "handleMatchFunction") defer stop() - if span != nil { - span.Annotatef(nil, "Number of uids: %d. args.srcFn: %+v", arg.srcFn.n, arg.srcFn) - } + span.AddEvent("Processing UIDs", trace.WithAttributes( + attribute.Int64("uid_count", int64(arg.srcFn.n)), + attribute.String("srcFn", fmt.Sprintf("%+v", arg.srcFn)))) attr := arg.q.Attr typ := arg.srcFn.atype - span.Annotatef(nil, "Attr: %s. Type: %s", attr, typ.Name()) + + span.AddEvent("Attribute information", trace.WithAttributes( + attribute.String("attr", attr), + attribute.String("type", typ.Name()))) var uids *pb.List switch { case !typ.IsScalar(): @@ -1504,7 +1529,10 @@ func (qs *queryState) handleMatchFunction(ctx context.Context, arg funcArgs) err isList := schema.State().IsList(attr) lang := langForFunc(arg.q.Langs) - span.Annotatef(nil, "Total uids: %d, list: %t lang: %v", len(uids.Uids), isList, lang) + span.AddEvent("Result UID information", trace.WithAttributes( + attribute.Int("uid_count", len(uids.Uids)), + attribute.Bool("is_list", isList), + attribute.String("lang", lang))) arg.out.UidMatrix = append(arg.out.UidMatrix, uids) matchQuery := strings.Join(arg.srcFn.tokens, "") @@ -1558,17 +1586,17 @@ func (qs *queryState) handleMatchFunction(ctx context.Context, arg funcArgs) err } func (qs *queryState) filterGeoFunction(ctx context.Context, arg funcArgs) error { - span := otrace.FromContext(ctx) + span := trace.SpanFromContext(ctx) stop := x.SpanTimer(span, "filterGeoFunction") defer stop() attr := arg.q.Attr uids := algo.MergeSorted(arg.out.UidMatrix) numGo, width := x.DivideAndRule(len(uids.Uids)) - if span != nil && numGo > 1 { - span.Annotatef(nil, "Number of uids: %d. NumGo: %d. Width: %d\n", - len(uids.Uids), numGo, width) - } + span.AddEvent("Parallel processing details", trace.WithAttributes( + attribute.Int("uid_count", len(uids.Uids)), + attribute.Int("num_go", numGo), + attribute.Int("width", width))) filtered := make([]*pb.List, numGo) filter := func(idx, start, end int) error { @@ -1616,9 +1644,8 @@ func (qs *queryState) filterGeoFunction(ctx context.Context, arg funcArgs) error for _, out := range filtered { final.Uids = append(final.Uids, out.Uids...) } - if span != nil && numGo > 1 { - span.Annotatef(nil, "Total uids after filtering geo: %d", len(final.Uids)) - } + span.AddEvent("Geo filtering result", trace.WithAttributes( + attribute.Int("uid_count", len(final.Uids)))) for i := range arg.out.UidMatrix { algo.IntersectWith(arg.out.UidMatrix[i], final, arg.out.UidMatrix[i]) } @@ -2116,7 +2143,7 @@ func interpretVFloatOrUid(val string) ([]float32, uint64, error) { // ServeTask is used to respond to a query. func (w *grpcWorker) ServeTask(ctx context.Context, q *pb.Query) (*pb.Result, error) { - ctx, span := otrace.StartSpan(ctx, "worker.ServeTask") + ctx, span := otel.Tracer("").Start(ctx, "worker.ServeTask") defer span.End() if ctx.Err() != nil { @@ -2143,7 +2170,10 @@ func (w *grpcWorker) ServeTask(ctx context.Context, q *pb.Query) (*pb.Result, er if q.UidList != nil { numUids = len(q.UidList.Uids) } - span.Annotatef(nil, "Attribute: %q NumUids: %v groupId: %v ServeTask", q.Attr, numUids, gid) + span.AddEvent("ServeTask details", trace.WithAttributes( + attribute.String("attr", q.Attr), + attribute.Int("num_uids", numUids), + attribute.Int64("group_id", int64(gid)))) if !groups().ServesGroup(gid) { return nil, errors.Errorf( @@ -2480,7 +2510,7 @@ func (qs *queryState) evaluate(cp countParams, out *pb.Result) error { func (qs *queryState) handleHasFunction(ctx context.Context, q *pb.Query, out *pb.Result, srcFn *functionContext) error { - span := otrace.FromContext(ctx) + span := trace.SpanFromContext(ctx) stop := x.SpanTimer(span, "handleHasFunction") defer stop() if glog.V(3) { @@ -2620,9 +2650,8 @@ loop: } } } - if span != nil { - span.Annotatef(nil, "handleHasFunction found %d uids", len(result.Uids)) - } + span.AddEvent("handleHasFunction result", trace.WithAttributes( + attribute.Int("uid_count", len(result.Uids)))) out.UidMatrix = append(out.UidMatrix, result) return nil } diff --git a/worker/worker.go b/worker/worker.go index 7c5f65948c1..85335e18c38 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -17,7 +17,7 @@ import ( "github.com/golang/glog" "github.com/pkg/errors" - "go.opencensus.io/plugin/ocgrpc" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -55,7 +55,7 @@ func Init(ps *badger.DB) { grpc.MaxRecvMsgSize(x.GrpcMaxSize), grpc.MaxSendMsgSize(x.GrpcMaxSize), grpc.MaxConcurrentStreams(math.MaxInt32), - grpc.StatsHandler(&ocgrpc.ServerHandler{}), + grpc.StatsHandler(otelgrpc.NewClientHandler()), } if x.WorkerConfig.TLSServerConfig != nil { diff --git a/x/metrics.go b/x/metrics.go index 2214a0a3142..e5aed5b95ec 100644 --- a/x/metrics.go +++ b/x/metrics.go @@ -19,9 +19,7 @@ import ( "strings" "time" - "contrib.go.opencensus.io/exporter/jaeger" oc_prom "contrib.go.opencensus.io/exporter/prometheus" - datadog "github.com/DataDog/opencensus-go-exporter-datadog" "github.com/dustin/go-humanize" "github.com/golang/glog" "github.com/prometheus/client_golang/prometheus" @@ -30,7 +28,12 @@ import ( ostats "go.opencensus.io/stats" "go.opencensus.io/stats/view" "go.opencensus.io/tag" - "go.opencensus.io/trace" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" + "go.opentelemetry.io/otel/sdk/resource" + traceTel "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.4.0" "github.com/dgraph-io/badger/v4" "github.com/dgraph-io/ristretto/v2/z" @@ -606,34 +609,78 @@ func SinceMs(startTime time.Time) float64 { func RegisterExporters(conf *viper.Viper, service string) { if traceFlag := conf.GetString("trace"); len(traceFlag) > 0 { t := z.NewSuperFlag(traceFlag).MergeAndCheckDefault(TraceDefaults) + // Create resource with service information + res := resource.NewWithAttributes( + semconv.SchemaURL, + semconv.ServiceNameKey.String(service), + ) + + // Configure the batch span processor options + batchOpts := []traceTel.BatchSpanProcessorOption{ + traceTel.WithMaxExportBatchSize(traceTel.DefaultMaxExportBatchSize), + traceTel.WithBatchTimeout(traceTel.DefaultScheduleDelay * time.Millisecond), + } + + traceSampler := traceTel.AlwaysSample() + if ratio := t.GetString("ratio"); len(ratio) > 0 { + f, err := strconv.ParseFloat(ratio, 64) + if err != nil { + log.Fatalf("Trace sampler ratio not a float: %s", ratio) + } + traceSampler = traceTel.TraceIDRatioBased(f) + } + + // Set up Jaeger exporter if configured if collector := t.GetString("jaeger"); len(collector) > 0 { - // Port details: https://www.jaegertracing.io/docs/getting-started/ - // Default collectorEndpointURI := "http://localhost:14268" - je, err := jaeger.NewExporter(jaeger.Options{ - Endpoint: collector, - ServiceName: service, - }) + // Create Jaeger exporter using OpenTelemetry OTLP + jaegerExp, err := otlptrace.New( + context.Background(), + otlptracehttp.NewClient( + otlptracehttp.WithEndpoint(collector), + otlptracehttp.WithInsecure(), + ), + ) if err != nil { log.Fatalf("Failed to create the Jaeger exporter: %v", err) } - // And now finally register it as a Trace Exporter - trace.RegisterExporter(je) + + // Create trace provider with Jaeger exporter + tp := traceTel.NewTracerProvider( + traceTel.WithBatcher(jaegerExp, batchOpts...), + traceTel.WithResource(res), + traceTel.WithSampler(traceSampler), + ) + + // Set the trace provider + otel.SetTracerProvider(tp) + glog.Infof("Registered Jaeger exporter for tracing") } + + // Set up OTLP exporter for Datadog if configured + // Note: In OpenTelemetry, typically Datadog integration uses the OTLP exporter if collector := t.GetString("datadog"); len(collector) > 0 { - exporter, err := datadog.NewExporter(datadog.Options{ - Service: service, - TraceAddr: collector, - }) + // Create OTLP exporter for Datadog + ddExporter, err := otlptrace.New( + context.Background(), + otlptracehttp.NewClient( + otlptracehttp.WithEndpoint(collector), + otlptracehttp.WithInsecure(), + ), + ) if err != nil { - log.Fatal(err) + log.Fatalf("Failed to create OTLP exporter for Datadog: %v", err) } - trace.RegisterExporter(exporter) + // Create trace provider with Datadog exporter + tp := traceTel.NewTracerProvider( + traceTel.WithBatcher(ddExporter, batchOpts...), + traceTel.WithResource(res), + traceTel.WithSampler(traceTel.AlwaysSample()), + ) - // For demoing purposes, always sample. - trace.ApplyConfig(trace.Config{ - DefaultSampler: trace.AlwaysSample(), - }) + // Set the trace provider + otel.SetTracerProvider(tp) + glog.Infof("Registered Datadog exporter for tracing") } } diff --git a/x/x.go b/x/x.go index 645c490306a..f221e057288 100644 --- a/x/x.go +++ b/x/x.go @@ -34,7 +34,8 @@ import ( "github.com/pkg/errors" "github.com/spf13/viper" "go.opencensus.io/plugin/ocgrpc" - "go.opencensus.io/trace" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "golang.org/x/term" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -965,22 +966,50 @@ func Diff(dst map[string]struct{}, src map[string]struct{}) ([]string, []string) } // SpanTimer returns a function used to record the duration of the given span. -func SpanTimer(span *trace.Span, name string) func() { - if span == nil { - return func() {} - } - uniq := int64(rand.Int31()) //nolint:gosec // unique id for tracing does not require cryptographic precision - attrs := []trace.Attribute{ - trace.Int64Attribute("funcId", uniq), - trace.StringAttribute("funcName", name), +// It supports both OpenCensus and OpenTelemetry spans +func SpanTimer(spanInterface interface{}, name string) func() { + // OpenCensus span case + if span, ok := spanInterface.(*trace.Span); ok && span != nil { + uniq := int64(rand.Int31()) //nolint:gosec // unique id for tracing does not require cryptographic precision + attrs := trace.WithAttributes( + attribute.Int64("funcId", uniq), + attribute.String("funcName", name), + ) + (*span).AddEvent("Start", attrs) + start := time.Now() + + return func() { + (*span).AddEvent("End", trace.WithAttributes( + attribute.Int64("funcId", uniq), + attribute.String("funcName", name), + attribute.String("duration", time.Since(start).String()), + )) + // TODO: We can look into doing a latency record here. + } } - span.Annotate(attrs, "Start.") - start := time.Now() - return func() { - span.Annotatef(attrs, "End. Took %s", time.Since(start)) - // TODO: We can look into doing a latency record here. + // OpenTelemetry span case + if span, ok := spanInterface.(trace.Span); ok { + uniq := int64(rand.Int31()) //nolint:gosec // unique id for tracing does not require cryptographic precision + attrs := trace.WithAttributes( + attribute.Int64("funcId", uniq), + attribute.String("funcName", name), + ) + span.AddEvent("Start", attrs) + start := time.Now() + + return func() { + span.AddEvent("End", trace.WithAttributes( + attribute.Int64("funcId", uniq), + attribute.String("funcName", name), + attribute.String("duration", time.Since(start).String()), + )) + // TODO: We can look into doing a latency record here. + } } + + // Fall back for nil or unknown span type + return func() {} } // CloseFunc needs to be called to close all the client connections.