Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions conn/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import (
"github.com/pkg/errors"
"go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/raft/v3/raftpb"
otrace "go.opencensus.io/trace"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"google.golang.org/protobuf/proto"

"github.com/dgraph-io/badger/v4/y"
Expand Down Expand Up @@ -432,7 +433,7 @@ func (n *Node) doSendMessage(to uint64, msgCh chan []byte) error {
}

c := pb.NewRaftClient(pool.Get())
ctx, span := otrace.StartSpan(context.Background(),
ctx, span := otel.Tracer("").Start(context.Background(),
fmt.Sprintf("RaftMessage-%d-to-%d", n.Id, to))
defer span.End()

Expand Down Expand Up @@ -473,15 +474,15 @@ func (n *Node) doSendMessage(to uint64, msgCh chan []byte) error {
Payload: &api.Payload{Data: data},
}
slurp(batch) // Pick up more entries from msgCh, if present.
span.Annotatef(nil, "[to: %x] [Packets: %d] Sending data of length: %d.",
to, packets, len(batch.Payload.Data))
span.AddEvent(fmt.Sprintf("[to: %x] [Packets: %d] Sending data of length: %d.",
to, packets, len(batch.Payload.Data)))
if packets%10000 == 0 {
glog.V(2).Infof("[to: %x] [Packets: %d] Sending data of length: %d.",
to, packets, len(batch.Payload.Data))
}
packets++
if err := mc.Send(batch); err != nil {
span.Annotatef(nil, "Error while mc.Send: %v", err)
span.AddEvent(fmt.Sprintf("Error while mc.Send: %v", err))
glog.Errorf("[to: %x] Error while mc.Send: %v", to, err)
switch {
case strings.Contains(err.Error(), "TransientFailure"):
Expand Down Expand Up @@ -513,8 +514,7 @@ func (n *Node) doSendMessage(to uint64, msgCh chan []byte) error {
}
case <-ticker.C:
if lastPackets == packets {
span.Annotatef(nil,
"No activity for a while [Packets == %d]. Closing connection.", packets)
span.AddEvent(fmt.Sprintf("No activity for a while [Packets == %d]. Closing connection.", packets))
return mc.CloseSend()
}
lastPackets = packets
Expand Down Expand Up @@ -640,34 +640,34 @@ var readIndexOk, readIndexTotal uint64

// WaitLinearizableRead waits until a linearizable read can be performed.
func (n *Node) WaitLinearizableRead(ctx context.Context) error {
span := otrace.FromContext(ctx)
span.Annotate(nil, "WaitLinearizableRead")
span := trace.SpanFromContext(ctx)
span.AddEvent("WaitLinearizableRead")

if num := atomic.AddUint64(&readIndexTotal, 1); num%1000 == 0 {
glog.V(2).Infof("ReadIndex Total: %d\n", num)
}
indexCh := make(chan uint64, 1)
select {
case n.requestCh <- linReadReq{indexCh: indexCh}:
span.Annotate(nil, "Pushed to requestCh")
span.AddEvent("Pushed to requestCh")
case <-ctx.Done():
span.Annotate(nil, "Context expired")
span.AddEvent("Context expired")
return ctx.Err()
}

select {
case index := <-indexCh:
span.Annotatef(nil, "Received index: %d", index)
span.AddEvent(fmt.Sprintf("Received index %d", index))
Comment thread
mangalaman93 marked this conversation as resolved.
if index == 0 {
return errReadIndex
} else if num := atomic.AddUint64(&readIndexOk, 1); num%1000 == 0 {
glog.V(2).Infof("ReadIndex OK: %d\n", num)
}
err := n.Applied.WaitForMark(ctx, index)
span.Annotatef(nil, "Error from Applied.WaitForMark: %v", err)
span.AddEvent(fmt.Sprintf("Error from Applied.WaitForMark: %v", err))
return err
case <-ctx.Done():
span.Annotate(nil, "Context expired")
span.AddEvent("Context expired")
return ctx.Err()
}
}
Expand Down
4 changes: 2 additions & 2 deletions conn/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

"github.com/golang/glog"
"github.com/pkg/errors"
"go.opencensus.io/plugin/ocgrpc"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
Expand Down Expand Up @@ -159,7 +159,7 @@ func (p *Pools) Connect(addr string, tlsClientConf *tls.Config) *Pool {
// newPool creates a new "pool" with one gRPC connection, refcount 0.
func newPool(addr string, tlsClientConf *tls.Config) (*Pool, error) {
conOpts := []grpc.DialOption{
grpc.WithStatsHandler(&ocgrpc.ClientHandler{}),
grpc.WithStatsHandler(otelgrpc.NewClientHandler()),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(x.GrpcMaxSize),
grpc.MaxCallSendMsgSize(x.GrpcMaxSize),
Expand Down
9 changes: 5 additions & 4 deletions conn/raft_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package conn
import (
"context"
"encoding/binary"
"fmt"
"math/rand"
"strconv"
"sync"
Expand All @@ -17,7 +18,7 @@ import (
"github.com/golang/glog"
"github.com/pkg/errors"
"go.etcd.io/etcd/raft/v3/raftpb"
otrace "go.opencensus.io/trace"
"go.opentelemetry.io/otel/trace"

"github.com/dgraph-io/dgo/v240/protos/api"
"github.com/hypermodeinc/dgraph/v24/protos/pb"
Expand Down Expand Up @@ -183,13 +184,13 @@ func (w *RaftServer) RaftMessage(server pb.Raft_RaftMessageServer) error {
if ctx.Err() != nil {
return ctx.Err()
}
span := otrace.FromContext(ctx)
span := trace.SpanFromContext(ctx)

node := w.GetNode()
if node == nil || node.Raft() == nil {
return ErrNoNode
}
span.Annotatef(nil, "Stream server is node %#x", node.Id)
span.AddEvent(fmt.Sprintf("Stream server is node %d", node.Id))

var rc *pb.RaftContext
raft := node.Raft()
Expand Down Expand Up @@ -247,7 +248,7 @@ func (w *RaftServer) RaftMessage(server pb.Raft_RaftMessageServer) error {
}
if loop == 1 {
rc = batch.GetContext()
span.Annotatef(nil, "Stream from %#x", rc.GetId())
span.AddEvent(fmt.Sprintf("Stream from %#x", rc.GetId()))
if rc != nil {
node.Connect(rc.Id, rc.Addr)
}
Expand Down
9 changes: 2 additions & 7 deletions dgraph/cmd/alpha/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ import (
"github.com/pkg/errors"
"github.com/spf13/cobra"
"go.opencensus.io/plugin/ocgrpc"
otrace "go.opencensus.io/trace"
"go.opencensus.io/zpages"
"go.opentelemetry.io/contrib/zpages"
"golang.org/x/net/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
Expand Down Expand Up @@ -509,7 +508,7 @@ func setupServer(closer *z.Closer) {
baseMux.HandleFunc("/health", healthCheck)
baseMux.HandleFunc("/state", stateHandler)
baseMux.HandleFunc("/debug/jemalloc", x.JemallocHandler)
zpages.Handle(baseMux, "/debug/z")
http.DefaultServeMux.Handle("/debug/z", zpages.NewTracezHandler(zpages.NewSpanProcessor()))

// TODO: Figure out what this is for?
http.HandleFunc("/debug/store", storeStatsHandler)
Expand Down Expand Up @@ -769,10 +768,6 @@ func run() {
return true, true
}
}
otrace.ApplyConfig(otrace.Config{
DefaultSampler: otrace.ProbabilitySampler(x.WorkerConfig.Trace.GetFloat64("ratio")),
MaxAnnotationEventsPerSpan: 256,
})

// Posting will initialize index which requires schema. Hence, initialize
// schema before calling posting.Init().
Expand Down
16 changes: 10 additions & 6 deletions dgraph/cmd/zero/assign.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ package zero

import (
"context"
"fmt"
"math/rand"
"time"

"github.com/golang/glog"
"github.com/pkg/errors"
otrace "go.opencensus.io/trace"
"go.opentelemetry.io/otel"
attribute "go.opentelemetry.io/otel/attribute"
"google.golang.org/grpc/metadata"

"github.com/hypermodeinc/dgraph/v24/protos/pb"
Expand Down Expand Up @@ -175,7 +177,7 @@ func (s *Server) AssignIds(ctx context.Context, num *pb.Num) (*pb.AssignedIds, e
if ctx.Err() != nil {
return &emptyAssignedIds, ctx.Err()
}
ctx, span := otrace.StartSpan(ctx, "Zero.AssignIds")
ctx, span := otel.Tracer("").Start(ctx, "Zero.AssignIds")
defer span.End()

rateLimit := func() error {
Expand Down Expand Up @@ -215,11 +217,13 @@ func (s *Server) AssignIds(ctx context.Context, num *pb.Num) (*pb.AssignedIds, e
if err := rateLimit(); err != nil {
return err
}
span.Annotatef(nil, "Zero leader leasing %d ids", num.GetVal())
span.SetAttributes(attribute.String("tablet", "predicate"))
span.AddEvent(fmt.Sprintf("Zero leader leasing %d ids", num.GetVal()))
reply, err = s.lease(ctx, num)
return err
}
span.Annotate(nil, "Not Zero leader")
span.SetAttributes(attribute.String("Not Zero leader", "true"))
span.AddEvent("Not Zero leader")
// I'm not the leader and this request was forwarded to me by a peer, who thought I'm the
// leader.
if num.Forwarded {
Expand All @@ -230,7 +234,7 @@ func (s *Server) AssignIds(ctx context.Context, num *pb.Num) (*pb.AssignedIds, e
if pl == nil {
return errors.Errorf("No healthy connection found to Leader of group zero")
}
span.Annotatef(nil, "Sending request to %v", pl.Addr)
span.AddEvent(fmt.Sprintf("Sending request to %v", pl.Addr))
zc := pb.NewZeroClient(pl.Get())
num.Forwarded = true
// pass on the incoming metadata to the zero leader.
Expand Down Expand Up @@ -269,7 +273,7 @@ func (s *Server) AssignIds(ctx context.Context, num *pb.Num) (*pb.AssignedIds, e
case <-ctx.Done():
return &emptyAssignedIds, ctx.Err()
case err := <-c:
span.Annotatef(nil, "Error while leasing %+v: %v", num, err)
span.AddEvent(fmt.Sprintf("Error while leasing %+v: %v", num, err))
return reply, err
}
}
34 changes: 19 additions & 15 deletions dgraph/cmd/zero/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,17 @@ package zero

import (
"context"
"fmt"
"math/rand"
"strconv"
"strings"
"time"

"github.com/golang/glog"
"github.com/pkg/errors"
otrace "go.opencensus.io/trace"
"go.opentelemetry.io/otel"
attribute "go.opentelemetry.io/otel/attribute"
trace "go.opentelemetry.io/otel/trace"

"github.com/dgraph-io/badger/v4/y"
"github.com/dgraph-io/dgo/v240/protos/api"
Expand Down Expand Up @@ -337,8 +340,8 @@ func (s *Server) proposeTxn(ctx context.Context, src *api.TxnContext) error {
}

func (s *Server) commit(ctx context.Context, src *api.TxnContext) error {
span := otrace.FromContext(ctx)
span.Annotate([]otrace.Attribute{otrace.Int64Attribute("startTs", int64(src.StartTs))}, "")
span := trace.SpanFromContext(ctx)
span.SetAttributes(attribute.Int64("startTs", int64(src.StartTs)))
if src.Aborted {
return s.proposeTxn(ctx, src)
}
Expand All @@ -348,8 +351,7 @@ func (s *Server) commit(ctx context.Context, src *api.TxnContext) error {
conflict := s.orc.hasConflict(src)
s.orc.RUnlock()
if conflict {
span.Annotate([]otrace.Attribute{otrace.BoolAttribute("abort", true)},
"Oracle found conflict")
span.SetAttributes(attribute.Bool("abort", true))
src.Aborted = true
return s.proposeTxn(ctx, src)
}
Expand Down Expand Up @@ -384,7 +386,7 @@ func (s *Server) commit(ctx context.Context, src *api.TxnContext) error {
return nil
}
if err := checkPreds(); err != nil {
span.Annotate([]otrace.Attribute{otrace.BoolAttribute("abort", true)}, err.Error())
span.SetAttributes(attribute.Bool("abort", true))
src.Aborted = true
return s.proposeTxn(ctx, src)
}
Expand All @@ -397,15 +399,16 @@ func (s *Server) commit(ctx context.Context, src *api.TxnContext) error {
src.CommitTs = assigned.StartId
// Mark the transaction as done, irrespective of whether the proposal succeeded or not.
defer s.orc.doneUntil.Done(src.CommitTs)
span.Annotatef([]otrace.Attribute{otrace.Int64Attribute("commitTs", int64(src.CommitTs))},
"Node Id: %d. Proposing TxnContext: %+v", s.Node.Id, src)
span.SetAttributes(attribute.Int64("commitTs", int64(src.CommitTs)))
span.SetAttributes(attribute.Int64("nodeId", int64(s.Node.Id)))
span.AddEvent(fmt.Sprintf("TXN Context: %+v", src))

if err := s.orc.commit(src); err != nil {
span.Annotatef(nil, "Found a conflict. Aborting.")
span.SetAttributes(attribute.Bool("abort", true))
src.Aborted = true
}
if err := ctx.Err(); err != nil {
span.Annotatef(nil, "Aborting txn due to context timing out.")
span.SetAttributes(attribute.Bool("abort", true))
src.Aborted = true
}
// Propose txn should be used to set watermark as done.
Expand All @@ -420,15 +423,15 @@ func (s *Server) CommitOrAbort(ctx context.Context, src *api.TxnContext) (*api.T
if ctx.Err() != nil {
return nil, ctx.Err()
}
ctx, span := otrace.StartSpan(ctx, "Zero.CommitOrAbort")
ctx, span := otel.Tracer("").Start(ctx, "Zero.CommitOrAbort")
defer span.End()

if !s.Node.AmLeader() {
return nil, errors.Errorf("Only leader can decide to commit or abort")
}
err := s.commit(ctx, src)
if err != nil {
span.Annotate([]otrace.Attribute{otrace.BoolAttribute("error", true)}, err.Error())
span.SetAttributes(attribute.Bool("error", true))
}
return src, err
}
Expand Down Expand Up @@ -491,17 +494,18 @@ func (s *Server) TryAbort(ctx context.Context,

// Timestamps is used to assign startTs for a new transaction
func (s *Server) Timestamps(ctx context.Context, num *pb.Num) (*pb.AssignedIds, error) {
ctx, span := otrace.StartSpan(ctx, "Zero.Timestamps")
ctx, span := otel.Tracer("").Start(ctx, "Zero.Timestamps")
defer span.End()

span.Annotatef(nil, "Zero id: %d. Timestamp request: %+v", s.Node.Id, num)
span.SetAttributes(attribute.Int64("zeroId", int64(s.Node.Id)))
span.SetAttributes(attribute.String("timestampRequest", fmt.Sprintf("%+v", num)))
if ctx.Err() != nil {
return &emptyAssignedIds, ctx.Err()
}

num.Type = pb.Num_TXN_TS
reply, err := s.lease(ctx, num)
span.Annotatef(nil, "Response: %+v. Error: %v", reply, err)
span.AddEvent(fmt.Sprintf("Response: %+v, Error: %v", reply, err))

switch err {
case nil:
Expand Down
Loading