Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions query/query0_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@ package query

import (
"context"
"encoding/json"
"fmt"
"strings"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/dgraph-io/dgo/v250/protos/api"
"github.com/dgraph-io/dgraph/v25/dgraphapi"
"github.com/dgraph-io/dgraph/v25/dgraphtest"
"github.com/dgraph-io/dgraph/v25/dql"
Expand Down Expand Up @@ -2886,6 +2889,70 @@ func TestDateTimeQuery(t *testing.T) {
processQueryNoErr(t, query))
}

// TestLossyIndexInUncommittedTxn tests that queries within an uncommitted
// transaction can find data that was mutated in that same transaction when using
// a lossy index like @index(hour).
//
// Issue #9556: The bug occurs because lossy indexes require a two-step query:
// 1. Index lookup - finds candidate UIDs
// 2. Value verification - re-checks actual values since hour granularity is imprecise
func TestLossyIndexInUncommittedTxn(t *testing.T) {
ctx := context.Background()

// Use a unique datetime value to avoid conflicts with existing test data
testTime := time.Now().UTC().Truncate(time.Second)
testTimeStr := testTime.Format(time.RFC3339)

// Create a new transaction - DO NOT commit yet
txn := client.NewTxn()
defer func() {
if err := txn.Discard(ctx); err != nil {
t.Logf("error discarding txn: %v", err)
}
}()

mutationJSON := fmt.Sprintf(`{
"uid": "_:newnode",
"dgraph.type": "TestNode",
"created_at": "%s"
}`, testTimeStr)

resp, err := txn.Mutate(ctx, &api.Mutation{
SetJson: []byte(mutationJSON),
})
require.NoError(t, err, "mutation should succeed")
require.NotEmpty(t, resp.Uids["newnode"], "should get a UID for the new node")

newUID := resp.Uids["newnode"]
t.Logf("Created node with UID %s and created_at=%s", newUID, testTimeStr)

// Query for the same data within the SAME uncommitted transaction
// This query uses the lossy @index(hour) on created_at
query := fmt.Sprintf(`{
q(func: eq(created_at, "%s")) {
uid
created_at
}
}`, testTimeStr)

queryResp, err := txn.Query(ctx, query)
require.NoError(t, err, "query should succeed")

var result struct {
Q []struct {
UID string `json:"uid"`
CreatedAt string `json:"created_at"`
} `json:"q"`
}
err = json.Unmarshal(queryResp.Json, &result)
require.NoError(t, err, "should be able to parse response")

t.Logf("Query response: %s", string(queryResp.Json))

require.Len(t, result.Q, 1, "should find exactly 1 node with the matching created_at")
require.Equal(t, newUID, result.Q[0].UID, "should find the node we just created")
}

func TestCountUidWithAlias(t *testing.T) {
query := `
{
Expand Down
21 changes: 15 additions & 6 deletions worker/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1396,7 +1396,7 @@ func (qs *queryState) handleCompareFunction(ctx context.Context, arg funcArgs) e
switch lang {
case "":
if isList {
pl, err := posting.GetNoStore(x.DataKey(attr, uid), arg.q.ReadTs)
pl, err := qs.cache.Get(x.DataKey(attr, uid))
if err != nil {
filterErr = err
return false
Expand All @@ -1418,7 +1418,7 @@ func (qs *queryState) handleCompareFunction(ctx context.Context, arg funcArgs) e
return false
}

pl, err := posting.GetNoStore(x.DataKey(attr, uid), arg.q.ReadTs)
pl, err := qs.cache.Get(x.DataKey(attr, uid))
if err != nil {
filterErr = err
return false
Expand All @@ -1433,7 +1433,7 @@ func (qs *queryState) handleCompareFunction(ctx context.Context, arg funcArgs) e
dst, err := types.Convert(sv, typ)
return err == nil && compareFunc(dst)
case ".":
pl, err := posting.GetNoStore(x.DataKey(attr, uid), arg.q.ReadTs)
pl, err := qs.cache.Get(x.DataKey(attr, uid))
if err != nil {
filterErr = err
return false
Expand All @@ -1451,17 +1451,26 @@ func (qs *queryState) handleCompareFunction(ctx context.Context, arg funcArgs) e
}
return false
default:
sv, err := fetchValue(uid, attr, arg.q.Langs, typ, arg.q.ReadTs)
pl, err := qs.cache.Get(x.DataKey(attr, uid))
if err != nil {
filterErr = err
return false
}
src, err := pl.ValueFor(arg.q.ReadTs, arg.q.Langs)
if err != nil {
if err != posting.ErrNoValue {
filterErr = err
}
return false
}
if sv.Value == nil {
dst, err := types.Convert(src, typ)
if err != nil {
return false
}
if dst.Value == nil {
return false
}
return compareFunc(sv)
return compareFunc(dst)
}
})
if filterErr != nil {
Expand Down
Loading