From de96e33698ae70bf791b69a4deadcce34730e807 Mon Sep 17 00:00:00 2001 From: chejinge Date: Thu, 18 Dec 2025 18:44:54 +0800 Subject: [PATCH] =?UTF-8?q?fix:proxy=E5=B4=A9=E6=BA=83?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- codis/pkg/proxy/forward.go | 67 ++++++++++++++++++++++++++++++++------ 1 file changed, 57 insertions(+), 10 deletions(-) diff --git a/codis/pkg/proxy/forward.go b/codis/pkg/proxy/forward.go index 4ccb929974..52756cc9e3 100644 --- a/codis/pkg/proxy/forward.go +++ b/codis/pkg/proxy/forward.go @@ -51,14 +51,19 @@ func (d *forwardSync) process(s *Slot, r *Request, hkey []byte) (*BackendConn, e } if s.migrate.bc != nil && len(hkey) != 0 { if err := d.slotsmgrt(s, hkey, r.Database, r.Seed16()); err != nil { - log.Debugf("slot-%04d migrate from = %s to %s failed: hash key = '%s', database = %d, error = %s", + log.Warnf("slot-%04d migrate from = %s to %s failed: hash key = '%s', database = %d, error = %s", s.id, s.migrate.bc.Addr(), s.backend.bc.Addr(), hkey, r.Database, err) - return nil, err + // Migration failed, but continue to forward directly to backend instead of returning error + log.Warnf("slot-%04d migration failed, forwarding directly to backend", s.id) } } r.Group = &s.refs r.Group.Add(1) - return d.forward2(s, r), nil + bc := d.forward2(s, r) + if bc == nil { + return nil, fmt.Errorf("slot-%04d backend connection unavailable", s.id) + } + return bc, nil } type forwardSemiAsync struct { @@ -114,9 +119,11 @@ func (d *forwardSemiAsync) process(s *Slot, r *Request, hkey []byte) (_ *Backend resp, moved, err := d.slotsmgrtExecWrapper(s, hkey, r.Database, r.Seed16(), r.Multi) switch { case err != nil: - log.Debugf("slot-%04d migrate from = %s to %s failed: hash key = '%s', error = %s", + log.Warnf("slot-%04d migrate from = %s to %s failed: hash key = '%s', error = %s", s.id, s.migrate.bc.Addr(), s.backend.bc.Addr(), hkey, err) - return nil, false, err + // Migration failed, but continue to forward directly to backend instead of returning error + log.Warnf("slot-%04d migration failed in semi-async mode, forwarding directly to backend", s.id) + // Fall through to use backend connection case !moved: switch { case resp != nil: @@ -128,13 +135,30 @@ func (d *forwardSemiAsync) process(s *Slot, r *Request, hkey []byte) (_ *Backend } r.Group = &s.refs r.Group.Add(1) - return d.forward2(s, r), false, nil + + bc := d.forward2(s, r) + if bc == nil { + return nil, false, fmt.Errorf("slot-%04d backend connection unavailable", s.id) + } + return bc, false, nil } type forwardHelper struct { } func (d *forwardHelper) slotsmgrt(s *Slot, hkey []byte, database int32, seed uint) error { + // Add nil pointer check for migrate.bc + if s.migrate.bc == nil { + log.Errorf("slot-%04d migrate connection is nil", s.id) + return fmt.Errorf("migrate connection is nil for slot-%04d", s.id) + } + + // Add nil pointer check for backend.bc + if s.backend.bc == nil { + log.Errorf("slot-%04d backend connection is nil", s.id) + return fmt.Errorf("backend connection is nil for slot-%04d", s.id) + } + m := &Request{} m.Multi = []*redis.Resp{ redis.NewBulkBytes([]byte("SLOTSMGRTTAGONE")), @@ -145,8 +169,13 @@ func (d *forwardHelper) slotsmgrt(s *Slot, hkey []byte, database int32, seed uin } m.Batch = &sync.WaitGroup{} - s.migrate.bc.BackendConn(database, seed, true, m.OpFlag.IsQuick()).PushBack(m) - + bc := s.migrate.bc.BackendConn(database, seed, true, m.OpFlag.IsQuick()) + if bc == nil { + log.Errorf("slot-%04d failed to get backend connection for migration", s.id) + return fmt.Errorf("failed to get backend connection for slot-%04d", s.id) + } + + bc.PushBack(m) m.Batch.Wait() if err := m.Err; err != nil { @@ -167,6 +196,12 @@ func (d *forwardHelper) slotsmgrt(s *Slot, hkey []byte, database int32, seed uin } func (d *forwardHelper) slotsmgrtExecWrapper(s *Slot, hkey []byte, database int32, seed uint, multi []*redis.Resp) (_ *redis.Resp, moved bool, _ error) { + // Add nil pointer check for migrate.bc + if s.migrate.bc == nil { + log.Errorf("slot-%04d migrate connection is nil", s.id) + return nil, false, fmt.Errorf("migrate connection is nil for slot-%04d", s.id) + } + m := &Request{} m.Multi = make([]*redis.Resp, 0, 2+len(multi)) m.Multi = append(m.Multi, @@ -176,8 +211,13 @@ func (d *forwardHelper) slotsmgrtExecWrapper(s *Slot, hkey []byte, database int3 m.Multi = append(m.Multi, multi...) m.Batch = &sync.WaitGroup{} - s.migrate.bc.BackendConn(database, seed, true, m.OpFlag.IsQuick()).PushBack(m) - + bc := s.migrate.bc.BackendConn(database, seed, true, m.OpFlag.IsQuick()) + if bc == nil { + log.Errorf("slot-%04d failed to get backend connection for migration", s.id) + return nil, false, fmt.Errorf("failed to get backend connection for slot-%04d", s.id) + } + + bc.PushBack(m) m.Batch.Wait() if err := m.Err; err != nil { @@ -227,6 +267,13 @@ func (d *forwardHelper) forward2(s *Slot, r *Request) *BackendConn { } } } + + // Add nil pointer check for backend.bc to prevent crash + if s.backend.bc == nil { + log.Errorf("slot-%04d backend connection is nil", s.id) + return nil + } + // fix:https://github.com/OpenAtomFoundation/pika/issues/2174 return s.backend.bc.BackendConn(database, uint(s.id), true, r.OpFlag.IsQuick()) }