Skip to content

Commit 79553c9

Browse files
haslersbertoverflowdreamorosi
authored
fix(kafka): handle tombstone events without value (#5017)
Co-authored-by: Bertram Vogel <[email protected]> Co-authored-by: Andrea Amorosi <[email protected]>
1 parent e1ea6fd commit 79553c9

3 files changed

Lines changed: 33 additions & 1 deletion

File tree

packages/kafka/src/consumer.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,11 @@ const deserializeRecord = async (
196196
},
197197
originalKey: key,
198198
get value() {
199+
if (value === undefined) {
200+
return undefined;
201+
}
199202
if (isNull(value)) return null;
203+
200204
const deserializedValue = deserialize({
201205
value: value,
202206
deserializer: deserializerValue,

packages/kafka/src/types/types.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ type Record = {
201201
/**
202202
* Base64-encoded value of the record
203203
*/
204-
value: string | null;
204+
value?: string | null;
205205
/**
206206
* Array of record headers
207207
*/

packages/kafka/tests/unit/consumer.test.ts

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -609,4 +609,32 @@ describe('Kafka consumer', () => {
609609
// Assess
610610
expect(result).toBeNull();
611611
});
612+
613+
it('handles tombstone events without a message value', async () => {
614+
// Prepare
615+
const event = structuredClone(jsonTestEvent);
616+
delete event.records['mytopic-0'][0].value;
617+
618+
const handler = kafkaConsumer<string, unknown>(
619+
async (event) => {
620+
await setTimeout(1); // simulate some processing time
621+
const firstRecord = event.records[0];
622+
if (firstRecord) {
623+
return firstRecord.value;
624+
}
625+
return undefined;
626+
},
627+
{
628+
value: {
629+
type: SchemaType.JSON,
630+
},
631+
}
632+
);
633+
634+
// Act
635+
const result = await handler(event, context);
636+
637+
// Assess
638+
expect(result).toBeUndefined();
639+
});
612640
});

0 commit comments

Comments
 (0)