Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 58 additions & 17 deletions dgraph/cmd/live/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"time"

"github.com/dgryski/go-farm"
"github.com/dustin/go-humanize"
"github.com/dustin/go-humanize/english"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -70,6 +71,9 @@ type loader struct {
// To get time elapsed
start time.Time

inflight int32
conc int32

conflicts map[uint64]struct{}
uidsLock sync.RWMutex

Expand Down Expand Up @@ -156,13 +160,15 @@ func (l *loader) infinitelyRetry(req *request) {
}

func (l *loader) mutate(req *request) error {
atomic.AddInt32(&l.inflight, 1)
txn := l.dc.NewTxn()
req.CommitNow = true
request := &api.Request{
CommitNow: true,
Mutations: []*api.Mutation{req.Mutation},
}
_, err := txn.Do(l.opts.Ctx, request)
atomic.AddInt32(&l.inflight, -1)
Comment thread
This conversation was marked as resolved.
Outdated
return err
}

Expand Down Expand Up @@ -390,23 +396,52 @@ func (l *loader) deregister(req *request) {
// caller functions.
func (l *loader) makeRequests() {
defer l.requestsWg.Done()
atomic.AddInt32(&l.conc, 1)
defer atomic.AddInt32(&l.conc, -1)

buffer := make([]*request, 0, l.opts.bufferSize)
drain := func(maxSize int) {
for len(buffer) > maxSize {
i := 0
for _, req := range buffer {
// If there is no conflict in req, we will use it
// and then it would shift all the other reqs in buffer
if !l.addConflictKeys(req) {
buffer[i] = req
i++
continue
}
// Req will no longer be part of a buffer
var loops int
drain := func() {
i := 0
for _, req := range buffer {
loops++
// If there is no conflict in req, we will use it
// and then it would shift all the other reqs in buffer
if !l.addConflictKeys(req) {
buffer[i] = req
i++
continue
}
// Req will no longer be part of a buffer
l.request(req)
}
buffer = buffer[:i]
}

t := time.NewTicker(5 * time.Second)
defer t.Stop()

outer:
Comment thread
This conversation was marked as resolved.
Outdated
for {
select {
case req, ok := <-l.reqs:
if !ok {
break outer
}
req.conflicts = l.conflictKeysForReq(req)
if l.addConflictKeys(req) {
l.request(req)
} else {
buffer = append(buffer, req)
}

case <-t.C:
for {
drain()
if len(buffer) < l.opts.bufferSize {
break
}
}
buffer = buffer[:i]
}
}

Expand All @@ -417,10 +452,13 @@ func (l *loader) makeRequests() {
} else {
buffer = append(buffer, req)
}
drain(l.opts.bufferSize - 1)

drain()
time.Sleep(100 * time.Millisecond)
}
fmt.Printf("Looped %d times over buffered requests.\n", loops)

drain(0)
drain()
}

func (l *loader) printCounters() {
Expand All @@ -434,8 +472,11 @@ func (l *loader) printCounters() {
rate := float64(counter.Nquads-last.Nquads) / period.Seconds()
elapsed := time.Since(start).Round(time.Second)
timestamp := time.Now().Format("15:04:05Z0700")
fmt.Printf("[%s] Elapsed: %s Txns: %d N-Quads: %d N-Quads/s [last 5s]: %5.0f Aborts: %d\n",
timestamp, x.FixedDuration(elapsed), counter.TxnsDone, counter.Nquads, rate, counter.Aborts)
fmt.Printf("[%s] Elapsed: %s Txns: %d N-Quads: %s N-Quads/s: %s"+
" Inflight: %2d/%2d Aborts: %d\n",
timestamp, x.FixedDuration(elapsed), counter.TxnsDone,
humanize.Comma(int64(counter.Nquads)), humanize.Comma(int64(rate)),
atomic.LoadInt32(&l.inflight), atomic.LoadInt32(&l.conc), counter.Aborts)
last = counter
}
}
Expand Down
10 changes: 10 additions & 0 deletions xidmap/xidmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,16 @@ func (m *XidMap) SetUid(xid string, uid uint64) {
sh.Lock()
defer sh.Unlock()
sh.tree.Set(farm.Fingerprint64([]byte(xid)), uid)
if m.writer != nil {
var uidBuf [8]byte
binary.BigEndian.PutUint64(uidBuf[:], uid)
m.kvBuf = append(m.kvBuf, kv{key: []byte(xid), value: uidBuf[:]})

if len(m.kvBuf) == 64 {
m.kvChan <- m.kvBuf
m.kvBuf = make([]kv, 0, 64)
}
}
}

func (m *XidMap) dbWriter() {
Expand Down