diff --git a/dgraph/cmd/bulk/loader.go b/dgraph/cmd/bulk/loader.go index 76b8ba86b1a..28c9cad8816 100644 --- a/dgraph/cmd/bulk/loader.go +++ b/dgraph/cmd/bulk/loader.go @@ -59,6 +59,8 @@ type BulkOptions struct { ConnStr string HttpAddr string IgnoreErrors bool + LogErrors bool + ErrorLogPath string CustomTokenizers string NewUids bool ClientDir string @@ -79,18 +81,59 @@ type BulkOptions struct { Badger badger.Options } +// chunkWithMeta wraps a chunk buffer with metadata about the source file. +type chunkWithMeta struct { + buf *bytes.Buffer + filename string +} + type state struct { opt *BulkOptions prog *progress xids *xidmap.XidMap schema *schemaStore shards *shardMap - readerChunkCh chan *bytes.Buffer + readerChunkCh chan *chunkWithMeta mapFileId uint32 // Used atomically to name the output files of the mappers. dbs []*badger.DB tmpDbs []*badger.DB // Temporary DB to write the split lists to avoid ordering issues. writeTs uint64 // All badger writes use this timestamp namespaces *sync.Map // To store the encountered namespaces. + errorLog *errorLogger // Error logger for --log_errors +} + +// errorLogger provides thread-safe logging of parsing errors to a file. +type errorLogger struct { + mu sync.Mutex + file *os.File +} + +func newErrorLogger(path string) (*errorLogger, error) { + f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644) + if err != nil { + return nil, err + } + return &errorLogger{file: f}, nil +} + +func (e *errorLogger) Log(filename string, err error, extra string) { + if e == nil || e.file == nil { + return + } + e.mu.Lock() + defer e.mu.Unlock() + // Truncate very long extra info in the log + if len(extra) > 500 { + extra = extra[:500] + "..." + } + fmt.Fprintf(e.file, "file: %s\nerror: %v\n%s\n", filename, err, extra) +} + +func (e *errorLogger) Close() error { + if e == nil || e.file == nil { + return nil + } + return e.file.Close() } type loader struct { @@ -130,14 +173,26 @@ func newLoader(opt *BulkOptions) *loader { x.Checkf(err, "Unable to connect to alpha, Is it running at %s?", opt.ConnStr) } + var errLog *errorLogger + if opt.LogErrors { + if !opt.IgnoreErrors { + fmt.Fprintln(os.Stderr, "Warning: --log_errors requires --ignore_errors to be set") + } + var err error + errLog, err = newErrorLogger(opt.ErrorLogPath) + x.Checkf(err, "Unable to create error log file at %s", opt.ErrorLogPath) + fmt.Printf("Error logging enabled, writing to: %s\n", opt.ErrorLogPath) + } + st := &state{ opt: opt, prog: newProgress(), shards: newShardMap(opt.MapShards), // Lots of gz readers, so not much channel buffer needed. - readerChunkCh: make(chan *bytes.Buffer, opt.NumGoroutines), + readerChunkCh: make(chan *chunkWithMeta, opt.NumGoroutines), writeTs: getWriteTimestamp(zero, dg), namespaces: &sync.Map{}, + errorLog: errLog, } st.schema = newSchemaStore(readSchema(opt), opt, st) ld := &loader{ @@ -348,7 +403,7 @@ func (ld *loader) mapStage() { for { chunkBuf, err := chunk.Chunk(r) if chunkBuf != nil && chunkBuf.Len() > 0 { - ld.readerChunkCh <- chunkBuf + ld.readerChunkCh <- &chunkWithMeta{buf: chunkBuf, filename: file} } if err == io.EOF { break @@ -453,7 +508,7 @@ func (ld *loader) processGqlSchema(loadType chunker.InputFormat) { _, err := fmt.Fprintf(gqlBuf, jsonSchema, ns, schema) x.Check(err) } - ld.readerChunkCh <- gqlBuf + ld.readerChunkCh <- &chunkWithMeta{buf: gqlBuf, filename: ""} } buf := readGqlSchema(ld.opt) @@ -531,5 +586,10 @@ func (ld *loader) cleanup() { x.Check(db.Close()) x.Check(os.RemoveAll(opts.Dir)) } + if ld.errorLog != nil { + if err := ld.errorLog.Close(); err != nil { + glog.Warningf("error closing error log: %v", err) + } + } ld.prog.endSummary() } diff --git a/dgraph/cmd/bulk/mapper.go b/dgraph/cmd/bulk/mapper.go index e03a4a0ddc8..1a7299ab2cf 100644 --- a/dgraph/cmd/bulk/mapper.go +++ b/dgraph/cmd/bulk/mapper.go @@ -221,9 +221,12 @@ func (m *mapper) run(inputFormat chunker.InputFormat) { chunk := chunker.NewChunker(inputFormat, 1000) nquads := chunk.NQuads() go func() { - for chunkBuf := range m.readerChunkCh { - if err := chunk.Parse(chunkBuf); err != nil { + for chunkMeta := range m.readerChunkCh { + if err := chunk.Parse(chunkMeta.buf); err != nil { atomic.AddInt64(&m.prog.errCount, 1) + if m.errorLog != nil { + m.errorLog.Log(chunkMeta.filename, err, "") + } if !m.opt.IgnoreErrors { x.Check(err) } @@ -250,6 +253,9 @@ func (m *mapper) run(inputFormat chunker.InputFormat) { for _, nq := range nqs { if err := facets.SortAndValidate(nq.Facets); err != nil { atomic.AddInt64(&m.prog.errCount, 1) + if m.errorLog != nil { + m.errorLog.Log("", err, fmt.Sprintf("subject=%s predicate=%s", nq.Subject, nq.Predicate)) + } if !m.opt.IgnoreErrors { x.Check(err) } diff --git a/dgraph/cmd/bulk/run.go b/dgraph/cmd/bulk/run.go index 1f7a0692f9e..064f6f4569d 100644 --- a/dgraph/cmd/bulk/run.go +++ b/dgraph/cmd/bulk/run.go @@ -91,6 +91,8 @@ func init() { // TODO: Potentially move http server to main. flag.String("http", "localhost:8080", "Address to serve http (pprof).") flag.Bool("ignore_errors", false, "ignore line parsing errors in rdf files") + flag.Bool("log_errors", false, "log parsing errors to a file (requires --ignore_errors)") + flag.String("error_log", "bulk_errors.log", "path to error log file when --log_errors is set") flag.Int("map_shards", 1, "Number of map output shards. Must be greater than or equal to the number of reduce "+ "shards. Increasing allows more evenly sized reduce shards, at the expense of "+ @@ -155,6 +157,8 @@ func run() { ZeroAddr: Bulk.Conf.GetString("zero"), HttpAddr: Bulk.Conf.GetString("http"), IgnoreErrors: Bulk.Conf.GetBool("ignore_errors"), + LogErrors: Bulk.Conf.GetBool("log_errors"), + ErrorLogPath: Bulk.Conf.GetString("error_log"), MapShards: Bulk.Conf.GetInt("map_shards"), ReduceShards: Bulk.Conf.GetInt("reduce_shards"), CustomTokenizers: Bulk.Conf.GetString("custom_tokenizers"), diff --git a/dgraph/main.go b/dgraph/main.go index d6f2d21e77f..bb35cf633e8 100644 --- a/dgraph/main.go +++ b/dgraph/main.go @@ -17,10 +17,6 @@ import ( ) func main() { - // Setting a higher number here allows more disk I/O calls to be scheduled, hence considerably - // improving throughput. The extra CPU overhead is almost negligible in comparison. The - // benchmark notes are located in badger-bench/randread. - runtime.GOMAXPROCS(128) absDiff := func(a, b uint64) uint64 { if a > b { diff --git a/x/init.go b/x/init.go index f088499e205..8583573acd2 100644 --- a/x/init.go +++ b/x/init.go @@ -50,6 +50,8 @@ Commit timestamp : %v Branch : %v Go version : %v jemalloc enabled : %v +GOMAXPROCS : %v +Num CPUs : %v For Dgraph official documentation, visit https://dgraph.io/docs. For discussions about Dgraph , visit https://discuss.dgraph.io. @@ -59,7 +61,7 @@ For discussions about Dgraph , visit https://discuss.dgraph.io. `, dgraphVersion, dgraphCodename, ExecutableChecksum(), lastCommitSHA, lastCommitTime, gitBranch, - runtime.Version(), jem, licenseInfo) + runtime.Version(), jem, runtime.GOMAXPROCS(0), runtime.NumCPU(), licenseInfo) } // PrintVersion prints version and other helpful information if --version.