From d1446ad17f42595b11ce927fe18d24ebeecfee79 Mon Sep 17 00:00:00 2001 From: xqqp Date: Tue, 11 Nov 2025 17:05:49 +0100 Subject: [PATCH 1/3] fix(zero): make zero shutdown cleanly --- conn/pool.go | 46 +++++++++++++++++++++++++++++------------ dgraph/cmd/zero/raft.go | 2 +- dgraph/cmd/zero/run.go | 4 ++++ 3 files changed, 38 insertions(+), 14 deletions(-) diff --git a/conn/pool.go b/conn/pool.go index 7ce688e7c96..e81a32b5212 100644 --- a/conn/pool.go +++ b/conn/pool.go @@ -91,6 +91,18 @@ func (p *Pools) GetAll() []*Pool { return pool } +// RemoveAll removes all pool entries. +func (p *Pools) RemoveAll() { + p.Lock() + defer p.Unlock() + + for k, pool := range p.all { + glog.Warningf("CONN: Disconnecting from %s\n", k) + delete(p.all, k) + pool.shutdown() + } +} + // RemoveInvalid removes invalid nodes from the list of pools. func (p *Pools) RemoveInvalid(state *pb.MembershipState) { // Keeps track of valid IP addresses, assigned to active nodes. We do this @@ -241,11 +253,10 @@ func (p *Pool) listenToHeartbeat() error { }() threshold := time.Now().Add(10 * time.Second) - ticker := time.NewTicker(time.Second) - defer ticker.Stop() + ticker := time.Tick(time.Second) for { select { - case <-ticker.C: + case <-ticker: // Don't check before at least 10s since start. if time.Now().Before(threshold) { continue @@ -277,11 +288,19 @@ func (p *Pool) MonitorHealth() { // We might have lost connection to the destination. In that case, re-dial // the connection. - reconnect := func() { + // Returns true, if reconnection was successful + reconnect := func() bool { + reconnectionTicker := time.Tick(time.Second) for { - time.Sleep(time.Second) + select { + case <-p.closer.HasBeenClosed(): + glog.Infof("CONN: Returning from MonitorHealth for %s", p.Addr) + return false + case <-reconnectionTicker: + } + if err := p.closer.Ctx().Err(); err != nil { - return + return false } ctx, cancel := context.WithTimeout(p.closer.Ctx(), 10*time.Second) conn, err := grpc.NewClient(p.Addr, p.dialOpts...) @@ -298,7 +317,7 @@ func (p *Pool) MonitorHealth() { } p.conn = conn p.Unlock() - return + return true } glog.Errorf("CONN: Unable to connect with %s : %s\n", p.Addr, err) if conn != nil { @@ -309,19 +328,20 @@ func (p *Pool) MonitorHealth() { } } + ticker := time.Tick(time.Second) for { select { case <-p.closer.HasBeenClosed(): glog.Infof("CONN: Returning from MonitorHealth for %s", p.Addr) return - default: - err := p.listenToHeartbeat() - if err != nil { - reconnect() + case <-ticker: + } + + err := p.listenToHeartbeat() + if err != nil { + if reconnect() { glog.Infof("CONN: Re-established connection with %s.\n", p.Addr) } - // Sleep for a bit before retrying. - time.Sleep(echoDuration) } } } diff --git a/dgraph/cmd/zero/raft.go b/dgraph/cmd/zero/raft.go index 81f53ff8068..d3ddefae7c3 100644 --- a/dgraph/cmd/zero/raft.go +++ b/dgraph/cmd/zero/raft.go @@ -879,7 +879,7 @@ func (n *node) Run() { // snapshot can cause select loop to block while deleting entries, so run // it in goroutine readStateCh := make(chan raft.ReadState, 100) - closer := z.NewCloser(5) + closer := z.NewCloser(4) defer func() { closer.SignalAndWait() n.closer.Done() diff --git a/dgraph/cmd/zero/run.go b/dgraph/cmd/zero/run.go index 9975dce0c89..e5ddc3e36c4 100644 --- a/dgraph/cmd/zero/run.go +++ b/dgraph/cmd/zero/run.go @@ -346,6 +346,10 @@ func run() { st.node.closer.SignalAndWait() // Stop all internal requests. _ = grpcListener.Close() + // Stop all pools + if pools := conn.GetPools(); pools != nil { + pools.RemoveAll() + } }() st.zero.closer.AddRunning(2) From faafdae941513a97ee3fa10ddfaccbaad864e5f7 Mon Sep 17 00:00:00 2001 From: Matthew McNeely Date: Tue, 30 Dec 2025 14:53:58 -0500 Subject: [PATCH 2/3] Increment running count "manually" to avoid future closer count mismatches --- dgraph/cmd/zero/raft.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/dgraph/cmd/zero/raft.go b/dgraph/cmd/zero/raft.go index d3ddefae7c3..6990418e899 100644 --- a/dgraph/cmd/zero/raft.go +++ b/dgraph/cmd/zero/raft.go @@ -879,17 +879,25 @@ func (n *node) Run() { // snapshot can cause select loop to block while deleting entries, so run // it in goroutine readStateCh := make(chan raft.ReadState, 100) - closer := z.NewCloser(4) + closer := z.NewCloser(0) defer func() { closer.SignalAndWait() n.closer.Done() glog.Infof("Zero Node.Run finished.") }() + closer.AddRunning(1) go n.snapshotPeriodically(closer) + + closer.AddRunning(1) go n.updateZeroMembershipPeriodically(closer) + + closer.AddRunning(1) go n.checkQuorum(closer) + + closer.AddRunning(1) go n.RunReadIndexLoop(closer, readStateCh) + if !x.WorkerConfig.HardSync { closer.AddRunning(1) go x.StoreSync(n.Store, closer) From f8b5dad33600b1d4fc78310606db385ba3b9c072 Mon Sep 17 00:00:00 2001 From: Matthew McNeely Date: Tue, 30 Dec 2025 14:54:14 -0500 Subject: [PATCH 3/3] Add an integration test --- dgraph/cmd/zero/zero_test.go | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/dgraph/cmd/zero/zero_test.go b/dgraph/cmd/zero/zero_test.go index 81b7bf42ee1..eb0850f360b 100644 --- a/dgraph/cmd/zero/zero_test.go +++ b/dgraph/cmd/zero/zero_test.go @@ -17,6 +17,7 @@ import ( "testing" "time" + "github.com/docker/docker/client" "github.com/stretchr/testify/require" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -154,3 +155,38 @@ func TestZeroHealth(t *testing.T) { require.NoError(t, err) require.Equal(t, string(body), "OK") } + +func TestZeroGracefulShutdown(t *testing.T) { + // This test verifies that Zero shuts down cleanly without hanging. + // It catches issues like closer miscount bugs where SignalAndWait() would block indefinitely. + + instance := testutil.GetContainerInstance(testutil.DockerPrefix, "zero1") + c := instance.GetContainer() + require.NotNil(t, c, "zero1 container not found") + + containerID := c.ID + + startTime := time.Now() + err := testutil.DockerRun("zero1", testutil.Stop) + shutdownDuration := time.Since(startTime) + + require.NoError(t, err, "Failed to stop zero1 container") + cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) + require.NoError(t, err) + + inspect, err := cli.ContainerInspect(context.Background(), containerID) + require.NoError(t, err) + require.False(t, inspect.State.Running, "Container should not be running after stop") + + if inspect.State.ExitCode == 137 { + t.Errorf("Zero was killed (exit code 137) instead of shutting down gracefully. "+ + "This may indicate a hanging goroutine or closer miscount. Shutdown took %v", shutdownDuration) + } + + // Restart the container so other tests can continue + err = testutil.DockerRun("zero1", testutil.Start) + require.NoError(t, err, "Failed to restart zero1 container") + + err = instance.BestEffortWaitForHealthy(6080) + require.NoError(t, err, "Zero did not become healthy after restart") +}