Skip to content

Commit 4ea2c7a

Browse files
authored
Merge pull request #313560 from sethmanheim/regrp5
5: more dataflow articles
2 parents 61f498c + 501e72d commit 4ea2c7a

3 files changed

Lines changed: 995 additions & 0 deletions

File tree

Lines changed: 382 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,382 @@
1+
---
2+
title: Enrich data with external datasets in data flow graphs
3+
description: Learn how to augment incoming messages with data from an external state store by configuring datasets in Azure IoT Operations data flow graphs.
4+
author: sethmanheim
5+
ms.author: sethm
6+
ms.service: azure-iot-operations
7+
ms.subservice: azure-data-flows
8+
ms.topic: how-to
9+
ms.date: 03/19/2026
10+
ai-usage: ai-assisted
11+
12+
---
13+
14+
# Enrich data with external datasets in data flow graphs
15+
16+
[!INCLUDE [kubernetes-management-preview-note](../includes/kubernetes-management-preview-note.md)]
17+
18+
Sometimes the incoming message doesn't contain everything you need. A temperature reading might arrive with a device ID, but the display name, location, and calibration offset live in a separate lookup table. Enrichment lets you pull that external data into your transform rules.
19+
20+
For an overview of data flow graphs, see [Data flow graphs overview](concept-dataflow-graphs.md).
21+
22+
## Prerequisites
23+
24+
- An Azure IoT Operations instance deployed on an Arc-enabled Kubernetes cluster. For more information, see [Deploy Azure IoT Operations](../deploy-iot-ops/howto-deploy-iot-operations.md).
25+
- A default registry endpoint named `default` that points to `mcr.microsoft.com` is automatically created during deployment.
26+
27+
## What is enrichment
28+
29+
You can augment incoming messages with data from an external state store, called a *contextualization dataset*. During processing, the runtime looks up records in the dataset and matches them against the incoming message using a condition you define. The matched fields then become available to your rules.
30+
31+
Enrichment works with **map**, **filter**, and **branch** transforms. It isn't supported in window transforms.
32+
33+
## Configure a dataset
34+
35+
Datasets are defined in the `datasets` array at the top level of your rules configuration, alongside `map`, `filter`, or `branch`.
36+
37+
# [Operations experience](#tab/portal)
38+
39+
In the transform configuration, add a dataset. Configure:
40+
41+
| Setting | Description |
42+
|---------|-------------|
43+
| **State store key** | The key where dataset records are stored. Use `as` to assign an alias (for example, `device-metadata as device`). |
44+
| **Match inputs** | Fields to compare: one from the source message (`$source.<field>`) and one from the dataset (`$context.<field>`). |
45+
| **Match expression** | A boolean expression (for example, `$1 == $2`). |
46+
47+
# [Bicep](#tab/bicep)
48+
49+
The dataset configuration is part of the rules JSON:
50+
51+
```bicep
52+
configuration: [
53+
{
54+
key: 'rules'
55+
value: '{"datasets":[{"key":"device-metadata as device","inputs":["$source.deviceId","$context.deviceId"],"expression":"$1 == $2"}],"map":[{"inputs":["$context(device).displayName"],"output":"deviceName"}]}'
56+
}
57+
]
58+
```
59+
60+
# [Kubernetes (preview)](#tab/kubernetes)
61+
62+
```json
63+
{
64+
"datasets": [
65+
{
66+
"key": "device-metadata as device",
67+
"inputs": ["$source.deviceId", "$context.deviceId"],
68+
"expression": "$1 == $2"
69+
}
70+
],
71+
"map": [
72+
{
73+
"inputs": ["$context(device).displayName"],
74+
"output": "deviceName"
75+
}
76+
]
77+
}
78+
```
79+
80+
---
81+
82+
Each dataset entry has these properties:
83+
84+
| Property | Required | Description |
85+
|----------|----------|-------------|
86+
| `key` | Yes | The state store key where the dataset records are stored. Supports an optional alias with the `as` keyword. |
87+
| `inputs` | Yes | List of field references used in the match expression. Each entry uses a `$source.` or `$context.` prefix. |
88+
| `expression` | Yes | A boolean expression that determines which dataset record matches the incoming message. |
89+
90+
### Key and alias
91+
92+
The `key` value is the state store key that the runtime reads. Assign a shorter alias with the `as` keyword. For example, `datasets.parag10.rule42 as position` lets you reference fields as `$context(position).WorkingHours`.
93+
94+
### Dataset inputs
95+
96+
Each entry in the `inputs` array uses a prefix to indicate where the value comes from:
97+
98+
- `$source.<field>`: reads from the incoming message.
99+
- `$context.<field>`: reads from the dataset record being evaluated.
100+
101+
Inputs can appear in any order and you can mix `$source` and `$context` references freely. Wildcard inputs aren't supported in dataset definitions.
102+
103+
### Match expression
104+
105+
The `expression` evaluates to a boolean. The runtime loads the dataset from the state store as NDJSON (one JSON object per line), iterates through the records, and returns the first record where the expression evaluates to `true`.
106+
107+
If no record matches, the enrichment fields aren't available and any rule that depends on them is skipped for that message.
108+
109+
## Use enriched data in rules
110+
111+
Reference matched record fields in any rule's `inputs` array using `$context(<alias>).<fieldPath>`.
112+
113+
### Map example
114+
115+
# [Operations experience](#tab/portal)
116+
117+
Add map rules that reference enriched fields:
118+
119+
| Input | Output |
120+
|-------|--------|
121+
| `$context(position).WorkingHours` | `WorkingHours` |
122+
| `rawValue` and `$context(product).multiplier` | `adjustedValue` (expression: `$1 * $2`) |
123+
124+
# [Bicep](#tab/bicep)
125+
126+
The enriched field references are part of the map rules JSON:
127+
128+
```bicep
129+
'{"datasets":[...],"map":[{"inputs":["$context(position).WorkingHours"],"output":"WorkingHours"},{"inputs":["rawValue","$context(product).multiplier"],"output":"adjustedValue","expression":"$1 * $2"}]}'
130+
```
131+
132+
# [Kubernetes (preview)](#tab/kubernetes)
133+
134+
```yaml
135+
- inputs:
136+
- "$context(position).WorkingHours"
137+
output: WorkingHours
138+
139+
- inputs:
140+
- rawValue # $1
141+
- "$context(product).multiplier" # $2
142+
output: adjustedValue
143+
expression: "$1 * $2"
144+
```
145+
146+
---
147+
148+
### Filter example
149+
150+
# [Operations experience](#tab/portal)
151+
152+
Add a filter rule with inputs `rawValue`, `$context(limits).multiplier`, and `$context(limits).baseLimit`, and expression `$1 * $2 > $3`.
153+
154+
# [Bicep](#tab/bicep)
155+
156+
```bicep
157+
'{"datasets":[{"key":"device_limits as limits","inputs":["$source.deviceId","$context.deviceId"],"expression":"$1 == $2"}],"filter":[{"inputs":["rawValue","$context(limits).multiplier","$context(limits).baseLimit"],"expression":"$1 * $2 > $3"}]}'
158+
```
159+
160+
# [Kubernetes (preview)](#tab/kubernetes)
161+
162+
```json
163+
{
164+
"datasets": [
165+
{
166+
"key": "device_limits as limits",
167+
"inputs": ["$source.deviceId", "$context.deviceId"],
168+
"expression": "$1 == $2"
169+
}
170+
],
171+
"filter": [
172+
{
173+
"inputs": ["rawValue", "$context(limits).multiplier", "$context(limits).baseLimit"],
174+
"expression": "$1 * $2 > $3"
175+
}
176+
]
177+
}
178+
```
179+
180+
---
181+
182+
### Branch example
183+
184+
# [Operations experience](#tab/portal)
185+
186+
Configure a branch rule with inputs `quantity`, `$context(mult).factor`, and `$context(mult).threshold`, and expression `$1 * $2 > $3`.
187+
188+
# [Bicep](#tab/bicep)
189+
190+
```bicep
191+
'{"datasets":[{"key":"multipliers as mult","inputs":["$source.productCode","$context.productCode"],"expression":"$1 == $2"}],"branch":{"inputs":["quantity","$context(mult).factor","$context(mult).threshold"],"expression":"$1 * $2 > $3"}}'
192+
```
193+
194+
# [Kubernetes (preview)](#tab/kubernetes)
195+
196+
```json
197+
{
198+
"datasets": [
199+
{
200+
"key": "multipliers as mult",
201+
"inputs": ["$source.productCode", "$context.productCode"],
202+
"expression": "$1 == $2"
203+
}
204+
],
205+
"branch": {
206+
"inputs": ["quantity", "$context(mult).factor", "$context(mult).threshold"],
207+
"expression": "$1 * $2 > $3"
208+
}
209+
}
210+
```
211+
212+
---
213+
214+
## Wildcards with datasets
215+
216+
In map rules, use `$context(<alias>).*` to copy all top-level fields from the matched dataset record:
217+
218+
# [Operations experience](#tab/portal)
219+
220+
Add a map rule with input `$context(device).*` and output `*`.
221+
222+
# [Bicep](#tab/bicep)
223+
224+
```bicep
225+
{
226+
inputs: [ '$context(device).*' ]
227+
output: '*'
228+
}
229+
```
230+
231+
# [Kubernetes (preview)](#tab/kubernetes)
232+
233+
```yaml
234+
- inputs:
235+
- "$context(device).*"
236+
output: "*"
237+
```
238+
239+
---
240+
241+
You can also target a nested object within the dataset record. For example, `$context(device).configuration.*` copies only the fields under `configuration`.
242+
243+
Wildcard enrichment inputs are supported only in map rules. Filter and branch rules don't support wildcard inputs.
244+
245+
## Set up the state store
246+
247+
The runtime reads dataset records from the Azure IoT Operations distributed state store. Each dataset key maps to one or more records in NDJSON format (one JSON object per line). The runtime caches records and receives change notifications, so state store updates are reflected in processing.
248+
249+
For information on configuring the distributed state store, see [State store overview](../develop-edge-apps/overview-state-store.md).
250+
251+
## Deploy a data flow graph with enrichment
252+
253+
# [Operations experience](#tab/portal)
254+
255+
In the Operations experience, create a data flow graph with enrichment:
256+
257+
1. Add a **source** that reads from your MQTT topic.
258+
1. Add a **map** transform. In the dataset configuration, add a dataset with the state store key and match condition.
259+
1. In the map rules, reference enriched fields using `$context(<alias>).<field>` syntax.
260+
1. Add a **destination** that sends to your output topic.
261+
262+
# [Bicep](#tab/bicep)
263+
264+
```bicep
265+
resource dataflowGraph 'Microsoft.IoTOperations/instances/dataflowProfiles/dataflowGraphs@2025-10-01' = {
266+
name: 'enrich-example'
267+
parent: dataflowProfile
268+
properties: {
269+
profileRef: dataflowProfileName
270+
mode: 'Enabled'
271+
nodes: [
272+
{
273+
nodeType: 'Source'
274+
name: 'sensors'
275+
sourceSettings: {
276+
endpointRef: 'default'
277+
dataSources: [ 'telemetry/sensors' ]
278+
}
279+
}
280+
{
281+
nodeType: 'Graph'
282+
name: 'enrich-and-map'
283+
graphSettings: {
284+
registryEndpointRef: 'default'
285+
artifact: 'azureiotoperations/graph-dataflow-map:1.0.0'
286+
configuration: [
287+
{
288+
key: 'rules'
289+
value: '{"datasets":[{"key":"device-metadata as device","inputs":["$source.deviceId","$context.deviceId"],"expression":"$1 == $2"}],"map":[{"inputs":["*"],"output":"*"},{"inputs":["$context(device).displayName"],"output":"deviceName"},{"inputs":["$context(device).location"],"output":"location"}]}'
290+
}
291+
]
292+
}
293+
}
294+
{
295+
nodeType: 'Destination'
296+
name: 'output'
297+
destinationSettings: {
298+
endpointRef: 'default'
299+
dataDestination: 'telemetry/enriched'
300+
}
301+
}
302+
]
303+
nodeConnections: [
304+
{ from: { name: 'sensors' }, to: { name: 'enrich-and-map' } }
305+
{ from: { name: 'enrich-and-map' }, to: { name: 'output' } }
306+
]
307+
}
308+
}
309+
```
310+
311+
# [Kubernetes (preview)](#tab/kubernetes)
312+
313+
```yaml
314+
apiVersion: connectivity.iotoperations.azure.com/v1
315+
kind: DataflowGraph
316+
metadata:
317+
name: enrich-example
318+
namespace: azure-iot-operations
319+
spec:
320+
profileRef: default
321+
nodes:
322+
- nodeType: Source
323+
name: sensors
324+
sourceSettings:
325+
endpointRef: default
326+
dataSources:
327+
- telemetry/sensors
328+
329+
- nodeType: Graph
330+
name: enrich-and-map
331+
graphSettings:
332+
registryEndpointRef: default
333+
artifact: azureiotoperations/graph-dataflow-map:1.0.0
334+
configuration:
335+
- key: rules
336+
value: |
337+
{
338+
"datasets": [
339+
{
340+
"key": "device-metadata as device",
341+
"inputs": ["$source.deviceId", "$context.deviceId"],
342+
"expression": "$1 == $2"
343+
}
344+
],
345+
"map": [
346+
{ "inputs": ["*"], "output": "*" },
347+
{ "inputs": ["$context(device).displayName"], "output": "deviceName" },
348+
{ "inputs": ["$context(device).location"], "output": "location" }
349+
]
350+
}
351+
352+
- nodeType: Destination
353+
name: output
354+
destinationSettings:
355+
endpointRef: default
356+
dataDestination: telemetry/enriched
357+
358+
nodeConnections:
359+
- from: { name: sensors }
360+
to: { name: enrich-and-map }
361+
- from: { name: enrich-and-map }
362+
to: { name: output }
363+
```
364+
365+
---
366+
367+
## Limitations
368+
369+
- **Not supported in window transforms.** Enrichment datasets aren't available in window (accumulate) transforms.
370+
- **First match wins.** The runtime uses the first record where the expression evaluates to `true`.
371+
- **Missing matches skip enriched rules.** If no dataset record matches, rules that reference `$context(<alias>)` fields are skipped. The transformation doesn't fail.
372+
- **State store errors propagate.** If the state store is unreachable, the transformation fails for that message.
373+
- **No wildcard inputs in dataset definitions.** Each input must be a specific `$source.<field>` or `$context.<field>` reference.
374+
375+
## Next steps
376+
377+
- [Transform data with map](howto-dataflow-graphs-map.md)
378+
- [Filter and route data](howto-dataflow-graphs-filter-route.md)
379+
- [Aggregate data over time](howto-dataflow-graphs-window.md)
380+
- [Expressions reference](concept-dataflow-graphs-expressions.md)
381+
- [Configure a source](howto-configure-dataflow-source.md)
382+
- [Configure a destination](howto-configure-dataflow-destination.md)

0 commit comments

Comments
 (0)