Skip to content

Latest commit

 

History

History
972 lines (649 loc) · 48.6 KB

File metadata and controls

972 lines (649 loc) · 48.6 KB
title Apache Kafka trigger for Azure Functions
description Use Azure Functions to run your code based on events from an Apache Kafka stream.
ms.topic reference
ms.custom devx-track-extended-java, devx-track-js, devx-track-python
ms.date 12/26/2025
zone_pivot_groups programming-languages-set-functions

Apache Kafka trigger for Azure Functions

Use the Apache Kafka trigger in Azure Functions to run your function code in response to messages in Kafka topics. You can also use a Kafka output binding to write from your function to a topic. For information on setup and configuration details, see Apache Kafka bindings for Azure Functions overview.

[!INCLUDE functions-binding-kafka-plan-support-note]

Example

::: zone pivot="programming-language-csharp"

The usage of the trigger depends on the C# modality used in your function app, which can be one of the following modes:

A compiled C# function that uses an isolated worker process class library that runs in a process that's separate from the runtime.

[!INCLUDE functions-in-process-model-retirement-note]

A compiled C# function that uses an in-process class library that runs in the same process as the Functions runtime.


The attributes you use depend on the specific event provider.

The following example shows a C# function that reads and logs the Kafka message as a Kafka event:

:::code language="csharp" source="~/azure-functions-kafka-extension/samples/dotnet/Confluent/KafkaTrigger.cs" range="10-21" :::

To receive events in a batch, use an input string or KafkaEventData as an array, as shown in the following example:

:::code language="csharp" source="~/azure-functions-kafka-extension/samples/dotnet/Confluent/KafkaTriggerMany.cs" range="10-24" :::

The following function logs the message and headers for the Kafka Event:

:::code language="csharp" source="~/azure-functions-kafka-extension/samples/dotnet/Confluent/KafkaTriggerWithHeaders.cs" range="10-27" :::

You can define a generic Avro schema for the event passed to the trigger. The following string value defines the generic Avro schema:

:::code language="csharp" source="~/azure-functions-kafka-extension/samples/dotnet/KafkaFunctionSample/AvroGenericTriggers.cs" range="23-41" :::

In the following function, an instance of GenericRecord is available in the KafkaEvent.Value property:

:::code language="csharp" source="~/azure-functions-kafka-extension/samples/dotnet/KafkaFunctionSample/AvroGenericTriggers.cs" range="43-60" :::

You can define a specific Avro schema for the event passed to the trigger. The following code defines the UserRecord class:

:::code language="csharp" source="~/azure-functions-kafka-extension/samples/dotnet/KafkaFunctionSample/User.cs" range="9-32" :::

In the following function, an instance of UserRecord is available in the KafkaEvent.Value property:

:::code language="csharp" source="~/azure-functions-kafka-extension/samples/dotnet/KafkaFunctionSample/AvroSpecificTriggers.cs" range="16-25" :::

For a complete set of working .NET examples, see the Kafka extension repository.

The following example shows a C# function that reads and logs the Kafka message as a Kafka event:

:::code language="csharp" source="~/azure-functions-kafka-extension/samples/dotnet/EventHub/KafkaTrigger.cs" range="10-21" :::

To receive events in a batch, use a string array or KafkaEventData array as input, as shown in the following example:

:::code language="csharp" source="~/azure-functions-kafka-extension/samples/dotnet/EventHub/KafkaTriggerMany.cs" range="10-24" :::

The following function logs the message and headers for the Kafka Event:

:::code language="csharp" source="~/azure-functions-kafka-extension/samples/dotnet/EventHub/KafkaTriggerWithHeaders.cs" range="10-26" :::

You can define a generic Avro schema for the event passed to the trigger. The following string value defines the generic Avro schema:

:::code language="csharp" source="~/azure-functions-kafka-extension/samples/dotnet/KafkaFunctionSample/AvroGenericTriggers.cs" range="23-41" :::

In the following function, an instance of GenericRecord is available in the KafkaEvent.Value property:

:::code language="csharp" source="~/azure-functions-kafka-extension/samples/dotnet/KafkaFunctionSample/AvroGenericTriggers.cs" range="43-60" :::

You can define a specific Avro schema for the event passed to the trigger. The following code defines the UserRecord class:

:::code language="csharp" source="~/azure-functions-kafka-extension/samples/dotnet/KafkaFunctionSample/User.cs" range="9-32" :::

In the following function, an instance of UserRecord is available in the KafkaEvent.Value property:

:::code language="csharp" source="~/azure-functions-kafka-extension/samples/dotnet/KafkaFunctionSample/AvroSpecificTriggers.cs" range="16-25" :::

For a complete set of working .NET examples, see the Kafka extension repository.

The following example shows a C# function that reads and logs the Kafka message as a Kafka event:

:::code language="csharp" source="~/azure-functions-kafka-extension/samples/dotnet-isolated/confluent/KafkaTrigger.cs" range="12-24" :::

To receive events in a batch, use a string array as input, as shown in the following example:

