Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 57 additions & 10 deletions codis/pkg/proxy/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,19 @@ func (d *forwardSync) process(s *Slot, r *Request, hkey []byte) (*BackendConn, e
}
if s.migrate.bc != nil && len(hkey) != 0 {
if err := d.slotsmgrt(s, hkey, r.Database, r.Seed16()); err != nil {
log.Debugf("slot-%04d migrate from = %s to %s failed: hash key = '%s', database = %d, error = %s",
log.Warnf("slot-%04d migrate from = %s to %s failed: hash key = '%s', database = %d, error = %s",
s.id, s.migrate.bc.Addr(), s.backend.bc.Addr(), hkey, r.Database, err)
return nil, err
// Migration failed, but continue to forward directly to backend instead of returning error
log.Warnf("slot-%04d migration failed, forwarding directly to backend", s.id)
}
}
r.Group = &s.refs
r.Group.Add(1)
return d.forward2(s, r), nil
bc := d.forward2(s, r)
if bc == nil {
return nil, fmt.Errorf("slot-%04d backend connection unavailable", s.id)
}
return bc, nil
}

type forwardSemiAsync struct {
Expand Down Expand Up @@ -114,9 +119,11 @@ func (d *forwardSemiAsync) process(s *Slot, r *Request, hkey []byte) (_ *Backend
resp, moved, err := d.slotsmgrtExecWrapper(s, hkey, r.Database, r.Seed16(), r.Multi)
switch {
case err != nil:
log.Debugf("slot-%04d migrate from = %s to %s failed: hash key = '%s', error = %s",
log.Warnf("slot-%04d migrate from = %s to %s failed: hash key = '%s', error = %s",
s.id, s.migrate.bc.Addr(), s.backend.bc.Addr(), hkey, err)
return nil, false, err
// Migration failed, but continue to forward directly to backend instead of returning error
log.Warnf("slot-%04d migration failed in semi-async mode, forwarding directly to backend", s.id)
// Fall through to use backend connection
case !moved:
switch {
case resp != nil:
Expand All @@ -128,13 +135,30 @@ func (d *forwardSemiAsync) process(s *Slot, r *Request, hkey []byte) (_ *Backend
}
r.Group = &s.refs
r.Group.Add(1)
return d.forward2(s, r), false, nil

bc := d.forward2(s, r)
if bc == nil {
return nil, false, fmt.Errorf("slot-%04d backend connection unavailable", s.id)
}
return bc, false, nil
}

type forwardHelper struct {
}

func (d *forwardHelper) slotsmgrt(s *Slot, hkey []byte, database int32, seed uint) error {
// Add nil pointer check for migrate.bc
if s.migrate.bc == nil {
log.Errorf("slot-%04d migrate connection is nil", s.id)
return fmt.Errorf("migrate connection is nil for slot-%04d", s.id)
}

// Add nil pointer check for backend.bc
if s.backend.bc == nil {
log.Errorf("slot-%04d backend connection is nil", s.id)
return fmt.Errorf("backend connection is nil for slot-%04d", s.id)
}

m := &Request{}
m.Multi = []*redis.Resp{
redis.NewBulkBytes([]byte("SLOTSMGRTTAGONE")),
Expand All @@ -145,8 +169,13 @@ func (d *forwardHelper) slotsmgrt(s *Slot, hkey []byte, database int32, seed uin
}
m.Batch = &sync.WaitGroup{}

s.migrate.bc.BackendConn(database, seed, true, m.OpFlag.IsQuick()).PushBack(m)

bc := s.migrate.bc.BackendConn(database, seed, true, m.OpFlag.IsQuick())
if bc == nil {
log.Errorf("slot-%04d failed to get backend connection for migration", s.id)
return fmt.Errorf("failed to get backend connection for slot-%04d", s.id)
}

bc.PushBack(m)
m.Batch.Wait()

if err := m.Err; err != nil {
Expand All @@ -167,6 +196,12 @@ func (d *forwardHelper) slotsmgrt(s *Slot, hkey []byte, database int32, seed uin
}

func (d *forwardHelper) slotsmgrtExecWrapper(s *Slot, hkey []byte, database int32, seed uint, multi []*redis.Resp) (_ *redis.Resp, moved bool, _ error) {
// Add nil pointer check for migrate.bc
if s.migrate.bc == nil {
log.Errorf("slot-%04d migrate connection is nil", s.id)
return nil, false, fmt.Errorf("migrate connection is nil for slot-%04d", s.id)
}

m := &Request{}
m.Multi = make([]*redis.Resp, 0, 2+len(multi))
m.Multi = append(m.Multi,
Expand All @@ -176,8 +211,13 @@ func (d *forwardHelper) slotsmgrtExecWrapper(s *Slot, hkey []byte, database int3
m.Multi = append(m.Multi, multi...)
m.Batch = &sync.WaitGroup{}

s.migrate.bc.BackendConn(database, seed, true, m.OpFlag.IsQuick()).PushBack(m)

bc := s.migrate.bc.BackendConn(database, seed, true, m.OpFlag.IsQuick())
if bc == nil {
log.Errorf("slot-%04d failed to get backend connection for migration", s.id)
return nil, false, fmt.Errorf("failed to get backend connection for slot-%04d", s.id)
}

bc.PushBack(m)
m.Batch.Wait()

if err := m.Err; err != nil {
Expand Down Expand Up @@ -227,6 +267,13 @@ func (d *forwardHelper) forward2(s *Slot, r *Request) *BackendConn {
}
}
}

// Add nil pointer check for backend.bc to prevent crash
if s.backend.bc == nil {
log.Errorf("slot-%04d backend connection is nil", s.id)
return nil
}

// fix:https://github.com/OpenAtomFoundation/pika/issues/2174
return s.backend.bc.BackendConn(database, uint(s.id), true, r.OpFlag.IsQuick())
}
Loading