Skip to content

Commit 94cefc7

Browse files
author
wangshaoyi
committed
fix proxy log print error && request time stat error
1 parent 1db2fbd commit 94cefc7

5 files changed

Lines changed: 43 additions & 27 deletions

File tree

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ src/build_version.cc
6565
build/
6666
buildtrees
6767
deps
68-
pkg
68+
#pkg
6969

7070
#develop container
7171
.devcontainer

codis/pkg/proxy/backend.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"strconv"
1111
"strings"
1212
"sync"
13+
"sync/atomic"
1314
"time"
1415

1516
"pika/codis/v2/pkg/proxy/redis"
@@ -84,14 +85,14 @@ func (bc *BackendConn) KeepAlive() bool {
8485
}
8586
switch bc.state.Int64() {
8687
default:
87-
m := &Request{}
88+
m := &Request{ReceiveTime: new(int64), SendToPikaTime: new(int64), ReceiveFromPikaTime: new(int64)}
8889
m.Multi = []*redis.Resp{
8990
redis.NewBulkBytes([]byte("PING")),
9091
}
9192
bc.PushBack(m)
9293

9394
case stateDataStale:
94-
m := &Request{}
95+
m := &Request{ReceiveTime: new(int64), SendToPikaTime: new(int64), ReceiveFromPikaTime: new(int64)}
9596
m.Multi = []*redis.Resp{
9697
redis.NewBulkBytes([]byte("INFO")),
9798
}
@@ -284,7 +285,7 @@ func (bc *BackendConn) loopReader(tasks <-chan *Request, c *redis.Conn, round in
284285
}()
285286
for r := range tasks {
286287
resp, err := c.Decode()
287-
r.ReceiveFromServerTime = time.Now().UnixNano()
288+
atomic.StoreInt64(r.ReceiveFromPikaTime, time.Now().UnixNano())
288289
if err != nil {
289290
return bc.setResponse(r, nil, fmt.Errorf("backend conn failure, %s", err))
290291
}
@@ -364,7 +365,7 @@ func (bc *BackendConn) loopWriter(round int) (err error) {
364365
} else {
365366
tasks <- r
366367
}
367-
r.SendToServerTime = time.Now().UnixNano()
368+
atomic.CompareAndSwapInt64(r.SendToPikaTime, 0, time.Now().UnixNano())
368369
}
369370
return nil
370371
}

codis/pkg/proxy/request.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package proxy
55