:::code language="csharp" source="~/azure-functions-kafka-extension/samples/dotnet-isolated/confluent/KafkaTriggerMany.cs" range="12-27" :::

The following function logs the message and headers for the Kafka Event:

:::code language="csharp" source="~/azure-functions-kafka-extension/samples/dotnet-isolated/Confluent/KafkaTriggerWithHeaders.cs" range="12-32" :::

For a complete set of working .NET examples, see the Kafka extension repository.

The following example shows a C# function that reads and logs the Kafka message as a Kafka event:

:::code language="csharp" source="~/azure-functions-kafka-extension/samples/dotnet-isolated/eventhub/KafkaTrigger.cs" range="12-24" :::

To receive events in a batch, use a string array as input, as shown in the following example:

:::code language="csharp" source="~/azure-functions-kafka-extension/samples/dotnet-isolated/eventhub/KafkaTriggerMany.cs" range="12-27" :::

The following function logs the message and headers for the Kafka Event:

:::code language="csharp" source="~/azure-functions-kafka-extension/samples/dotnet-isolated/eventhub/KafkaTriggerWithHeaders.cs" range="12-32" :::

For a complete set of working .NET examples, see the Kafka extension repository.


::: zone-end
::: zone pivot="programming-language-javascript,programming-language-typescript" The usage of the trigger depends on your version of the Node.js programming model.

In the Node.js v4 model, you define your trigger directly in your function code. For more information, see the Azure Functions Node.js developer guide.

In the Node.js v3 model, you define your trigger in a function.json file with your code. For more information, see the Azure Functions Node.js developer guide.


In these examples, the event providers are either Confluent or Azure Event Hubs. These examples show how to define a Kafka trigger for a function that reads a Kafka message.
::: zone-end ::: zone pivot="programming-language-javascript"

const { app } = require("@azure/functions");

async function kafkaTrigger(event, context) {
  context.log("Event Offset: " + event.Offset);
  context.log("Event Partition: " + event.Partition);
  context.log("Event Topic: " + event.Topic);
  context.log("Event Timestamp: " + event.Timestamp);
  context.log("Event Key: " + event.Key);
  context.log("Event Value (as string): " + event.Value);

  let event_obj = JSON.parse(event.Value);

  context.log("Event Value Object: ");
  context.log("   Value.registertime: ", event_obj.registertime.toString());
  context.log("   Value.userid: ", event_obj.userid);
  context.log("   Value.regionid: ", event_obj.regionid);
  context.log("   Value.gender: ", event_obj.gender);
}

app.generic("Kafkatrigger", {
  trigger: {
    type: "kafkaTrigger",
    direction: "in",
    name: "event",
    topic: "topic",
    brokerList: "%BrokerList%",
    username: "%ConfluentCloudUserName%",
    password: "%ConfluentCloudPassword%",
    consumerGroup: "$Default",
    protocol: "saslSsl",
    authenticationMode: "plain",
    dataType: "string"
  },
  handler: kafkaTrigger,
});

:::code language="javascript" source="~/azure-functions-kafka-extension/samples/javascript-v4/src/functions/kafkaTrigger.js" range="1-19,21-36":::

This function.json file defines the trigger for the Confluent provider:

:::code language="json" source="~/azure-functions-kafka-extension/samples/javascript/KafkaTrigger/function.confluent.json" :::

The following code runs when the function is triggered:

:::code language="javascript" source="~/azure-functions-kafka-extension/samples/javascript/KafkaTrigger/index.js" :::

This function.json file defines the trigger for the Event Hubs provider:

:::code language="json" source="~/azure-functions-kafka-extension/samples/javascript/KafkaTrigger/function.eventhub.json" :::

The following code runs when the function is triggered:

:::code language="javascript" source="~/azure-functions-kafka-extension/samples/javascript/KafkaTrigger/index.js" :::


To receive events in a batch, set the cardinality value to many, as shown in these examples:

const { app } = require("@azure/functions");

async function kafkaTriggerMany(events, context) {
  for (const event of events) {
    context.log("Event Offset: " + event.Offset);
    context.log("Event Partition: " + event.Partition);
    context.log("Event Topic: " + event.Topic);
    context.log("Event Key: " + event.Key);
    context.log("Event Timestamp: " + event.Timestamp);
    context.log("Event Value (as string): " + event.Value);

    let event_obj = JSON.parse(event.Value);

    context.log("Event Value Object: ");
    context.log("   Value.registertime: ", event_obj.registertime.toString());
    context.log("   Value.userid: ", event_obj.userid);
    context.log("   Value.regionid: ", event_obj.regionid);
    context.log("   Value.gender: ", event_obj.gender);
  }
}

app.generic("kafkaTriggerMany", {
  trigger: {
    type: "kafkaTrigger",
    direction: "in",
    name: "event",
    topic: "topic",
    brokerList: "%BrokerList%",
    username: "%ConfluentCloudUserName%",
    password: "%ConfluentCloudPassword%",
    consumerGroup: "$Default",
    protocol: "saslSsl",
    authenticationMode: "plain",
    dataType: "string",
    cardinality: "MANY"
  },
  handler: kafkaTriggerMany,
});

