Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions conn/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"go.etcd.io/etcd/raft/v3/raftpb"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"

"google.golang.org/grpc/connectivity"
"google.golang.org/protobuf/proto"

"github.com/dgraph-io/badger/v4/y"
Expand Down Expand Up @@ -760,10 +762,13 @@ func (n *Node) joinCluster(ctx context.Context, rc *pb.RaftContext) (*api.Payloa
return nil, errors.Errorf("REUSE_RAFTID: Raft ID duplicates mine: %+v", rc)
}

// Check that the new node is not already part of the group.
// Reject if a peer with the same Raft ID is already registered at a
// different address AND that peer is still genuinely connected. Using the
// gRPC connection state is more accurate than IsHealthy(), which relies on
// a heartbeat timestamp and stays "healthy" for ~2s after the peer drops.
if addr, ok := n.Peer(rc.Id); ok && rc.Addr != addr {
// There exists a healthy connection to server with same id.
if _, err := GetPools().Get(addr); err == nil {
if pool, err := GetPools().Get(addr); err == nil &&
pool.Get().GetState() == connectivity.Ready {
return &api.Payload{}, errors.Errorf(
"REUSE_ADDR: IP Address same as existing peer: %s", addr)
}
Expand Down
7 changes: 4 additions & 3 deletions dgraphtest/local_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"sync"
"time"

cerrdefs "github.com/containerd/errdefs"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/api/types/network"
Expand Down Expand Up @@ -335,7 +336,7 @@ func (c *LocalCluster) destroyContainers() error {
wg.Add(1)
go func(z *zero) {
defer wg.Done()
if err := c.dcli.ContainerRemove(ctx, z.cid(), ro); err != nil {
if err := c.dcli.ContainerRemove(ctx, z.cid(), ro); err != nil && !cerrdefs.IsNotFound(err) {
errChan <- errors.Wrapf(err, "error removing zero [%v]", z.cname())
}
}(zo)
Expand All @@ -345,7 +346,7 @@ func (c *LocalCluster) destroyContainers() error {
wg.Add(1)
go func(a *alpha) {
defer wg.Done()
if err := c.dcli.ContainerRemove(ctx, a.cid(), ro); err != nil {
if err := c.dcli.ContainerRemove(ctx, a.cid(), ro); err != nil && !cerrdefs.IsNotFound(err) {
errChan <- errors.Wrapf(err, "error removing alpha [%v]", a.cname())
}
}(aa)
Expand Down Expand Up @@ -645,7 +646,7 @@ func (c *LocalCluster) RecreateZero(id int) error {
}

ro := container.RemoveOptions{RemoveVolumes: true, Force: true}
if err := c.dcli.ContainerRemove(ctx, z.cid(), ro); err != nil {
if err := c.dcli.ContainerRemove(ctx, z.cid(), ro); err != nil && !cerrdefs.IsNotFound(err) {
return errors.Wrapf(err, "error removing zero container [%v]", z.cname())
}

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/IBM/sarama v1.47.0
github.com/Masterminds/semver/v3 v3.4.0
github.com/blevesearch/bleve/v2 v2.5.7
github.com/containerd/errdefs v1.0.0
github.com/dgraph-io/badger/v4 v4.9.1
github.com/dgraph-io/dgo/v250 v250.0.0
github.com/dgraph-io/gqlgen v0.13.2
Expand Down Expand Up @@ -88,7 +89,6 @@ require (
github.com/cenkalti/backoff/v5 v5.0.3 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/chewxy/math32 v1.11.1 // indirect
github.com/containerd/errdefs v1.0.0 // indirect
github.com/containerd/errdefs/pkg v0.3.0 // indirect
github.com/containerd/log v0.1.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
Expand Down
Loading