You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
- Promote H3 headings to H2 for better right-side navigation
- Rename headings to be more direct (e.g., 'Subscribe to multiple topics' instead of 'Configure data sources (MQTT or Kafka topics)')
- Update UI references from 'Message broker' to 'Data flow endpoint' to match current portal
- Add 'Use the source topic in the destination path' section clarifying that ${inputTopic} works for both data flows and data flow graphs
- Update internal anchor links
Copy file name to clipboardExpand all lines: articles/iot-operations/connect-to-cloud/howto-configure-dataflow-source.md
+26-20Lines changed: 26 additions & 20 deletions
Display the source diff
Display the rich diff
Original file line number
Diff line number
Diff line change
@@ -6,7 +6,7 @@ ms.author: sethm
6
6
ms.service: azure-iot-operations
7
7
ms.subservice: azure-data-flows
8
8
ms.topic: how-to
9
-
ms.date: 03/25/2026
9
+
ms.date: 03/26/2026
10
10
ai-usage: ai-assisted
11
11
12
12
#CustomerIntent: As an operator, I want to configure the source for a data flow or data flow graph.
@@ -19,22 +19,20 @@ ai-usage: ai-assisted
19
19
The source is where data enters a data flow or data flow graph. You configure the source by specifying an endpoint reference and a list of data sources (topics) for that endpoint.
20
20
21
21
> [!TIP]
22
-
> A single data flow source can subscribe to **multiple MQTT or Kafka topics** at once. You don't need to create separate data flows for each topic. Use the `dataSources` field (or **Topic(s)** > **Add row** in the operations experience) to add multiple topic filters, including wildcards. For more information, see [Configure MQTT or Kafka topics](#configure-data-sources-mqtt-or-kafka-topics).
22
+
> A single data flow source can subscribe to **multiple MQTT or Kafka topics** at once. You don't need to create separate data flows for each topic. Use the `dataSources` field (or **Topic(s)** > **Add row** in the operations experience) to add multiple topic filters, including wildcards. For more information, see [Subscribe to multiple topics](#subscribe-to-multiple-topics).
23
23
24
24
This page applies to both [data flows](overview-dataflow.md) and [data flow graphs](concept-dataflow-graphs.md). For data flows, the source is an operation in the `Dataflow` resource. For data flow graphs, the source is a `Source` node in the `DataflowGraph` resource.
25
25
26
26
> [!IMPORTANT]
27
27
> Data flows support MQTT and Kafka source endpoints. Data flow graphs support MQTT, Kafka, and OpenTelemetry source endpoints. Each data flow must have the Azure IoT Operations local MQTT broker default endpoint as either the source or destination. For more information, see [Data flows must use local MQTT broker endpoint](./howto-configure-dataflow-endpoint.md#data-flows-must-use-local-mqtt-broker-endpoint).
28
28
29
-
## Choose a source endpoint
30
-
31
29
You can use one of the following options as the source.
32
30
33
-
### Option 1: Use the default message broker endpoint
31
+
##Use the default endpoint
34
32
35
33
# [Operations experience](#tab/portal)
36
34
37
-
1. Under **Source details**, select **Message broker**.
35
+
1. Under **Source details**, select **Data flow endpoint**.
38
36
39
37
:::image type="content" source="media/howto-create-dataflow/dataflow-source-mqtt.png" alt-text="Screenshot of the operations experience interface showing the selection of the message broker as the source endpoint for a data flow.":::
40
38
@@ -43,7 +41,7 @@ You can use one of the following options as the source.
| Data flow endpoint | Select *default* to use the default MQTT message broker endpoint. |
46
-
| Topic | The topic filter to subscribe to for incoming messages. Use **Topic(s)** > **Add row** to add multiple topics. For more information on topics, see [Configure MQTT or Kafka topics](#configure-data-sources-mqtt-or-kafka-topics). |
44
+
| Topic | The topic filter to subscribe to for incoming messages. Use **Topic(s)** > **Add row** to add multiple topics. For more information on topics, see [Subscribe to multiple topics](#subscribe-to-multiple-topics). |
47
45
| Message schema | The schema to use to deserialize the incoming messages. See [Specify schema to deserialize data](#specify-source-schema). |
48
46
49
47
1. Select **Apply**.
@@ -87,9 +85,9 @@ sourceSettings:
87
85
88
86
---
89
87
90
-
Because `dataSources` accepts MQTT or Kafka topics without modifying the endpoint configuration, you can reuse the endpoint for multiple data flows even if the topics are different. For more information, see [Configure data sources](#configure-data-sources-mqtt-or-kafka-topics).
88
+
Because `dataSources` accepts MQTT or Kafka topics without modifying the endpoint configuration, you can reuse the endpoint for multiple data flows even if the topics are different. For more information, see [Subscribe to multiple topics](#subscribe-to-multiple-topics).
91
89
92
-
### Option 2: Use an asset as a source
90
+
## Use an asset as a source
93
91
94
92
# [Operations experience](#tab/portal)
95
93
@@ -123,13 +121,13 @@ When you use an asset as the source, the asset definition provides the schema fo
123
121
124
122
After you configure the source, the data from the asset reaches the data flow through the local MQTT broker. So, when you use an asset as the source, the data flow uses the local MQTT broker default endpoint as the source.
125
123
126
-
### Option 3: Use a custom MQTT or Kafka endpoint
124
+
## Use a custom MQTT or Kafka endpoint
127
125
128
126
If you created a custom MQTT or Kafka data flow endpoint (for example, to use with Event Grid or Event Hubs), you can use it as the source for the data flow. Remember that storage type endpoints, like Data Lake or Fabric OneLake, can't be used as a source.
129
127
130
128
# [Operations experience](#tab/portal)
131
129
132
-
1. Under **Source details**, select **Message broker**.
130
+
1. Under **Source details**, select **Data flow endpoint**.
133
131
134
132
:::image type="content" source="media/howto-create-dataflow/dataflow-source-custom.png" alt-text="Screenshot using operations experience to select a custom message broker as the source endpoint.":::
135
133
@@ -138,7 +136,7 @@ If you created a custom MQTT or Kafka data flow endpoint (for example, to use wi
| Data flow endpoint | Use the **Reselect** button to select a custom MQTT or Kafka data flow endpoint. For more information, see [Configure MQTT data flow endpoints](howto-configure-mqtt-endpoint.md) or [Configure Azure Event Hubs and Kafka data flow endpoints](howto-configure-kafka-endpoint.md).|
141
-
| Topic | The topic filter to subscribe to for incoming messages. Use **Topic(s)** > **Add row** to add multiple topics. For more information on topics, see [Configure MQTT or Kafka topics](#configure-data-sources-mqtt-or-kafka-topics). |
139
+
| Topic | The topic filter to subscribe to for incoming messages. Use **Topic(s)** > **Add row** to add multiple topics. For more information on topics, see [Subscribe to multiple topics](#subscribe-to-multiple-topics). |
142
140
| Message schema | The schema to use to deserialize the incoming messages. See [Specify schema to deserialize data](#specify-source-schema). |
143
141
144
142
1. Select **Apply**.
@@ -188,17 +186,17 @@ sourceSettings:
188
186
189
187
---
190
188
191
-
## Configure data sources (MQTT or Kafka topics)
189
+
## Subscribe to multiple topics
192
190
193
191
You can specify multiple MQTT or Kafka topics in a source without needing to modify the data flow endpoint configuration. This flexibility means you can reuse the same endpoint across multiple data flows, even if the topics vary. For more information, see [Reuse data flow endpoints](./howto-configure-dataflow-endpoint.md#reuse-endpoints).
194
192
195
-
### MQTT topics
193
+
## MQTT topic wildcards
196
194
197
195
When the source is an MQTT (Event Grid included) endpoint, use the MQTT topic filter to subscribe to incoming messages. The topic filter can include wildcards to subscribe to multiple topics. For example, `thermostats/+/sensor/temperature/#` subscribes to all temperature sensor messages from thermostats. To configure the MQTT topic filters:
198
196
199
197
# [Operations experience](#tab/portal)
200
198
201
-
In the operations experience data flow **Source details**, select **Message broker**, then use the **Topic(s)** field to specify the MQTT topic filters to subscribe to for incoming messages. To add multiple MQTT topics, select **Add row** and enter a new topic.
199
+
In the operations experience data flow **Source details**, select **Data flow endpoint**, then use the **Topic(s)** field to specify the MQTT topic filters to subscribe to for incoming messages. To add multiple MQTT topics, select **Add row** and enter a new topic.
202
200
203
201
:::image type="content" source="media/howto-configure-dataflow-source/dataflow-source-multiple-topics.png" alt-text="Screenshot of the operations experience interface showing multiple MQTT topic filters configured in the source details for a data flow.":::
204
202
@@ -284,13 +282,13 @@ Here, the wildcard `+` selects all devices under the `thermostats` and `humidifi
284
282
285
283
---
286
284
287
-
### Shared subscriptions
285
+
## Shared subscriptions
288
286
289
287
To use shared subscriptions with message broker sources, specify the shared subscription topic in the form of `$shared/<GROUP_NAME>/<TOPIC_FILTER>`.
290
288
291
289
# [Operations experience](#tab/portal)
292
290
293
-
In operations experience data flow **Source details**, select **Message broker** and use the **Topic** field to specify the shared subscription group and topic.
291
+
In operations experience data flow **Source details**, select **Data flow endpoint** and use the **Topic** field to specify the shared subscription group and topic.
294
292
295
293
# [Azure CLI](#tab/cli)
296
294
```json
@@ -331,7 +329,7 @@ You can explicitly create a topic named `$shared/mygroup/topic` in your configur
331
329
> [!IMPORTANT]
332
330
> Shared subscriptions are important for data flows when the instance count is greater than one and you're using Event Grid MQTT broker as a source, since it [doesn't support shared subscriptions](../../event-grid/mqtt-support.md#mqtt-v5-current-limitations). To avoid missing messages, set the data flow profile instance count to one when using Event Grid MQTT broker as the source. That is when the data flow is the subscriber and receiving messages from the cloud.
333
331
334
-
### Kafka topics
332
+
## Kafka topics
335
333
336
334
When the source is a Kafka (Event Hubs included) endpoint, specify the individual Kafka topics to subscribe to for incoming messages. Wildcards aren't supported, so you must specify each topic statically.
337
335
@@ -342,7 +340,7 @@ To configure the Kafka topics:
342
340
343
341
# [Operations experience](#tab/portal)
344
342
345
-
In the operations experience data flow **Source details**, select **Message broker**, then use the **Topic** field to specify the Kafka topic filter to subscribe to for incoming messages.
343
+
In the operations experience data flow **Source details**, select **Data flow endpoint**, then use the **Topic** field to specify the Kafka topic filter to subscribe to for incoming messages.
346
344
347
345
> [!NOTE]
348
346
> You can specify only one topic filter in the operations experience. To use multiple topic filters, use Bicep or Kubernetes.
@@ -386,6 +384,14 @@ sourceSettings:
386
384
387
385
---
388
386
387
+
## Use the source topic in the destination path
388
+
389
+
When you subscribe to multiple topics with wildcards, you can use the source topic as a variable in the destination path. This feature works with both data flows and data flow graphs.
390
+
391
+
Use `${inputTopic}` for the full source topic, or `${inputTopic.N}` to extract a specific segment (1-indexed). For example, if you subscribe to `factory/+/telemetry/#`, a message arriving on `factory/line1/telemetry/temp` can be routed to a destination topic like `processed/${inputTopic.2}/data`, which resolves to `processed/line1/data`.
392
+
393
+
For full details and examples, see [Dynamic destination topics](howto-configure-dataflow-destination.md#dynamic-destination-topics).
394
+
389
395
## Specify source schema
390
396
391
397
When you use MQTT or Kafka as the source, you can specify a [schema](concept-schema-registry.md) to display the list of data points in the operations experience web UI. Using a schema to deserialize and validate incoming messages [isn't currently supported](../troubleshoot/known-issues.md#data-flows-issues).
@@ -399,7 +405,7 @@ To configure the schema used to deserialize the incoming messages from a source:
399
405
400
406
# [Operations experience](#tab/portal)
401
407
402
-
In the Operations experience data flow **Source details**, select **Message broker** and use the **Message schema** field to specify the schema. Select **Upload** to upload a schema file. For more information, see [Understand message schemas](concept-schema-registry.md).
408
+
In the Operations experience data flow **Source details**, select **Data flow endpoint** and use the **Message schema** field to specify the schema. Select **Upload** to upload a schema file. For more information, see [Understand message schemas](concept-schema-registry.md).
0 commit comments