:::code language="javascript" source="~/azure-functions-kafka-extension/samples/javascript-v4/src/functions/kafkaTriggerMany.js" range="1-21,23-39":::

:::code language="json" source="~/azure-functions-kafka-extension/samples/javascript/KafkaTriggerMany/function.confluent.json" :::

This code parses the array of events and logs the event data:

:::code language="javascript" source="~/azure-functions-kafka-extension/samples/javascript/KafkaTriggerMany/index.js" :::

This code logs the header data:

:::code language="javascript" source="~/azure-functions-kafka-extension/samples/javascript/KafkaTriggerManyWithHeaders/index.js" :::

:::code language="json" source="~/azure-functions-kafka-extension/samples/javascript/KafkaTriggerMany/function.eventhub.json" :::

This code parses the array of events and logs the event data:

:::code language="javascript" source="~/azure-functions-kafka-extension/samples/javascript/KafkaTriggerMany/index.js" :::

This code logs the header data:

:::code language="javascript" source="~/azure-functions-kafka-extension/samples/javascript/KafkaTriggerManyWithHeaders/index.js" :::


You can define a generic Avro schema for the event passed to the trigger. This example defines the trigger for the specific provider with a generic Avro schema:

const { app } = require("@azure/functions");

async function kafkaAvroGenericTrigger(event, context) {
  context.log("Processed kafka event: ", event);
  if (context.triggerMetadata?.key !== undefined) {
    context.log("message key: ", context.triggerMetadata?.key);
  }
}

app.generic("kafkaAvroGenericTrigger", {
  trigger: {
    type: "kafkaTrigger",
    direction: "in",
    name: "event",
    protocol: "SASLSSL",
    password: "EventHubConnectionString",
    dataType: "string",
    topic: "topic",
    authenticationMode: "PLAIN",
    avroSchema:
      '{"type":"record","name":"Payment","namespace":"io.confluent.examples.clients.basicavro","fields":[{"name":"id","type":"string"},{"name":"amount","type":"double"},{"name":"type","type":"string"}]}',
    consumerGroup: "$Default",
    username: "$ConnectionString",
    brokerList: "%BrokerList%",
  },
  handler: kafkaAvroGenericTrigger,
});

:::code language="javascript" source="~/azure-functions-kafka-extension/samples/javascript-v4/src/functions/kafkaAvroGenericTrigger.js" range="1-9,11-28":::

:::code language="json" source="~/azure-functions-kafka-extension/samples/javascript/KafkaTriggerAvroGeneric/function.confluent.json" :::

The following code runs when the function is triggered:

:::code language="javascript" source="~/azure-functions-kafka-extension/samples/javascript/KafkaTriggerAvroGeneric/index.js" :::

:::code language="json" source="~/azure-functions-kafka-extension/samples/javascript/KafkaTriggerAvroGeneric/function.eventhub.json" :::

The following code runs when the function is triggered:

:::code language="javascript" source="~/azure-functions-kafka-extension/samples/javascript/KafkaTriggerAvroGeneric/index.js" :::


For a complete set of working JavaScript examples, see the Kafka extension repository.

For a complete set of working JavaScript examples, see the Kafka extension repository.


::: zone-end
::: zone pivot="programming-language-typescript"

import { app, InvocationContext } from "@azure/functions";

// This is a sample interface that describes the actual data in your event.
interface EventData {
  registertime: number;
  userid: string;
  regionid: string;
  gender: string;
}

export async function kafkaTrigger(
  event: any,
  context: InvocationContext
): Promise<void> {
  context.log("Event Offset: " + event.Offset);
  context.log("Event Partition: " + event.Partition);
  context.log("Event Topic: " + event.Topic);
  context.log("Event Timestamp: " + event.Timestamp);
  context.log("Event Value (as string): " + event.Value);

  let event_obj: EventData = JSON.parse(event.Value);

  context.log("Event Value Object: ");
  context.log("   Value.registertime: ", event_obj.registertime.toString());
  context.log("   Value.userid: ", event_obj.userid);
  context.log("   Value.regionid: ", event_obj.regionid);
  context.log("   Value.gender: ", event_obj.gender);
}

app.generic("Kafkatrigger", {
  trigger: {
    type: "kafkaTrigger",
    direction: "in",
    name: "event",
    topic: "topic",
    brokerList: "%BrokerList%",
    username: "%ConfluentCloudUserName%",
    password: "%ConfluentCloudPassword%",
    consumerGroup: "$Default",
    protocol: "saslSsl",
    authenticationMode: "plain",
    dataType: "string"
  },
  handler: kafkaTrigger,
});

:::code language="typescript" source="~/azure-functions-kafka-extension/samples/typescript-v4/src/functions/kafkaTrigger.ts" range="1-29,31-46":::

This function.json file defines the trigger for the Confluent provider:

:::code language="json" source="~/azure-functions-kafka-extension/samples/typescript/KafkaTrigger/function.confluent.json" :::

The following code runs when the function is triggered:

:::code language="typescript" source="~/azure-functions-kafka-extension/samples/typescript/KafkaTrigger/index.ts" :::

This function.json file defines the trigger for the Event Hubs provider:

:::code language="json" source="~/azure-functions-kafka-extension/samples/typescript/KafkaTrigger/function.eventhub.json" :::

The following code runs when the function is triggered:

:::code language="typescript" source="~/azure-functions-kafka-extension/samples/typescript/KafkaTrigger/index.ts" :::


To receive events in a batch, set the cardinality value to many, as shown in these examples:

import { app, InvocationContext } from "@azure/functions";

// This is a sample interface that describes the actual data in your event.
interface EventData {
    registertime: number;
    userid: string;
    regionid: string;
    gender: string;
}

interface KafkaEvent {
    Offset: number;
    Partition: number;
    Topic: string;
    Timestamp: number;
    Value: string;
}

export async function kafkaTriggerMany(
    events: any,
    context: InvocationContext
): Promise<void> {
    for (const event of events) {
        context.log("Event Offset: " + event.Offset);
        context.log("Event Partition: " + event.Partition);
        context.log("Event Topic: " + event.Topic);
        context.log("Event Timestamp: " + event.Timestamp);
        context.log("Event Value (as string): " + event.Value);

        let event_obj: EventData = JSON.parse(event.Value);

        context.log("Event Value Object: ");
        context.log("   Value.registertime: ", event_obj.registertime.toString());
        context.log("   Value.userid: ", event_obj.userid);
        context.log("   Value.regionid: ", event_obj.regionid);
        context.log("   Value.gender: ", event_obj.gender);
    }
}

app.generic("kafkaTriggerMany", {
  trigger: {
    type: "kafkaTrigger",
    direction: "in",
    name: "event",
    topic: "topic",
    brokerList: "%BrokerList%",
    username: "%ConfluentCloudUserName%",
    password: "%ConfluentCloudPassword%",
    consumerGroup: "$Default",
    protocol: "saslSsl",
    authenticationMode: "plain",
    dataType: "string",
    cardinality: "MANY"
  },
  handler: kafkaTriggerMany,
});

:::code language="typescript" source="~/azure-functions-kafka-extension/samples/typescript-v4/src/functions/kafkaTriggerMany.ts" range="1-39,41-57":::

:::code language="json" source="~/azure-functions-kafka-extension/samples/typescript/KafkaTriggerMany/function.confluent.json" :::

This code parses the array of events and logs the event data:

:::code language="typescript" source="~/azure-functions-kafka-extension/samples/typescript/KafkaTriggerMany/index.ts" :::

This code logs the header data:

:::code language="typescript" source="~/azure-functions-kafka-extension/samples/typescript/KafkaTriggerManyWithHeaders/index.ts" :::

:::code language="json" source="~/azure-functions-kafka-extension/samples/typescript/KafkaTriggerMany/function.eventhub.json" :::

This code parses the array of events and logs the event data:

:::code language="typescript" source="~/azure-functions-kafka-extension/samples/typescript/KafkaTriggerMany/index.ts" :::

This code logs the header data:

:::code language="typescript" source="~/azure-functions-kafka-extension/samples/typescript/KafkaTriggerManyWithHeaders/index.ts" :::


You can define a generic Avro schema for the event passed to the trigger. This example defines the trigger for the specific provider with a generic Avro schema:

:::code language="typescript" source="~/azure-functions-kafka-extension/samples/typescript-v4/src/functions/kafkaAvroGenericTrigger.ts" range="1-15,17-34":::

import { app, InvocationContext } from "@azure/functions";

export async function kafkaAvroGenericTrigger(
  event: any,
  context: InvocationContext
): Promise<void> {
  context.log("Processed kafka event: ", event);
  context.log(
    `Message ID: ${event.id}, amount: ${event.amount}, type: ${event.type}`
  );
  if (context.triggerMetadata?.key !== undefined) {
    context.log(`Message Key : ${context.triggerMetadata?.key}`);
  }
}

app.generic("kafkaAvroGenericTrigger", {
  trigger: {
    type: "kafkaTrigger",
    direction: "in",
    name: "event",
    protocol: "SASLSSL",
    password: "EventHubConnectionString",
    dataType: "string",
    topic: "topic",
    authenticationMode: "PLAIN",
    avroSchema:
      '{"type":"record","name":"Payment","namespace":"io.confluent.examples.clients.basicavro","fields":[{"name":"id","type":"string"},{"name":"amount","type":"double"},{"name":"type","type":"string"}]}',
    consumerGroup: "$Default",
    username: "$ConnectionString",
    brokerList: "%BrokerList%",
  },
  handler: kafkaAvroGenericTrigger,
});

:::code language="json" source="~/azure-functions-kafka-extension/samples/typescript/KafkaTriggerAvroGeneric/function.confluent.json" :::

The following code runs when the function is triggered:

:::code language="typescript" source="~/azure-functions-kafka-extension/samples/typescript/KafkaTriggerAvroGeneric/index.ts" :::

