Skip to content

Commit 27a8a81

Browse files
author
Harshil Goel
authored
Merge branch 'main' into aman/telemetry
2 parents b191d77 + 60ddeb7 commit 27a8a81

8 files changed

Lines changed: 108 additions & 11 deletions

File tree

dql/parser.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2691,7 +2691,7 @@ func validKeyAtRoot(k string) bool {
26912691
switch k {
26922692
case "func", "orderasc", "orderdesc", "first", "offset", "after":
26932693
return true
2694-
case "from", "to", "numpaths", "minweight", "maxweight":
2694+
case "from", "to", "numpaths", "minweight", "maxweight", "maxfrontiersize":
26952695
// Specific to shortest path
26962696
return true
26972697
case "depth":

dql/parser_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1302,7 +1302,7 @@ func TestParseQueryWithMultipleVar(t *testing.T) {
13021302
func TestParseShortestPath(t *testing.T) {
13031303
query := `
13041304
{
1305-
shortest(from:0x0a, to:0x0b, numpaths: 3, minweight: 3, maxweight: 6) {
1305+
shortest(from:0x0a, to:0x0b, numpaths: 3, minweight: 3, maxweight: 6, maxfrontiersize: 1) {
13061306
friends
13071307
name
13081308
}
@@ -1317,6 +1317,7 @@ func TestParseShortestPath(t *testing.T) {
13171317
require.Equal(t, "3", res.Query[0].Args["numpaths"])
13181318
require.Equal(t, "3", res.Query[0].Args["minweight"])
13191319
require.Equal(t, "6", res.Query[0].Args["maxweight"])
1320+
require.Equal(t, "1", res.Query[0].Args["maxfrontiersize"])
13201321
}
13211322

13221323
func TestParseShortestPathWithUidVars(t *testing.T) {

query/query.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,10 @@ type params struct {
165165
MaxWeight float64
166166
// MinWeight is the min weight allowed in a path returned by the shortest path algorithm.
167167
MinWeight float64
168+
// MaxFrontierSize limits the number of candidate paths stored in the priority queue.
169+
// During shortest path computation. This prevents out-of-memory errors on large graphs
170+
// but may affect solution optimality if set too low.
171+
MaxFrontierSize int64
168172

169173
// ExploreDepth is used by recurse and shortest path queries to specify the maximum graph
170174
// depth to explore.
@@ -714,6 +718,16 @@ func (args *params) fill(gq *dql.GraphQuery) error {
714718
args.MinWeight = -math.MaxFloat64
715719
}
716720

721+
if v, ok := gq.Args["maxfrontiersize"]; ok {
722+
maxfrontiersize, err := strconv.ParseInt(v, 0, 64)
723+
if err != nil {
724+
return err
725+
}
726+
args.MaxFrontierSize = maxfrontiersize
727+
} else if !ok {
728+
args.MaxFrontierSize = math.MaxInt64
729+
}
730+
717731
if gq.ShortestPathArgs.From == nil || gq.ShortestPathArgs.To == nil {
718732
return errors.Errorf("from/to can't be nil for shortest path")
719733
}
@@ -2640,7 +2654,7 @@ func (sg *SubGraph) sortAndPaginateUsingVar(ctx context.Context) error {
26402654
func isValidArg(a string) bool {
26412655
switch a {
26422656
case "numpaths", "from", "to", "orderasc", "orderdesc", "first", "offset", "after", "depth",
2643-
"minweight", "maxweight":
2657+
"minweight", "maxweight", "maxfrontiersize":
26442658
return true
26452659
}
26462660
return false

query/shortest.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -405,6 +405,9 @@ func runKShortestPaths(ctx context.Context, sg *SubGraph) ([]*SubGraph, error) {
405405
hop: item.hop + 1,
406406
path: route{route: curPath},
407407
}
408+
if int64(pq.Len()) > sg.Params.MaxFrontierSize {
409+
pq.Pop()
410+
}
408411
heap.Push(&pq, node)
409412
}
410413
// Return the popped nodes path to pool.
@@ -558,6 +561,9 @@ func shortestPath(ctx context.Context, sg *SubGraph) ([]*SubGraph, error) {
558561
cost: nodeCost,
559562
hop: item.hop + 1,
560563
}
564+
if int64(pq.Len()) > sg.Params.MaxFrontierSize {
565+
pq.Pop()
566+
}
561567
heap.Push(&pq, node)
562568
} else {
563569
// We've already seen this node. So, just update the cost

systest/shortest-path/graph.rdf.gz

3.7 MB
Binary file not shown.
81 Bytes
Binary file not shown.
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
//go:build integration2
2+
3+
/*
4+
* SPDX-FileCopyrightText: © Hypermode Inc. <[email protected]>
5+
* SPDX-License-Identifier: Apache-2.0
6+
*/
7+
8+
package main
9+
10+
import (
11+
"context"
12+
"testing"
13+
"time"
14+
15+
"github.com/hypermodeinc/dgraph/v25/dgraphapi"
16+
"github.com/hypermodeinc/dgraph/v25/dgraphtest"
17+
"github.com/hypermodeinc/dgraph/v25/x"
18+
19+
"github.com/stretchr/testify/require"
20+
)
21+
22+
func TestShortestPath(t *testing.T) {
23+
conf := dgraphtest.NewClusterConfig().WithNumAlphas(1).WithNumZeros(1).WithReplicas(1).WithACL(time.Hour)
24+
c, err := dgraphtest.NewLocalCluster(conf)
25+
require.NoError(t, err)
26+
defer func() { c.Cleanup(t.Failed()) }()
27+
require.NoError(t, c.Start())
28+
29+
err = c.LiveLoad(dgraphtest.LiveOpts{
30+
DataFiles: []string{"graph.rdf.gz"},
31+
SchemaFiles: []string{"graph.schema.gz"},
32+
GqlSchemaFiles: []string{},
33+
})
34+
require.NoError(t, err)
35+
36+
gc, cleanup, err := c.Client()
37+
require.NoError(t, err)
38+
defer cleanup()
39+
require.NoError(t, gc.LoginIntoNamespace(context.Background(),
40+
dgraphapi.DefaultUser, dgraphapi.DefaultPassword, x.GalaxyNamespace))
41+
42+
_, err = gc.Query(`
43+
{
44+
q(func: eq(guid, "85270d10-560e-4cc8-8703-4b4c563a2f4e")) {
45+
a as uid
46+
}
47+
q1(func: eq(guid, "4a520068-80b6-42f2-9019-4e6ef8a02bb3")) {
48+
b as uid
49+
}
50+
51+
path as shortest(from: uid(a), to: uid(b), numpaths: 5, maxfrontiersize: 10000) {
52+
connected_to @facets(weight)
53+
}
54+
55+
path(func: uid(path)) {
56+
uid
57+
}
58+
}
59+
`)
60+
require.NoError(t, err)
61+
}

worker/task.go

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1819,17 +1819,17 @@ func langForFunc(langs []string) string {
18191819
return langs[0]
18201820
}
18211821

1822-
func planForEqFilter(fc *functionContext, pred string, uidlist []uint64) {
1823-
checkUidEmpty := func(uids []uint64) bool {
1824-
for _, i := range uids {
1825-
if i == 0 {
1826-
return false
1827-
}
1822+
func checkUidZero(uids []uint64) bool {
1823+
for _, uid := range uids {
1824+
if uid == 0 {
1825+
return true
18281826
}
1829-
return true
18301827
}
1828+
return false
1829+
}
18311830

1832-
if !checkUidEmpty(uidlist) {
1831+
func planForEqFilter(fc *functionContext, pred string, uidlist []uint64) {
1832+
if checkUidZero(uidlist) {
18331833
// We have a uid which has 0 in it. Mostly it would happen when there is only 0. But any one item
18341834
// being 0 could cause the query planner to fail. In case of 0 being present, we neeed to query the
18351835
// index itself.
@@ -1912,6 +1912,14 @@ func parseSrcFn(ctx context.Context, q *pb.Query) (*functionContext, error) {
19121912
}
19131913
}
19141914

1915+
generateIneqTokens := true
1916+
if fc.fname != eq && q.UidList != nil && uint64(len(q.UidList.Uids)) < Config.TypeFilterUidLimit {
1917+
if !checkUidZero(q.UidList.Uids) {
1918+
fc.n = len(q.UidList.Uids)
1919+
generateIneqTokens = false
1920+
}
1921+
}
1922+
19151923
var tokens []string
19161924
var ineqValues []types.Val
19171925
// eq can have multiple args.
@@ -1947,6 +1955,9 @@ func parseSrcFn(ctx context.Context, q *pb.Query) (*functionContext, error) {
19471955
lang = q.Langs[0]
19481956
}
19491957

1958+
if !generateIneqTokens {
1959+
continue
1960+
}
19501961
// Get tokens ge/le ineqValueToken.
19511962
if tokens, fc.ineqValueToken, err = getInequalityTokens(ctx, q.ReadTs, attr, f, lang,
19521963
ineqValues); err != nil {
@@ -1958,6 +1969,10 @@ func parseSrcFn(ctx context.Context, q *pb.Query) (*functionContext, error) {
19581969
fc.tokens = append(fc.tokens, tokens...)
19591970
}
19601971

1972+
if !generateIneqTokens {
1973+
return fc, nil
1974+
}
1975+
19611976
// In case of non-indexed predicate, there won't be any tokens. We will fetch value
19621977
// from data keys.
19631978
// If number of index keys is more than no. of uids to filter, so its better to fetch values

0 commit comments

Comments
 (0)