Skip to content

Commit 156ce72

Browse files
committed
Add implementation for AllocateIDs and use the API in live loader
This PR implements the AllocateIDs API for allocating UIDs, namespaces and timestamps for bulk and live loader. For now, this PR also updates the live loader to use this API and completely avoid talkingt to zero directly.
1 parent e648e77 commit 156ce72

21 files changed

Lines changed: 130 additions & 130 deletions

File tree

acl/acl_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2679,7 +2679,8 @@ func (asuite *AclTestSuite) TestAllowUIDAccess() {
26792679
require.Equal(t, devGroup, createdGroup)
26802680
require.NoError(t, hc.AddUserToGroup(userid, devGroup))
26812681

2682-
require.NoError(t, asuite.dc.AssignUids(gc.Dgraph, 101))
2682+
_, _, err = gc.AllocateUIDs(ctx, 101)
2683+
require.NoError(t, err)
26832684
mu := &api.Mutation{SetNquads: []byte(`<100> <name> "100th User" .`), CommitNow: true}
26842685
_, err = gc.Mutate(mu)
26852686
require.NoError(t, err)

dgraph/cmd/live/batch.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import (
1919
"github.com/dgryski/go-farm"
2020
"github.com/dustin/go-humanize"
2121
"github.com/dustin/go-humanize/english"
22-
"google.golang.org/grpc"
2322
"google.golang.org/grpc/codes"
2423
"google.golang.org/grpc/status"
2524

@@ -78,7 +77,6 @@ type loader struct {
7877
uidsLock sync.RWMutex
7978

8079
reqs chan *request
81-
zeroconn *grpc.ClientConn
8280
schema *Schema
8381
namespaces map[uint64]struct{}
8482

dgraph/cmd/live/load-json/load_test.go

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323
)
2424

2525
var alphaService = testutil.SockAddr
26-
var zeroService = testutil.SockAddrZero
2726

2827
var (
2928
testDataDir string
@@ -91,8 +90,7 @@ func TestLiveLoadJSONFileEmpty(t *testing.T) {
9190
{"echo", "[]"},
9291
{testutil.DgraphBinaryPath(), "live",
9392
"--schema", testDataDir + "/family.schema", "--files", "/dev/stdin",
94-
"--alpha", alphaService, "--zero", zeroService,
95-
"--creds", "user=groot;password=password;"},
93+
"--alpha", alphaService, "--creds", "user=groot;password=password;"},
9694
}
9795
_, err := testutil.Pipeline(pipeline)
9896
require.NoError(t, err, "live loading JSON file ran successfully")
@@ -104,8 +102,7 @@ func TestLiveLoadJSONFile(t *testing.T) {
104102
pipeline := [][]string{
105103
{testutil.DgraphBinaryPath(), "live",
106104
"--schema", testDataDir + "/family.schema", "--files", testDataDir + "/family.json",
107-
"--alpha", alphaService, "--zero", zeroService,
108-
"--creds", "user=groot;password=password;"},
105+
"--alpha", alphaService, "--creds", "user=groot;password=password;"},
109106
}
110107
_, err := testutil.Pipeline(pipeline)
111108
require.NoError(t, err, "live loading JSON file exited with error")
@@ -119,8 +116,7 @@ func TestLiveLoadCanUseAlphaForAssigningUids(t *testing.T) {
119116
pipeline := [][]string{
120117
{testutil.DgraphBinaryPath(), "live",
121118
"--schema", testDataDir + "/family.schema", "--files", testDataDir + "/family.json",
122-
"--alpha", alphaService, "--zero", alphaService,
123-
"--creds", "user=groot;password=password;"},
119+
"--alpha", alphaService, "--creds", "user=groot;password=password;"},
124120
}
125121
_, err := testutil.Pipeline(pipeline)
126122
require.NoError(t, err, "live loading JSON file exited with error")
@@ -135,8 +131,7 @@ func TestLiveLoadJSONCompressedStream(t *testing.T) {
135131
{"gzip", "-c", testDataDir + "/family.json"},
136132
{testutil.DgraphBinaryPath(), "live",
137133
"--schema", testDataDir + "/family.schema", "--files", "/dev/stdin",
138-
"--alpha", alphaService, "--zero", zeroService,
139-
"--creds", "user=groot;password=password;"},
134+
"--alpha", alphaService, "--creds", "user=groot;password=password;"},
140135
}
141136
_, err := testutil.Pipeline(pipeline)
142137
require.NoError(t, err, "live loading JSON stream exited with error")
@@ -157,8 +152,7 @@ func TestLiveLoadJSONMultipleFiles(t *testing.T) {
157152
pipeline := [][]string{
158153
{testutil.DgraphBinaryPath(), "live",
159154
"--schema", testDataDir + "/family.schema", "--files", fileList,
160-
"--alpha", alphaService, "--zero", zeroService,
161-
"--creds", "user=groot;password=password;"},
155+
"--alpha", alphaService, "--creds", "user=groot;password=password;"},
162156
}
163157
_, err := testutil.Pipeline(pipeline)
164158
require.NoError(t, err, "live loading multiple JSON files exited with error")

dgraph/cmd/live/load-uids/load_test.go

Lines changed: 11 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ var (
3535

3636
var (
3737
alphaService string
38-
zeroService string
3938
alphaName string
4039
alphaExportPath string
4140
localExportPath = "./export_copy"
@@ -87,8 +86,7 @@ func TestLiveLoadUpsertAtOnce(t *testing.T) {
8786
pipeline := [][]string{
8887
{testutil.DgraphBinaryPath(), "live",
8988
"--schema", testDataDir + "/xid.schema", "--files", file, "--alpha",
90-
alphaService, "--zero", zeroService,
91-
"--creds", "user=groot;password=password;", "-U", "xid"},
89+
alphaService, "--creds", "user=groot;password=password;", "-U", "xid"},
9290
}
9391
_, err := testutil.Pipeline(pipeline)
9492
require.NoError(t, err, "live loading JSON file exited with error")
@@ -102,17 +100,15 @@ func TestLiveLoadUpsert(t *testing.T) {
102100
pipeline := [][]string{
103101
{testutil.DgraphBinaryPath(), "live",
104102
"--schema", testDataDir + "/xid.schema", "--files", testDataDir + "/xid_a.rdf",
105-
"--alpha", alphaService, "--zero", zeroService,
106-
"--creds", "user=groot;password=password;", "-U", "xid"},
103+
"--alpha", alphaService, "--creds", "user=groot;password=password;", "-U", "xid"},
107104
}
108105
_, err := testutil.Pipeline(pipeline)
109106
require.NoError(t, err, "live loading JSON file exited with error")
110107

111108
pipeline = [][]string{
112109
{testutil.DgraphBinaryPath(), "live",
113110
"--schema", testDataDir + "/xid.schema", "--files", testDataDir + "/xid_b.rdf",
114-
"--alpha", alphaService, "--zero", zeroService,
115-
"--creds", "user=groot;password=password;", "-U", "xid"},
111+
"--alpha", alphaService, "--creds", "user=groot;password=password;", "-U", "xid"},
116112
}
117113
_, err = testutil.Pipeline(pipeline)
118114
require.NoError(t, err, "live loading JSON file exited with error")
@@ -190,8 +186,7 @@ func TestLiveLoadJsonUidKeep(t *testing.T) {
190186
pipeline := [][]string{
191187
{testutil.DgraphBinaryPath(), "live",
192188
"--schema", testDataDir + "/family.schema", "--files", testDataDir + "/family.json",
193-
"--alpha", alphaService, "--zero", zeroService,
194-
"--creds", "user=groot;password=password;"},
189+
"--alpha", alphaService, "--creds", "user=groot;password=password;"},
195190
}
196191
_, err := testutil.Pipeline(pipeline)
197192
require.NoError(t, err, "live loading JSON file exited with error")
@@ -205,8 +200,7 @@ func TestLiveLoadJsonUidDiscard(t *testing.T) {
205200
pipeline := [][]string{
206201
{testutil.DgraphBinaryPath(), "live", "--new_uids",
207202
"--schema", testDataDir + "/family.schema", "--files", testDataDir + "/family.json",
208-
"--alpha", alphaService, "--zero", zeroService,
209-
"--creds", "user=groot;password=password;"},
203+
"--alpha", alphaService, "--creds", "user=groot;password=password;"},
210204
}
211205
_, err := testutil.Pipeline(pipeline)
212206
require.NoError(t, err, "live loading JSON file exited with error")
@@ -220,8 +214,7 @@ func TestLiveLoadRdfUidKeep(t *testing.T) {
220214
pipeline := [][]string{
221215
{testutil.DgraphBinaryPath(), "live",
222216
"--schema", testDataDir + "/family.schema", "--files", testDataDir + "/family.rdf",
223-
"--alpha", alphaService, "--zero", zeroService,
224-
"--creds", "user=groot;password=password;"},
217+
"--alpha", alphaService, "--creds", "user=groot;password=password;"},
225218
}
226219
_, err := testutil.Pipeline(pipeline)
227220
require.NoError(t, err, "live loading JSON file exited with error")
@@ -235,8 +228,7 @@ func TestLiveLoadRdfUidDiscard(t *testing.T) {
235228
pipeline := [][]string{
236229
{testutil.DgraphBinaryPath(), "live", "--new_uids",
237230
"--schema", testDataDir + "/family.schema", "--files", testDataDir + "/family.rdf",
238-
"--alpha", alphaService, "--zero", zeroService,
239-
"--creds", "user=groot;password=password;"},
231+
"--alpha", alphaService, "--creds", "user=groot;password=password;"},
240232
}
241233
_, err := testutil.Pipeline(pipeline)
242234
require.NoError(t, err, "live loading JSON file exited with error")
@@ -276,8 +268,7 @@ func TestLiveLoadExportedSchema(t *testing.T) {
276268
"--files", localExportPath + "/" + exportId + "/" + groupId + ".rdf.gz",
277269
"--encryption",
278270
x.BuildEncFlag(testDataDir + "/../../../../enc/test-fixtures/enc-key"),
279-
"--alpha", alphaService, "--zero", zeroService,
280-
"--creds", "user=groot;password=password;"},
271+
"--alpha", alphaService, "--creds", "user=groot;password=password;"},
281272
}
282273
_, err := testutil.Pipeline(pipeline)
283274
require.NoError(t, err, "live loading exported schema exited with error")
@@ -316,8 +307,7 @@ func TestLiveLoadFileName(t *testing.T) {
316307
pipeline := [][]string{
317308
{testutil.DgraphBinaryPath(), "live",
318309
"--files", testDataDir + "/correct1.rdf," + testDataDir + "/errored1.rdf",
319-
"--alpha", alphaService, "--zero", zeroService,
320-
"--creds", "user=groot;password=password;"},
310+
"--alpha", alphaService, "--creds", "user=groot;password=password;"},
321311
}
322312