:::code language="json" source="~/azure-functions-kafka-extension/samples/typescript/KafkaTriggerAvroGeneric/function.eventhub.json" :::

The following code runs when the function is triggered:

:::code language="typescript" source="~/azure-functions-kafka-extension/samples/typescript/KafkaTriggerAvroGeneric/index.ts" :::


For a complete set of working TypeScript examples, see the Kafka extension repository.

For a complete set of working TypeScript examples, see the Kafka extension repository.


::: zone-end
::: zone pivot="programming-language-powershell"

The specific properties of the function.json file depend on your event provider. In these examples, the event providers are either Confluent or Azure Event Hubs. The following examples show a Kafka trigger for a function that reads and logs a Kafka message.

The following function.json file defines the trigger for the specific provider:

:::code language="json" source="~/azure-functions-kafka-extension/samples/powershell/KafkaTrigger/function.confluent.json" :::

:::code language="json" source="~/azure-functions-kafka-extension/samples/powershell/KafkaTrigger/function.eventhub.json" :::


The following code runs when the function is triggered:

:::code language="powershell" source="~/azure-functions-kafka-extension/samples/powershell/KafkaTrigger/run.ps1" :::

To receive events in a batch, set the cardinality value to many in the function.json file, as shown in the following examples:

:::code language="json" source="~/azure-functions-kafka-extension/samples/powershell/KafkaTriggerMany/function.confluent.json" :::

:::code language="json" source="~/azure-functions-kafka-extension/samples/powershell/KafkaTriggerMany/function.eventhub.json" :::


The following code parses the array of events and logs the event data:

:::code language="powershell" source="~/azure-functions-kafka-extension/samples/powershell/KafkaTriggerMany/run.ps1" :::

The following code logs the header data:

:::code language="powershell" source="~/azure-functions-kafka-extension/samples/powershell/KafkaTriggerManyWithHeaders/run.ps1" :::

You can define a generic Avro schema for the event passed to the trigger. The following function.json defines the trigger for the specific provider with a generic Avro schema:

:::code language="json" source="~/azure-functions-kafka-extension/samples/powershell/KafkaTriggerAvroGeneric/function.confluent.json" :::

:::code language="json" source="~/azure-functions-kafka-extension/samples/powershell/KafkaTriggerAvroGeneric/function.eventhub.json" :::


The following code runs when the function is triggered:

:::code language="powershell" source="~/azure-functions-kafka-extension/samples/powershell/KafkaTriggerAvroGeneric/run.ps1" :::

For a complete set of working PowerShell examples, see the Kafka extension repository.

::: zone-end
::: zone pivot="programming-language-python"
The usage of the trigger depends on your version of the Python programming model.

In the Python v2 model, you define your trigger directly in your function code using decorators. For more information, see the Azure Functions Python developer guide.

In the Python v1 model, you define your trigger in the function.json with your function code. For more information, see the Azure Functions Python developer guide.


These examples show how to define a Kafka trigger for a function that reads a Kafka message.

:::code language="python" source="~/azure-functions-kafka-extension/samples/python-v2/kafka_trigger.py" range="10-22" :::

This function.json file defines the trigger:

:::code language="json" source="~/azure-functions-kafka-extension/samples/python/KafkaTrigger/function.confluent.json" :::

This code runs when the function is triggered:

:::code language="python" source="~/azure-functions-kafka-extension/samples/python/KafkaTrigger/main.py" :::


This example receives events in a batch by setting the cardinality value to many.

:::code language="python" source="~/azure-functions-kafka-extension/samples/python-v2/kafka_trigger.py" range="24-38" :::

:::code language="json" source="~/azure-functions-kafka-extension/samples/python/KafkaTriggerMany/function.confluent.json" :::

This code parses the array of events and logs the event data:

:::code language="python" source="~/azure-functions-kafka-extension/samples/python/KafkaTriggerMany/main.py" :::

This code logs the header data:

:::code language="python" source="~/azure-functions-kafka-extension/samples/python/KafkaTriggerManyWithHeaders/init.py" :::


You can define a generic Avro schema for the event passed to the trigger.

:::code language="python" source="~/azure-functions-kafka-extension/samples/python-v2/kafka_trigger_avro.py" range="24-37" :::

This function.json defines the trigger with a generic Avro schema:

:::code language="json" source="~/azure-functions-kafka-extension/samples/python/KafkaTriggerAvroGeneric/function.confluent.json" :::

This code runs when the function is triggered:

:::code language="python" source="~/azure-functions-kafka-extension/samples/python/KafkaTriggerAvroGeneric/main.py" :::


For a complete set of working Python examples, see the Kafka extension repository.

For a complete set of working Python examples, see the Kafka extension repository.


::: zone-end
::: zone pivot="programming-language-java"

The annotations you use to configure your trigger depend on the specific event provider.

The following example shows a Java function that reads and logs the content of the Kafka event:

:::code language="java" source="~/azure-functions-kafka-extension/samples/java/confluent/src/main/java/com/contoso/kafka/SampleKafkaTrigger.java" range="19-35" :::

