Skip to content

LatestKafkaTimestamp metrics causes NPE when first event to be parsed fails #316

@StrongestNumber9

Description

@StrongestNumber9

Describe the bug

Kafka metrics causes NPE when first event to be parsed fails.

Culprit seems to be this not being set in all cases:

new TaskMetric("LatestKafkaTimestamp", latestKafkaTimestamp.getValue())

org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 4.0 failed 4 times, most recent failure: Lost task 1.3 in stage 4.0 (TID 9) (iris-workernode02.qa.xnet.fi executor 6): org.apache.spark.util.TaskCompletionListenerException: null
        at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:254)
        at org.apache.spark.TaskContextImpl.invokeTaskCompletionListeners(TaskContextImpl.scala:144)
        at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:137)
        at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:185)
        at org.apache.spark.scheduler.Task.run(Task.scala:141)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:621)
        at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
        at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:624)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
        Suppressed: java.lang.NullPointerException
                at com.teragrep.pth_06.task.KafkaMicroBatchInputPartitionReader.currentMetricsValues(KafkaMicroBatchInputPartitionReader.java:283)
                at org.apache.spark.sql.execution.datasources.v2.PartitionMetricCallback.execute(DataSourceRDD.scala:129)
                at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$new$1(DataSourceRDD.scala:68)
                at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$new$1$adapted(DataSourceRDD.scala:67)
                at org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:137)
                at org.apache.spark.TaskContextImpl.$anonfun$invokeTaskCompletionListeners$1(TaskContextImpl.scala:144)
                at org.apache.spark.TaskContextImpl.$anonfun$invokeTaskCompletionListeners$1$adapted(TaskContextImpl.scala:144)
                at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:199)
                ... 12 more

Expected behavior

Has some fallback value if values are missing

How to reproduce

Query data from kafka which fails due to com.teragrep.rlo_06.PriorityParseException: PRIORITY < missing or similar error. This appeared when I tried to load data from kafka save cluster where the data is not in RFC5424 format.

Screenshots

The error message on the UI is just null

Image

Software version
com.teragrep-pth_10-12.0.0-1.noarch

Desktop (please complete the following information if relevant):

  • OS:
  • Browser:
  • Version:

Additional context

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions