diff --git a/.gitignore b/.gitignore index 8dace988b71..caa22f8e5a4 100644 --- a/.gitignore +++ b/.gitignore @@ -50,3 +50,4 @@ x/log_test/*.enc ## .claude/ CLAUDE.md +.osgrep diff --git a/dgraph/cmd/zero/oracle.go b/dgraph/cmd/zero/oracle.go index cee15fae72f..d89da7f073e 100644 --- a/dgraph/cmd/zero/oracle.go +++ b/dgraph/cmd/zero/oracle.go @@ -370,13 +370,20 @@ func (s *Server) commit(ctx context.Context, src *api.TxnContext) error { if strings.Contains(pred, hnsw.VecKeyword) { pred = pred[0:strings.Index(pred, hnsw.VecKeyword)] } - tablet := s.ServingTablet(pred) - if tablet == nil { + tablets := s.ServingTablets(pred) + if len(tablets) == 0 { return errors.Errorf("Tablet for %s is nil", pred) } - if tablet.GroupId != uint32(gid) { - return errors.Errorf("Mutation done in group: %d. Predicate %s assigned to %d", - gid, pred, tablet.GroupId) + found := false + for _, t := range tablets { + if t.GroupId == uint32(gid) { + found = true + break + } + } + if !found { + return errors.Errorf("Mutation done in group: %d. Predicate %s not assigned there", + gid, pred) } if s.isBlocked(pred) { return errors.Errorf("Commits on predicate %s are blocked due to predicate move", pred) diff --git a/dgraph/cmd/zero/raft.go b/dgraph/cmd/zero/raft.go index 1c82c60ea3a..79f4c5a0ca0 100644 --- a/dgraph/cmd/zero/raft.go +++ b/dgraph/cmd/zero/raft.go @@ -284,6 +284,13 @@ func (n *node) regenerateChecksum() { g.Checksum = farm.Fingerprint64([]byte(strings.Join(preds, ""))) } + // Rebuild tablet index from authoritative flat proto maps. + idx := pb.NewTabletIndex() + for _, g := range state.GetGroups() { + idx.BuildFromFlat(g.GetTablets()) + } + n.server.tabletIndex = idx + if n.AmLeader() { // It is important to push something to Oracle updates channel, so the subscribers would // get the latest checksum that we calculated above. Otherwise, if all the queries are @@ -315,11 +322,14 @@ func (n *node) handleTablet(tablet *pb.Tablet) error { if tablet.GroupId == 0 { return errors.Errorf("Tablet group id is zero: %+v", tablet) } + + key := pb.TabletKey(tablet.Predicate, tablet.Label) + group := state.Groups[tablet.GroupId] if tablet.Remove { - glog.Infof("Removing tablet for attr: [%v], gid: [%v]\n", tablet.Predicate, tablet.GroupId) + glog.Infof("Removing tablet for key: [%v], gid: [%v]\n", key, tablet.GroupId) if group != nil { - delete(group.Tablets, tablet.Predicate) + delete(group.Tablets, key) } return nil } @@ -328,29 +338,28 @@ func (n *node) handleTablet(tablet *pb.Tablet) error { state.Groups[tablet.GroupId] = group } - // There's a edge case that we're handling. - // Two servers ask to serve the same tablet, then we need to ensure that - // only the first one succeeds. - if prev := n.server.servingTablet(tablet.Predicate); prev != nil { + // Duplicate detection: check if this (predicate, label) pair is already served. + // Multiple groups CAN serve the same predicate as long as they have different labels. + if prev := n.server.servingTablet(tablet.Predicate, tablet.Label); prev != nil { if tablet.Force { originalGroup := state.Groups[prev.GroupId] - delete(originalGroup.Tablets, tablet.Predicate) - } else if tablet.Label != "" && prev.Label != tablet.Label { + delete(originalGroup.Tablets, key) + } else if tablet.IsLabeled() && prev.Label != tablet.Label { // Allow re-routing when labels differ. This happens when a schema with @label // is applied after the predicate was created without a label. - glog.Infof("Tablet for attr: [%s] re-routing from group %d to %d due to label change (%q -> %q)", - tablet.Predicate, prev.GroupId, tablet.GroupId, prev.Label, tablet.Label) + glog.Infof("Tablet for key: [%s] re-routing from group %d to %d due to label change (%q -> %q)", + key, prev.GroupId, tablet.GroupId, prev.Label, tablet.Label) originalGroup := state.Groups[prev.GroupId] - delete(originalGroup.Tablets, tablet.Predicate) + delete(originalGroup.Tablets, key) } else if prev.GroupId != tablet.GroupId { glog.Infof( - "Tablet for attr: [%s], gid: [%d] already served by group: [%d]\n", - prev.Predicate, tablet.GroupId, prev.GroupId) + "Tablet for key: [%s], gid: [%d] already served by group: [%d]\n", + key, tablet.GroupId, prev.GroupId) return errTabletAlreadyServed } } tablet.Force = false - group.Tablets[tablet.Predicate] = tablet + group.Tablets[key] = tablet return nil } diff --git a/dgraph/cmd/zero/tablet.go b/dgraph/cmd/zero/tablet.go index 5260d458b87..3cb9582c1d8 100644 --- a/dgraph/cmd/zero/tablet.go +++ b/dgraph/cmd/zero/tablet.go @@ -277,7 +277,7 @@ func (s *Server) chooseTablet() (predicate string, srcGroup uint32, dstGroup uin // Reserved predicates should always be in group 1 so do not re-balance them. continue } - if tab.Label != "" { + if tab.IsLabeled() { // labeled predicates are pinned and should not be re-balanced either continue } diff --git a/dgraph/cmd/zero/zero.go b/dgraph/cmd/zero/zero.go index c28b5f47846..28e971ab381 100644 --- a/dgraph/cmd/zero/zero.go +++ b/dgraph/cmd/zero/zero.go @@ -63,6 +63,10 @@ type Server struct { blockCommitsOn *sync.Map checkpointPerGroup map[uint32]uint64 + + // tabletIndex is a nested index rebuilt from flat proto maps for O(1) lookups. + tabletIndex *pb.TabletIndex + // embedding the pb.UnimplementedZeroServer struct to ensure forward compatibility of the server. pb.UnimplementedZeroServer } @@ -89,6 +93,7 @@ func (s *Server) Init() { s.blockCommitsOn = new(sync.Map) s.moveOngoing = make(chan struct{}, 1) s.checkpointPerGroup = make(map[uint32]uint64) + s.tabletIndex = pb.NewTabletIndex() if opts.limiterConfig.UidLeaseLimit > 0 { // rate limiting is not enabled when lease limit is set to zero. s.rateLimiter = x.NewRateLimiter(int64(opts.limiterConfig.UidLeaseLimit), @@ -253,6 +258,12 @@ func (s *Server) SetMembershipState(state *pb.MembershipState) { } s.nextGroup = uint32(len(state.Groups) + 1) + + // Rebuild the tablet index from flat proto maps. + s.tabletIndex = pb.NewTabletIndex() + for _, g := range state.Groups { + s.tabletIndex.BuildFromFlat(g.Tablets) + } } // MarshalMembershipState returns the marshaled membership state. @@ -309,13 +320,23 @@ func (s *Server) removeZero(nodeId uint64) { func (s *Server) ServingTablet(tablet string) *pb.Tablet { s.RLock() defer s.RUnlock() + pred, label := pb.ParseTabletKey(tablet) + return s.tabletIndex.Get(pred, label) +} - for _, group := range s.state.Groups { - if tab, ok := group.Tablets[tablet]; ok { - return tab - } +// ServingTablets returns all tablets for a given predicate across all groups. +func (s *Server) ServingTablets(predicate string) []*pb.Tablet { + s.RLock() + defer s.RUnlock() + labels := s.tabletIndex.AllForPredicate(predicate) + if labels == nil { + return nil } - return nil + tablets := make([]*pb.Tablet, 0, len(labels)) + for _, tab := range labels { + tablets = append(tablets, tab) + } + return tablets } func (s *Server) blockTablet(pred string) func() { @@ -330,15 +351,11 @@ func (s *Server) isBlocked(pred string) bool { return blocked } -func (s *Server) servingTablet(tablet string) *pb.Tablet { +// servingTablet returns the tablet for the given (predicate, label) pair. +// Caller must hold at least a read lock. +func (s *Server) servingTablet(predicate, label string) *pb.Tablet { s.AssertRLock() - - for _, group := range s.state.Groups { - if tab, ok := group.Tablets[tablet]; ok { - return tab - } - } - return nil + return s.tabletIndex.Get(predicate, label) } func (s *Server) createProposals(dst *pb.Group) ([]*pb.ZeroProposal, error) { @@ -418,7 +435,7 @@ func (s *Server) Inform(ctx context.Context, req *pb.TabletRequest) (*pb.TabletR tablets := make([]*pb.Tablet, 0) unknownTablets := make([]*pb.Tablet, 0) for _, t := range req.Tablets { - tab := s.ServingTablet(t.Predicate) + tab := s.ServingTablet(pb.TabletKey(t.Predicate, t.Label)) span.SetAttributes(attribute.String("tablet_predicate", t.Predicate)) switch { case tab != nil && !t.Force: @@ -456,7 +473,7 @@ func (s *Server) Inform(ctx context.Context, req *pb.TabletRequest) (*pb.TabletR // This will also make it easier to restore the reserved predicates after // a DropAll operation. t.GroupId = 1 - case t.Label != "": + case t.IsLabeled(): // Labeled predicate: route to matching labeled group gid, err := s.labelGroup(t.Label) if err != nil { @@ -485,7 +502,7 @@ func (s *Server) Inform(ctx context.Context, req *pb.TabletRequest) (*pb.TabletR } for _, t := range unknownTablets { - tab := s.ServingTablet(t.Predicate) + tab := s.ServingTablet(pb.TabletKey(t.Predicate, t.Label)) x.AssertTrue(tab != nil) span.AddEvent(fmt.Sprintf("Tablet served: %+v", tab)) tablets = append(tablets, tab) @@ -705,23 +722,37 @@ func (s *Server) ShouldServe( return resp, errors.Errorf("Group ID is Zero in %+v", tablet) } - // Check who is serving this tablet. - tab := s.ServingTablet(tablet.Predicate) + // Use the index to find the exact (predicate, label) match. + tab := s.ServingTablet(pb.TabletKey(tablet.Predicate, tablet.Label)) span.SetAttributes(attribute.String("tablet_predicate", tablet.Predicate)) span.SetAttributes(attribute.String("tablet_label", tablet.Label)) + if tab == nil && !tablet.IsLabeled() { + // Unlabeled request: check if any labeled tablet exists for this predicate. + s.RLock() + tab = s.tabletIndex.GetAny(tablet.Predicate) + s.RUnlock() + } if tab != nil && !tablet.Force { // If the existing tablet has a different label than requested, we need to re-route. - // This can happen when a schema is applied with @label after the predicate was - // created without a label (e.g., during DropAll). - if tablet.Label != "" && tab.Label != tablet.Label { + if tablet.IsLabeled() && tab.Label != tablet.Label { glog.Infof("ShouldServe: tablet %s has label %q but request has label %q, re-routing", tablet.Predicate, tab.Label, tablet.Label) - // Fall through to re-assign the tablet with the new label - // The handleTablet function will allow this because labels differ + // Fall through to re-assign the tablet with the new label. } else { - // Someone is serving this tablet. Could be the caller as well. - // The caller should compare the returned group against the group it holds to check who's - // serving. + // Someone is serving this tablet. If the found tablet belongs to a + // different group than the requester, check if the requesting group + // serves a tablet for this predicate under a different label. + if tablet.GroupId > 0 && tab.GroupId != tablet.GroupId { + s.RLock() + labels := s.tabletIndex.AllForPredicate(tablet.Predicate) + for _, labelTab := range labels { + if labelTab.GroupId == tablet.GroupId { + s.RUnlock() + return labelTab, nil + } + } + s.RUnlock() + } return tab, nil } } @@ -746,7 +777,7 @@ func (s *Server) ShouldServe( // This will also make it easier to restore the reserved predicates after // a DropAll operation. tablet.GroupId = 1 - case tablet.Label != "": + case tablet.IsLabeled(): // Labeled predicate: route to matching labeled group gid, err := s.labelGroup(tablet.Label) if err != nil { @@ -770,7 +801,7 @@ func (s *Server) ShouldServe( span.AddEvent(fmt.Sprintf("Error proposing tablet: %+v. Error: %v", &proposal, err)) return tablet, err } - tab = s.ServingTablet(tablet.Predicate) + tab = s.ServingTablet(pb.TabletKey(tablet.Predicate, tablet.Label)) x.AssertTrue(tab != nil) span.SetAttributes(attribute.String("tablet_predicate_served", tablet.Predicate)) return tab, nil @@ -931,7 +962,7 @@ func (s *Server) groupLabel(gid uint32) string { return "" } for _, member := range group.Members { - if member.Label != "" { + if member.IsLabeled() { return member.Label } } diff --git a/protos/pb/labeled.go b/protos/pb/labeled.go new file mode 100644 index 00000000000..d1911eca694 --- /dev/null +++ b/protos/pb/labeled.go @@ -0,0 +1,171 @@ +/* + * SPDX-FileCopyrightText: © Hypermode Inc. + * SPDX-License-Identifier: Apache-2.0 + */ + +package pb + +import "strings" + +const tabletKeySep = "@" + +// TabletKey returns the composite key for a tablet. Unlabeled tablets +// use the bare predicate name for backward compatibility. +func TabletKey(predicate, label string) string { + if label == "" { + return predicate + } + return predicate + tabletKeySep + label +} + +// ParseTabletKey splits a composite tablet key into its predicate and label +// components. Uses the rightmost '@' as the separator as a defensive choice, +// though '@' is not valid in Dgraph predicate names (allowed: a-zA-Z0-9_.~). +// For keys without a label (no '@' separator), the label is "". +func ParseTabletKey(key string) (predicate, label string) { + if idx := strings.LastIndex(key, tabletKeySep); idx >= 0 { + return key[:idx], key[idx+1:] + } + return key, "" +} + +// IsLabeled returns true if this tablet has a label assigned via the @label +// schema directive. Labeled tablets are pinned to specific alpha groups and +// receive special routing, rebalancing, and authorization treatment. +func (t *Tablet) IsLabeled() bool { + return t != nil && t.Label != "" +} + +// IsLabeled returns true if this member was started with a --label flag. +// Labeled members serve only predicates whose @label matches their label. +func (m *Member) IsLabeled() bool { + return m != nil && m.Label != "" +} + +// IsLabeled returns true if this schema update carries a @label directive. +// Labeled predicates are routed to the alpha group whose label matches. +func (s *SchemaUpdate) IsLabeled() bool { + return s != nil && s.Label != "" +} + +// TabletIndex provides O(1) nested lookups for tablets by (predicate, label). +// It is a read cache built from the flat proto map[string]*Tablet and avoids +// the O(n) ParseTabletKey scans required by composite-key iteration. +type TabletIndex struct { + pred map[string]map[string]*Tablet // pred -> label -> *Tablet +} + +// NewTabletIndex returns an empty TabletIndex ready for use. +func NewTabletIndex() *TabletIndex { + return &TabletIndex{pred: make(map[string]map[string]*Tablet)} +} + +// Get returns the tablet for the exact (predicate, label) pair, or nil. +func (ti *TabletIndex) Get(predicate, label string) *Tablet { + labels := ti.pred[predicate] + if labels == nil { + return nil + } + return labels[label] +} + +// Set inserts or updates the tablet for the given (predicate, label) pair. +func (ti *TabletIndex) Set(predicate, label string, tablet *Tablet) { + labels := ti.pred[predicate] + if labels == nil { + labels = make(map[string]*Tablet) + ti.pred[predicate] = labels + } + labels[label] = tablet +} + +// Delete removes the tablet for the given (predicate, label) pair. +func (ti *TabletIndex) Delete(predicate, label string) { + labels := ti.pred[predicate] + if labels == nil { + return + } + delete(labels, label) + if len(labels) == 0 { + delete(ti.pred, predicate) + } +} + +// GetAny returns any tablet for the predicate, preferring the unlabeled one. +// Used by BelongsToReadOnly where we need any group serving this predicate. +func (ti *TabletIndex) GetAny(predicate string) *Tablet { + labels := ti.pred[predicate] + if labels == nil { + return nil + } + // Prefer unlabeled tablet. + if t, ok := labels[""]; ok { + return t + } + // Return any labeled tablet. + for _, t := range labels { + return t + } + return nil +} + +// GetForGroup returns the tablet belonging to gid for the given predicate, +// falling back to any tablet if no own-group match exists. This replaces the +// two-pass aliasing in applyState where own-group tablets won bare-key collisions. +func (ti *TabletIndex) GetForGroup(predicate string, gid uint32) *Tablet { + labels := ti.pred[predicate] + if labels == nil { + return nil + } + // First pass: find a tablet belonging to gid. + for _, t := range labels { + if t.GroupId == gid { + return t + } + } + // Fallback: return any tablet for this predicate. + for _, t := range labels { + return t + } + return nil +} + +// AllForPredicate returns the inner label→tablet map for a predicate. +// Returns nil if the predicate is not in the index. O(1). +func (ti *TabletIndex) AllForPredicate(predicate string) map[string]*Tablet { + return ti.pred[predicate] +} + +// HasPredicate returns true if any tablet exists for the given predicate. +func (ti *TabletIndex) HasPredicate(predicate string) bool { + return len(ti.pred[predicate]) > 0 +} + +// Len returns the total number of tablets across all predicates. +func (ti *TabletIndex) Len() int { + n := 0 + for _, labels := range ti.pred { + n += len(labels) + } + return n +} + +// Range iterates over all tablets. Return false from fn to stop early. +func (ti *TabletIndex) Range(fn func(pred, label string, tablet *Tablet) bool) { + for pred, labels := range ti.pred { + for label, tablet := range labels { + if !fn(pred, label, tablet) { + return + } + } + } +} + +// BuildFromFlat parses composite keys in a flat proto tablet map and inserts +// them into the nested index. This is the bridge from the proto wire format. +func (ti *TabletIndex) BuildFromFlat(tablets map[string]*Tablet) { + for key, tablet := range tablets { + pred, label := ParseTabletKey(key) + ti.Set(pred, label, tablet) + } +} diff --git a/protos/pb/labeled_test.go b/protos/pb/labeled_test.go new file mode 100644 index 00000000000..4e26af7ea85 --- /dev/null +++ b/protos/pb/labeled_test.go @@ -0,0 +1,61 @@ +/* + * SPDX-FileCopyrightText: © Hypermode Inc. + * SPDX-License-Identifier: Apache-2.0 + */ + +package pb + +import "testing" + +func TestTabletKey_Unlabeled(t *testing.T) { + got := TabletKey("Document.name", "") + if got != "Document.name" { + t.Errorf("TabletKey('Document.name', '') = %q, want 'Document.name'", got) + } +} + +func TestTabletKey_Labeled(t *testing.T) { + got := TabletKey("Document.name", "secret") + if got != "Document.name@secret" { + t.Errorf("TabletKey('Document.name', 'secret') = %q, want 'Document.name@secret'", got) + } +} + +func TestParseTabletKey_Unlabeled(t *testing.T) { + pred, label := ParseTabletKey("Document.name") + if pred != "Document.name" || label != "" { + t.Errorf("ParseTabletKey('Document.name') = (%q, %q), want ('Document.name', '')", pred, label) + } +} + +func TestParseTabletKey_Labeled(t *testing.T) { + pred, label := ParseTabletKey("Document.name@secret") + if pred != "Document.name" || label != "secret" { + t.Errorf("ParseTabletKey('Document.name@secret') = (%q, %q), want ('Document.name', 'secret')", pred, label) + } +} + +func TestParseTabletKey_NamespacedLabeled(t *testing.T) { + // Dgraph namespaces predicates as "0-Document.name" — the '@' should still + // be the delimiter even with the namespace prefix. + pred, label := ParseTabletKey("0-Document.name@top_secret") + if pred != "0-Document.name" || label != "top_secret" { + t.Errorf("ParseTabletKey('0-Document.name@top_secret') = (%q, %q), want ('0-Document.name', 'top_secret')", pred, label) + } +} + +func TestTabletKeyRoundTrip(t *testing.T) { + cases := []struct{ pred, label string }{ + {"Document.name", ""}, + {"Document.name", "secret"}, + {"0-Document.name", "top_secret"}, + {"dgraph.type", ""}, + } + for _, c := range cases { + key := TabletKey(c.pred, c.label) + gotPred, gotLabel := ParseTabletKey(key) + if gotPred != c.pred || gotLabel != c.label { + t.Errorf("Round-trip(%q, %q): got (%q, %q)", c.pred, c.label, gotPred, gotLabel) + } + } +} diff --git a/protos/pb/tablet_index_test.go b/protos/pb/tablet_index_test.go new file mode 100644 index 00000000000..b934426308b --- /dev/null +++ b/protos/pb/tablet_index_test.go @@ -0,0 +1,199 @@ +/* + * SPDX-FileCopyrightText: © Hypermode Inc. + * SPDX-License-Identifier: Apache-2.0 + */ + +package pb + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestTabletIndex_SetGetDelete(t *testing.T) { + ti := NewTabletIndex() + require.Equal(t, 0, ti.Len()) + + tab1 := &Tablet{Predicate: "name", GroupId: 1} + ti.Set("name", "", tab1) + require.Equal(t, 1, ti.Len()) + + got := ti.Get("name", "") + require.Equal(t, tab1, got) + + // Different label is a different entry. + tab2 := &Tablet{Predicate: "name", Label: "secret", GroupId: 2} + ti.Set("name", "secret", tab2) + require.Equal(t, 2, ti.Len()) + + got = ti.Get("name", "secret") + require.Equal(t, tab2, got) + + // Original is still there. + got = ti.Get("name", "") + require.Equal(t, tab1, got) + + // Miss returns nil. + require.Nil(t, ti.Get("name", "other")) + require.Nil(t, ti.Get("missing", "")) + + // Delete labeled. + ti.Delete("name", "secret") + require.Equal(t, 1, ti.Len()) + require.Nil(t, ti.Get("name", "secret")) + + // Delete unlabeled removes the predicate entry entirely. + ti.Delete("name", "") + require.Equal(t, 0, ti.Len()) + require.False(t, ti.HasPredicate("name")) + + // Delete of non-existent is no-op. + ti.Delete("name", "") +} + +func TestTabletIndex_GetAny_PrefersUnlabeled(t *testing.T) { + ti := NewTabletIndex() + + tabUnlabeled := &Tablet{Predicate: "name", GroupId: 1} + tabSecret := &Tablet{Predicate: "name", Label: "secret", GroupId: 2} + tabTop := &Tablet{Predicate: "name", Label: "top_secret", GroupId: 3} + + ti.Set("name", "", tabUnlabeled) + ti.Set("name", "secret", tabSecret) + ti.Set("name", "top_secret", tabTop) + + // GetAny should prefer the unlabeled tablet. + got := ti.GetAny("name") + require.Equal(t, tabUnlabeled, got) + + // After deleting unlabeled, GetAny returns one of the labeled tablets. + ti.Delete("name", "") + got = ti.GetAny("name") + require.NotNil(t, got) + require.NotEmpty(t, got.Label) + + // Missing predicate returns nil. + require.Nil(t, ti.GetAny("missing")) +} + +func TestTabletIndex_GetForGroup(t *testing.T) { + ti := NewTabletIndex() + + tab1 := &Tablet{Predicate: "name", GroupId: 1} + tab2 := &Tablet{Predicate: "name", Label: "secret", GroupId: 2} + tab3 := &Tablet{Predicate: "name", Label: "top_secret", GroupId: 3} + + ti.Set("name", "", tab1) + ti.Set("name", "secret", tab2) + ti.Set("name", "top_secret", tab3) + + // Exact group match. + require.Equal(t, tab1, ti.GetForGroup("name", 1)) + require.Equal(t, tab2, ti.GetForGroup("name", 2)) + require.Equal(t, tab3, ti.GetForGroup("name", 3)) + + // Non-existent group falls back to any tablet. + got := ti.GetForGroup("name", 99) + require.NotNil(t, got) + + // Missing predicate returns nil. + require.Nil(t, ti.GetForGroup("missing", 1)) +} + +func TestTabletIndex_AllForPredicate(t *testing.T) { + ti := NewTabletIndex() + + tab1 := &Tablet{Predicate: "name", GroupId: 1} + tab2 := &Tablet{Predicate: "name", Label: "secret", GroupId: 2} + ti.Set("name", "", tab1) + ti.Set("name", "secret", tab2) + + labels := ti.AllForPredicate("name") + require.Len(t, labels, 2) + require.Equal(t, tab1, labels[""]) + require.Equal(t, tab2, labels["secret"]) + + // Missing predicate returns nil. + require.Nil(t, ti.AllForPredicate("missing")) +} + +func TestTabletIndex_BuildFromFlat(t *testing.T) { + flat := map[string]*Tablet{ + "name": {Predicate: "name", GroupId: 1}, + "name@secret": {Predicate: "name", Label: "secret", GroupId: 2}, + "name@top_secret": {Predicate: "name", Label: "top_secret", GroupId: 3}, + "age": {Predicate: "age", GroupId: 1}, + } + + ti := NewTabletIndex() + ti.BuildFromFlat(flat) + require.Equal(t, 4, ti.Len()) + + // Verify nested structure. + require.True(t, ti.HasPredicate("name")) + require.True(t, ti.HasPredicate("age")) + + nameLabels := ti.AllForPredicate("name") + require.Len(t, nameLabels, 3) + require.Equal(t, uint32(1), nameLabels[""].GroupId) + require.Equal(t, uint32(2), nameLabels["secret"].GroupId) + require.Equal(t, uint32(3), nameLabels["top_secret"].GroupId) + + ageLabels := ti.AllForPredicate("age") + require.Len(t, ageLabels, 1) + require.Equal(t, uint32(1), ageLabels[""].GroupId) +} + +func TestTabletIndex_Range(t *testing.T) { + ti := NewTabletIndex() + ti.Set("name", "", &Tablet{Predicate: "name", GroupId: 1}) + ti.Set("name", "secret", &Tablet{Predicate: "name", Label: "secret", GroupId: 2}) + ti.Set("age", "", &Tablet{Predicate: "age", GroupId: 1}) + + // Collect all entries. + type entry struct{ pred, label string } + var entries []entry + ti.Range(func(pred, label string, tablet *Tablet) bool { + entries = append(entries, entry{pred, label}) + return true + }) + require.Len(t, entries, 3) + + // Test early termination. + count := 0 + ti.Range(func(pred, label string, tablet *Tablet) bool { + count++ + return false // stop after first + }) + require.Equal(t, 1, count) +} + +func TestTabletIndex_BuildFromFlat_MultipleGroups(t *testing.T) { + // Simulate multiple groups each having tablets for the same predicate. + ti := NewTabletIndex() + + group1 := map[string]*Tablet{ + "name": {Predicate: "name", GroupId: 1}, + } + group2 := map[string]*Tablet{ + "name@secret": {Predicate: "name", Label: "secret", GroupId: 2}, + } + group3 := map[string]*Tablet{ + "name@top_secret": {Predicate: "name", Label: "top_secret", GroupId: 3}, + } + + ti.BuildFromFlat(group1) + ti.BuildFromFlat(group2) + ti.BuildFromFlat(group3) + + require.Equal(t, 3, ti.Len()) + + // GetForGroup should find each group's tablet. + require.Equal(t, uint32(1), ti.GetForGroup("name", 1).GroupId) + require.Equal(t, uint32(2), ti.GetForGroup("name", 2).GroupId) + require.Equal(t, uint32(3), ti.GetForGroup("name", 3).GroupId) + + // GetAny should prefer unlabeled. + require.Equal(t, uint32(1), ti.GetAny("name").GroupId) +} diff --git a/schema/schema.go b/schema/schema.go index f651946baf2..e144b5c8fa6 100644 --- a/schema/schema.go +++ b/schema/schema.go @@ -787,6 +787,12 @@ func initialSchemaInternal(namespace uint64, all bool) []*pb.SchemaUpdate { Tokenizer: []string{"exact"}, List: true, }, + { + Predicate: "dgraph.label", + ValueType: pb.Posting_STRING, + Directive: pb.SchemaUpdate_INDEX, + Tokenizer: []string{"exact"}, + }, { Predicate: "dgraph.drop.op", ValueType: pb.Posting_STRING, diff --git a/systest/label/label_test.go b/systest/label/label_test.go index 30d61df228b..bb3c72f7707 100644 --- a/systest/label/label_test.go +++ b/systest/label/label_test.go @@ -9,9 +9,11 @@ package main import ( "context" + "encoding/json" "fmt" "net/http" "net/url" + "sort" "testing" "time" @@ -158,20 +160,22 @@ func TestLabeledPredicateRouting(t *testing.T) { require.Equal(t, "1", predicateToGroup["0-name"], "'name' predicate should be in group 1 (unlabeled)") - // Verify 'codename' is in the 'secret' labeled group + // Verify 'codename' is in the 'secret' labeled group. + // With composite tablet keys, the tablet is stored as "0-codename@secret". secretGroup := labelToGroup["secret"] t.Logf(" 'secret' label maps to group: %s", secretGroup) require.NotEmpty(t, secretGroup, "should have a 'secret' labeled group") - t.Logf(" Checking 'codename' is in secret group... actual: %s", predicateToGroup["0-codename"]) - require.Equal(t, secretGroup, predicateToGroup["0-codename"], + t.Logf(" Checking 'codename@secret' is in secret group... actual: %s", predicateToGroup["0-codename@secret"]) + require.Equal(t, secretGroup, predicateToGroup["0-codename@secret"], "'codename' predicate should be in the 'secret' labeled group") - // Verify 'alias' is in the 'top_secret' labeled group + // Verify 'alias' is in the 'top_secret' labeled group. + // With composite tablet keys, the tablet is stored as "0-alias@top_secret". topSecretGroup := labelToGroup["top_secret"] t.Logf(" 'top_secret' label maps to group: %s", topSecretGroup) require.NotEmpty(t, topSecretGroup, "should have a 'top_secret' labeled group") - t.Logf(" Checking 'alias' is in top_secret group... actual: %s", predicateToGroup["0-alias"]) - require.Equal(t, topSecretGroup, predicateToGroup["0-alias"], + t.Logf(" Checking 'alias@top_secret' is in top_secret group... actual: %s", predicateToGroup["0-alias@top_secret"]) + require.Equal(t, topSecretGroup, predicateToGroup["0-alias@top_secret"], "'alias' predicate should be in the 'top_secret' labeled group") t.Log("All predicate routing verified successfully!") } @@ -263,10 +267,10 @@ func TestLabeledPredicateCannotBeMoved(t *testing.T) { state, err := testutil.GetState() require.NoError(t, err) - // Find the group with 'codename' predicate (stored with namespace prefix "0-") + // Find the group with 'codename' predicate (stored as composite key "0-codename@secret") var codenameGroup string for groupID, group := range state.Groups { - if _, ok := group.Tablets["0-codename"]; ok { + if _, ok := group.Tablets["0-codename@secret"]; ok { codenameGroup = groupID break } @@ -296,7 +300,7 @@ func TestLabeledPredicateCannotBeMoved(t *testing.T) { var newCodenameGroup string for groupID, group := range state2.Groups { - if _, ok := group.Tablets["0-codename"]; ok { + if _, ok := group.Tablets["0-codename@secret"]; ok { newCodenameGroup = groupID break } @@ -347,9 +351,10 @@ func TestUnlabeledPredicateNotOnLabeledGroup(t *testing.T) { } } - // Verify unlabeled predicates are not in labeled groups + // Verify unlabeled predicates are not in labeled groups. + // Tablet keys in the state use the namespace prefix "0-" (e.g., "0-name"). t.Log("Verifying unlabeled predicates are not in labeled groups...") - unlabeledPreds := []string{"name", "email", "phone"} + unlabeledPreds := []string{"0-name", "0-email", "0-phone"} for _, pred := range unlabeledPreds { for groupID, group := range state.Groups { if _, ok := group.Tablets[pred]; ok { @@ -390,3 +395,179 @@ func TestMissingLabelGroupError(t *testing.T) { "error should mention the missing label") t.Log("Verified: non-existent label produces correct error!") } + +// TestEntityLevelRouting verifies that setting dgraph.label on a UID pins all its predicates +// to the labeled group, creating composite tablet keys like "predicate@label" in Zero's state. +func TestEntityLevelRouting(t *testing.T) { + t.Log("=== TestEntityLevelRouting: Verifying entity-level tablet routing ===") + dg := waitForCluster(t) + ctx := context.Background() + + // Step 1: Drop all data and apply schema without @label directives. + // Entity-level routing uses dgraph.label on the UID, not schema-level @label. + t.Log("Dropping all data...") + require.NoError(t, dg.Alter(ctx, &api.Operation{DropAll: true})) + + t.Log("Applying schema (no @label directives — routing is entity-level via dgraph.label)...") + schema := ` + Document.name: string @index(term) . + Document.text: string @index(term) . + ` + require.NoError(t, dg.Alter(ctx, &api.Operation{Schema: schema})) + t.Log("Schema applied successfully") + + // Step 2: Create 3 entities with different dgraph.label values in a single mutation. + t.Log("Inserting 3 entities with different dgraph.label values...") + _, err := dg.NewTxn().Mutate(ctx, &api.Mutation{ + CommitNow: true, + SetNquads: []byte(` + _:doc1 "secret" . + _:doc1 "Secret.pdf" . + _:doc1 "Classified" . + + _:doc2 "top_secret" . + _:doc2 "TopSecret.pdf" . + _:doc2 "Highly classified" . + + _:doc3 "Boring.pdf" . + _:doc3 "Unclassified" . + `), + }) + require.NoError(t, err) + t.Log("Entities inserted successfully") + + // Step 3: Verify tablet assignments in Zero's state. + t.Log("Waiting 5s for tablet assignments to propagate...") + time.Sleep(5 * time.Second) + + t.Log("Fetching cluster state to verify tablet assignments...") + state, err := testutil.GetState() + require.NoError(t, err) + + // Build a map of label -> groupID from members + labelToGroup := make(map[string]string) + for groupID, group := range state.Groups { + for _, member := range group.Members { + if member.Label != "" { + labelToGroup[member.Label] = groupID + t.Logf(" Group %s has label: %s", groupID, member.Label) + } + } + } + secretGroup := labelToGroup["secret"] + topSecretGroup := labelToGroup["top_secret"] + require.NotEmpty(t, secretGroup, "should have a 'secret' labeled group") + require.NotEmpty(t, topSecretGroup, "should have a 'top_secret' labeled group") + + // Build a map of tablet key -> groupID from all groups + tabletToGroup := make(map[string]string) + for groupID, group := range state.Groups { + for tabletKey := range group.Tablets { + tabletToGroup[tabletKey] = groupID + t.Logf(" Tablet %q is in group %s", tabletKey, groupID) + } + } + + // Verify unlabeled tablets exist (for doc3 which has no dgraph.label) + t.Log("Verifying unlabeled tablets (for doc3)...") + _, hasDocName := tabletToGroup["0-Document.name"] + require.True(t, hasDocName, "unlabeled tablet '0-Document.name' should exist") + + _, hasDocText := tabletToGroup["0-Document.text"] + require.True(t, hasDocText, "unlabeled tablet '0-Document.text' should exist") + + // Verify 'secret' tablets (for doc1) + t.Log("Verifying 'secret' tablets (for doc1)...") + secretNameGroup, hasSecretName := tabletToGroup["0-Document.name@secret"] + require.True(t, hasSecretName, "tablet '0-Document.name@secret' should exist") + require.Equal(t, secretGroup, secretNameGroup, + "'0-Document.name@secret' should be in the 'secret' group") + + secretTextGroup, hasSecretText := tabletToGroup["0-Document.text@secret"] + require.True(t, hasSecretText, "tablet '0-Document.text@secret' should exist") + require.Equal(t, secretGroup, secretTextGroup, + "'0-Document.text@secret' should be in the 'secret' group") + + // Verify 'top_secret' tablets (for doc2) + t.Log("Verifying 'top_secret' tablets (for doc2)...") + topSecretNameGroup, hasTopSecretName := tabletToGroup["0-Document.name@top_secret"] + require.True(t, hasTopSecretName, "tablet '0-Document.name@top_secret' should exist") + require.Equal(t, topSecretGroup, topSecretNameGroup, + "'0-Document.name@top_secret' should be in the 'top_secret' group") + + topSecretTextGroup, hasTopSecretText := tabletToGroup["0-Document.text@top_secret"] + require.True(t, hasTopSecretText, "tablet '0-Document.text@top_secret' should exist") + require.Equal(t, topSecretGroup, topSecretTextGroup, + "'0-Document.text@top_secret' should be in the 'top_secret' group") + + t.Log("All tablet assignments verified!") + + // Step 4: Verify query fan-out — all 3 documents should be returned despite + // living on 3 different groups. + // NOTE: We avoid orderasc in the DQL query because the sort operation fans out + // to all tablet groups and concatenates sorted runs instead of merging them, + // causing triplication. Sorting in Go is the correct approach for now. + // + // We poll with retries because AllTablets (used for query fan-out) reads the + // alpha's local tablet cache, which is updated asynchronously via applyState from + // Zero. Until all tablets propagate, the query may only reach a subset of groups. + t.Log("Querying all documents via has(Document.name) to verify fan-out across groups...") + type docResult struct { + Name string `json:"Document.name"` + Text string `json:"Document.text"` + } + var result struct { + Docs []docResult `json:"docs"` + } + var lastResp string + deadline := time.Now().Add(30 * time.Second) + for attempt := 1; time.Now().Before(deadline); attempt++ { + resp, err := dg.NewTxn().Query(ctx, ` + { + docs(func: has(Document.name)) { + Document.name + Document.text + } + } + `) + require.NoError(t, err) + lastResp = string(resp.GetJson()) + + var r struct { + Docs []docResult `json:"docs"` + } + require.NoError(t, json.Unmarshal(resp.GetJson(), &r)) + if len(r.Docs) == 3 { + result = r + t.Logf("Query returned 3 docs on attempt %d", attempt) + break + } + t.Logf("Attempt %d: got %d docs (need 3), retrying in 2s... (response: %s)", + attempt, len(r.Docs), lastResp) + time.Sleep(2 * time.Second) + } + require.Len(t, result.Docs, 3, + "should return all 3 documents from 3 different groups (last response: %s)", lastResp) + + // Sort results in Go for deterministic verification + sort.Slice(result.Docs, func(i, j int) bool { + return result.Docs[i].Name < result.Docs[j].Name + }) + + // Verify each document is present + expectedDocs := []struct { + Name string + Text string + }{ + {"Boring.pdf", "Unclassified"}, + {"Secret.pdf", "Classified"}, + {"TopSecret.pdf", "Highly classified"}, + } + for i, expected := range expectedDocs { + require.Equal(t, expected.Name, result.Docs[i].Name, "document name mismatch at index %d", i) + require.Equal(t, expected.Text, result.Docs[i].Text, "document text mismatch at index %d", i) + t.Logf(" Found document: %s -> %s", result.Docs[i].Name, result.Docs[i].Text) + } + + t.Log("Entity-level routing test passed: all documents returned via fan-out!") +} diff --git a/worker/draft.go b/worker/draft.go index c2cb9947519..372b46d75ce 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -367,6 +367,9 @@ func (n *node) applyMutations(ctx context.Context, proposal *pb.Proposal) (rerr // TODO: Revisit this when we work on posting cache. Don't clear entire cache. // We don't want to drop entire cache, just due to one namespace. posting.ResetCache() + if elCache != nil { + elCache.Clear() + } return nil } @@ -384,6 +387,9 @@ func (n *node) applyMutations(ctx context.Context, proposal *pb.Proposal) (rerr // TODO: Revisit this when we work on posting cache. Don't clear entire cache. // We don't want to drop entire cache, just due to one namespace. posting.ResetCache() + if elCache != nil { + elCache.Clear() + } return nil } @@ -398,6 +404,9 @@ func (n *node) applyMutations(ctx context.Context, proposal *pb.Proposal) (rerr // Clear entire cache. posting.ResetCache() + if elCache != nil { + elCache.Clear() + } // It should be okay to set the schema at timestamp 1 after drop all operation. if groups().groupId() == 1 { diff --git a/worker/embedded.go b/worker/embedded.go index 8394e6668d7..424a90938a6 100644 --- a/worker/embedded.go +++ b/worker/embedded.go @@ -26,7 +26,7 @@ func InitForLite(ps *badger.DB) { func InitTablet(pred string) { groups().Lock() defer groups().Unlock() - groups().tablets[pred] = &pb.Tablet{GroupId: 1, Predicate: pred} + groups().tabletIndex.Set(pred, "", &pb.Tablet{GroupId: 1, Predicate: pred}) } func ApplyMutations(ctx context.Context, p *pb.Proposal) error { diff --git a/worker/entity_label_cache.go b/worker/entity_label_cache.go new file mode 100644 index 00000000000..ce75f290f69 --- /dev/null +++ b/worker/entity_label_cache.go @@ -0,0 +1,60 @@ +/* + * SPDX-FileCopyrightText: © Hypermode Inc. + * SPDX-License-Identifier: Apache-2.0 + */ + +package worker + +import "sync" + +// entityLabelCache is a concurrency-safe UID -> label cache. +// Used by the mutation routing layer to resolve entity labels without +// querying group 1 on every mutation. +type entityLabelCache struct { + mu sync.RWMutex + entries map[uint64]string + maxSize int +} + +func newEntityLabelCache(maxSize int) *entityLabelCache { + return &entityLabelCache{ + entries: make(map[uint64]string), + maxSize: maxSize, + } +} + +// Get returns the cached label for a UID. Returns ("", false) on cache miss. +// An empty label with ok=true means the entity is explicitly unlabeled. +func (c *entityLabelCache) Get(uid uint64) (string, bool) { + c.mu.RLock() + defer c.mu.RUnlock() + label, ok := c.entries[uid] + return label, ok +} + +// Set stores a UID -> label mapping. If the cache exceeds maxSize, it is +// cleared (simple eviction strategy — revisit with LRU if needed). +func (c *entityLabelCache) Set(uid uint64, label string) { + c.mu.Lock() + defer c.mu.Unlock() + if len(c.entries) >= c.maxSize { + // Simple eviction: clear everything. This is acceptable because + // cache misses just cause a read from group 1, not data loss. + c.entries = make(map[uint64]string) + } + c.entries[uid] = label +} + +// Invalidate removes a single UID from the cache. +func (c *entityLabelCache) Invalidate(uid uint64) { + c.mu.Lock() + defer c.mu.Unlock() + delete(c.entries, uid) +} + +// Clear removes all entries. Used on DropAll. +func (c *entityLabelCache) Clear() { + c.mu.Lock() + defer c.mu.Unlock() + c.entries = make(map[uint64]string) +} diff --git a/worker/entity_label_cache_test.go b/worker/entity_label_cache_test.go new file mode 100644 index 00000000000..d2c0b6a4019 --- /dev/null +++ b/worker/entity_label_cache_test.go @@ -0,0 +1,59 @@ +/* + * SPDX-FileCopyrightText: © Hypermode Inc. + * SPDX-License-Identifier: Apache-2.0 + */ + +package worker + +import "testing" + +func TestEntityLabelCache_GetSet(t *testing.T) { + c := newEntityLabelCache(100) + c.Set(42, "secret") + label, ok := c.Get(42) + if !ok || label != "secret" { + t.Errorf("Get(42) = (%q, %v), want ('secret', true)", label, ok) + } +} + +func TestEntityLabelCache_Miss(t *testing.T) { + c := newEntityLabelCache(100) + label, ok := c.Get(99) + if ok || label != "" { + t.Errorf("Get(99) = (%q, %v), want ('', false)", label, ok) + } +} + +func TestEntityLabelCache_Invalidate(t *testing.T) { + c := newEntityLabelCache(100) + c.Set(42, "secret") + c.Invalidate(42) + label, ok := c.Get(42) + if ok { + t.Errorf("Get(42) after Invalidate = (%q, %v), want ('', false)", label, ok) + } +} + +func TestEntityLabelCache_Clear(t *testing.T) { + c := newEntityLabelCache(100) + c.Set(1, "a") + c.Set(2, "b") + c.Clear() + if _, ok := c.Get(1); ok { + t.Error("Get(1) after Clear should miss") + } + if _, ok := c.Get(2); ok { + t.Error("Get(2) after Clear should miss") + } +} + +func TestEntityLabelCache_UnlabeledEntity(t *testing.T) { + // An entity with no label should be cached as "" (empty string) + // so we don't repeatedly look it up from group 1. + c := newEntityLabelCache(100) + c.Set(42, "") + label, ok := c.Get(42) + if !ok || label != "" { + t.Errorf("Get(42) = (%q, %v), want ('', true)", label, ok) + } +} diff --git a/worker/groups.go b/worker/groups.go index 5aa2ee1c827..0c0af18a579 100644 --- a/worker/groups.go +++ b/worker/groups.go @@ -33,7 +33,7 @@ type groupi struct { state *pb.MembershipState Node *node gid uint32 - tablets map[string]*pb.Tablet + tabletIndex *pb.TabletIndex triggerCh chan struct{} // Used to trigger membership sync blockDeletes *sync.Mutex // Ensure that deletion won't happen when move is going on. closer *z.Closer @@ -46,7 +46,7 @@ type groupi struct { var gr = &groupi{ blockDeletes: new(sync.Mutex), - tablets: make(map[string]*pb.Tablet), + tabletIndex: pb.NewTabletIndex(), closer: z.NewCloser(3), // Match CLOSER:1 in this file. } @@ -304,7 +304,6 @@ func (g *groupi) applyState(myId uint64, state *pb.MembershipState) { // Sometimes this can cause us to lose latest tablet info, but that shouldn't cause any issues. var foundSelf bool - g.tablets = make(map[string]*pb.Tablet) for gid, group := range g.state.Groups { for _, member := range group.Members { if myId == member.Id { @@ -315,14 +314,19 @@ func (g *groupi) applyState(myId uint64, state *pb.MembershipState) { conn.GetPools().Connect(member.Addr, x.WorkerConfig.TLSClientConfig) } } - for _, tablet := range group.Tablets { - g.tablets[tablet.Predicate] = tablet - } if gid == g.groupId() { glog.V(3).Infof("group %d checksum: %d", g.groupId(), group.Checksum) atomic.StoreUint64(&g.membershipChecksum, group.Checksum) } } + // Build tablet index from all groups' flat proto maps in a single pass. + // TabletIndex.GetForGroup handles what the previous two-pass ordering guaranteed: + // preferring the tablet belonging to this group for bare-predicate lookups. + idx := pb.NewTabletIndex() + for _, group := range g.state.Groups { + idx.BuildFromFlat(group.Tablets) + } + g.tabletIndex = idx for _, member := range g.state.Zeros { if x.WorkerConfig.MyAddr != member.Addr { conn.GetPools().Connect(member.Addr, x.WorkerConfig.TLSClientConfig) @@ -407,7 +411,7 @@ func (g *groupi) BelongsTo(key string, label string) (uint32, error) { // should reject that query. func (g *groupi) BelongsToReadOnly(key string, ts uint64) (uint32, error) { g.RLock() - tablet := g.tablets[key] + tablet := g.tabletIndex.GetForGroup(key, g.groupId()) g.RUnlock() if tablet != nil { if ts > 0 && ts < tablet.MoveTs { @@ -416,15 +420,16 @@ func (g *groupi) BelongsToReadOnly(key string, ts uint64) (uint32, error) { } return tablet.GetGroupId(), nil } - // We don't know about this tablet. Talk to dgraphzero to find out who is - // serving this tablet. + // serving this tablet. We pass our own GroupId so Zero can check if this + // group serves a tablet for the predicate (entity-level routing). pl := g.connToZeroLeader() zc := pb.NewZeroClient(pl.Get()) tablet = &pb.Tablet{ Predicate: key, ReadOnly: true, + GroupId: g.groupId(), } out, err := zc.ShouldServe(g.Ctx(), tablet) if err != nil { @@ -437,7 +442,7 @@ func (g *groupi) BelongsToReadOnly(key string, ts uint64) (uint32, error) { g.Lock() defer g.Unlock() - g.tablets[key] = out + g.tabletIndex.Set(out.GetPredicate(), out.GetLabel(), out) if out != nil && ts > 0 && ts < out.MoveTs { return 0, errors.Errorf("StartTs: %d is from before MoveTs: %d for pred: %q", ts, out.MoveTs, key) @@ -445,6 +450,29 @@ func (g *groupi) BelongsToReadOnly(key string, ts uint64) (uint32, error) { return out.GetGroupId(), nil } +// AllTablets returns all cached tablets for a predicate (across all labels). +// This is used for query fan-out when a predicate has multiple tablets. +// Returns nil if only a single tablet exists (fast path). +func (g *groupi) AllTablets(predicate string, ts uint64) ([]*pb.Tablet, error) { + g.RLock() + labels := g.tabletIndex.AllForPredicate(predicate) + if len(labels) <= 1 { + g.RUnlock() + return nil, nil + } + tablets := make([]*pb.Tablet, 0, len(labels)) + for _, tablet := range labels { + if ts > 0 && ts < tablet.MoveTs { + g.RUnlock() + return nil, errors.Errorf("StartTs: %d is before MoveTs: %d for pred: %q", + ts, tablet.MoveTs, predicate) + } + tablets = append(tablets, tablet) + } + g.RUnlock() + return tablets, nil +} + // ServesTablet checks if this group serves the given predicate. // Uses stored schema to get the label for existing predicates. func (g *groupi) ServesTablet(key string) (bool, error) { @@ -462,7 +490,7 @@ func (g *groupi) ServesTablet(key string) (bool, error) { // by other groups (labeled alphas). Returns empty string if tablet not cached. func (g *groupi) GetTabletLabel(key string) string { g.RLock() - tablet := g.tablets[key] + tablet := g.tabletIndex.GetAny(key) g.RUnlock() if tablet != nil { return tablet.Label @@ -481,10 +509,10 @@ func (g *groupi) sendTablet(tablet *pb.Tablet) (*pb.Tablet, error) { } // Do not store tablets with group ID 0, as they are just dummy tablets for - // predicates that do no exist. + // predicates that do not exist. if out.GroupId > 0 { g.Lock() - g.tablets[out.GetPredicate()] = out + g.tabletIndex.Set(out.GetPredicate(), out.GetLabel(), out) g.Unlock() } @@ -502,16 +530,16 @@ func (g *groupi) Inform(preds []string) ([]*pb.Tablet, error) { if len(p) == 0 { continue } - if tab, ok := g.tablets[p]; !ok { + if tab := g.tabletIndex.GetAny(p); tab != nil { + tablets = append(tablets, tab) + } else { tablet := &pb.Tablet{GroupId: g.groupId(), Predicate: p} // Get label from schema and set if exists if label, ok := schema.State().GetLabel(context.Background(), p); ok { tablet.Label = label - glog.Infof("Inform: predicate %s has label %q from schema", p, label) + glog.V(2).Infof("Inform: predicate %s has label %q from schema", p, label) } unknownPreds = append(unknownPreds, tablet) - } else { - tablets = append(tablets, tab) } } g.RUnlock() @@ -530,11 +558,11 @@ func (g *groupi) Inform(preds []string) ([]*pb.Tablet, error) { } // Do not store tablets with group ID 0, as they are just dummy tablets for - // predicates that do no exist. + // predicates that do not exist. g.Lock() for _, t := range out.Tablets { if t.GroupId > 0 { - g.tablets[t.GetPredicate()] = t + g.tabletIndex.Set(t.GetPredicate(), t.GetLabel(), t) tablets = append(tablets, t) } @@ -552,30 +580,14 @@ func (g *groupi) Inform(preds []string) ([]*pb.Tablet, error) { // For data mutations, get the label from schema.State().GetLabel(). // Do not modify the returned Tablet. func (g *groupi) Tablet(key string, label string) (*pb.Tablet, error) { - // TODO: Remove all this later, create a membership state and apply it g.RLock() - tablet, ok := g.tablets[key] + tablet := g.tabletIndex.Get(key, label) g.RUnlock() - if ok { - // If labels match (or both empty), return cached tablet - if tablet.Label == label { - glog.V(2).Infof("Tablet: predicate %s cached (groupId=%d, label=%q)", key, tablet.GroupId, tablet.Label) - return tablet, nil - } - // Labels don't match - clear our cache and re-request from Zero - // This can happen after DropAll when tablets are re-created with different labels - glog.Infof("Tablet: predicate %s cached with label %q but need %q, clearing cache and re-requesting", - key, tablet.Label, label) - g.Lock() - delete(g.tablets, key) - g.Unlock() + if tablet != nil { + return tablet, nil } - - // We don't know about this tablet (or labels didn't match). - // Check with dgraphzero if we can serve it. - tablet = &pb.Tablet{GroupId: g.groupId(), Predicate: key, Label: label} - glog.V(2).Infof("Tablet: predicate %s requesting with label %q", key, label) - return g.sendTablet(tablet) + // Cache miss — query Zero. + return g.sendTablet(&pb.Tablet{GroupId: g.groupId(), Predicate: key, Label: label}) } // ForceTablet forces this group to serve the given predicate, even if another diff --git a/worker/mutation.go b/worker/mutation.go index 5f61f2bff07..ed37ac0a8c2 100644 --- a/worker/mutation.go +++ b/worker/mutation.go @@ -224,7 +224,7 @@ func runSchemaMutation(ctx context.Context, updates []*pb.SchemaUpdate, startTs // For labeled predicates, the tablet is intentionally served by a different group. // We still need to record the schema metadata so queries know the predicate type, // but we skip all index operations since we don't store the data. - if su.Label != "" { + if su.IsLabeled() { glog.V(2).Infof("Recording schema metadata for labeled predicate %s (label: %s), served by group %d", su.Predicate, su.Label, tablet.GetGroupId()) if err := checkSchema(su); err != nil { @@ -700,13 +700,91 @@ func proposeOrSend(ctx context.Context, gid uint32, m *pb.Mutations, chr chan re chr <- res } +// Global entity label cache, initialized during group setup. +var elCache *entityLabelCache + +func initEntityLabelCache() { + elCache = newEntityLabelCache(1_000_000) // 1M entries ~= 16MB +} + +// readEntityLabelFromStore reads dgraph.label for a UID via ProcessTaskOverNetwork. +// This issues a query to group 1 (where dgraph.label is served) to look up +// the entity's label on a cache miss. +func readEntityLabelFromStore(uid uint64) string { + ctx := context.Background() + q := &pb.Query{ + Attr: x.NamespaceAttr(x.RootNamespace, "dgraph.label"), + UidList: &pb.List{Uids: []uint64{uid}}, + ReadTs: State.GetTimestamp(false), + } + result, err := ProcessTaskOverNetwork(ctx, q) + if err != nil { + glog.V(2).Infof("Failed to read dgraph.label for uid %d: %v", uid, err) + return "" + } + if len(result.ValueMatrix) > 0 && len(result.ValueMatrix[0].Values) > 0 { + val := result.ValueMatrix[0].Values[0] + if len(val.Val) > 0 { + return string(val.Val) + } + } + return "" +} + +// resolveEntityLabel returns the entity-level label for a UID. +// Priority: batch labels > cache > read from group 1. +func resolveEntityLabel(uid uint64, batchLabels map[uint64]string) string { + if label, ok := batchLabels[uid]; ok { + return label + } + if elCache != nil { + if label, ok := elCache.Get(uid); ok { + return label + } + } + // Cache miss — read dgraph.label from the store. + label := readEntityLabelFromStore(uid) + if elCache != nil { + elCache.Set(uid, label) + } + return label +} + +// resolveLabel determines the effective label for routing an edge. +// Only entity-level labels (dgraph.label) trigger label-aware routing. +// Predicate-level @label routing is handled by Zero's tablet assignments +// and does not require the alpha to resolve labels. +func resolveLabel(uid uint64, _ string, batchLabels map[uint64]string) string { + return resolveEntityLabel(uid, batchLabels) +} + // populateMutationMap populates a map from group id to the mutation that // should be sent to that group. func populateMutationMap(src *pb.Mutations) (map[uint32]*pb.Mutations, error) { mm := make(map[uint32]*pb.Mutations) + + // PHASE 1: Scan for dgraph.label edges to build entity -> label map. + // This handles new entities whose labels are set in the same mutation batch. + batchLabels := make(map[uint64]string) + for _, edge := range src.Edges { + pred := x.ParseAttr(edge.Attr) + if pred == "dgraph.label" { + batchLabels[edge.Entity] = string(edge.Value) + } + } + + // PHASE 2: Route each edge using the entity's resolved label. for _, edge := range src.Edges { - // For data mutations, get the label from stored schema - label, _ := schema.State().GetLabel(context.Background(), edge.Attr) + var label string + if x.IsReservedPredicate(edge.Attr) { + // Reserved predicates (dgraph.label, dgraph.type, ACL) always use + // predicate-level routing (typically group 1). + label, _ = schema.State().GetLabel(context.Background(), edge.Attr) + } else { + // Non-reserved predicates use entity-label-aware resolution. + label = resolveLabel(edge.Entity, edge.Attr, batchLabels) + } + gid, err := groups().BelongsTo(edge.Attr, label) if err != nil { return nil, err @@ -721,9 +799,8 @@ func populateMutationMap(src *pb.Mutations) (map[uint32]*pb.Mutations, error) { mu.Metadata = src.Metadata } + // Schema mutations — unchanged, use predicate-level label. for _, schemaUpdate := range src.Schema { - // For schema mutations, use the label from the SchemaUpdate itself - // This is critical for new predicates where the schema isn't stored yet gid, err := groups().BelongsTo(schemaUpdate.Predicate, schemaUpdate.Label) if err != nil { return nil, err diff --git a/worker/proposal.go b/worker/proposal.go index b3974be1cb3..1241d51c5aa 100644 --- a/worker/proposal.go +++ b/worker/proposal.go @@ -146,16 +146,20 @@ func (n *node) proposeAndWait(ctx context.Context, proposal *pb.Proposal) (perr var noTimeout bool // checkTablet verifies that this group serves the given predicate. - // For data mutations, we get the label from stored schema. + // Uses BelongsToReadOnly instead of Tablet to avoid label-resolution issues + // with entity-level routing. GetForGroup in the tablet index prefers the + // tablet belonging to this group, so a direct cache lookup correctly validates + // that this group serves some tablet for the predicate. This avoids depending + // on schema.State().GetLabel() which returns a single label and may not match + // the receiving group's tablet. checkTablet := func(pred string) error { - label, _ := schema.State().GetLabel(context.Background(), pred) - tablet, err := groups().Tablet(pred, label) + gid, err := groups().BelongsToReadOnly(pred, 0) switch { case err != nil: return err - case tablet == nil || tablet.GroupId == 0: + case gid == 0: return errNonExistentTablet - case tablet.GroupId != groups().groupId(): + case gid != groups().groupId(): return errUnservedTablet default: return nil diff --git a/worker/sort.go b/worker/sort.go index deabecdac26..93d484b226c 100644 --- a/worker/sort.go +++ b/worker/sort.go @@ -46,23 +46,35 @@ type sortresult struct { // SortOverNetwork sends sort query over the network. func SortOverNetwork(ctx context.Context, q *pb.SortMessage) (*pb.SortResult, error) { - gid, err := groups().BelongsToReadOnly(q.Order[0].Attr, q.ReadTs) + attr := q.Order[0].Attr + + // Check for multi-tablet fan-out. + tablets, err := groups().AllTablets(attr, q.ReadTs) + if err != nil { + return &emptySortResult, err + } + + if len(tablets) > 1 { + return processSortFanOut(ctx, q, tablets) + } + + // Fast path: single tablet. + gid, err := groups().BelongsToReadOnly(attr, q.ReadTs) if err != nil { return &emptySortResult, err } else if gid == 0 { return &emptySortResult, - errors.Errorf("Cannot sort by unknown attribute %s", x.ParseAttr(q.Order[0].Attr)) + errors.Errorf("Cannot sort by unknown attribute %s", x.ParseAttr(attr)) } if span := trace.SpanFromContext(ctx); span != nil { span.SetAttributes( - attribute.String("attribute", q.Order[0].Attr), + attribute.String("attribute", attr), attribute.Int("groupId", int(gid)), ) } if groups().ServesGroup(gid) { - // No need for a network call, as this should be run from within this instance. return processSort(ctx, q) } @@ -76,6 +88,71 @@ func SortOverNetwork(ctx context.Context, q *pb.SortMessage) (*pb.SortResult, er return result.(*pb.SortResult), nil } +func processSortFanOut(ctx context.Context, q *pb.SortMessage, tablets []*pb.Tablet) (*pb.SortResult, error) { + type fanOutResult struct { + result *pb.SortResult + err error + } + + ch := make(chan fanOutResult, len(tablets)) + for _, tab := range tablets { + gid := tab.GroupId + go func(gid uint32) { + if groups().ServesGroup(gid) { + r, err := processSort(ctx, q) + ch <- fanOutResult{r, err} + return + } + r, err := processWithBackupRequest(ctx, gid, + func(ctx context.Context, c pb.WorkerClient) (interface{}, error) { + return c.Sort(ctx, q) + }) + if err != nil { + ch <- fanOutResult{nil, err} + return + } + ch <- fanOutResult{r.(*pb.SortResult), nil} + }(gid) + } + + var results []*pb.SortResult + for range tablets { + r := <-ch + if r.err != nil { + return &emptySortResult, r.err + } + results = append(results, r.result) + } + + return mergeSortResults(results, q), nil +} + +func mergeSortResults(results []*pb.SortResult, q *pb.SortMessage) *pb.SortResult { + if len(results) == 0 { + return &emptySortResult + } + if len(results) == 1 { + return results[0] + } + + // Merge UID matrices from all tablets. + merged := &pb.SortResult{} + if len(results[0].UidMatrix) > 0 { + merged.UidMatrix = make([]*pb.List, len(results[0].UidMatrix)) + for i := range merged.UidMatrix { + merged.UidMatrix[i] = &pb.List{} + } + for _, r := range results { + for i, list := range r.UidMatrix { + if i < len(merged.UidMatrix) { + merged.UidMatrix[i].Uids = append(merged.UidMatrix[i].Uids, list.Uids...) + } + } + } + } + return merged +} + // Sort is used to sort given UID matrix. func (w *grpcWorker) Sort(ctx context.Context, s *pb.SortMessage) (*pb.SortResult, error) { if ctx.Err() != nil { diff --git a/worker/task.go b/worker/task.go index 16d2a2a72b8..2b462e78dd7 100644 --- a/worker/task.go +++ b/worker/task.go @@ -118,11 +118,91 @@ func processWithBackupRequest( } } +// mergeResults combines results from multiple tablet queries. +// Each tablet returns results only for UIDs it has postings for. +func mergeResults(results []*pb.Result) *pb.Result { + if len(results) == 0 { + return &pb.Result{} + } + if len(results) == 1 { + return results[0] + } + + merged := &pb.Result{} + // Merge UID matrices: each result has one UidMatrix entry per query UID. + // For fan-out, all results have the same number of UidMatrix entries. + // Merge by appending UIDs from each tablet's response. + if len(results[0].UidMatrix) > 0 { + merged.UidMatrix = make([]*pb.List, len(results[0].UidMatrix)) + for i := range merged.UidMatrix { + merged.UidMatrix[i] = &pb.List{} + } + for _, r := range results { + for i, list := range r.UidMatrix { + if i < len(merged.UidMatrix) { + merged.UidMatrix[i].Uids = append(merged.UidMatrix[i].Uids, list.Uids...) + } + } + } + // Sort merged UID lists. Downstream consumers (algo.IndexOf in + // outputnode.go, algo.MergeSorted in query.go) use binary search + // and assume sorted order. Fan-out goroutines complete in + // non-deterministic order, so the appended UIDs need sorting. + for _, list := range merged.UidMatrix { + sort.Slice(list.Uids, func(i, j int) bool { return list.Uids[i] < list.Uids[j] }) + } + } + + // Merge value matrices similarly. + if len(results[0].ValueMatrix) > 0 { + merged.ValueMatrix = make([]*pb.ValueList, len(results[0].ValueMatrix)) + for i := range merged.ValueMatrix { + merged.ValueMatrix[i] = &pb.ValueList{} + } + for _, r := range results { + for i, vl := range r.ValueMatrix { + if i < len(merged.ValueMatrix) { + merged.ValueMatrix[i].Values = append(merged.ValueMatrix[i].Values, vl.Values...) + } + } + } + } + + // Merge counts. + if len(results[0].Counts) > 0 { + merged.Counts = make([]uint32, len(results[0].Counts)) + for _, r := range results { + for i, c := range r.Counts { + if i < len(merged.Counts) { + merged.Counts[i] += c + } + } + } + } + + // IntersectDest is not relevant for fan-out queries. + // LinRead is not relevant for fan-out queries. + return merged +} + // ProcessTaskOverNetwork is used to process the query and get the result from // the instance which stores posting list corresponding to the predicate in the // query. func ProcessTaskOverNetwork(ctx context.Context, q *pb.Query) (*pb.Result, error) { attr := q.Attr + + // Check for multi-tablet fan-out. + tablets, err := groups().AllTablets(attr, q.ReadTs) + if err != nil { + return nil, err + } + + if len(tablets) > 1 { + // Fan-out path: send query to all tablet groups in parallel. + return processTaskFanOut(ctx, q, tablets) + } + + // Fast path: single tablet (or none), use existing routing. gid, err := groups().BelongsToReadOnly(attr, q.ReadTs) switch { case err != nil: @@ -139,7 +219,6 @@ func ProcessTaskOverNetwork(ctx context.Context, q *pb.Query) (*pb.Result, error attribute.String("node_id", fmt.Sprintf("%d", groups().Node.Id)))) if groups().ServesGroup(gid) { - // No need for a network call, as this should be run from within this instance. return processTask(ctx, q, gid) } @@ -159,6 +238,58 @@ func ProcessTaskOverNetwork(ctx context.Context, q *pb.Query) (*pb.Result, error return reply, nil } +// processTaskFanOut sends the query to all tablet groups in parallel +// and merges the results. +func processTaskFanOut(ctx context.Context, q *pb.Query, tablets []*pb.Tablet) (*pb.Result, error) { + span := trace.SpanFromContext(ctx) + span.AddEvent("ProcessTaskFanOut", trace.WithAttributes( + attribute.String("attr", q.Attr), + attribute.Int("tablets", len(tablets)), + attribute.String("readTs", fmt.Sprintf("%d", q.ReadTs)))) + + type fanOutResult struct { + result *pb.Result + err error + } + + ch := make(chan fanOutResult, len(tablets)) + for _, tab := range tablets { + gid := tab.GroupId + go func(gid uint32) { + if groups().ServesGroup(gid) { + r, err := processTask(ctx, q, gid) + ch <- fanOutResult{r, err} + return + } + r, err := processWithBackupRequest(ctx, gid, + func(ctx context.Context, c pb.WorkerClient) (interface{}, error) { + return c.ServeTask(ctx, q) + }) + if err != nil { + ch <- fanOutResult{nil, err} + return + } + ch <- fanOutResult{r.(*pb.Result), nil} + }(gid) + } + + var results []*pb.Result + for range tablets { + r := <-ch + if r.err != nil { + glog.Warningf("processTaskFanOut(%q): tablet returned error: %v", q.Attr, r.err) + return nil, r.err + } + results = append(results, r.result) + } + + merged := mergeResults(results) + span.AddEvent("FanOut merged", trace.WithAttributes( + attribute.Int("result_count", len(results)), + attribute.String("attr", q.Attr))) + return merged, nil +} + // convertValue converts the data to the schema.State() type of predicate. func convertValue(attr, data string) (types.Val, error) { // Parse given value and get token. There should be only one token. diff --git a/worker/worker_test.go b/worker/worker_test.go index b5808dbf8e3..927236e15fd 100644 --- a/worker/worker_test.go +++ b/worker/worker_test.go @@ -504,10 +504,10 @@ func TestMain(m *testing.M) { posting.Config.CommitFraction = 0.10 gr = new(groupi) gr.gid = 1 - gr.tablets = make(map[string]*pb.Tablet) + gr.tabletIndex = pb.NewTabletIndex() addTablets := func(attrs []string, gid uint32, namespace uint64) { for _, attr := range attrs { - gr.tablets[x.NamespaceAttr(namespace, attr)] = &pb.Tablet{GroupId: gid} + gr.tabletIndex.Set(x.NamespaceAttr(namespace, attr), "", &pb.Tablet{GroupId: gid}) } } diff --git a/x/keys.go b/x/keys.go index 8850cebd08f..ec36b4e5b15 100644 --- a/x/keys.go +++ b/x/keys.go @@ -632,7 +632,8 @@ func IsDropOpKey(key []byte) (bool, error) { // These predicates appear for queries that have * as predicate in them. var starAllPredicateMap = map[string]struct{}{ - "dgraph.type": {}, + "dgraph.type": {}, + "dgraph.label": {}, } var aclPredicateMap = map[string]struct{}{