Skip to content

Commit 0571fb7

Browse files
Add error logging to bulk loader
1 parent 223a3fb commit 0571fb7

3 files changed

Lines changed: 76 additions & 6 deletions

File tree

dgraph/cmd/bulk/loader.go

Lines changed: 64 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ type BulkOptions struct {
5959
ConnStr string
6060
HttpAddr string
6161
IgnoreErrors bool
62+
LogErrors bool
63+
ErrorLogPath string
6264
CustomTokenizers string
6365
NewUids bool
6466
ClientDir string
@@ -79,18 +81,59 @@ type BulkOptions struct {
7981
Badger badger.Options
8082
}
8183

84+
// chunkWithMeta wraps a chunk buffer with metadata about the source file.
85+
type chunkWithMeta struct {
86+
buf *bytes.Buffer
87+
filename string
88+
}
89+
8290
type state struct {
8391
opt *BulkOptions
8492
prog *progress
8593
xids *xidmap.XidMap
8694
schema *schemaStore
8795
shards *shardMap
88-
readerChunkCh chan *bytes.Buffer
96+
readerChunkCh chan *chunkWithMeta
8997
mapFileId uint32 // Used atomically to name the output files of the mappers.
9098
dbs []*badger.DB
9199
tmpDbs []*badger.DB // Temporary DB to write the split lists to avoid ordering issues.
92100
writeTs uint64 // All badger writes use this timestamp
93101
namespaces *sync.Map // To store the encountered namespaces.
102+
errorLog *errorLogger // Error logger for --log_errors
103+
}
104+
105+
// errorLogger provides thread-safe logging of parsing errors to a file.
106+
type errorLogger struct {
107+
mu sync.Mutex
108+
file *os.File
109+
}
110+
111+
func newErrorLogger(path string) (*errorLogger, error) {
112+
f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
113+
if err != nil {
114+
return nil, err
115+
}
116+
return &errorLogger{file: f}, nil
117+
}
118+
119+
func (e *errorLogger) Log(filename string, err error, extra string) {
120+
if e == nil || e.file == nil {
121+
return
122+
}
123+
e.mu.Lock()
124+
defer e.mu.Unlock()
125+
// Truncate very long extra info in the log
126+
if len(extra) > 500 {
127+
extra = extra[:500] + "..."
128+
}
129+
fmt.Fprintf(e.file, "file: %s\nerror: %v\n%s\n", filename, err, extra)
130+
}
131+
132+
func (e *errorLogger) Close() error {
133+
if e == nil || e.file == nil {
134+
return nil
135+
}
136+
return e.file.Close()
94137
}
95138

96139
type loader struct {
@@ -130,14 +173,26 @@ func newLoader(opt *BulkOptions) *loader {
130173
x.Checkf(err, "Unable to connect to alpha, Is it running at %s?", opt.ConnStr)
131174
}
132175

176+
var errLog *errorLogger
177+
if opt.LogErrors {
178+
if !opt.IgnoreErrors {
179+
fmt.Fprintln(os.Stderr, "Warning: --log_errors requires --ignore_errors to be set")
180+
}
181+
var err error
182+
errLog, err = newErrorLogger(opt.ErrorLogPath)
183+
x.Checkf(err, "Unable to create error log file at %s", opt.ErrorLogPath)
184+
fmt.Printf("Error logging enabled, writing to: %s\n", opt.ErrorLogPath)
185+
}
186+
133187
st := &state{
134188
opt: opt,
135189
prog: newProgress(),
136190
shards: newShardMap(opt.MapShards),
137191
// Lots of gz readers, so not much channel buffer needed.
138-
readerChunkCh: make(chan *bytes.Buffer, opt.NumGoroutines),
192+
readerChunkCh: make(chan *chunkWithMeta, opt.NumGoroutines),
139193
writeTs: getWriteTimestamp(zero, dg),
140194
namespaces: &sync.Map{},
195+
errorLog: errLog,
141196
}
142197
st.schema = newSchemaStore(readSchema(opt), opt, st)
143198
ld := &loader{
@@ -348,7 +403,7 @@ func (ld *loader) mapStage() {
348403
for {
349404
chunkBuf, err := chunk.Chunk(r)
350405
if chunkBuf != nil && chunkBuf.Len() > 0 {
351-
ld.readerChunkCh <- chunkBuf
406+
ld.readerChunkCh <- &chunkWithMeta{buf: chunkBuf, filename: file}
352407
}
353408
if err == io.EOF {
354409
break
@@ -453,7 +508,7 @@ func (ld *loader) processGqlSchema(loadType chunker.InputFormat) {
453508
_, err := fmt.Fprintf(gqlBuf, jsonSchema, ns, schema)
454509
x.Check(err)
455510
}
456-
ld.readerChunkCh <- gqlBuf
511+
ld.readerChunkCh <- &chunkWithMeta{buf: gqlBuf, filename: "<gql_schema>"}
457512
}
458513

459514
buf := readGqlSchema(ld.opt)
@@ -531,5 +586,10 @@ func (ld *loader) cleanup() {
531586
x.Check(db.Close())
532587
x.Check(os.RemoveAll(opts.Dir))
533588
}
589+
if ld.errorLog != nil {
590+
if err := ld.errorLog.Close(); err != nil {
591+
glog.Warningf("error closing error log: %v", err)
592+
}
593+
}
534594
ld.prog.endSummary()
535595
}

dgraph/cmd/bulk/mapper.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -221,9 +221,12 @@ func (m *mapper) run(inputFormat chunker.InputFormat) {
221221
chunk := chunker.NewChunker(inputFormat, 1000)
222222
nquads := chunk.NQuads()
223223
go func() {
224-
for chunkBuf := range m.readerChunkCh {
225-
if err := chunk.Parse(chunkBuf); err != nil {
224+
for chunkMeta := range m.readerChunkCh {
225+
if err := chunk.Parse(chunkMeta.buf); err != nil {
226226
atomic.AddInt64(&m.prog.errCount, 1)
227+
if m.errorLog != nil {
228+
m.errorLog.Log(chunkMeta.filename, err, "")
229+
}
227230
if !m.opt.IgnoreErrors {
228231
x.Check(err)
229232
}
@@ -250,6 +253,9 @@ func (m *mapper) run(inputFormat chunker.InputFormat) {
250253
for _, nq := range nqs {
251254
if err := facets.SortAndValidate(nq.Facets); err != nil {
252255
atomic.AddInt64(&m.prog.errCount, 1)
256+
if m.errorLog != nil {
257+
m.errorLog.Log("<facet_validation>", err, fmt.Sprintf("subject=%s predicate=%s", nq.Subject, nq.Predicate))
258+
}
253259
if !m.opt.IgnoreErrors {
254260
x.Check(err)
255261
}

dgraph/cmd/bulk/run.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ func init() {
9191
// TODO: Potentially move http server to main.
9292
flag.String("http", "localhost:8080", "Address to serve http (pprof).")
9393
flag.Bool("ignore_errors", false, "ignore line parsing errors in rdf files")
94+
flag.Bool("log_errors", false, "log parsing errors to a file (requires --ignore_errors)")
95+
flag.String("error_log", "bulk_errors.log", "path to error log file when --log_errors is set")
9496
flag.Int("map_shards", 1,
9597
"Number of map output shards. Must be greater than or equal to the number of reduce "+
9698
"shards. Increasing allows more evenly sized reduce shards, at the expense of "+
@@ -155,6 +157,8 @@ func run() {
155157
ZeroAddr: Bulk.Conf.GetString("zero"),
156158
HttpAddr: Bulk.Conf.GetString("http"),
157159
IgnoreErrors: Bulk.Conf.GetBool("ignore_errors"),
160+
LogErrors: Bulk.Conf.GetBool("log_errors"),
161+
ErrorLogPath: Bulk.Conf.GetString("error_log"),
158162
MapShards: Bulk.Conf.GetInt("map_shards"),
159163
ReduceShards: Bulk.Conf.GetInt("reduce_shards"),
160164
CustomTokenizers: Bulk.Conf.GetString("custom_tokenizers"),

0 commit comments

Comments
 (0)