@@ -21,7 +21,8 @@ import (
2121 "github.com/pkg/errors"
2222 "go.etcd.io/etcd/raft/v3"
2323 "go.etcd.io/etcd/raft/v3/raftpb"
24- otrace "go.opencensus.io/trace"
24+ "go.opentelemetry.io/otel"
25+ "go.opentelemetry.io/otel/trace"
2526 "google.golang.org/protobuf/proto"
2627
2728 "github.com/dgraph-io/badger/v4/y"
@@ -432,7 +433,7 @@ func (n *Node) doSendMessage(to uint64, msgCh chan []byte) error {
432433 }
433434
434435 c := pb .NewRaftClient (pool .Get ())
435- ctx , span := otrace . StartSpan (context .Background (),
436+ ctx , span := otel . Tracer ( "" ). Start (context .Background (),
436437 fmt .Sprintf ("RaftMessage-%d-to-%d" , n .Id , to ))
437438 defer span .End ()
438439
@@ -473,15 +474,15 @@ func (n *Node) doSendMessage(to uint64, msgCh chan []byte) error {
473474 Payload : & api.Payload {Data : data },
474475 }
475476 slurp (batch ) // Pick up more entries from msgCh, if present.
476- span .Annotatef ( nil , "[to: %x] [Packets: %d] Sending data of length: %d." ,
477- to , packets , len (batch .Payload .Data ))
477+ span .AddEvent ( fmt . Sprintf ( "[to: %x] [Packets: %d] Sending data of length: %d." ,
478+ to , packets , len (batch .Payload .Data )))
478479 if packets % 10000 == 0 {
479480 glog .V (2 ).Infof ("[to: %x] [Packets: %d] Sending data of length: %d." ,
480481 to , packets , len (batch .Payload .Data ))
481482 }
482483 packets ++
483484 if err := mc .Send (batch ); err != nil {
484- span .Annotatef ( nil , "Error while mc.Send: %v" , err )
485+ span .AddEvent ( fmt . Sprintf ( "Error while mc.Send: %v" , err ) )
485486 glog .Errorf ("[to: %x] Error while mc.Send: %v" , to , err )
486487 switch {
487488 case strings .Contains (err .Error (), "TransientFailure" ):
@@ -513,8 +514,7 @@ func (n *Node) doSendMessage(to uint64, msgCh chan []byte) error {
513514 }
514515 case <- ticker .C :
515516 if lastPackets == packets {
516- span .Annotatef (nil ,
517- "No activity for a while [Packets == %d]. Closing connection." , packets )
517+ span .AddEvent (fmt .Sprintf ("No activity for a while [Packets == %d]. Closing connection." , packets ))
518518 return mc .CloseSend ()
519519 }
520520 lastPackets = packets
@@ -640,34 +640,34 @@ var readIndexOk, readIndexTotal uint64
640640
641641// WaitLinearizableRead waits until a linearizable read can be performed.
642642func (n * Node ) WaitLinearizableRead (ctx context.Context ) error {
643- span := otrace . FromContext (ctx )
644- span .Annotate ( nil , "WaitLinearizableRead" )
643+ span := trace . SpanFromContext (ctx )
644+ span .AddEvent ( "WaitLinearizableRead" )
645645
646646 if num := atomic .AddUint64 (& readIndexTotal , 1 ); num % 1000 == 0 {
647647 glog .V (2 ).Infof ("ReadIndex Total: %d\n " , num )
648648 }
649649 indexCh := make (chan uint64 , 1 )
650650 select {
651651 case n .requestCh <- linReadReq {indexCh : indexCh }:
652- span .Annotate ( nil , "Pushed to requestCh" )
652+ span .AddEvent ( "Pushed to requestCh" )
653653 case <- ctx .Done ():
654- span .Annotate ( nil , "Context expired" )
654+ span .AddEvent ( "Context expired" )
655655 return ctx .Err ()
656656 }
657657
658658 select {
659659 case index := <- indexCh :
660- span .Annotatef ( nil , "Received index: %d" , index )
660+ span .AddEvent ( fmt . Sprintf ( "Received index %d" , index ) )
661661 if index == 0 {
662662 return errReadIndex
663663 } else if num := atomic .AddUint64 (& readIndexOk , 1 ); num % 1000 == 0 {
664664 glog .V (2 ).Infof ("ReadIndex OK: %d\n " , num )
665665 }
666666 err := n .Applied .WaitForMark (ctx , index )
667- span .Annotatef ( nil , "Error from Applied.WaitForMark: %v" , err )
667+ span .AddEvent ( fmt . Sprintf ( "Error from Applied.WaitForMark: %v" , err ) )
668668 return err
669669 case <- ctx .Done ():
670- span .Annotate ( nil , "Context expired" )
670+ span .AddEvent ( "Context expired" )
671671 return ctx .Err ()
672672 }
673673}
0 commit comments