To receive events in a batch, use an input string as an array, as shown in the following example:

:::code language="java" source="~/azure-functions-kafka-extension/samples/java/confluent/src/main/java/com/contoso/kafka/KafkaTriggerMany.java" range="8-27" :::

The following function logs the message and headers for the Kafka Event:

:::code language="java" source="~/azure-functions-kafka-extension/samples/java/confluent/src/main/java/com/contoso/kafka/KafkaTriggerManyWithHeaders.java" range="12-38" :::

You can define a generic Avro schema for the event passed to the trigger. The following function defines a trigger for the specific provider with a generic Avro schema:

:::code language="java" source="~/azure-functions-kafka-extension/samples/java/confluent/src/main/java/com/contoso/kafka/avro/generic/KafkaTriggerAvroGeneric.java" range="15-31" :::

For a complete set of working Java examples for Confluent, see the Kafka extension repository.

The following example shows a Java function that reads and logs the content of the Kafka event:

:::code language="java" source="~/azure-functions-kafka-extension/samples/java/eventhub/src/main/java/com/contoso/kafka/SampleKafkaTrigger.java" range="19-35" :::

To receive events in a batch, use an input string as an array, as shown in the following example:

:::code language="java" source="~/azure-functions-kafka-extension/samples/java/eventhub/src/main/java/com/contoso/kafka/KafkaTriggerMany.java" range="8-27" :::

The following function logs the message and headers for the Kafka Event:

:::code language="java" source="~/azure-functions-kafka-extension/samples/java/eventhub/src/main/java/com/contoso/kafka/KafkaTriggerManyWithHeaders.java" range="12-38" :::

You can define a generic Avro schema for the event passed to the trigger. The following function defines a trigger for the specific provider with a generic Avro schema:

:::code language="java" source="~/azure-functions-kafka-extension/samples/java/eventhub/src/main/java/com/contoso/kafka/avro/generic/KafkaTriggerAvroGeneric.java" range="15-31" :::

For a complete set of working Java examples for Event Hubs, see the Kafka extension repository.


::: zone-end ::: zone pivot="programming-language-csharp"

Attributes

Both in-process and isolated worker process C# libraries use the KafkaTriggerAttribute to define the function trigger.

The following table explains the properties you can set by using this trigger attribute:

Parameter Description
BrokerList (Required) The list of Kafka brokers monitored by the trigger. See Connections for more information.
Topic (Required) The topic monitored by the trigger.
ConsumerGroup (Optional) Kafka consumer group used by the trigger.
AvroSchema (Optional) Schema of a generic record of message value when using the Avro protocol.
KeyAvroSchema (Optional) Schema of a generic record of message key when using the Avro protocol.
KeyDataType (Optional) Data type to receive the message key as from Kafka Topic. If KeyAvroSchema is set, this value is generic record. Accepted values are Int, Long, String, and Binary.
AuthenticationMode (Optional) The authentication mode when using Simple Authentication and Security Layer (SASL) authentication. The supported values are NotSet (default), Gssapi, Plain, ScramSha256, ScramSha512, and OAuthBearer.
Username (Optional) The username for SASL authentication. Not supported when AuthenticationMode is Gssapi. See Connections for more information.
Password (Optional) The password for SASL authentication. Not supported when AuthenticationMode is Gssapi. See Connections for more information.
Protocol (Optional) The security protocol used when communicating with brokers. The supported values are NotSet (default), plaintext, ssl, sasl_plaintext, sasl_ssl.
SslCaLocation (Optional) Path to CA certificate file for verifying the broker's certificate.
SslCertificateLocation (Optional) Path to the client's certificate.
SslKeyLocation (Optional) Path to client's private key (PEM) used for authentication.
SslKeyPassword (Optional) Password for client's certificate.
SslCertificatePEM (Optional) Client certificate in PEM format as a string. See Connections for more information.
SslKeyPEM (Optional) Client private key in PEM format as a string. See Connections for more information.
SslCaPEM (Optional) CA certificate in PEM format as a string. See Connections for more information.
SslCertificateandKeyPEM (Optional) Client certificate and key in PEM format as a string. See Connections for more information.
SchemaRegistryUrl (Optional) URL for the Avro Schema Registry. See Connections for more information.
SchemaRegistryUsername (Optional) Username for the Avro Schema Registry. See Connections for more information.
SchemaRegistryPassword (Optional) Password for the Avro Schema Registry. See Connections for more information.
OAuthBearerMethod (Optional) OAuth Bearer method. Accepted values are oidc and default.
OAuthBearerClientId (Optional) When OAuthBearerMethod is set to oidc, this specifies the OAuth bearer client ID. See Connections for more information.
OAuthBearerClientSecret (Optional) When OAuthBearerMethod is set to oidc, this specifies the OAuth bearer client secret. See Connections for more information.
OAuthBearerScope (Optional) Specifies the scope of the access request to the broker.
OAuthBearerTokenEndpointUrl (Optional) OAuth/OIDC issuer token endpoint HTTP(S) URI used to retrieve token when oidc method is used. See Connections for more information.
OAuthBearerExtensions (Optional) Comma-separated list of key=value pairs to be provided as additional information to broker when oidc method is used. For example: supportFeatureX=true,organizationId=sales-emea.