323313
out, err := testutil.Pipeline(pipeline)
@@ -333,8 +323,7 @@ func TestLiveLoadFileNameMultipleErrored(t *testing.T) {
333323
pipeline := [][]string{
334324
{testutil.DgraphBinaryPath(), "live",
335325
"--files", testDataDir + "/correct1.rdf," + testDataDir + "/errored1.rdf," +
336-
testDataDir + "/errored2.rdf", "--alpha", alphaService, "--zero", zeroService,
337-
"--creds", "user=groot;password=password;"},
326+
testDataDir + "/errored2.rdf", "--alpha", alphaService, "--creds", "user=groot;password=password;"},
338327
}
339328

340329
out, err := testutil.Pipeline(pipeline)
@@ -351,8 +340,7 @@ func TestLiveLoadFileNameMultipleCorrect(t *testing.T) {
351340
pipeline := [][]string{
352341
{testutil.DgraphBinaryPath(), "live",
353342
"--files", testDataDir + "/correct1.rdf," + testDataDir + "/correct2.rdf," +
354-
testDataDir + "/errored1.rdf", "--alpha", alphaService, "--zero", zeroService,
355-
"--creds", "user=groot;password=password;"},
343+
testDataDir + "/errored1.rdf", "--alpha", alphaService, "--creds", "user=groot;password=password;"},
356344
}
357345

