From 82c2d9fc1305f7f034dcd2723db48e41f45828b3 Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Mon, 17 Feb 2025 14:28:48 +0530 Subject: [PATCH 1/2] fix(live): fix how xidmap stores value along with upsertPredicate --- dgraph/cmd/live/batch.go | 75 +++++++++++++++++++++++++++++++--------- xidmap/xidmap.go | 10 ++++++ 2 files changed, 68 insertions(+), 17 deletions(-) diff --git a/dgraph/cmd/live/batch.go b/dgraph/cmd/live/batch.go index ca99a25d22b..a5ea0dc63a8 100644 --- a/dgraph/cmd/live/batch.go +++ b/dgraph/cmd/live/batch.go @@ -17,6 +17,7 @@ import ( "time" "github.com/dgryski/go-farm" + "github.com/dustin/go-humanize" "github.com/dustin/go-humanize/english" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -70,6 +71,9 @@ type loader struct { // To get time elapsed start time.Time + inflight int32 + conc int32 + conflicts map[uint64]struct{} uidsLock sync.RWMutex @@ -156,6 +160,7 @@ func (l *loader) infinitelyRetry(req *request) { } func (l *loader) mutate(req *request) error { + atomic.AddInt32(&l.inflight, 1) txn := l.dc.NewTxn() req.CommitNow = true request := &api.Request{ @@ -163,6 +168,7 @@ func (l *loader) mutate(req *request) error { Mutations: []*api.Mutation{req.Mutation}, } _, err := txn.Do(l.opts.Ctx, request) + atomic.AddInt32(&l.inflight, -1) return err } @@ -390,23 +396,52 @@ func (l *loader) deregister(req *request) { // caller functions. func (l *loader) makeRequests() { defer l.requestsWg.Done() + atomic.AddInt32(&l.conc, 1) + defer atomic.AddInt32(&l.conc, -1) buffer := make([]*request, 0, l.opts.bufferSize) - drain := func(maxSize int) { - for len(buffer) > maxSize { - i := 0 - for _, req := range buffer { - // If there is no conflict in req, we will use it - // and then it would shift all the other reqs in buffer - if !l.addConflictKeys(req) { - buffer[i] = req - i++ - continue - } - // Req will no longer be part of a buffer + var loops int + drain := func() { + i := 0 + for _, req := range buffer { + loops++ + // If there is no conflict in req, we will use it + // and then it would shift all the other reqs in buffer + if !l.addConflictKeys(req) { + buffer[i] = req + i++ + continue + } + // Req will no longer be part of a buffer + l.request(req) + } + buffer = buffer[:i] + } + + t := time.NewTicker(5 * time.Second) + defer t.Stop() + +outer: + for { + select { + case req, ok := <-l.reqs: + if !ok { + break outer + } + req.conflicts = l.conflictKeysForReq(req) + if l.addConflictKeys(req) { l.request(req) + } else { + buffer = append(buffer, req) + } + + case <-t.C: + for { + drain() + if len(buffer) < l.opts.bufferSize { + break + } } - buffer = buffer[:i] } } @@ -417,10 +452,13 @@ func (l *loader) makeRequests() { } else { buffer = append(buffer, req) } - drain(l.opts.bufferSize - 1) + + drain() + time.Sleep(100 * time.Millisecond) } + fmt.Printf("Looped %d times over buffered requests.\n", loops) - drain(0) + drain() } func (l *loader) printCounters() { @@ -434,8 +472,11 @@ func (l *loader) printCounters() { rate := float64(counter.Nquads-last.Nquads) / period.Seconds() elapsed := time.Since(start).Round(time.Second) timestamp := time.Now().Format("15:04:05Z0700") - fmt.Printf("[%s] Elapsed: %s Txns: %d N-Quads: %d N-Quads/s [last 5s]: %5.0f Aborts: %d\n", - timestamp, x.FixedDuration(elapsed), counter.TxnsDone, counter.Nquads, rate, counter.Aborts) + fmt.Printf("[%s] Elapsed: %s Txns: %d N-Quads: %s N-Quads/s: %s"+ + " Inflight: %2d/%2d Aborts: %d\n", + timestamp, x.FixedDuration(elapsed), counter.TxnsDone, + humanize.Comma(int64(counter.Nquads)), humanize.Comma(int64(rate)), + atomic.LoadInt32(&l.inflight), atomic.LoadInt32(&l.conc), counter.Aborts) last = counter } } diff --git a/xidmap/xidmap.go b/xidmap/xidmap.go index 94a67e02da5..0d96414d640 100644 --- a/xidmap/xidmap.go +++ b/xidmap/xidmap.go @@ -215,6 +215,16 @@ func (m *XidMap) SetUid(xid string, uid uint64) { sh.Lock() defer sh.Unlock() sh.tree.Set(farm.Fingerprint64([]byte(xid)), uid) + if m.writer != nil { + var uidBuf [8]byte + binary.BigEndian.PutUint64(uidBuf[:], uid) + m.kvBuf = append(m.kvBuf, kv{key: []byte(xid), value: uidBuf[:]}) + + if len(m.kvBuf) == 64 { + m.kvChan <- m.kvBuf + m.kvBuf = make([]kv, 0, 64) + } + } } func (m *XidMap) dbWriter() { From b368593290077b8570e7eafad149a224cc52a429 Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Thu, 17 Apr 2025 18:42:46 +0530 Subject: [PATCH 2/2] address reviews --- dgraph/cmd/live/batch.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dgraph/cmd/live/batch.go b/dgraph/cmd/live/batch.go index a5ea0dc63a8..d054071b94c 100644 --- a/dgraph/cmd/live/batch.go +++ b/dgraph/cmd/live/batch.go @@ -168,12 +168,12 @@ func (l *loader) mutate(req *request) error { Mutations: []*api.Mutation{req.Mutation}, } _, err := txn.Do(l.opts.Ctx, request) - atomic.AddInt32(&l.inflight, -1) return err } func (l *loader) request(req *request) { atomic.AddUint64(&l.reqNum, 1) + defer atomic.AddInt32(&l.inflight, -1) err := l.mutate(req) if err == nil { atomic.AddUint64(&l.nquads, uint64(len(req.Set))) @@ -421,12 +421,12 @@ func (l *loader) makeRequests() { t := time.NewTicker(5 * time.Second) defer t.Stop() -outer: +loop: for { select { case req, ok := <-l.reqs: if !ok { - break outer + break loop } req.conflicts = l.conflictKeysForReq(req) if l.addConflictKeys(req) {