@@ -17,6 +17,7 @@ import (
1717 "time"
1818
1919 "github.com/dgryski/go-farm"
20+ "github.com/dustin/go-humanize"
2021 "github.com/dustin/go-humanize/english"
2122 "google.golang.org/grpc"
2223 "google.golang.org/grpc/codes"
@@ -70,6 +71,9 @@ type loader struct {
7071 // To get time elapsed
7172 start time.Time
7273
74+ inflight int32
75+ conc int32
76+
7377 conflicts map [uint64 ]struct {}
7478 uidsLock sync.RWMutex
7579
@@ -156,13 +160,15 @@ func (l *loader) infinitelyRetry(req *request) {
156160}
157161
158162func (l * loader ) mutate (req * request ) error {
163+ atomic .AddInt32 (& l .inflight , 1 )
159164 txn := l .dc .NewTxn ()
160165 req .CommitNow = true
161166 request := & api.Request {
162167 CommitNow : true ,
163168 Mutations : []* api.Mutation {req .Mutation },
164169 }
165170 _ , err := txn .Do (l .opts .Ctx , request )
171+ atomic .AddInt32 (& l .inflight , - 1 )
166172 return err
167173}
168174
@@ -390,23 +396,52 @@ func (l *loader) deregister(req *request) {
390396// caller functions.
391397func (l * loader ) makeRequests () {
392398 defer l .requestsWg .Done ()
399+ atomic .AddInt32 (& l .conc , 1 )
400+ defer atomic .AddInt32 (& l .conc , - 1 )
393401
394402 buffer := make ([]* request , 0 , l .opts .bufferSize )
395- drain := func (maxSize int ) {
396- for len (buffer ) > maxSize {
397- i := 0
398- for _ , req := range buffer {
399- // If there is no conflict in req, we will use it
400- // and then it would shift all the other reqs in buffer
401- if ! l .addConflictKeys (req ) {
402- buffer [i ] = req
403- i ++
404- continue
405- }
406- // Req will no longer be part of a buffer
403+ var loops int
404+ drain := func () {
405+ i := 0
406+ for _ , req := range buffer {
407+ loops ++
408+ // If there is no conflict in req, we will use it
409+ // and then it would shift all the other reqs in buffer
410+ if ! l .addConflictKeys (req ) {
411+ buffer [i ] = req
412+ i ++
413+ continue
414+ }
415+ // Req will no longer be part of a buffer
416+ l .request (req )
417+ }
418+ buffer = buffer [:i ]
419+ }
420+
421+ t := time .NewTicker (5 * time .Second )
422+ defer t .Stop ()
423+
424+ outer:
425+ for {
426+ select {
427+ case req , ok := <- l .reqs :
428+ if ! ok {
429+ break outer
430+ }
431+ req .conflicts = l .conflictKeysForReq (req )
432+ if l .addConflictKeys (req ) {
407433 l .request (req )
434+ } else {
435+ buffer = append (buffer , req )
436+ }
437+
438+ case <- t .C :
439+ for {
440+ drain ()
441+ if len (buffer ) < l .opts .bufferSize {
442+ break
443+ }
408444 }
409- buffer = buffer [:i ]
410445 }
411446 }
412447
@@ -417,10 +452,13 @@ func (l *loader) makeRequests() {
417452 } else {
418453 buffer = append (buffer , req )
419454 }
420- drain (l .opts .bufferSize - 1 )
455+
456+ drain ()
457+ time .Sleep (100 * time .Millisecond )
421458 }
459+ fmt .Printf ("Looped %d times over buffered requests.\n " , loops )
422460
423- drain (0 )
461+ drain ()
424462}
425463
426464func (l * loader ) printCounters () {
@@ -434,8 +472,11 @@ func (l *loader) printCounters() {
434472 rate := float64 (counter .Nquads - last .Nquads ) / period .Seconds ()
435473 elapsed := time .Since (start ).Round (time .Second )
436474 timestamp := time .Now ().Format ("15:04:05Z0700" )
437- fmt .Printf ("[%s] Elapsed: %s Txns: %d N-Quads: %d N-Quads/s [last 5s]: %5.0f Aborts: %d\n " ,
438- timestamp , x .FixedDuration (elapsed ), counter .TxnsDone , counter .Nquads , rate , counter .Aborts )
475+ fmt .Printf ("[%s] Elapsed: %s Txns: %d N-Quads: %s N-Quads/s: %s" +
476+ " Inflight: %2d/%2d Aborts: %d\n " ,
477+ timestamp , x .FixedDuration (elapsed ), counter .TxnsDone ,
478+ humanize .Comma (int64 (counter .Nquads )), humanize .Comma (int64 (rate )),
479+ atomic .LoadInt32 (& l .inflight ), atomic .LoadInt32 (& l .conc ), counter .Aborts )
439480 last = counter
440481 }
441482}
0 commit comments