Skip to content

rlo_06 Fragment parsing fails with unclear error when handling data from Kafka #330

@StrongestNumber9

Description

@StrongestNumber9

Describe the bug

Wanted to receive RFC5424 data from kafka, failed with fragmentState != FragmentState.WRITTEN error

 WARN [2026-02-16 09:54:11,391] ({task-result-getter-3} Logging.scala[logWarning]:72) - Lost task 0.0 in stage 8.0 (TID 6) (hostname executor 11): java.lang.IllegalStateException: fragmentState != FragmentState.WRITTEN
        at com.teragrep.rlo_06.Fragment.toBytes(Fragment.java:116)
        at com.teragrep.pth_06.task.kafka.KafkaRecordConverter.convert(KafkaRecordConverter.java:157)
        at com.teragrep.pth_06.task.KafkaMicroBatchInputPartitionReader.convertToRow(KafkaMicroBatchInputPartitionReader.java:258)
        at com.teragrep.pth_06.task.KafkaMicroBatchInputPartitionReader.next(KafkaMicroBatchInputPartitionReader.java:213)
        at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:146)
        at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:184)
        at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1(DataSourceRDD.scala:71)
        at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1$adapted(DataSourceRDD.scala:71)
        at scala.Option.exists(Option.scala:376)
        at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:71)
        at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.advanceToNextIter(DataSourceRDD.scala:102)
        at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:71)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
        at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$5(WriteToDataSourceV2Exec.scala:446)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1397)
        at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:491)
        at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:430)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:496)
        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:393)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
        at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
        at org.apache.spark.scheduler.Task.run(Task.scala:141)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:621)
        at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
        at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:624)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)

Seems like this part expects structured data to exist and to have specific values available:

rowWriter.write(2, UTF8String.fromBytes(rfc5424Frame.structuredData.getValue(teragrepDirectory).toBytes()));
rowWriter.write(3, UTF8String.fromBytes(rfc5424Frame.structuredData.getValue(teragrepStreamName).toBytes()));

The example payload was as such:

<46>1 2026-02-16T08:58:02.079776+02:00 hostname topic-rfc5424 - - - payload

Expected behavior

Clearer error message of what is wrong if this is as expected, or fixed issue

How to reproduce

Added data to Kafka using kafka-producer-perf-test.sh and then executed the following query

%dpl
index=some-kafka-topic latest=-100y

Screenshots

Image

Software version

com.teragrep-pth_10-12.0.0-1.noarch
Desktop (please complete the following information if relevant):

  • OS:
  • Browser:
  • Version:

Additional context

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions