Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ src/build_version.cc
build/
buildtrees
deps
pkg
#pkg

#develop container
.devcontainer
Expand Down
9 changes: 5 additions & 4 deletions codis/pkg/proxy/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"pika/codis/v2/pkg/proxy/redis"
Expand Down Expand Up @@ -84,14 +85,14 @@ func (bc *BackendConn) KeepAlive() bool {
}
switch bc.state.Int64() {
default:
m := &Request{}
m := &Request{ReceiveTime: new(int64), SendToPikaTime: new(int64), ReceiveFromPikaTime: new(int64)}
m.Multi = []*redis.Resp{
redis.NewBulkBytes([]byte("PING")),
}
bc.PushBack(m)

case stateDataStale:
m := &Request{}
m := &Request{ReceiveTime: new(int64), SendToPikaTime: new(int64), ReceiveFromPikaTime: new(int64)}
m.Multi = []*redis.Resp{
redis.NewBulkBytes([]byte("INFO")),
}
Expand Down Expand Up @@ -284,7 +285,7 @@ func (bc *BackendConn) loopReader(tasks <-chan *Request, c *redis.Conn, round in
}()
for r := range tasks {
resp, err := c.Decode()
r.ReceiveFromServerTime = time.Now().UnixNano()
atomic.StoreInt64(r.ReceiveFromPikaTime, time.Now().UnixNano())
if err != nil {
return bc.setResponse(r, nil, fmt.Errorf("backend conn failure, %s", err))
}
Expand Down Expand Up @@ -364,7 +365,7 @@ func (bc *BackendConn) loopWriter(round int) (err error) {
} else {
tasks <- r
}
r.SendToServerTime = time.Now().UnixNano()
atomic.CompareAndSwapInt64(r.SendToPikaTime, 0, time.Now().UnixNano())
}
return nil
}
Expand Down
15 changes: 9 additions & 6 deletions codis/pkg/proxy/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package proxy