66
import (
77
"sync"
8+
"sync/atomic"
89
"unsafe"
910

1011
"pika/codis/v2/pkg/proxy/redis"
@@ -21,11 +22,11 @@ type Request struct {
2122
OpStr string
2223
OpFlag
2324

24-
Database int32
25-
ReceiveTime int64
26-
SendToServerTime int64
27-
ReceiveFromServerTime int64
28-
TasksLen int64
25+
Database int32
26+
ReceiveTime *int64
27+
SendToPikaTime *int64
28+
ReceiveFromPikaTime *int64
29+
TasksLen int64
2930

3031
*redis.Resp
3132
Err error
@@ -47,14 +48,16 @@ func (r *Request) MakeSubRequest(n int) []Request {
4748
x.Broken = r.Broken
4849
x.Database = r.Database
4950
x.ReceiveTime = r.ReceiveTime
51+
x.SendToPikaTime = r.SendToPikaTime
52+
x.ReceiveFromPikaTime = r.ReceiveFromPikaTime
5053
}
5154
return sub
5255
}
5356

5457
const GOLDEN_RATIO_PRIME_32 = 0x9e370001
5558

5659
func (r *Request) Seed16() uint {
57-
h32 := uint32(r.ReceiveTime) + uint32(uintptr(unsafe.Pointer(r)))
60+
h32 := uint32(atomic.LoadInt64(r.ReceiveTime)) + uint32(uintptr(unsafe.Pointer(r)))
5861
h32 *= GOLDEN_RATIO_PRIME_32
5962
return uint(h32 >> 16)
6063
}

codis/pkg/proxy/session.go

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,16 @@ import (
1010
"strconv"
1111
"strings"
1212
"sync"
13+
"sync/atomic"
1314
"time"
1415

1516
"pika/codis/v2/pkg/models"
1617
"pika/codis/v2/pkg/proxy/redis"
1718
"pika/codis/v2/pkg/utils/errors"
1819
"pika/codis/v2/pkg/utils/log"
1920
"pika/codis/v2/pkg/utils/sync2/atomic2"
21+
22+
"github.com/HdrHistogram/hdrhistogram-go"
2023
)
2124

2225
type Session struct {
@@ -184,7 +187,10 @@ func (s *Session) loopReader(tasks *RequestChan, d *Router) (err error) {
184187
r.Multi = multi
185188
r.Batch = &sync.WaitGroup{}
186189
r.Database = s.database
187-
r.ReceiveTime = start.UnixNano()
190+
r.ReceiveTime = new(int64)
191+
r.SendToPikaTime = new(int64)
192+
r.ReceiveFromPikaTime = new(int64)
193+
*r.ReceiveTime = time.Now().UnixNano()
188194
r.TasksLen = int64(tasksLen)
189195

190196
if err := s.handleRequest(r, d); err != nil {
@@ -230,14 +236,16 @@ func (s *Session) loopWriter(tasks *RequestChan) (err error) {
230236
if err := p.Encode(resp); err != nil {
231237
return s.incrOpFails(r, err)
232238
}
239+
nowTime := time.Now().UnixNano()
240+
receiveTime := atomic.LoadInt64(r.ReceiveTime)
241+
duration := int64((nowTime - receiveTime) / 1e3)
242+
233243
fflush := tasks.IsEmpty()
234244
if err := p.Flush(fflush); err != nil {
235245
return s.incrOpFails(r, err)
236246
} else {
237-
s.incrOpStats(r, resp.Type)
247+
s.incrOpStats(r, resp.Type, duration)
238248
}
239-
nowTime := time.Now().UnixNano()
240-
duration := int64((nowTime - r.ReceiveTime) / 1e3)
241249
s.updateMaxDelay(duration, r)
242250
if fflush {
243251
s.flushOpStats(false)
@@ -248,19 +256,22 @@ func (s *Session) loopWriter(tasks *RequestChan) (err error) {
248256
//Record the waiting time from receiving the request from the client to sending it to the backend server
249257
//the waiting time from sending the request to the backend server to receiving the response from the server
250258
//the waiting time from receiving the server response to sending it to the client
259+
sendToPikaTime := atomic.LoadInt64(r.SendToPikaTime)
260+
receiveFromPikaTime := atomic.LoadInt64(r.ReceiveFromPikaTime)
261+
251262
var d0, d1, d2 int64 = -1, -1, -1
252-
if r.SendToServerTime > 0 {
253-
d0 = int64((r.SendToServerTime - r.ReceiveTime) / 1e3)
263+
if sendToPikaTime > 0 {
264+
d0 = int64((sendToPikaTime - receiveTime) / 1e3)
254265
}
255-
if r.SendToServerTime > 0 && r.ReceiveFromServerTime > 0 {
256-
d1 = int64((r.ReceiveFromServerTime - r.SendToServerTime) / 1e3)
266+
if sendToPikaTime > 0 && receiveFromPikaTime > 0 {
267+
d1 = int64((receiveFromPikaTime - sendToPikaTime) / 1e3)
257268
}
258-
if r.ReceiveFromServerTime > 0 {
259-
d2 = int64((nowTime - r.ReceiveFromServerTime) / 1e3)
269+
if receiveFromPikaTime > 0 {
270+
d2 = int64((nowTime - receiveFromPikaTime) / 1e3)
260271
}
261272
index := getWholeCmd(r.Multi, cmd)
262-
log.Errorf("%s remote:%s, start_time(us):%d, duration(us): [%d, %d, %d], %d, tasksLen:%d, command:[%s].",
263-
time.Unix(r.ReceiveTime/1e9, 0).Format("2006-01-02 15:04:05"), s.Conn.RemoteAddr(), r.ReceiveTime/1e3, d0, d1, d2, duration, r.TasksLen, string(cmd[:index]))
273+
log.Warnf("remote:%s, duration(us): [%d, %d, %d], %d, tasksLen:%d, command:[%s].",
274+
s.Conn.RemoteAddr(), d0, d1, d2, duration, r.TasksLen, string(cmd[:index]))
264275
}
265276
return nil
266277
})
@@ -669,16 +680,17 @@ func (s *Session) incrOpTotal() {
669680
func (s *Session) getOpStats(opstr string) *opStats {
670681
e := s.stats.opmap[opstr]
671682
if e == nil {
672-
e = &opStats{opstr: opstr}
683+
e = &opStats{opstr: opstr, hist: hdrhistogram.New(1, 10000, 3)}
673684
s.stats.opmap[opstr] = e
674685
}
675686
return e
676687
}
677688

678-
func (s *Session) incrOpStats(r *Request, t redis.RespType) {
689+
func (s *Session) incrOpStats(r *Request, t redis.RespType, duration int64) {
679690
e := s.getOpStats(r.OpStr)
680691
e.calls.Incr()
681-
e.nsecs.Add(time.Now().UnixNano() - r.ReceiveTime)
692+
e.nsecs.Add(duration)
693+
e.hist.RecordValue(duration / 1e3)
682694
switch t {
683695
case redis.TypeError:
684696
e.redis.errors.Incr()

codis/pkg/utils/log/log.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ func New(writer io.Writer, prefix string) *Logger {
138138
}
139139
return &Logger{
140140
out: out,
141-
log: log.New(out, prefix, LstdFlags|Lshortfile),
141+
log: log.New(out, prefix, LstdFlags|Lshortfile|Lmicroseconds),
142142
level: LevelAll,
143143
trace: LevelError,
144144
}

0 commit comments

Comments
 (0)