From f1ff651e1f0202fc84825267592812be7f3a1ca7 Mon Sep 17 00:00:00 2001 From: wangshaoyi Date: Fri, 13 Jun 2025 11:54:57 +0800 Subject: [PATCH] fix proxy log print error && request time stat error --- .gitignore | 2 +- codis/pkg/proxy/backend.go | 9 +++++---- codis/pkg/proxy/request.go | 15 +++++++++------ codis/pkg/proxy/session.go | 37 +++++++++++++++++++++++-------------- codis/pkg/utils/log/log.go | 2 +- 5 files changed, 39 insertions(+), 26 deletions(-) diff --git a/.gitignore b/.gitignore index ab567194a1..38ff7c5645 100644 --- a/.gitignore +++ b/.gitignore @@ -65,7 +65,7 @@ src/build_version.cc build/ buildtrees deps -pkg +#pkg #develop container .devcontainer diff --git a/codis/pkg/proxy/backend.go b/codis/pkg/proxy/backend.go index 7c76a82176..aa9d4dda13 100644 --- a/codis/pkg/proxy/backend.go +++ b/codis/pkg/proxy/backend.go @@ -10,6 +10,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "pika/codis/v2/pkg/proxy/redis" @@ -84,14 +85,14 @@ func (bc *BackendConn) KeepAlive() bool { } switch bc.state.Int64() { default: - m := &Request{} + m := &Request{ReceiveTime: new(int64), SendToPikaTime: new(int64), ReceiveFromPikaTime: new(int64)} m.Multi = []*redis.Resp{ redis.NewBulkBytes([]byte("PING")), } bc.PushBack(m) case stateDataStale: - m := &Request{} + m := &Request{ReceiveTime: new(int64), SendToPikaTime: new(int64), ReceiveFromPikaTime: new(int64)} m.Multi = []*redis.Resp{ redis.NewBulkBytes([]byte("INFO")), } @@ -284,7 +285,7 @@ func (bc *BackendConn) loopReader(tasks <-chan *Request, c *redis.Conn, round in }() for r := range tasks { resp, err := c.Decode() - r.ReceiveFromServerTime = time.Now().UnixNano() + atomic.StoreInt64(r.ReceiveFromPikaTime, time.Now().UnixNano()) if err != nil { return bc.setResponse(r, nil, fmt.Errorf("backend conn failure, %s", err)) } @@ -364,7 +365,7 @@ func (bc *BackendConn) loopWriter(round int) (err error) { } else { tasks <- r } - r.SendToServerTime = time.Now().UnixNano() + atomic.CompareAndSwapInt64(r.SendToPikaTime, 0, time.Now().UnixNano()) } return nil } diff --git a/codis/pkg/proxy/request.go b/codis/pkg/proxy/request.go index 26158557c2..644e83861a 100644 --- a/codis/pkg/proxy/request.go +++ b/codis/pkg/proxy/request.go @@ -5,6 +5,7 @@ package proxy import ( "sync" + "sync/atomic" "unsafe" "pika/codis/v2/pkg/proxy/redis" @@ -21,11 +22,11 @@ type Request struct { OpStr string OpFlag - Database int32 - ReceiveTime int64 - SendToServerTime int64 - ReceiveFromServerTime int64 - TasksLen int64 + Database int32 + ReceiveTime *int64 + SendToPikaTime *int64 + ReceiveFromPikaTime *int64 + TasksLen int64 *redis.Resp Err error @@ -47,6 +48,8 @@ func (r *Request) MakeSubRequest(n int) []Request { x.Broken = r.Broken x.Database = r.Database x.ReceiveTime = r.ReceiveTime + x.SendToPikaTime = r.SendToPikaTime + x.ReceiveFromPikaTime = r.ReceiveFromPikaTime } return sub } @@ -54,7 +57,7 @@ func (r *Request) MakeSubRequest(n int) []Request { const GOLDEN_RATIO_PRIME_32 = 0x9e370001 func (r *Request) Seed16() uint { - h32 := uint32(r.ReceiveTime) + uint32(uintptr(unsafe.Pointer(r))) + h32 := uint32(atomic.LoadInt64(r.ReceiveTime)) + uint32(uintptr(unsafe.Pointer(r))) h32 *= GOLDEN_RATIO_PRIME_32 return uint(h32 >> 16) } diff --git a/codis/pkg/proxy/session.go b/codis/pkg/proxy/session.go index 26137d2182..3a9ac2b4ab 100644 --- a/codis/pkg/proxy/session.go +++ b/codis/pkg/proxy/session.go @@ -10,6 +10,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "pika/codis/v2/pkg/models" @@ -184,7 +185,10 @@ func (s *Session) loopReader(tasks *RequestChan, d *Router) (err error) { r.Multi = multi r.Batch = &sync.WaitGroup{} r.Database = s.database - r.ReceiveTime = start.UnixNano() + r.ReceiveTime = new(int64) + r.SendToPikaTime = new(int64) + r.ReceiveFromPikaTime = new(int64) + *r.ReceiveTime = time.Now().UnixNano() r.TasksLen = int64(tasksLen) if err := s.handleRequest(r, d); err != nil { @@ -230,14 +234,16 @@ func (s *Session) loopWriter(tasks *RequestChan) (err error) { if err := p.Encode(resp); err != nil { return s.incrOpFails(r, err) } + nowTime := time.Now().UnixNano() + receiveTime := atomic.LoadInt64(r.ReceiveTime) + duration := int64((nowTime - receiveTime) / 1e3) + fflush := tasks.IsEmpty() if err := p.Flush(fflush); err != nil { return s.incrOpFails(r, err) } else { - s.incrOpStats(r, resp.Type) + s.incrOpStats(r, resp.Type, duration) } - nowTime := time.Now().UnixNano() - duration := int64((nowTime - r.ReceiveTime) / 1e3) s.updateMaxDelay(duration, r) if fflush { s.flushOpStats(false) @@ -248,19 +254,22 @@ func (s *Session) loopWriter(tasks *RequestChan) (err error) { //Record the waiting time from receiving the request from the client to sending it to the backend server //the waiting time from sending the request to the backend server to receiving the response from the server //the waiting time from receiving the server response to sending it to the client + sendToPikaTime := atomic.LoadInt64(r.SendToPikaTime) + receiveFromPikaTime := atomic.LoadInt64(r.ReceiveFromPikaTime) + var d0, d1, d2 int64 = -1, -1, -1 - if r.SendToServerTime > 0 { - d0 = int64((r.SendToServerTime - r.ReceiveTime) / 1e3) + if sendToPikaTime > 0 { + d0 = int64((sendToPikaTime - receiveTime) / 1e3) } - if r.SendToServerTime > 0 && r.ReceiveFromServerTime > 0 { - d1 = int64((r.ReceiveFromServerTime - r.SendToServerTime) / 1e3) + if sendToPikaTime > 0 && receiveFromPikaTime > 0 { + d1 = int64((receiveFromPikaTime - sendToPikaTime) / 1e3) } - if r.ReceiveFromServerTime > 0 { - d2 = int64((nowTime - r.ReceiveFromServerTime) / 1e3) + if receiveFromPikaTime > 0 { + d2 = int64((nowTime - receiveFromPikaTime) / 1e3) } index := getWholeCmd(r.Multi, cmd) - log.Errorf("%s remote:%s, start_time(us):%d, duration(us): [%d, %d, %d], %d, tasksLen:%d, command:[%s].", - time.Unix(r.ReceiveTime/1e9, 0).Format("2006-01-02 15:04:05"), s.Conn.RemoteAddr(), r.ReceiveTime/1e3, d0, d1, d2, duration, r.TasksLen, string(cmd[:index])) + log.Warnf("%s remote:%s, start_time(us):%d, duration(us): [%d, %d, %d], %d, tasksLen:%d, command:[%s].", + time.Unix(receiveTime/1e9, 0).Format("2006-01-02 15:04:05"), s.Conn.RemoteAddr(), receiveTime/1e3, d0, d1, d2, duration, r.TasksLen, string(cmd[:index])) } return nil }) @@ -675,10 +684,10 @@ func (s *Session) getOpStats(opstr string) *opStats { return e } -func (s *Session) incrOpStats(r *Request, t redis.RespType) { +func (s *Session) incrOpStats(r *Request, t redis.RespType, duration int64) { e := s.getOpStats(r.OpStr) e.calls.Incr() - e.nsecs.Add(time.Now().UnixNano() - r.ReceiveTime) + e.nsecs.Add(duration) switch t { case redis.TypeError: e.redis.errors.Incr() diff --git a/codis/pkg/utils/log/log.go b/codis/pkg/utils/log/log.go index 5c5b03617f..13f4d6b5af 100644 --- a/codis/pkg/utils/log/log.go +++ b/codis/pkg/utils/log/log.go @@ -138,7 +138,7 @@ func New(writer io.Writer, prefix string) *Logger { } return &Logger{ out: out, - log: log.New(out, prefix, LstdFlags|Lshortfile), + log: log.New(out, prefix, LstdFlags|Lshortfile|Lmicroseconds), level: LevelAll, trace: LevelError, }