import (
"sync"
"sync/atomic"
"unsafe"

"pika/codis/v2/pkg/proxy/redis"
Expand All @@ -21,11 +22,11 @@ type Request struct {
OpStr string
OpFlag

Database int32
ReceiveTime int64
SendToServerTime int64
ReceiveFromServerTime int64
TasksLen int64
Database int32
ReceiveTime *int64
SendToPikaTime *int64
ReceiveFromPikaTime *int64
TasksLen int64

*redis.Resp
Err error
Expand All @@ -47,14 +48,16 @@ func (r *Request) MakeSubRequest(n int) []Request {
x.Broken = r.Broken
x.Database = r.Database
x.ReceiveTime = r.ReceiveTime
x.SendToPikaTime = r.SendToPikaTime
x.ReceiveFromPikaTime = r.ReceiveFromPikaTime
}
return sub
}

const GOLDEN_RATIO_PRIME_32 = 0x9e370001

func (r *Request) Seed16() uint {
h32 := uint32(r.ReceiveTime) + uint32(uintptr(unsafe.Pointer(r)))
h32 := uint32(atomic.LoadInt64(r.ReceiveTime)) + uint32(uintptr(unsafe.Pointer(r)))
h32 *= GOLDEN_RATIO_PRIME_32
return uint(h32 >> 16)
}
Expand Down
37 changes: 23 additions & 14 deletions codis/pkg/proxy/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"pika/codis/v2/pkg/models"
Expand Down Expand Up @@ -184,7 +185,10 @@ func (s *Session) loopReader(tasks *RequestChan, d *Router) (err error) {
r.Multi = multi
r.Batch = &sync.WaitGroup{}
r.Database = s.database
r.ReceiveTime = start.UnixNano()
r.ReceiveTime = new(int64)
r.SendToPikaTime = new(int64)
r.ReceiveFromPikaTime = new(int64)
*r.ReceiveTime = time.Now().UnixNano()
r.TasksLen = int64(tasksLen)

if err := s.handleRequest(r, d); err != nil {
Expand Down Expand Up @@ -230,14 +234,16 @@ func (s *Session) loopWriter(tasks *RequestChan) (err error) {
if err := p.Encode(resp); err != nil {
return s.incrOpFails(r, err)
}
nowTime := time.Now().UnixNano()
receiveTime := atomic.LoadInt64(r.ReceiveTime)
duration := int64((nowTime - receiveTime) / 1e3)

fflush := tasks.IsEmpty()
if err := p.Flush(fflush); err != nil {
return s.incrOpFails(r, err)
} else {
s.incrOpStats(r, resp.Type)
s.incrOpStats(r, resp.Type, duration)
}
nowTime := time.Now().UnixNano()
duration := int64((nowTime - r.ReceiveTime) / 1e3)
s.updateMaxDelay(duration, r)
if fflush {
s.flushOpStats(false)
Expand All @@ -248,19 +254,22 @@ func (s *Session) loopWriter(tasks *RequestChan) (err error) {
//Record the waiting time from receiving the request from the client to sending it to the backend server
//the waiting time from sending the request to the backend server to receiving the response from the server
//the waiting time from receiving the server response to sending it to the client
sendToPikaTime := atomic.LoadInt64(r.SendToPikaTime)
receiveFromPikaTime := atomic.LoadInt64(r.ReceiveFromPikaTime)

var d0, d1, d2 int64 = -1, -1, -1
if r.SendToServerTime > 0 {
d0 = int64((r.SendToServerTime - r.ReceiveTime) / 1e3)
if sendToPikaTime > 0 {
d0 = int64((sendToPikaTime - receiveTime) / 1e3)
}
if r.SendToServerTime > 0 && r.ReceiveFromServerTime > 0 {
d1 = int64((r.ReceiveFromServerTime - r.SendToServerTime) / 1e3)
if sendToPikaTime > 0 && receiveFromPikaTime > 0 {
d1 = int64((receiveFromPikaTime - sendToPikaTime) / 1e3)
}
if r.ReceiveFromServerTime > 0 {
d2 = int64((nowTime - r.ReceiveFromServerTime) / 1e3)
if receiveFromPikaTime > 0 {
d2 = int64((nowTime - receiveFromPikaTime) / 1e3)
}
index := getWholeCmd(r.Multi, cmd)
log.Errorf("%s remote:%s, start_time(us):%d, duration(us): [%d, %d, %d], %d, tasksLen:%d, command:[%s].",
time.Unix(r.ReceiveTime/1e9, 0).Format("2006-01-02 15:04:05"), s.Conn.RemoteAddr(), r.ReceiveTime/1e3, d0, d1, d2, duration, r.TasksLen, string(cmd[:index]))
log.Warnf("%s remote:%s, start_time(us):%d, duration(us): [%d, %d, %d], %d, tasksLen:%d, command:[%s].",
time.Unix(receiveTime/1e9, 0).Format("2006-01-02 15:04:05"), s.Conn.RemoteAddr(), receiveTime/1e3, d0, d1, d2, duration, r.TasksLen, string(cmd[:index]))
}
return nil
})
Expand Down Expand Up @@ -675,10 +684,10 @@ func (s *Session) getOpStats(opstr string) *opStats {
return e
}

func (s *Session) incrOpStats(r *Request, t redis.RespType) {
func (s *Session) incrOpStats(r *Request, t redis.RespType, duration int64) {
e := s.getOpStats(r.OpStr)
e.calls.Incr()
e.nsecs.Add(time.Now().UnixNano() - r.ReceiveTime)
e.nsecs.Add(duration)
switch t {
case redis.TypeError:
e.redis.errors.Incr()
Expand Down
2 changes: 1 addition & 1 deletion codis/pkg/utils/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func New(writer io.Writer, prefix string) *Logger {
}
return &Logger{
out: out,
log: log.New(out, prefix, LstdFlags|Lshortfile),
log: log.New(out, prefix, LstdFlags|Lshortfile|Lmicroseconds),
level: LevelAll,
trace: LevelError,
}
Expand Down
Loading