358346
out, err := testutil.Pipeline(pipeline)
@@ -365,7 +353,6 @@ func TestLiveLoadFileNameMultipleCorrect(t *testing.T) {
365353
func TestMain(m *testing.M) {
366354
alphaName = testutil.Instance
367355
alphaService = testutil.SockAddr
368-
zeroService = testutil.SockAddrZero
369356

370357
x.AssertTrue(strings.Count(alphaName, "_") == 2)
371358
left := strings.Index(alphaName, "_")

dgraph/cmd/live/run.go

Lines changed: 2 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"bufio"
1010
"compress/gzip"
1111
"context"
12-
"crypto/tls"
1312
"encoding/json"
1413
"fmt"
1514
"io"
@@ -48,7 +47,6 @@ type options struct {
4847
dataFiles string
4948
dataFormat string
5049
schemaFile string
51-
zero string
5250
concurrent int
5351
batchSize int
5452
clientDir string
@@ -137,7 +135,7 @@ func init() {
137135
"from filename")
138136
flag.StringP("alpha", "a", "127.0.0.1:9080",
139137
"Comma-separated list of Dgraph alpha gRPC server addresses")
140-
flag.StringP("zero", "z", "127.0.0.1:5080", "Dgraph zero gRPC server address")
138+
flag.StringP("zero", "z", "", "(deprecated) Dgraph zero gRPC server address")
141139
flag.IntP("conc", "c", 10,
142140
"Number of concurrent requests to make to Dgraph")
143141
flag.IntP("batch", "b", 1000,
@@ -599,34 +597,14 @@ func setup(opts batchMutationOptions, dc *dgo.Dgraph, conf *viper.Viper) *loader
599597
WithIndexCacheSize(100 * (1 << 20)).
600598
WithZSTDCompressionLevel(3))
601599
x.Checkf(err, "Error while creating badger KV posting store")
602-
603600
}
604601

605602
dialOpts := []grpc.DialOption{}
606603
if conf.GetString("slash_grpc_endpoint") != "" && conf.IsSet("auth_token") {
607604
dialOpts = append(dialOpts, x.WithAuthorizationCredentials(conf.GetString("auth_token")))
608605
}
609606

610-
var tlsConfig *tls.Config
611-
if conf.GetString("slash_grpc_endpoint") != "" {
612-
var tlsErr error
613-
tlsConfig, tlsErr = x.SlashTLSConfig(conf.GetString("slash_grpc_endpoint"))
614-
x.Checkf(tlsErr, "Unable to generate TLS Cert Pool")
615-
} else {
616-
var tlsErr error
617-
tlsConfig, tlsErr = x.LoadClientTLSConfigForInternalPort(conf)
618-
x.Check(tlsErr)
619-
}
620-
621-
// compression with zero server actually makes things worse
622-
connzero, err := x.SetupConnection(opt.zero, tlsConfig, false, dialOpts...)
623-
x.Checkf(err, "Unable to connect to zero, Is it running at %s?", opt.zero)
624-
625-
xopts := xidmap.XidMapOptions{UidAssigner: connzero, DB: db}
626-
// Slash uses alpha to assign UIDs in live loader. Dgraph client is needed by xidmap to do
627-
// authorization.
628-
xopts.DgClient = dc
629-
607+
xopts := xidmap.XidMapOptions{DB: db, DgClient: dc}
630608
alloc := xidmap.New(xopts)
631609
l := &loader{
632610
opts: opts,
@@ -636,7 +614,6 @@ func setup(opts batchMutationOptions, dc *dgo.Dgraph, conf *viper.Viper) *loader
636614
conflicts: make(map[uint64]struct{}),
637615
alloc: alloc,
638616
db: db,
639-
zeroconn: connzero,
640617
namespaces: make(map[uint64]struct{}),
641618
}
642619

@@ -684,13 +661,6 @@ func (l *loader) populateNamespaces(ctx context.Context, dc *dgo.Dgraph, singleN
684661
}
685662

686663
func run() error {
687-
var zero string
688-
if Live.Conf.GetString("slash_grpc_endpoint") != "" {
689-
zero = Live.Conf.GetString("slash_grpc_endpoint")
690-
} else {
691-
zero = Live.Conf.GetString("zero")
692-
}
693-
694664
creds := z.NewSuperFlag(Live.Conf.GetString("creds")).MergeAndCheckDefault(x.DefaultCreds)
695665
keys, err := x.GetEncAclKeys(Live.Conf)
696666
if err != nil {
@@ -702,7 +672,6 @@ func run() error {
702672
dataFiles: Live.Conf.GetString("files"),
703673
dataFormat: Live.Conf.GetString("format"),
704674
schemaFile: Live.Conf.GetString("schema"),
705-
zero: zero,
706675
concurrent: Live.Conf.GetInt("conc"),
707676
batchSize: Live.Conf.GetInt("batch"),
708677
clientDir: Live.Conf.GetString("xidmap"),
@@ -778,8 +747,6 @@ func run() error {
778747
defer closeFunc()
779748

780749
l := setup(bmOpts, dg, Live.Conf)
781-
defer l.zeroconn.Close()
782-
783750
if err := l.populateNamespaces(ctx, dg, singleNsOp); err != nil {
784751
fmt.Printf("Error while populating namespaces %s\n", err)
785752
return err

dgraph/cmd/zero/zero_test.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,20 +40,26 @@ func TestRemoveNode(t *testing.T) {
4040
}
4141

4242
func TestIdLeaseOverflow(t *testing.T) {
43-
require.NoError(t, testutil.AssignUids(100))
44-
err := testutil.AssignUids(math.MaxUint64 - 10)
43+
dialOpts := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}
44+
con, err := grpc.NewClient(testutil.SockAddrZero, dialOpts...)
45+
require.NoError(t, err)
46+
zc := pb.NewZeroClient(con)
47+
48+
_, err = zc.AssignIds(context.Background(), &pb.Num{Val: 100, Type: pb.Num_UID})
49+
require.NoError(t, err)
50+
51+
_, err = zc.AssignIds(context.Background(), &pb.Num{Val: math.MaxUint64 - 10, Type: pb.Num_UID})
4552
require.Error(t, err)
4653
require.Contains(t, err.Error(), "limit has reached")
4754
}
4855

4956
func TestIdBump(t *testing.T) {
5057
dialOpts := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}
51-
ctx := context.Background()
5258
con, err := grpc.NewClient(testutil.SockAddrZero, dialOpts...)
5359
require.NoError(t, err)
54-
5560
zc := pb.NewZeroClient(con)
5661

62+
ctx := context.Background()
5763
res, err := zc.AssignIds(ctx, &pb.Num{Val: 10, Type: pb.Num_UID})
5864
require.NoError(t, err)
5965
require.Equal(t, uint64(10), res.GetEndId()-res.GetStartId()+1)

dgraphtest/compose_cluster.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,6 @@ func (c *ComposeCluster) AlphasLogs() ([]string, error) {
4747
return nil, errNotImplemented
4848
}
4949

50-
func (c *ComposeCluster) AssignUids(client *dgo.Dgraph, num uint64) error {
51-
return testutil.AssignUids(num)
52-
}
53-
5450
func (c *ComposeCluster) GetVersion() string {
5551
return localVersion
5652
}

0 commit comments

Comments
 (0)