::: zone-end
::: zone pivot="programming-language-java"

Annotations

The KafkaTrigger annotation enables you to create a function that runs when it receives a topic. Supported options include the following elements:

Element Description
name (Required) The name of the variable that represents the queue or topic message in function code.
brokerList (Required) The list of Kafka brokers monitored by the trigger. See Connections for more information.
topic (Required) The topic monitored by the trigger.
cardinality (Optional) Indicates the cardinality of the trigger input. The supported values are ONE (default) and MANY. Use ONE when the input is a single message and MANY when the input is an array of messages. When you use MANY, you must also set a dataType.
dataType Defines how Functions handles the parameter value. By default, the value is obtained as a string and Functions tries to deserialize the string to actual plain-old Java object (POJO). When string, the input is treated as just a string. When binary, the message is received as binary data, and Functions tries to deserialize it to an actual parameter type byte[].
consumerGroup (Optional) Kafka consumer group used by the trigger.
avroSchema (Optional) Schema of a generic record when using the Avro protocol.
authenticationMode (Optional) The authentication mode when using Simple Authentication and Security Layer (SASL) authentication. The supported values are NotSet (default), Gssapi, Plain, ScramSha256, ScramSha512.
username (Optional) The username for SASL authentication. Not supported when AuthenticationMode is Gssapi. See Connections for more information.
password (Optional) The password for SASL authentication. Not supported when AuthenticationMode is Gssapi. See Connections for more information.
protocol (Optional) The security protocol used when communicating with brokers. The supported values are NotSet (default), plaintext, ssl, sasl_plaintext, sasl_ssl.
sslCaLocation (Optional) Path to CA certificate file for verifying the broker's certificate.
sslCertificateLocation (Optional) Path to the client's certificate.
sslKeyLocation (Optional) Path to client's private key (PEM) used for authentication.
sslKeyPassword (Optional) Password for client's certificate.
lagThreshold (Optional) Lag threshold for the trigger.
schemaRegistryUrl (Optional) URL for the Avro Schema Registry. See Connections for more information.
schemaRegistryUsername (Optional) Username for the Avro Schema Registry. See Connections for more information.
schemaRegistryPassword (Optional) Password for the Avro Schema Registry. See Connections for more information.

::: zone-end
::: zone pivot="programming-language-javascript,programming-language-powershell"

Configuration

The following table explains the binding configuration properties that you set in the function.json file.

function.json property Description
type (Required) Set to kafkaTrigger.
direction (Required) Set to in.
name (Required) The name of the variable that represents the brokered data in function code.
brokerList (Required) The list of Kafka brokers monitored by the trigger. See Connections for more information.
topic (Required) The topic monitored by the trigger.
cardinality (Optional) Indicates the cardinality of the trigger input. The supported values are ONE (default) and MANY. Use ONE when the input is a single message and MANY when the input is an array of messages. When you use MANY, you must also set a dataType.
dataType Defines how Functions handles the parameter value. By default, the value is obtained as a string and Functions tries to deserialize the string to actual plain-old Java object (POJO). When string, the input is treated as just a string. When binary, the message is received as binary data, and Functions tries to deserialize it to an actual byte array parameter type.
consumerGroup (Optional) Kafka consumer group used by the trigger.
avroSchema (Optional) Schema of a generic record when using the Avro protocol.
keyAvroSchema (Optional) Schema of a generic record of message key when using the Avro protocol.
keyDataType (Optional) Data type to receive the message key as from Kafka Topic. If keyAvroSchema is set, this value is generic record. Accepted values are Int, Long, String, and Binary.
authenticationMode (Optional) The authentication mode when using Simple Authentication and Security Layer (SASL) authentication. The supported values are NotSet (default), Gssapi, Plain, ScramSha256, ScramSha512.
username (Optional) The username for SASL authentication. Not supported when AuthenticationMode is Gssapi. See Connections for more information.
password (Optional) The password for SASL authentication. Not supported when AuthenticationMode is Gssapi. See Connections for more information.
protocol (Optional) The security protocol used when communicating with brokers. The supported values are NotSet (default), plaintext, ssl, sasl_plaintext, sasl_ssl.
sslCaLocation (Optional) Path to CA certificate file for verifying the broker's certificate.
sslCertificateLocation (Optional) Path to the client's certificate.
sslKeyLocation (Optional) Path to client's private key (PEM) used for authentication.
sslKeyPassword (Optional) Password for client's certificate.
sslCertificatePEM (Optional) Client certificate in PEM format as a string. See Connections for more information.
sslKeyPEM (Optional) Client private key in PEM format as a string. See Connections for more information.
sslCaPEM (Optional) CA certificate in PEM format as a string. See Connections for more information.
sslCertificateandKeyPEM (Optional) Client certificate and key in PEM format as a string. See Connections for more information.
lagThreshold (Optional) Lag threshold for the trigger.
schemaRegistryUrl (Optional) URL for the Avro Schema Registry. See Connections for more information.
schemaRegistryUsername (Optional) Username for the Avro Schema Registry. See Connections for more information.
schemaRegistryPassword (Optional) Password for the Avro Schema Registry. See Connections for more information.
oAuthBearerMethod (Optional) OAuth Bearer method. Accepted values are oidc and default.
oAuthBearerClientId (Optional) When oAuthBearerMethod is set to oidc, this specifies the OAuth bearer client ID. See Connections for more information.
oAuthBearerClientSecret (Optional) When oAuthBearerMethod is set to oidc, this specifies the OAuth bearer client secret. See Connections for more information.
oAuthBearerScope (Optional) Specifies the scope of the access request to the broker.
oAuthBearerTokenEndpointUrl (Optional) OAuth/OIDC issuer token endpoint HTTP(S) URI used to retrieve token when oidc method is used. See Connections for more information.

