Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dgraph/cmd/alpha/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (

"github.com/dgraph-io/badger/v4"
"github.com/dgraph-io/dgo/v250/protos/api"
apiv25 "github.com/dgraph-io/dgo/v250/protos/api.v25"
apiv2 "github.com/dgraph-io/dgo/v250/protos/api.v2"
_ "github.com/dgraph-io/gqlparser/v2/validator/rules" // make gql validator init() all rules
"github.com/dgraph-io/ristretto/v2/z"
"github.com/hypermodeinc/dgraph/v25/audit"
Expand Down Expand Up @@ -467,7 +467,7 @@ func serveGRPC(l net.Listener, tlsCfg *tls.Config, closer *z.Closer) {

s := grpc.NewServer(opt...)
api.RegisterDgraphServer(s, &edgraph.Server{})
apiv25.RegisterDgraphServer(s, &edgraph.ServerV25{})
apiv2.RegisterDgraphServer(s, &edgraph.ServerV25{})
hapi.RegisterHealthServer(s, health.NewServer())
worker.RegisterZeroProxyServer(s)

Expand Down
24 changes: 12 additions & 12 deletions dgraph/cmd/dgraphimport/import_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"path/filepath"

"github.com/dgraph-io/badger/v4"
apiv25 "github.com/dgraph-io/dgo/v250/protos/api.v25"
apiv2 "github.com/dgraph-io/dgo/v250/protos/api.v2"
"github.com/dgraph-io/ristretto/v2/z"

"github.com/golang/glog"
Expand All @@ -24,14 +24,14 @@ import (
)

// newClient creates a new import client with the specified endpoint and gRPC options.
func newClient(endpoint string, opts grpc.DialOption) (apiv25.DgraphClient, error) {
func newClient(endpoint string, opts grpc.DialOption) (apiv2.DgraphClient, error) {
conn, err := grpc.NewClient(endpoint, opts)
if err != nil {
return nil, fmt.Errorf("failed to connect to endpoint [%s]: %w", endpoint, err)
}

glog.Infof("Successfully connected to Dgraph endpoint: %s", endpoint)
return apiv25.NewDgraphClient(conn), nil
return apiv2.NewDgraphClient(conn), nil
}

func Import(ctx context.Context, endpoint string, opts grpc.DialOption, bulkOutDir string) error {
Expand All @@ -48,9 +48,9 @@ func Import(ctx context.Context, endpoint string, opts grpc.DialOption, bulkOutD
}

// startPDirStream initiates a snapshot stream session with the Dgraph server.
func startPDirStream(ctx context.Context, dc apiv25.DgraphClient) (*apiv25.InitiatePDirStreamResponse, error) {
func startPDirStream(ctx context.Context, dc apiv2.DgraphClient) (*apiv2.InitiatePDirStreamResponse, error) {
glog.Info("Initiating pdir stream")
req := &apiv25.InitiatePDirStreamRequest{}
req := &apiv2.InitiatePDirStreamRequest{}
resp, err := dc.InitiatePDirStream(ctx, req)
if err != nil {
glog.Errorf("failed to initiate pdir stream: %v", err)
Expand All @@ -63,7 +63,7 @@ func startPDirStream(ctx context.Context, dc apiv25.DgraphClient) (*apiv25.Initi
// sendPDir takes a p directory and a set of group IDs and streams the data from the
// p directory to the corresponding group IDs. It first scans the provided directory for
// subdirectories named with numeric group IDs.
func sendPDir(ctx context.Context, dg apiv25.DgraphClient, baseDir string, groups []uint32) error {
func sendPDir(ctx context.Context, dg apiv2.DgraphClient, baseDir string, groups []uint32) error {
glog.Infof("Starting to stream pdir from directory: %s", baseDir)

errG, ctx := errgroup.WithContext(ctx)
Expand Down Expand Up @@ -94,7 +94,7 @@ func sendPDir(ctx context.Context, dg apiv25.DgraphClient, baseDir string, group

// streamData handles the actual data streaming process for a single group.
// It opens the BadgerDB at the specified directory and streams all data to the server.
func streamData(ctx context.Context, dg apiv25.DgraphClient, pdir string, groupId uint32) error {
func streamData(ctx context.Context, dg apiv2.DgraphClient, pdir string, groupId uint32) error {
glog.Infof("Opening stream for group %d from directory %s", groupId, pdir)

// Initialize stream with the server
Expand All @@ -118,7 +118,7 @@ func streamData(ctx context.Context, dg apiv25.DgraphClient, pdir string, groupI

// Send group ID as the first message in the stream
glog.Infof("Sending group ID [%d] to server", groupId)
groupReq := &apiv25.StreamPDirRequest{GroupId: groupId}
groupReq := &apiv2.StreamPDirRequest{GroupId: groupId}
if err := out.Send(groupReq); err != nil {
return fmt.Errorf("failed to send group ID [%d]: %w", groupId, err)
}
Expand All @@ -129,8 +129,8 @@ func streamData(ctx context.Context, dg apiv25.DgraphClient, pdir string, groupI
stream.LogPrefix = fmt.Sprintf("Sending P dir for group [%d]", groupId)
stream.KeyToList = nil
stream.Send = func(buf *z.Buffer) error {
p := &apiv25.StreamPacket{Data: buf.Bytes()}
if err := out.Send(&apiv25.StreamPDirRequest{StreamPacket: p}); err != nil && !errors.Is(err, io.EOF) {
p := &apiv2.StreamPacket{Data: buf.Bytes()}
if err := out.Send(&apiv2.StreamPDirRequest{StreamPacket: p}); err != nil && !errors.Is(err, io.EOF) {
return fmt.Errorf("failed to send data chunk: %w", err)
}
return nil
Expand All @@ -143,9 +143,9 @@ func streamData(ctx context.Context, dg apiv25.DgraphClient, pdir string, groupI

// Send the final 'done' signal to mark completion
glog.Infof("Sending completion signal for group [%d]", groupId)
done := &apiv25.StreamPacket{Done: true}
done := &apiv2.StreamPacket{Done: true}

if err := out.Send(&apiv25.StreamPDirRequest{StreamPacket: done}); err != nil && !errors.Is(err, io.EOF) {
if err := out.Send(&apiv2.StreamPDirRequest{StreamPacket: done}); err != nil && !errors.Is(err, io.EOF) {
return fmt.Errorf("failed to send 'done' signal for group [%d]: %w", groupId, err)
}
// Wait for acknowledgment from the server
Expand Down
30 changes: 15 additions & 15 deletions edgraph/alter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"time"

"github.com/dgraph-io/dgo/v250/protos/api"
apiv25 "github.com/dgraph-io/dgo/v250/protos/api.v25"
apiv2 "github.com/dgraph-io/dgo/v250/protos/api.v2"
"github.com/hypermodeinc/dgraph/v25/dql"
"github.com/hypermodeinc/dgraph/v25/protos/pb"
"github.com/hypermodeinc/dgraph/v25/query"
Expand All @@ -28,7 +28,7 @@ import (
)

type ServerV25 struct {
apiv25.UnimplementedDgraphServer
apiv2.UnimplementedDgraphServer
}

func validateAlterReq(ctx context.Context) error {
Expand Down Expand Up @@ -74,7 +74,7 @@ func executeDropAll(ctx context.Context, startTs uint64) error {
return nil
}

func executeDropAllInNs(ctx context.Context, startTs uint64, req *apiv25.AlterRequest) error {
func executeDropAllInNs(ctx context.Context, startTs uint64, req *apiv2.AlterRequest) error {
ctx = x.AttachJWTNamespace(ctx)

nsID, err := getNamespaceIDFromName(ctx, req.NsName)
Expand Down Expand Up @@ -117,7 +117,7 @@ func executeDropAllInNs(ctx context.Context, startTs uint64, req *apiv25.AlterRe
return nil
}

func executeDropData(ctx context.Context, startTs uint64, req *apiv25.AlterRequest) error {
func executeDropData(ctx context.Context, startTs uint64, req *apiv2.AlterRequest) error {
nsID, err := getNamespaceIDFromName(x.AttachJWTNamespace(ctx), req.NsName)
if err != nil {
return err
Expand Down Expand Up @@ -153,7 +153,7 @@ func executeDropData(ctx context.Context, startTs uint64, req *apiv25.AlterReque
return nil
}

func executeDropPredicate(ctx context.Context, startTs uint64, req *apiv25.AlterRequest) error {
func executeDropPredicate(ctx context.Context, startTs uint64, req *apiv2.AlterRequest) error {
if len(req.PredicateToDrop) == 0 {
return errors.Errorf("PredicateToDrop cannot be empty")
}
Expand Down Expand Up @@ -195,7 +195,7 @@ func executeDropPredicate(ctx context.Context, startTs uint64, req *apiv25.Alter
return nil
}

func executeDropType(ctx context.Context, startTs uint64, req *apiv25.AlterRequest) error {
func executeDropType(ctx context.Context, startTs uint64, req *apiv2.AlterRequest) error {
if len(req.TypeToDrop) == 0 {
return errors.Errorf("TypeToDrop cannot be empty")
}
Expand All @@ -219,7 +219,7 @@ func executeDropType(ctx context.Context, startTs uint64, req *apiv25.AlterReque
return nil
}

func executeSetSchema(ctx context.Context, startTs uint64, req *apiv25.AlterRequest) error {
func executeSetSchema(ctx context.Context, startTs uint64, req *apiv2.AlterRequest) error {
nsID, err := getNamespaceIDFromName(x.AttachJWTNamespace(ctx), req.NsName)
if err != nil {
return err
Expand Down Expand Up @@ -250,7 +250,7 @@ func executeSetSchema(ctx context.Context, startTs uint64, req *apiv25.AlterRequ
}

// Alter handles requests to change the schema or remove parts or all of the data.
func (s *ServerV25) Alter(ctx context.Context, req *apiv25.AlterRequest) (*apiv25.AlterResponse, error) {
func (s *ServerV25) Alter(ctx context.Context, req *apiv2.AlterRequest) (*apiv2.AlterResponse, error) {
ctx, span := otel.Tracer("").Start(ctx, "ServerV25.Alter")
defer span.End()
span.AddEvent("Alter operation", trace.WithAttributes(attribute.String("request", req.String())))
Expand All @@ -274,19 +274,19 @@ func (s *ServerV25) Alter(ctx context.Context, req *apiv25.AlterRequest) (*apiv2
// but is required if it lies on some other machine. Let's get it for safety.
startTs := worker.State.GetTimestamp(false)

empty := &apiv25.AlterResponse{}
empty := &apiv2.AlterResponse{}
switch req.Op {
case apiv25.AlterOp_DROP_ALL:
case apiv2.AlterOp_DROP_ALL:
return empty, executeDropAll(ctx, startTs)
case apiv25.AlterOp_DROP_ALL_IN_NS:
case apiv2.AlterOp_DROP_ALL_IN_NS:
return empty, executeDropAllInNs(ctx, startTs, req)
case apiv25.AlterOp_DROP_DATA_IN_NS:
case apiv2.AlterOp_DROP_DATA_IN_NS:
return empty, executeDropData(ctx, startTs, req)
case apiv25.AlterOp_DROP_PREDICATE_IN_NS:
case apiv2.AlterOp_DROP_PREDICATE_IN_NS:
return empty, executeDropPredicate(ctx, startTs, req)
case apiv25.AlterOp_DROP_TYPE_IN_NS:
case apiv2.AlterOp_DROP_TYPE_IN_NS:
return empty, executeDropType(ctx, startTs, req)
case apiv25.AlterOp_SCHEMA_IN_NS:
case apiv2.AlterOp_SCHEMA_IN_NS:
return empty, executeSetSchema(ctx, startTs, req)
default:
return empty, errors.Errorf("invalid operation in Alter Request")
Expand Down
38 changes: 19 additions & 19 deletions edgraph/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"google.golang.org/protobuf/proto"

"github.com/dgraph-io/dgo/v250/protos/api"
apiv25 "github.com/dgraph-io/dgo/v250/protos/api.v25"
apiv2 "github.com/dgraph-io/dgo/v250/protos/api.v2"
"github.com/hypermodeinc/dgraph/v25/schema"
"github.com/hypermodeinc/dgraph/v25/x"
)
Expand Down Expand Up @@ -61,7 +61,7 @@ type resultNamespaces struct {
}

func (s *ServerV25) SignInUser(ctx context.Context,
request *apiv25.SignInUserRequest) (*apiv25.SignInUserResponse, error) {
request *apiv2.SignInUserRequest) (*apiv2.SignInUserResponse, error) {

req := &api.LoginRequest{Userid: request.UserId, Password: request.Password, Namespace: 0}
resp, err := (&Server{}).Login(ctx, req)
Expand All @@ -73,11 +73,11 @@ func (s *ServerV25) SignInUser(ctx context.Context,
return nil, err
}

return &apiv25.SignInUserResponse{AccessJwt: jwt.AccessJwt, RefreshJwt: jwt.RefreshJwt}, nil
return &apiv2.SignInUserResponse{AccessJwt: jwt.AccessJwt, RefreshJwt: jwt.RefreshJwt}, nil
}

func (s *ServerV25) CreateNamespace(ctx context.Context, in *apiv25.CreateNamespaceRequest) (
*apiv25.CreateNamespaceResponse, error) {
func (s *ServerV25) CreateNamespace(ctx context.Context, in *apiv2.CreateNamespaceRequest) (
*apiv2.CreateNamespaceResponse, error) {

if err := AuthSuperAdmin(ctx); err != nil {
s := status.Convert(err)
Expand Down Expand Up @@ -112,11 +112,11 @@ func (s *ServerV25) CreateNamespace(ctx context.Context, in *apiv25.CreateNamesp
}

glog.Infof("Created namespace [%v] with id [%d]", in.NsName, ns)
return &apiv25.CreateNamespaceResponse{}, nil
return &apiv2.CreateNamespaceResponse{}, nil
}

func (s *ServerV25) DropNamespace(ctx context.Context, in *apiv25.DropNamespaceRequest) (
*apiv25.DropNamespaceResponse, error) {
func (s *ServerV25) DropNamespace(ctx context.Context, in *apiv2.DropNamespaceRequest) (
*apiv2.DropNamespaceResponse, error) {

if err := AuthSuperAdmin(ctx); err != nil {
s := status.Convert(err)
Expand Down Expand Up @@ -152,7 +152,7 @@ func (s *ServerV25) DropNamespace(ctx context.Context, in *apiv25.DropNamespaceR

if nsID == 0 {
glog.Infof("Namespace [%v] not found", in.NsName)
return &apiv25.DropNamespaceResponse{}, nil
return &apiv2.DropNamespaceResponse{}, nil
}

// If we crash at this point, it is possible that namespace is deleted
Expand All @@ -167,11 +167,11 @@ func (s *ServerV25) DropNamespace(ctx context.Context, in *apiv25.DropNamespaceR
}

glog.Infof("Dropped namespace [%v] with id [%d]", in.NsName, nsID)
return &apiv25.DropNamespaceResponse{}, nil
return &apiv2.DropNamespaceResponse{}, nil
}

func (s *ServerV25) UpdateNamespace(ctx context.Context, in *apiv25.UpdateNamespaceRequest) (
*apiv25.UpdateNamespaceResponse, error) {
func (s *ServerV25) UpdateNamespace(ctx context.Context, in *apiv2.UpdateNamespaceRequest) (
*apiv2.UpdateNamespaceResponse, error) {

if err := AuthSuperAdmin(ctx); err != nil {
s := status.Convert(err)
Expand All @@ -188,19 +188,19 @@ func (s *ServerV25) UpdateNamespace(ctx context.Context, in *apiv25.UpdateNamesp

if isLgacyNamespace(in.NsName) {
err := renameLeagcyNamespace(ctx, in.NsName, in.RenameToNs)
return &apiv25.UpdateNamespaceResponse{}, err
return &apiv2.UpdateNamespaceResponse{}, err
}

if err := renameNamespace(x.AttachJWTNamespace(ctx), in.NsName, in.RenameToNs); err != nil {
return nil, err
}

glog.Infof("Renamed namespace [%v] to [%v]", in.NsName, in.RenameToNs)
return &apiv25.UpdateNamespaceResponse{}, nil
return &apiv2.UpdateNamespaceResponse{}, nil
}

func (s *ServerV25) ListNamespaces(ctx context.Context, in *apiv25.ListNamespacesRequest) (
*apiv25.ListNamespacesResponse, error) {
func (s *ServerV25) ListNamespaces(ctx context.Context, in *apiv2.ListNamespacesRequest) (
*apiv2.ListNamespacesResponse, error) {

if err := AuthSuperAdmin(ctx); err != nil {
s := status.Convert(err)
Expand All @@ -224,13 +224,13 @@ func (s *ServerV25) ListNamespaces(ctx context.Context, in *apiv25.ListNamespace
}

schNsList := schema.State().Namespaces()
result := &apiv25.ListNamespacesResponse{NsList: make(map[string]*apiv25.Namespace)}
result := &apiv2.ListNamespacesResponse{NsList: make(map[string]*apiv2.Namespace)}
for id := range schNsList {
if name, ok := dataNsList[id]; !ok {
name = fmt.Sprintf("dgraph-%d", id)
result.NsList[name] = &apiv25.Namespace{Name: name, Id: id}
result.NsList[name] = &apiv2.Namespace{Name: name, Id: id}
} else {
result.NsList[name] = &apiv25.Namespace{Name: name, Id: id}
result.NsList[name] = &apiv2.Namespace{Name: name, Id: id}
}
}

Expand Down
22 changes: 11 additions & 11 deletions edgraph/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,23 @@ import (
"fmt"

"github.com/dgraph-io/dgo/v250/protos/api"
apiv25 "github.com/dgraph-io/dgo/v250/protos/api.v25"
apiv2 "github.com/dgraph-io/dgo/v250/protos/api.v2"
"github.com/hypermodeinc/dgraph/v25/dql"
"github.com/hypermodeinc/dgraph/v25/x"

"google.golang.org/grpc/status"
)

func (s *ServerV25) Ping(ctx context.Context, req *apiv25.PingRequest) (*apiv25.PingResponse, error) {
func (s *ServerV25) Ping(ctx context.Context, req *apiv2.PingRequest) (*apiv2.PingResponse, error) {
if err := x.HealthCheck(); err != nil {
return nil, err
}

return &apiv25.PingResponse{Version: x.Version()}, nil
return &apiv2.PingResponse{Version: x.Version()}, nil
}

// Alter handles requests to change the schema or remove parts or all of the data.
func (s *ServerV25) RunDQL(ctx context.Context, req *apiv25.RunDQLRequest) (*apiv25.RunDQLResponse, error) {
func (s *ServerV25) RunDQL(ctx context.Context, req *apiv2.RunDQLRequest) (*apiv2.RunDQLResponse, error) {
// For now, we only allow guardian of galaxies to do this operation in v25
if err := AuthSuperAdmin(ctx); err != nil {
s := status.Convert(err)
Expand All @@ -48,7 +48,7 @@ func (s *ServerV25) RunDQL(ctx context.Context, req *apiv25.RunDQLRequest) (*api
apiReq.ReadOnly = req.ReadOnly
apiReq.BestEffort = req.BestEffort
apiReq.RespFormat = api.Request_JSON
if req.RespFormat == apiv25.RespFormat_RDF {
if req.RespFormat == apiv2.RespFormat_RDF {
apiReq.RespFormat = api.Request_RDF
}
if len(apiReq.Mutations) > 0 {
Expand All @@ -61,8 +61,8 @@ func (s *ServerV25) RunDQL(ctx context.Context, req *apiv25.RunDQLRequest) (*api
return nil, err
}

resp := &apiv25.RunDQLResponse{
Txn: &apiv25.TxnContext{
resp := &apiv2.RunDQLResponse{
Txn: &apiv2.TxnContext{
StartTs: apiResp.Txn.StartTs,
CommitTs: apiResp.Txn.CommitTs,
Aborted: apiResp.Txn.Aborted,
Expand All @@ -71,21 +71,21 @@ func (s *ServerV25) RunDQL(ctx context.Context, req *apiv25.RunDQLRequest) (*api
Hash: apiResp.Txn.Hash,
},
BlankUids: apiResp.Uids,
Latency: &apiv25.Latency{
Latency: &apiv2.Latency{
ParsingNs: apiResp.Latency.ParsingNs,
ProcessingNs: apiResp.Latency.ProcessingNs,
RespEncodingNs: apiResp.Latency.EncodingNs,
AssignTimestampNs: apiResp.Latency.AssignTimestampNs,
TotalNs: apiResp.Latency.TotalNs,
},
Metrics: &apiv25.Metrics{
Metrics: &apiv2.Metrics{
UidsTouched: apiResp.Metrics.NumUids,
},
}

if req.RespFormat == apiv25.RespFormat_JSON {
if req.RespFormat == apiv2.RespFormat_JSON {
resp.QueryResult = apiResp.Json
} else if req.RespFormat == apiv25.RespFormat_RDF {
} else if req.RespFormat == apiv2.RespFormat_RDF {
resp.QueryResult = apiResp.Rdf
}

Expand Down
Loading