Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 33 additions & 13 deletions conn/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,18 @@ func (p *Pools) GetAll() []*Pool {
return pool
}

// RemoveAll removes all pool entries.
func (p *Pools) RemoveAll() {
p.Lock()
defer p.Unlock()

for k, pool := range p.all {
glog.Warningf("CONN: Disconnecting from %s\n", k)
delete(p.all, k)
pool.shutdown()
}
}

// RemoveInvalid removes invalid nodes from the list of pools.
func (p *Pools) RemoveInvalid(state *pb.MembershipState) {
// Keeps track of valid IP addresses, assigned to active nodes. We do this
Expand Down Expand Up @@ -241,11 +253,10 @@ func (p *Pool) listenToHeartbeat() error {
}()

threshold := time.Now().Add(10 * time.Second)
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
ticker := time.Tick(time.Second)
for {
select {
case <-ticker.C:
case <-ticker:
// Don't check before at least 10s since start.
if time.Now().Before(threshold) {
continue
Expand Down Expand Up @@ -277,11 +288,19 @@ func (p *Pool) MonitorHealth() {

// We might have lost connection to the destination. In that case, re-dial
// the connection.
reconnect := func() {
// Returns true, if reconnection was successful
reconnect := func() bool {
reconnectionTicker := time.Tick(time.Second)
for {
time.Sleep(time.Second)
select {
case <-p.closer.HasBeenClosed():
glog.Infof("CONN: Returning from MonitorHealth for %s", p.Addr)
return false
case <-reconnectionTicker:
}

if err := p.closer.Ctx().Err(); err != nil {
return
return false
}
ctx, cancel := context.WithTimeout(p.closer.Ctx(), 10*time.Second)
conn, err := grpc.NewClient(p.Addr, p.dialOpts...)
Expand All @@ -298,7 +317,7 @@ func (p *Pool) MonitorHealth() {
}
p.conn = conn
p.Unlock()
return
return true
}
glog.Errorf("CONN: Unable to connect with %s : %s\n", p.Addr, err)
if conn != nil {
Expand All @@ -309,19 +328,20 @@ func (p *Pool) MonitorHealth() {
}
}

ticker := time.Tick(time.Second)
for {
select {
case <-p.closer.HasBeenClosed():
glog.Infof("CONN: Returning from MonitorHealth for %s", p.Addr)
return
default:
err := p.listenToHeartbeat()
if err != nil {
reconnect()
case <-ticker:
}

err := p.listenToHeartbeat()
if err != nil {
if reconnect() {
glog.Infof("CONN: Re-established connection with %s.\n", p.Addr)
}
// Sleep for a bit before retrying.
time.Sleep(echoDuration)
}
}
}
Expand Down
10 changes: 9 additions & 1 deletion dgraph/cmd/zero/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -879,17 +879,25 @@ func (n *node) Run() {
// snapshot can cause select loop to block while deleting entries, so run
// it in goroutine
readStateCh := make(chan raft.ReadState, 100)
closer := z.NewCloser(5)
closer := z.NewCloser(0)
Comment thread
xqqp marked this conversation as resolved.
defer func() {
closer.SignalAndWait()
n.closer.Done()
glog.Infof("Zero Node.Run finished.")
}()

closer.AddRunning(1)
go n.snapshotPeriodically(closer)

closer.AddRunning(1)
go n.updateZeroMembershipPeriodically(closer)

closer.AddRunning(1)
go n.checkQuorum(closer)

closer.AddRunning(1)
go n.RunReadIndexLoop(closer, readStateCh)

if !x.WorkerConfig.HardSync {
closer.AddRunning(1)
go x.StoreSync(n.Store, closer)
Expand Down
4 changes: 4 additions & 0 deletions dgraph/cmd/zero/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,10 @@ func run() {
st.node.closer.SignalAndWait()
// Stop all internal requests.
_ = grpcListener.Close()
// Stop all pools
if pools := conn.GetPools(); pools != nil {
pools.RemoveAll()
}
}()

st.zero.closer.AddRunning(2)
Expand Down
36 changes: 36 additions & 0 deletions dgraph/cmd/zero/zero_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"testing"
"time"

"github.com/docker/docker/client"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
Expand Down Expand Up @@ -154,3 +155,38 @@ func TestZeroHealth(t *testing.T) {
require.NoError(t, err)
require.Equal(t, string(body), "OK")
}

func TestZeroGracefulShutdown(t *testing.T) {
// This test verifies that Zero shuts down cleanly without hanging.
// It catches issues like closer miscount bugs where SignalAndWait() would block indefinitely.

instance := testutil.GetContainerInstance(testutil.DockerPrefix, "zero1")
c := instance.GetContainer()
require.NotNil(t, c, "zero1 container not found")

containerID := c.ID

startTime := time.Now()
err := testutil.DockerRun("zero1", testutil.Stop)
shutdownDuration := time.Since(startTime)

require.NoError(t, err, "Failed to stop zero1 container")
cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
require.NoError(t, err)

inspect, err := cli.ContainerInspect(context.Background(), containerID)
require.NoError(t, err)
require.False(t, inspect.State.Running, "Container should not be running after stop")

if inspect.State.ExitCode == 137 {
t.Errorf("Zero was killed (exit code 137) instead of shutting down gracefully. "+
"This may indicate a hanging goroutine or closer miscount. Shutdown took %v", shutdownDuration)
}

// Restart the container so other tests can continue
err = testutil.DockerRun("zero1", testutil.Start)
require.NoError(t, err, "Failed to restart zero1 container")

err = instance.BestEffortWaitForHealthy(6080)
require.NoError(t, err, "Zero did not become healthy after restart")
}
Loading