Skip to content

Commit 492f6a8

Browse files
fix(core): upgrade opencensus to opentelemetry
1 parent b307fad commit 492f6a8

31 files changed

Lines changed: 2132 additions & 427 deletions

conn/node.go

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ import (
2121
"github.com/pkg/errors"
2222
"go.etcd.io/etcd/raft/v3"
2323
"go.etcd.io/etcd/raft/v3/raftpb"
24-
otrace "go.opencensus.io/trace"
24+
"go.opentelemetry.io/otel"
25+
"go.opentelemetry.io/otel/trace"
2526
"google.golang.org/protobuf/proto"
2627

2728
"github.com/dgraph-io/badger/v4/y"
@@ -432,7 +433,7 @@ func (n *Node) doSendMessage(to uint64, msgCh chan []byte) error {
432433
}
433434

434435
c := pb.NewRaftClient(pool.Get())
435-
ctx, span := otrace.StartSpan(context.Background(),
436+
ctx, span := otel.Tracer("").Start(context.Background(),
436437
fmt.Sprintf("RaftMessage-%d-to-%d", n.Id, to))
437438
defer span.End()
438439

@@ -473,15 +474,15 @@ func (n *Node) doSendMessage(to uint64, msgCh chan []byte) error {
473474
Payload: &api.Payload{Data: data},
474475
}
475476
slurp(batch) // Pick up more entries from msgCh, if present.
476-
span.Annotatef(nil, "[to: %x] [Packets: %d] Sending data of length: %d.",
477-
to, packets, len(batch.Payload.Data))
477+
span.AddEvent(fmt.Sprintf("[to: %x] [Packets: %d] Sending data of length: %d.",
478+
to, packets, len(batch.Payload.Data)))
478479
if packets%10000 == 0 {
479480
glog.V(2).Infof("[to: %x] [Packets: %d] Sending data of length: %d.",
480481
to, packets, len(batch.Payload.Data))
481482
}
482483
packets++
483484
if err := mc.Send(batch); err != nil {
484-
span.Annotatef(nil, "Error while mc.Send: %v", err)
485+
span.AddEvent(fmt.Sprintf("Error while mc.Send: %v", err))
485486
glog.Errorf("[to: %x] Error while mc.Send: %v", to, err)
486487
switch {
487488
case strings.Contains(err.Error(), "TransientFailure"):
@@ -513,8 +514,7 @@ func (n *Node) doSendMessage(to uint64, msgCh chan []byte) error {
513514
}
514515
case <-ticker.C:
515516
if lastPackets == packets {
516-
span.Annotatef(nil,
517-
"No activity for a while [Packets == %d]. Closing connection.", packets)
517+
span.AddEvent(fmt.Sprintf("No activity for a while [Packets == %d]. Closing connection.", packets))
518518
return mc.CloseSend()
519519
}
520520
lastPackets = packets
@@ -640,34 +640,34 @@ var readIndexOk, readIndexTotal uint64
640640

641641
// WaitLinearizableRead waits until a linearizable read can be performed.
642642
func (n *Node) WaitLinearizableRead(ctx context.Context) error {
643-
span := otrace.FromContext(ctx)
644-
span.Annotate(nil, "WaitLinearizableRead")
643+
span := trace.SpanFromContext(ctx)
644+
span.AddEvent("WaitLinearizableRead")
645645

646646
if num := atomic.AddUint64(&readIndexTotal, 1); num%1000 == 0 {
647647
glog.V(2).Infof("ReadIndex Total: %d\n", num)
648648
}
649649
indexCh := make(chan uint64, 1)
650650
select {
651651
case n.requestCh <- linReadReq{indexCh: indexCh}:
652-
span.Annotate(nil, "Pushed to requestCh")
652+
span.AddEvent("Pushed to requestCh")
653653
case <-ctx.Done():
654-
span.Annotate(nil, "Context expired")
654+
span.AddEvent("Context expired")
655655
return ctx.Err()
656656
}
657657

658658
select {
659659
case index := <-indexCh:
660-
span.Annotatef(nil, "Received index: %d", index)
660+
span.AddEvent(fmt.Sprintf("Received index %d", index))
661661
if index == 0 {
662662
return errReadIndex
663663
} else if num := atomic.AddUint64(&readIndexOk, 1); num%1000 == 0 {
664664
glog.V(2).Infof("ReadIndex OK: %d\n", num)
665665
}
666666
err := n.Applied.WaitForMark(ctx, index)
667-
span.Annotatef(nil, "Error from Applied.WaitForMark: %v", err)
667+
span.AddEvent(fmt.Sprintf("Error from Applied.WaitForMark: %v", err))
668668
return err
669669
case <-ctx.Done():
670-
span.Annotate(nil, "Context expired")
670+
span.AddEvent("Context expired")
671671
return ctx.Err()
672672
}
673673
}

conn/pool.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414

1515
"github.com/golang/glog"
1616
"github.com/pkg/errors"
17-
"go.opencensus.io/plugin/ocgrpc"
17+
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
1818
"google.golang.org/grpc"
1919
"google.golang.org/grpc/credentials"
2020
"google.golang.org/grpc/credentials/insecure"
@@ -159,7 +159,7 @@ func (p *Pools) Connect(addr string, tlsClientConf *tls.Config) *Pool {
159159
// newPool creates a new "pool" with one gRPC connection, refcount 0.
160160
func newPool(addr string, tlsClientConf *tls.Config) (*Pool, error) {
161161
conOpts := []grpc.DialOption{
162-
grpc.WithStatsHandler(&ocgrpc.ClientHandler{}),
162+
grpc.WithStatsHandler(otelgrpc.NewClientHandler()),
163163
grpc.WithDefaultCallOptions(
164164
grpc.MaxCallRecvMsgSize(x.GrpcMaxSize),
165165
grpc.MaxCallSendMsgSize(x.GrpcMaxSize),

conn/raft_server.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package conn
88
import (
99
"context"
1010
"encoding/binary"
11+
"fmt"
1112
"math/rand"
1213
"strconv"
1314
"sync"
@@ -17,7 +18,7 @@ import (
1718
"github.com/golang/glog"
1819
"github.com/pkg/errors"
1920
"go.etcd.io/etcd/raft/v3/raftpb"
20-
otrace "go.opencensus.io/trace"
21+
"go.opentelemetry.io/otel/trace"
2122

2223
"github.com/dgraph-io/dgo/v240/protos/api"
2324
"github.com/hypermodeinc/dgraph/v24/protos/pb"
@@ -183,13 +184,13 @@ func (w *RaftServer) RaftMessage(server pb.Raft_RaftMessageServer) error {
183184
if ctx.Err() != nil {
184185
return ctx.Err()
185186
}
186-
span := otrace.FromContext(ctx)
187+
span := trace.SpanFromContext(ctx)
187188

188189
node := w.GetNode()
189190
if node == nil || node.Raft() == nil {
190191
return ErrNoNode
191192
}
192-
span.Annotatef(nil, "Stream server is node %#x", node.Id)
193+
span.AddEvent(fmt.Sprintf("Stream server is node %d", node.Id))
193194

194195
var rc *pb.RaftContext
195196
raft := node.Raft()
@@ -247,7 +248,7 @@ func (w *RaftServer) RaftMessage(server pb.Raft_RaftMessageServer) error {
247248
}
248249
if loop == 1 {
249250
rc = batch.GetContext()
250-
span.Annotatef(nil, "Stream from %#x", rc.GetId())
251+
span.AddEvent(fmt.Sprintf("Stream from %#x", rc.GetId()))
251252
if rc != nil {
252253
node.Connect(rc.Id, rc.Addr)
253254
}

dgraph/cmd/alpha/run.go

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,7 @@ import (
2828
"github.com/pkg/errors"
2929
"github.com/spf13/cobra"
3030
"go.opencensus.io/plugin/ocgrpc"
31-
otrace "go.opencensus.io/trace"
32-
"go.opencensus.io/zpages"
31+
"go.opentelemetry.io/contrib/zpages"
3332
"golang.org/x/net/trace"
3433
"google.golang.org/grpc"
3534
"google.golang.org/grpc/credentials"
@@ -510,7 +509,7 @@ func setupServer(closer *z.Closer) {
510509
baseMux.HandleFunc("/health", healthCheck)
511510
baseMux.HandleFunc("/state", stateHandler)
512511
baseMux.HandleFunc("/debug/jemalloc", x.JemallocHandler)
513-
zpages.Handle(baseMux, "/debug/z")
512+
http.DefaultServeMux.Handle("/debug/z", zpages.NewTracezHandler(zpages.NewSpanProcessor()))
514513

515514
// TODO: Figure out what this is for?
516515
http.HandleFunc("/debug/store", storeStatsHandler)
@@ -771,10 +770,6 @@ func run() {
771770
return true, true
772771
}
773772
}
774-
otrace.ApplyConfig(otrace.Config{
775-
DefaultSampler: otrace.ProbabilitySampler(x.WorkerConfig.Trace.GetFloat64("ratio")),
776-
MaxAnnotationEventsPerSpan: 256,
777-
})
778773

779774
// Posting will initialize index which requires schema. Hence, initialize
780775
// schema before calling posting.Init().

dgraph/cmd/zero/assign.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,20 @@
11
/*
2-
* SPDX-FileCopyrightText: © Hypermode Inc. <[email protected]>
2+
* SPDX-FileCopyrightText: Hypermode Inc. <[email protected]>
33
* SPDX-License-Identifier: Apache-2.0
44
*/
55

66
package zero
77

88
import (
99
"context"
10+
"fmt"
1011
"math/rand"
1112
"time"
1213

1314
"github.com/golang/glog"
1415
"github.com/pkg/errors"
15-
otrace "go.opencensus.io/trace"
16+
"go.opentelemetry.io/otel"
17+
attribute "go.opentelemetry.io/otel/attribute"
1618
"google.golang.org/grpc/metadata"
1719

1820
"github.com/hypermodeinc/dgraph/v24/protos/pb"
@@ -175,7 +177,7 @@ func (s *Server) AssignIds(ctx context.Context, num *pb.Num) (*pb.AssignedIds, e
175177
if ctx.Err() != nil {
176178
return &emptyAssignedIds, ctx.Err()
177179
}
178-
ctx, span := otrace.StartSpan(ctx, "Zero.AssignIds")
180+
ctx, span := otel.Tracer("").Start(ctx, "Zero.AssignIds")
179181
defer span.End()
180182

181183
rateLimit := func() error {
@@ -215,11 +217,13 @@ func (s *Server) AssignIds(ctx context.Context, num *pb.Num) (*pb.AssignedIds, e
215217
if err := rateLimit(); err != nil {
216218
return err
217219
}
218-
span.Annotatef(nil, "Zero leader leasing %d ids", num.GetVal())
220+
span.SetAttributes(attribute.String("tablet", "predicate"))
221+
span.AddEvent(fmt.Sprintf("Zero leader leasing %d ids", num.GetVal()))
219222
reply, err = s.lease(ctx, num)
220223
return err
221224
}
222-
span.Annotate(nil, "Not Zero leader")
225+
span.SetAttributes(attribute.String("Not Zero leader", "true"))
226+
span.AddEvent("Not Zero leader")
223227
// I'm not the leader and this request was forwarded to me by a peer, who thought I'm the
224228
// leader.
225229
if num.Forwarded {
@@ -230,7 +234,7 @@ func (s *Server) AssignIds(ctx context.Context, num *pb.Num) (*pb.AssignedIds, e
230234
if pl == nil {
231235
return errors.Errorf("No healthy connection found to Leader of group zero")
232236
}
233-
span.Annotatef(nil, "Sending request to %v", pl.Addr)
237+
span.AddEvent(fmt.Sprintf("Sending request to %v", pl.Addr))
234238
zc := pb.NewZeroClient(pl.Get())
235239
num.Forwarded = true
236240
// pass on the incoming metadata to the zero leader.
@@ -269,7 +273,7 @@ func (s *Server) AssignIds(ctx context.Context, num *pb.Num) (*pb.AssignedIds, e
269273
case <-ctx.Done():
270274
return &emptyAssignedIds, ctx.Err()
271275
case err := <-c:
272-
span.Annotatef(nil, "Error while leasing %+v: %v", num, err)
276+
span.AddEvent(fmt.Sprintf("Error while leasing %+v: %v", num, err))
273277
return reply, err
274278
}
275279
}

dgraph/cmd/zero/oracle.go

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,23 @@
11
/*
2-
* SPDX-FileCopyrightText: © Hypermode Inc. <[email protected]>
2+
* SPDX-FileCopyrightText: Hypermode Inc. <[email protected]>
33
* SPDX-License-Identifier: Apache-2.0
44
*/
55

66
package zero
77

88
import (
99
"context"
10+
"fmt"
1011
"math/rand"
1112
"strconv"
1213
"strings"
1314
"time"
1415

1516
"github.com/golang/glog"
1617
"github.com/pkg/errors"
17-
otrace "go.opencensus.io/trace"
18+
"go.opentelemetry.io/otel"
19+
attribute "go.opentelemetry.io/otel/attribute"
20+
trace "go.opentelemetry.io/otel/trace"
1821

1922
"github.com/dgraph-io/badger/v4/y"
2023
"github.com/dgraph-io/dgo/v240/protos/api"
@@ -337,8 +340,8 @@ func (s *Server) proposeTxn(ctx context.Context, src *api.TxnContext) error {
337340
}
338341

339342
func (s *Server) commit(ctx context.Context, src *api.TxnContext) error {
340-
span := otrace.FromContext(ctx)
341-
span.Annotate([]otrace.Attribute{otrace.Int64Attribute("startTs", int64(src.StartTs))}, "")
343+
span := trace.SpanFromContext(ctx)
344+
span.SetAttributes(attribute.Int64("startTs", int64(src.StartTs)))
342345
if src.Aborted {
343346
return s.proposeTxn(ctx, src)
344347
}
@@ -348,8 +351,7 @@ func (s *Server) commit(ctx context.Context, src *api.TxnContext) error {
348351
conflict := s.orc.hasConflict(src)
349352
s.orc.RUnlock()
350353
if conflict {
351-
span.Annotate([]otrace.Attribute{otrace.BoolAttribute("abort", true)},
352-
"Oracle found conflict")
354+
span.SetAttributes(attribute.Bool("abort", true))
353355
src.Aborted = true
354356
return s.proposeTxn(ctx, src)
355357
}
@@ -384,7 +386,7 @@ func (s *Server) commit(ctx context.Context, src *api.TxnContext) error {
384386
return nil
385387
}
386388
if err := checkPreds(); err != nil {
387-
span.Annotate([]otrace.Attribute{otrace.BoolAttribute("abort", true)}, err.Error())
389+
span.SetAttributes(attribute.Bool("abort", true))
388390
src.Aborted = true
389391
return s.proposeTxn(ctx, src)
390392
}
@@ -397,15 +399,16 @@ func (s *Server) commit(ctx context.Context, src *api.TxnContext) error {
397399
src.CommitTs = assigned.StartId
398400
// Mark the transaction as done, irrespective of whether the proposal succeeded or not.
399401
defer s.orc.doneUntil.Done(src.CommitTs)
400-
span.Annotatef([]otrace.Attribute{otrace.Int64Attribute("commitTs", int64(src.CommitTs))},
401-
"Node Id: %d. Proposing TxnContext: %+v", s.Node.Id, src)
402+
span.SetAttributes(attribute.Int64("commitTs", int64(src.CommitTs)))
403+
span.SetAttributes(attribute.Int64("nodeId", int64(s.Node.Id)))
404+
span.SetAttributes(attribute.String("txnContext", fmt.Sprintf("%+v", src)))
402405

403406
if err := s.orc.commit(src); err != nil {
404-
span.Annotatef(nil, "Found a conflict. Aborting.")
407+
span.SetAttributes(attribute.Bool("abort", true))
405408
src.Aborted = true
406409
}
407410
if err := ctx.Err(); err != nil {
408-
span.Annotatef(nil, "Aborting txn due to context timing out.")
411+
span.SetAttributes(attribute.Bool("abort", true))
409412
src.Aborted = true
410413
}
411414
// Propose txn should be used to set watermark as done.
@@ -420,15 +423,15 @@ func (s *Server) CommitOrAbort(ctx context.Context, src *api.TxnContext) (*api.T
420423
if ctx.Err() != nil {
421424
return nil, ctx.Err()
422425
}
423-
ctx, span := otrace.StartSpan(ctx, "Zero.CommitOrAbort")
426+
ctx, span := otel.Tracer("").Start(ctx, "Zero.CommitOrAbort")
424427
defer span.End()
425428

426429
if !s.Node.AmLeader() {
427430
return nil, errors.Errorf("Only leader can decide to commit or abort")
428431
}
429432
err := s.commit(ctx, src)
430433
if err != nil {
431-
span.Annotate([]otrace.Attribute{otrace.BoolAttribute("error", true)}, err.Error())
434+
span.SetAttributes(attribute.Bool("error", true))
432435
}
433436
return src, err
434437
}
@@ -491,17 +494,19 @@ func (s *Server) TryAbort(ctx context.Context,
491494

492495
// Timestamps is used to assign startTs for a new transaction
493496
func (s *Server) Timestamps(ctx context.Context, num *pb.Num) (*pb.AssignedIds, error) {
494-
ctx, span := otrace.StartSpan(ctx, "Zero.Timestamps")
497+
ctx, span := otel.Tracer("").Start(ctx, "Zero.Timestamps")
495498
defer span.End()
496499

497-
span.Annotatef(nil, "Zero id: %d. Timestamp request: %+v", s.Node.Id, num)
500+
span.SetAttributes(attribute.Int64("zeroId", int64(s.Node.Id)))
501+
span.SetAttributes(attribute.String("timestampRequest", fmt.Sprintf("%+v", num)))
498502
if ctx.Err() != nil {
499503
return &emptyAssignedIds, ctx.Err()
500504
}
501505

502506
num.Type = pb.Num_TXN_TS
503507
reply, err := s.lease(ctx, num)
504-
span.Annotatef(nil, "Response: %+v. Error: %v", reply, err)
508+
span.SetAttributes(attribute.String("response", fmt.Sprintf("%+v", reply)))
509+
span.SetAttributes(attribute.String("error", fmt.Sprintf("%v", err)))
505510

506511
switch err {
507512
case nil:

0 commit comments

Comments
 (0)