Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 64 additions & 4 deletions dgraph/cmd/bulk/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ type BulkOptions struct {
ConnStr string
HttpAddr string
IgnoreErrors bool
LogErrors bool
ErrorLogPath string
CustomTokenizers string
NewUids bool
ClientDir string
Expand All @@ -79,18 +81,59 @@ type BulkOptions struct {
Badger badger.Options
}

// chunkWithMeta wraps a chunk buffer with metadata about the source file.
type chunkWithMeta struct {
buf *bytes.Buffer
filename string
}

type state struct {
opt *BulkOptions
prog *progress
xids *xidmap.XidMap
schema *schemaStore
shards *shardMap
readerChunkCh chan *bytes.Buffer
readerChunkCh chan *chunkWithMeta
mapFileId uint32 // Used atomically to name the output files of the mappers.
dbs []*badger.DB
tmpDbs []*badger.DB // Temporary DB to write the split lists to avoid ordering issues.
writeTs uint64 // All badger writes use this timestamp
namespaces *sync.Map // To store the encountered namespaces.
errorLog *errorLogger // Error logger for --log_errors
}

// errorLogger provides thread-safe logging of parsing errors to a file.
type errorLogger struct {
mu sync.Mutex
file *os.File
}

func newErrorLogger(path string) (*errorLogger, error) {
f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
if err != nil {
return nil, err
}
return &errorLogger{file: f}, nil
}

func (e *errorLogger) Log(filename string, err error, extra string) {
if e == nil || e.file == nil {
return
}
e.mu.Lock()
defer e.mu.Unlock()
// Truncate very long extra info in the log
if len(extra) > 500 {
extra = extra[:500] + "..."
}
fmt.Fprintf(e.file, "file: %s\nerror: %v\n%s\n", filename, err, extra)
}

func (e *errorLogger) Close() error {
if e == nil || e.file == nil {
return nil
}
return e.file.Close()
}

type loader struct {
Expand Down Expand Up @@ -130,14 +173,26 @@ func newLoader(opt *BulkOptions) *loader {
x.Checkf(err, "Unable to connect to alpha, Is it running at %s?", opt.ConnStr)
}

var errLog *errorLogger
if opt.LogErrors {
if !opt.IgnoreErrors {
fmt.Fprintln(os.Stderr, "Warning: --log_errors requires --ignore_errors to be set")
}
var err error
errLog, err = newErrorLogger(opt.ErrorLogPath)
x.Checkf(err, "Unable to create error log file at %s", opt.ErrorLogPath)
fmt.Printf("Error logging enabled, writing to: %s\n", opt.ErrorLogPath)
}

st := &state{
opt: opt,
prog: newProgress(),
shards: newShardMap(opt.MapShards),
// Lots of gz readers, so not much channel buffer needed.
readerChunkCh: make(chan *bytes.Buffer, opt.NumGoroutines),
readerChunkCh: make(chan *chunkWithMeta, opt.NumGoroutines),
writeTs: getWriteTimestamp(zero, dg),
namespaces: &sync.Map{},
errorLog: errLog,
}
st.schema = newSchemaStore(readSchema(opt), opt, st)
ld := &loader{
Expand Down Expand Up @@ -348,7 +403,7 @@ func (ld *loader) mapStage() {
for {
chunkBuf, err := chunk.Chunk(r)
if chunkBuf != nil && chunkBuf.Len() > 0 {
ld.readerChunkCh <- chunkBuf
ld.readerChunkCh <- &chunkWithMeta{buf: chunkBuf, filename: file}
}
if err == io.EOF {
break
Expand Down Expand Up @@ -453,7 +508,7 @@ func (ld *loader) processGqlSchema(loadType chunker.InputFormat) {
_, err := fmt.Fprintf(gqlBuf, jsonSchema, ns, schema)
x.Check(err)
}
ld.readerChunkCh <- gqlBuf
ld.readerChunkCh <- &chunkWithMeta{buf: gqlBuf, filename: "<gql_schema>"}
}

buf := readGqlSchema(ld.opt)
Expand Down Expand Up @@ -531,5 +586,10 @@ func (ld *loader) cleanup() {
x.Check(db.Close())
x.Check(os.RemoveAll(opts.Dir))
}
if ld.errorLog != nil {
if err := ld.errorLog.Close(); err != nil {
glog.Warningf("error closing error log: %v", err)
}
}
ld.prog.endSummary()
}
10 changes: 8 additions & 2 deletions dgraph/cmd/bulk/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,12 @@ func (m *mapper) run(inputFormat chunker.InputFormat) {
chunk := chunker.NewChunker(inputFormat, 1000)
nquads := chunk.NQuads()
go func() {
for chunkBuf := range m.readerChunkCh {
if err := chunk.Parse(chunkBuf); err != nil {
for chunkMeta := range m.readerChunkCh {
if err := chunk.Parse(chunkMeta.buf); err != nil {
atomic.AddInt64(&m.prog.errCount, 1)
if m.errorLog != nil {
m.errorLog.Log(chunkMeta.filename, err, "")
}
if !m.opt.IgnoreErrors {
x.Check(err)
}
Expand All @@ -250,6 +253,9 @@ func (m *mapper) run(inputFormat chunker.InputFormat) {
for _, nq := range nqs {
if err := facets.SortAndValidate(nq.Facets); err != nil {
atomic.AddInt64(&m.prog.errCount, 1)
if m.errorLog != nil {
m.errorLog.Log("<facet_validation>", err, fmt.Sprintf("subject=%s predicate=%s", nq.Subject, nq.Predicate))
}
if !m.opt.IgnoreErrors {
x.Check(err)
}
Expand Down
4 changes: 4 additions & 0 deletions dgraph/cmd/bulk/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ func init() {
// TODO: Potentially move http server to main.
flag.String("http", "localhost:8080", "Address to serve http (pprof).")
flag.Bool("ignore_errors", false, "ignore line parsing errors in rdf files")
flag.Bool("log_errors", false, "log parsing errors to a file (requires --ignore_errors)")
flag.String("error_log", "bulk_errors.log", "path to error log file when --log_errors is set")
flag.Int("map_shards", 1,
"Number of map output shards. Must be greater than or equal to the number of reduce "+
"shards. Increasing allows more evenly sized reduce shards, at the expense of "+
Expand Down Expand Up @@ -155,6 +157,8 @@ func run() {
ZeroAddr: Bulk.Conf.GetString("zero"),
HttpAddr: Bulk.Conf.GetString("http"),
IgnoreErrors: Bulk.Conf.GetBool("ignore_errors"),
LogErrors: Bulk.Conf.GetBool("log_errors"),
ErrorLogPath: Bulk.Conf.GetString("error_log"),
MapShards: Bulk.Conf.GetInt("map_shards"),
ReduceShards: Bulk.Conf.GetInt("reduce_shards"),
CustomTokenizers: Bulk.Conf.GetString("custom_tokenizers"),
Expand Down
4 changes: 0 additions & 4 deletions dgraph/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@ import (
)

func main() {
// Setting a higher number here allows more disk I/O calls to be scheduled, hence considerably
// improving throughput. The extra CPU overhead is almost negligible in comparison. The
// benchmark notes are located in badger-bench/randread.
runtime.GOMAXPROCS(128)

absDiff := func(a, b uint64) uint64 {
if a > b {
Expand Down
4 changes: 3 additions & 1 deletion x/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ Commit timestamp : %v
Branch : %v
Go version : %v
jemalloc enabled : %v
GOMAXPROCS : %v
Num CPUs : %v

For Dgraph official documentation, visit https://dgraph.io/docs.
For discussions about Dgraph , visit https://discuss.dgraph.io.
Expand All @@ -59,7 +61,7 @@ For discussions about Dgraph , visit https://discuss.dgraph.io.

`,
dgraphVersion, dgraphCodename, ExecutableChecksum(), lastCommitSHA, lastCommitTime, gitBranch,
runtime.Version(), jem, licenseInfo)
runtime.Version(), jem, runtime.GOMAXPROCS(0), runtime.NumCPU(), licenseInfo)
}

// PrintVersion prints version and other helpful information if --version.
Expand Down
Loading