Skip to content

Commit c4e4ccf

Browse files
committed
refactor(sharding): improve lock safety and code organization
- Use closure with defer for RLock/RUnlock in Inform() to prevent lock leaks on error paths - Add getGroupLabel() helper that handles its own locking, keeping groupLabel() for callers that already hold the lock - Move checkSchemaTablet function closer to its usage in proposeAndWait - Add tablet_label span attribute for better observability
1 parent 60349e2 commit c4e4ccf

3 files changed

Lines changed: 62 additions & 50 deletions

File tree

dgraph/cmd/zero/tablet.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ func (s *Server) movePredicate(predicate string, srcGroup, dstGroup uint32) erro
140140
if tab == nil {
141141
return errors.Errorf("Tablet to be moved: [%v] is not being served", predicate)
142142
}
143-
dstGroupLabel := s.groupLabel(dstGroup)
143+
dstGroupLabel := s.getGroupLabel(dstGroup)
144144
if dstGroupLabel != tab.Label {
145145
// Don't allow a predicate to be moved to a group that doesn't share it's label.
146146
// (label will be empty string on either if unassigned)

dgraph/cmd/zero/zero.go

Lines changed: 37 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -440,42 +440,45 @@ func (s *Server) Inform(ctx context.Context, req *pb.TabletRequest) (*pb.TabletR
440440
var proposal pb.ZeroProposal
441441
proposal.Tablets = make([]*pb.Tablet, 0)
442442

443-
// Acquire read lock for label-related lookups
444-
s.RLock()
445443
for _, t := range unknownTablets {
446444
glog.Infof("Zero.Inform: routing tablet %s (label=%q, groupId=%d)", t.Predicate, t.Label, t.GroupId)
447-
switch {
448-
case x.IsReservedPredicate(t.Predicate):
449-
// Force all the reserved predicates to be allocated to group 1.
450-
// This is to make it easier to stream ACL updates to all alpha servers
451-
// since they only need to open one pipeline to receive updates for all
452-
// ACL predicates.
453-
// This will also make it easier to restore the reserved predicates after
454-
// a DropAll operation.
455-
t.GroupId = 1
456-
case t.Label != "":
457-
{
445+
// Use closure to ensure lock is always released via defer, even on error paths.
446+
// This pattern prevents lock leaks if new error conditions are added later.
447+
if err := func() error {
448+
s.RLock()
449+
defer s.RUnlock()
450+
switch {
451+
case x.IsReservedPredicate(t.Predicate):
452+
// Force all the reserved predicates to be allocated to group 1.
453+
// This is to make it easier to stream ACL updates to all alpha servers
454+
// since they only need to open one pipeline to receive updates for all
455+
// ACL predicates.
456+
// This will also make it easier to restore the reserved predicates after
457+
// a DropAll operation.
458+
t.GroupId = 1
459+
case t.Label != "":
458460
// Labeled predicate: route to matching labeled group
459461
gid, err := s.labelGroupId(t.Label)
460462
if err != nil {
461-
s.RUnlock()
462-
return nil, err
463+
return err
463464
}
464465
glog.Infof("Zero.Inform: labeled predicate %s (label=%q) routed to group %d", t.Predicate, t.Label, gid)
465466
t.GroupId = gid
467+
case s.isLabeledGroupId(t.GroupId):
468+
// make sure unlabeled predicates don't go an labeled group
469+
gid, err := s.firstUnlabeledGroupId()
470+
if err != nil {
471+
return err
472+
}
473+
t.GroupId = gid
466474
}
467-
case s.isLabeledGroupId(t.GroupId):
468-
// make sure unlabeled predicates don't go an labeled group
469-
gid, err := s.firstUnlabeledGroupId()
470-
if err != nil {
471-
s.RUnlock()
472-
return nil, err
473-
}
474-
t.GroupId = gid
475+
return nil
476+
}(); err != nil {
477+
return nil, err
475478
}
476479
proposal.Tablets = append(proposal.Tablets, t)
477480
}
478-
s.RUnlock()
481+
479482
if err := s.Node.proposeAndWait(ctx, &proposal); err != nil && err != errTabletAlreadyServed {
480483
span.AddEvent(fmt.Sprintf("Error proposing tablet: %+v. Error: %v", &proposal, err))
481484
return nil, err
@@ -705,6 +708,7 @@ func (s *Server) ShouldServe(
705708
// Check who is serving this tablet.
706709
tab := s.ServingTablet(tablet.Predicate)
707710
span.SetAttributes(attribute.String("tablet_predicate", tablet.Predicate))
711+
span.SetAttributes(attribute.String("tablet_label", tablet.Label))
708712
if tab != nil && !tablet.Force {
709713
// If the existing tablet has a different label than requested, we need to re-route.
710714
// This can happen when a schema is applied with @label after the predicate was
@@ -919,6 +923,7 @@ func (s *Server) latestMembershipState(ctx context.Context) (*pb.MembershipState
919923
}
920924

921925
// groupLabel returns the label for a group (from first labeled member found)
926+
// Caller must hold the read lock.
922927
func (s *Server) groupLabel(gid uint32) string {
923928
s.AssertRLock()
924929
group := s.state.Groups[gid]
@@ -933,6 +938,14 @@ func (s *Server) groupLabel(gid uint32) string {
933938
return ""
934939
}
935940

941+
// getGroupLabel is like groupLabel but handles its own locking.
942+
// Use this when calling from code that doesn't already hold the lock.
943+
func (s *Server) getGroupLabel(gid uint32) string {
944+
s.RLock()
945+
defer s.RUnlock()
946+
return s.groupLabel(gid)
947+
}
948+
936949
// labelGroupId the group ID that has the given label, or 0 if none
937950
func (s *Server) labelGroupId(label string) (uint32, error) {
938951
s.AssertRLock()

worker/proposal.go

Lines changed: 24 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -162,29 +162,6 @@ func (n *node) proposeAndWait(ctx context.Context, proposal *pb.Proposal) (perr
162162
}
163163
}
164164

165-
// validateSchemaTablet validates tablet assignment for schema mutations.
166-
// For labeled predicates, we only verify the tablet was assigned to some group -
167-
// we don't require it to be served by this instance since labeled predicates
168-
// are intentionally routed to a different group (the one with matching label).
169-
validateSchemaTablet := func(pred string, label string) error {
170-
tablet, err := groups().Tablet(pred, label)
171-
switch {
172-
case err != nil:
173-
return err
174-
case tablet == nil || tablet.GroupId == 0:
175-
return errNonExistentTablet
176-
case label != "":
177-
// Labeled predicates are served by the labeled group, not this instance.
178-
// Just verify the tablet was assigned successfully.
179-
return nil
180-
case tablet.GroupId != groups().groupId():
181-
// Unlabeled schema predicates should be served by this instance
182-
return errUnservedTablet
183-
default:
184-
return nil
185-
}
186-
}
187-
188165
// Do a type check here if schema is present
189166
// In very rare cases invalid entries might pass through raft, which would
190167
// be persisted, we do best effort schema check while writing
@@ -210,11 +187,33 @@ func (n *node) proposeAndWait(ctx context.Context, proposal *pb.Proposal) (perr
210187
}
211188
}
212189

190+
// checkSchemaTablet validates tablet assignment for schema mutations.
191+
// For labeled predicates, we only verify the tablet was assigned to some group -
192+
// we don't require it to be served by this instance since labeled predicates
193+
// are intentionally routed to a different group (the one with matching label).
194+
checkSchemaTablet := func(pred string, label string) error {
195+
tablet, err := groups().Tablet(pred, label)
196+
switch {
197+
case err != nil:
198+
return err
199+
case tablet == nil || tablet.GroupId == 0:
200+
return errNonExistentTablet
201+
case label != "":
202+
// Labeled predicates are served by the labeled group, not this instance.
203+
// Just verify the tablet was assigned successfully.
204+
return nil
205+
case tablet.GroupId != groups().groupId():
206+
// Unlabeled schema predicates should be served by this instance
207+
return errUnservedTablet
208+
default:
209+
return nil
210+
}
211+
}
213212
for _, schema := range proposal.Mutations.Schema {
214-
// Use validateSchemaTablet to pass the label from the schema update
213+
// Use checkSchemaTablet to pass the label from the schema update
215214
// since the schema isn't stored yet when we're processing it.
216215
// For labeled predicates, we don't require this instance to serve the tablet.
217-
if err := validateSchemaTablet(schema.Predicate, schema.Label); err != nil {
216+
if err := checkSchemaTablet(schema.Predicate, schema.Label); err != nil {
218217
return err
219218
}
220219
if err := checkSchema(schema); err != nil {

0 commit comments

Comments
 (0)