Enhance error handling in DelegatingMultiSinkOutputCommitter#1606
Enhance error handling in DelegatingMultiSinkOutputCommitter#1606
Conversation
…g logging for task and job commit failures
| } catch (IOException e) { | ||
| LOG.warn("BigQuery multi-sink table '{}' failed during job commit. Reason: {}", | ||
| tableName, getFailureReason(e), e); | ||
| throw e; |
There was a problem hiding this comment.
logging and re-throwing an error is considered an anti-pattern.
Isn't this error propagated upwards and logged somewhere in pipeline run?
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces logging and enhanced error handling to the DelegatingMultiSinkOutputCommitter by adding a logger and a helper method to extract root cause failure messages. The feedback suggests significant improvements to the error handling strategy in commitTask and commitJob; specifically, instead of failing fast on the first error, the code should attempt to commit all sinks and collect exceptions using addSuppressed to avoid inconsistent states. Additionally, the reviewer recommended hardening the getFailureReason method to handle circular references and ensure a non-null return value by falling back to the exception's string representation.
| for (String tableName : committerMap.keySet()) { | ||
| configureContext(taskAttemptContext, tableName); | ||
|
|
||
| committerMap.get(tableName).commitTask(taskAttemptContext); | ||
| try { | ||
| configureContext(taskAttemptContext, tableName); | ||
| committerMap.get(tableName).commitTask(taskAttemptContext); | ||
| } catch (IOException e) { | ||
| LOG.warn("BigQuery multi-sink table '{}' failed during task commit. Reason: {}", | ||
| tableName, getFailureReason(e), e); | ||
| throw e; | ||
| } | ||
| } |
There was a problem hiding this comment.
In a multi-sink scenario, failing fast on the first error prevents other sinks from attempting to commit their data, which can lead to partial and inconsistent job states.
It is better to attempt the commit for all sinks and collect any exceptions using addSuppressed, similar to the implementation in abortTask and abortJob. Additionally, since a commit failure is a fatal event for the task, LOG.error is more appropriate than LOG.warn.
IOException ioe = null;
for (String tableName : committerMap.keySet()) {
try {
configureContext(taskAttemptContext, tableName);
committerMap.get(tableName).commitTask(taskAttemptContext);
} catch (IOException e) {
LOG.error("BigQuery multi-sink table '{}' failed during task commit. Reason: {}",
tableName, getFailureReason(e), e);
if (ioe == null) {
ioe = e;
} else {
ioe.addSuppressed(e);
}
}
}
if (ioe != null) {
throw ioe;
}| for (String tableName : committerMap.keySet()) { | ||
| configureContext(jobContext, tableName); | ||
|
|
||
| committerMap.get(tableName).commitJob(jobContext); | ||
| try { | ||
| configureContext(jobContext, tableName); | ||
| committerMap.get(tableName).commitJob(jobContext); | ||
| } catch (IOException e) { | ||
| LOG.warn("BigQuery multi-sink table '{}' failed during job commit. Reason: {}", | ||
| tableName, getFailureReason(e), e); | ||
| throw e; | ||
| } | ||
| } |
There was a problem hiding this comment.
Similar to commitTask, commitJob should attempt to commit all sinks rather than failing fast. This ensures that as much data as possible is finalized and provides a complete picture of which sinks failed. Using LOG.error is also recommended here as job commit failures are critical.
IOException ioe = null;
for (String tableName : committerMap.keySet()) {
try {
configureContext(jobContext, tableName);
committerMap.get(tableName).commitJob(jobContext);
} catch (IOException e) {
LOG.error("BigQuery multi-sink table '{}' failed during job commit. Reason: {}",
tableName, getFailureReason(e), e);
if (ioe == null) {
ioe = e;
} else {
ioe.addSuppressed(e);
}
}
}
if (ioe != null) {
throw ioe;
}| private String getFailureReason(IOException exception) { | ||
| Throwable rootCause = exception; | ||
| while (rootCause.getCause() != null) { | ||
| rootCause = rootCause.getCause(); | ||
| } | ||
| return rootCause.getMessage() == null ? exception.getMessage() : rootCause.getMessage(); | ||
| } |
There was a problem hiding this comment.
The current implementation of getFailureReason might return null if both the root cause and the wrapper exception have no message. It also lacks a check for circular references in the exception chain.
A more robust approach is to fall back to rootCause.toString() when the message is null, which will at least provide the exception class name.
| private String getFailureReason(IOException exception) { | |
| Throwable rootCause = exception; | |
| while (rootCause.getCause() != null) { | |
| rootCause = rootCause.getCause(); | |
| } | |
| return rootCause.getMessage() == null ? exception.getMessage() : rootCause.getMessage(); | |
| } | |
| private String getFailureReason(IOException exception) { | |
| Throwable rootCause = exception; | |
| while (rootCause.getCause() != null && rootCause.getCause() != rootCause) { | |
| rootCause = rootCause.getCause(); | |
| } | |
| return rootCause.getMessage() != null ? rootCause.getMessage() : rootCause.toString(); | |
| } |
This pull request improves error handling and logging in the
DelegatingMultiSinkOutputCommitterclass for BigQuery multi-sink operations. The main focus is to provide more informative log messages when task or job commits fail, making it easier to diagnose issues.Enhanced error handling and logging:
Loggerinstance (LOG) using SLF4J to theDelegatingMultiSinkOutputCommitterclass for logging warnings. [1] [2]commitTaskandcommitJobcalls in try-catch blocks to log warnings with detailed failure reasons when anIOExceptionoccurs, then re-throw the exception.getFailureReasonhelper method to extract the root cause message from anIOExceptionfor clearer log output.