Skip to content

Commit a3d33ee

Browse files
xqqpmatthewmcneely
andauthored
fix(zero): make zero shutdown cleanly (#9525)
Zero currently has several issues related to shutdown: - #9367 removed a call to `updateEnterpriseState()` without decreasing the wait group counter. This makes zero wait indefinitely, or exit with a return code other than zero after it has been forced to terminate. - Pools never get shutdown, which keeps the pool's health check alive, resulting in logs with connection failures. - The pool health check uses `time.Sleep`, so even if pools would get shutdown, the shutdown would be delayed by the remaining sleep duration. This PR sets the appropriate wait group counter, implements a function which removes pools on shutdown and switches from `time.Sleep` to `time.Tick` in the health check. --------- Co-authored-by: Matthew McNeely <[email protected]>
1 parent 2c84a01 commit a3d33ee

4 files changed

Lines changed: 82 additions & 14 deletions

File tree

conn/pool.go

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,18 @@ func (p *Pools) GetAll() []*Pool {
9191
return pool
9292
}
9393

94+
// RemoveAll removes all pool entries.
95+
func (p *Pools) RemoveAll() {
96+
p.Lock()
97+
defer p.Unlock()
98+
99+
for k, pool := range p.all {
100+
glog.Warningf("CONN: Disconnecting from %s\n", k)
101+
delete(p.all, k)
102+
pool.shutdown()
103+
}
104+
}
105+
94106
// RemoveInvalid removes invalid nodes from the list of pools.
95107
func (p *Pools) RemoveInvalid(state *pb.MembershipState) {
96108
// Keeps track of valid IP addresses, assigned to active nodes. We do this
@@ -241,11 +253,10 @@ func (p *Pool) listenToHeartbeat() error {
241253
}()
242254

243255
threshold := time.Now().Add(10 * time.Second)
244-
ticker := time.NewTicker(time.Second)
245-
defer ticker.Stop()
256+
ticker := time.Tick(time.Second)
246257
for {
247258
select {
248-
case <-ticker.C:
259+
case <-ticker:
249260
// Don't check before at least 10s since start.
250261
if time.Now().Before(threshold) {
251262
continue
@@ -277,11 +288,19 @@ func (p *Pool) MonitorHealth() {
277288

278289
// We might have lost connection to the destination. In that case, re-dial
279290
// the connection.
280-
reconnect := func() {
291+
// Returns true, if reconnection was successful
292+
reconnect := func() bool {
293+
reconnectionTicker := time.Tick(time.Second)
281294
for {
282-
time.Sleep(time.Second)
295+
select {
296+
case <-p.closer.HasBeenClosed():
297+
glog.Infof("CONN: Returning from MonitorHealth for %s", p.Addr)
298+
return false
299+
case <-reconnectionTicker:
300+
}
301+
283302
if err := p.closer.Ctx().Err(); err != nil {
284-
return
303+
return false
285304
}
286305
ctx, cancel := context.WithTimeout(p.closer.Ctx(), 10*time.Second)
287306
conn, err := grpc.NewClient(p.Addr, p.dialOpts...)
@@ -298,7 +317,7 @@ func (p *Pool) MonitorHealth() {
298317
}
299318
p.conn = conn
300319
p.Unlock()
301-
return
320+
return true
302321
}
303322
glog.Errorf("CONN: Unable to connect with %s : %s\n", p.Addr, err)
304323
if conn != nil {
@@ -309,19 +328,20 @@ func (p *Pool) MonitorHealth() {
309328
}
310329
}
311330

331+
ticker := time.Tick(time.Second)
312332
for {
313333
select {
314334
case <-p.closer.HasBeenClosed():
315335
glog.Infof("CONN: Returning from MonitorHealth for %s", p.Addr)
316336
return
317-
default:
318-
err := p.listenToHeartbeat()
319-
if err != nil {
320-
reconnect()
337+
case <-ticker:
338+
}
339+
340+
err := p.listenToHeartbeat()
341+
if err != nil {
342+
if reconnect() {
321343
glog.Infof("CONN: Re-established connection with %s.\n", p.Addr)
322344
}
323-
// Sleep for a bit before retrying.
324-
time.Sleep(echoDuration)
325345
}
326346
}
327347
}

dgraph/cmd/zero/raft.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -879,17 +879,25 @@ func (n *node) Run() {
879879
// snapshot can cause select loop to block while deleting entries, so run
880880
// it in goroutine
881881
readStateCh := make(chan raft.ReadState, 100)
882-
closer := z.NewCloser(5)
882+
closer := z.NewCloser(0)
883883
defer func() {
884884
closer.SignalAndWait()
885885
n.closer.Done()
886886
glog.Infof("Zero Node.Run finished.")
887887
}()
888888

889+
closer.AddRunning(1)
889890
go n.snapshotPeriodically(closer)
891+
892+
closer.AddRunning(1)
890893
go n.updateZeroMembershipPeriodically(closer)
894+
895+
closer.AddRunning(1)
891896
go n.checkQuorum(closer)
897+
898+
closer.AddRunning(1)
892899
go n.RunReadIndexLoop(closer, readStateCh)
900+
893901
if !x.WorkerConfig.HardSync {
894902
closer.AddRunning(1)
895903
go x.StoreSync(n.Store, closer)

dgraph/cmd/zero/run.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,10 @@ func run() {
346346
st.node.closer.SignalAndWait()
347347
// Stop all internal requests.
348348
_ = grpcListener.Close()
349+
// Stop all pools
350+
if pools := conn.GetPools(); pools != nil {
351+
pools.RemoveAll()
352+
}
349353
}()
350354

351355
st.zero.closer.AddRunning(2)

dgraph/cmd/zero/zero_test.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"testing"
1818
"time"
1919

20+
"github.com/docker/docker/client"
2021
"github.com/stretchr/testify/require"
2122
"google.golang.org/grpc"
2223
"google.golang.org/grpc/credentials/insecure"
@@ -154,3 +155,38 @@ func TestZeroHealth(t *testing.T) {
154155
require.NoError(t, err)
155156
require.Equal(t, string(body), "OK")
156157
}
158+
159+
func TestZeroGracefulShutdown(t *testing.T) {
160+
// This test verifies that Zero shuts down cleanly without hanging.
161+
// It catches issues like closer miscount bugs where SignalAndWait() would block indefinitely.
162+
163+
instance := testutil.GetContainerInstance(testutil.DockerPrefix, "zero1")
164+
c := instance.GetContainer()
165+
require.NotNil(t, c, "zero1 container not found")
166+
167+
containerID := c.ID
168+
169+
startTime := time.Now()
170+
err := testutil.DockerRun("zero1", testutil.Stop)
171+
shutdownDuration := time.Since(startTime)
172+
173+
require.NoError(t, err, "Failed to stop zero1 container")
174+
cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
175+
require.NoError(t, err)
176+
177+
inspect, err := cli.ContainerInspect(context.Background(), containerID)
178+
require.NoError(t, err)
179+
require.False(t, inspect.State.Running, "Container should not be running after stop")
180+
181+
if inspect.State.ExitCode == 137 {
182+
t.Errorf("Zero was killed (exit code 137) instead of shutting down gracefully. "+
183+
"This may indicate a hanging goroutine or closer miscount. Shutdown took %v", shutdownDuration)
184+
}
185+
186+
// Restart the container so other tests can continue
187+
err = testutil.DockerRun("zero1", testutil.Start)
188+
require.NoError(t, err, "Failed to restart zero1 container")
189+
190+
err = instance.BestEffortWaitForHealthy(6080)
191+
require.NoError(t, err, "Zero did not become healthy after restart")
192+
}

0 commit comments

Comments
 (0)