::: zone-end ::: zone pivot="programming-language-python"

Configuration

The following table explains the binding configuration properties that you set in the function.json file. Python uses snake_case naming conventions for configuration properties.

function.json property Description
type (Required) Set to kafkaTrigger.
direction (Required) Set to in.
name (Required) The name of the variable that represents the brokered data in function code.
broker_list (Required) The list of Kafka brokers monitored by the trigger. See Connections for more information.
topic (Required) The topic monitored by the trigger.
cardinality (Optional) Indicates the cardinality of the trigger input. The supported values are ONE (default) and MANY. Use ONE when the input is a single message and MANY when the input is an array of messages. When you use MANY, you must also set a data_type.
data_type Defines how Functions handles the parameter value. By default, the value is obtained as a string and Functions tries to deserialize the string to actual plain-old Java object (POJO). When string, the input is treated as just a string. When binary, the message is received as binary data, and Functions tries to deserialize it to an actual parameter type byte[].
consumerGroup (Optional) Kafka consumer group used by the trigger.
avroSchema (Optional) Schema of a generic record when using the Avro protocol.
authentication_mode (Optional) The authentication mode when using Simple Authentication and Security Layer (SASL) authentication. The supported values are NOTSET (default), Gssapi, Plain, ScramSha256, ScramSha512.
username (Optional) The username for SASL authentication. Not supported when authentication_mode is Gssapi. See Connections for more information.
password (Optional) The password for SASL authentication. Not supported when authentication_mode is Gssapi. See Connections for more information.
protocol (Optional) The security protocol used when communicating with brokers. The supported values are NOTSET (default), plaintext, ssl, sasl_plaintext, sasl_ssl.
sslCaLocation (Optional) Path to CA certificate file for verifying the broker's certificate.
sslCertificateLocation (Optional) Path to the client's certificate.
sslKeyLocation (Optional) Path to client's private key (PEM) used for authentication.
sslKeyPassword (Optional) Password for client's certificate.
lag_threshold (Optional) Lag threshold for the trigger.
schema_registry_url (Optional) URL for the Avro Schema Registry. See Connections for more information.
schema_registry_username (Optional) Username for the Avro Schema Registry. See Connections for more information.
schema_registry_password (Optional) Password for the Avro Schema Registry. See Connections for more information.
o_auth_bearer_method (Optional) OAuth Bearer method. Accepted values are oidc and default.
o_auth_bearer_client_id (Optional) When o_auth_bearer_method is set to oidc, this specifies the OAuth bearer client ID. See Connections for more information.
o_auth_bearer_client_secret (Optional) When o_auth_bearer_method is set to oidc, this specifies the OAuth bearer client secret. See Connections for more information.
o_auth_bearer_scope (Optional) Specifies the scope of the access request to the broker.
o_auth_bearer_token_endpoint_url (Optional) OAuth/OIDC issuer token endpoint HTTP(S) URI used to retrieve token when oidc method is used. See Connections for more information.

Note

Certificate PEM-related properties and Avro key-related properties aren't yet available in the Python library.

::: zone-end

Usage

::: zone pivot="programming-language-csharp"

The Kafka trigger currently supports Kafka events as strings and string arrays that are JSON payloads.

The Kafka trigger passes Kafka events to the function as KafkaEventData<string> objects or arrays. The trigger also supports strings and string arrays that are JSON payloads.


::: zone-end ::: zone pivot="programming-language-javascript,programming-language-python,programming-language-powershell"

The Kafka trigger passes Kafka messages to the function as strings. The trigger also supports string arrays that are JSON payloads.

::: zone-end

In a Premium plan, you must enable runtime scale monitoring for the Kafka output to scale out to multiple instances. To learn more, see Enable runtime scaling.

You can't use the Test/Run feature of the Code + Test page in the Azure portal to work with Kafka triggers. You must instead send test events directly to the topic being monitored by the trigger.

For a complete set of supported host.json settings for the Kafka trigger, see host.json settings.

[!INCLUDE functions-bindings-kafka-connections]

Next steps