|
| 1 | +(** |
| 2 | +
|
| 3 | +*) |
| 4 | +#r "nuget: FSharp.Control.AsyncSeq,4.11.0" |
| 5 | +(** |
| 6 | +[](https://mybinder.org/v2/gh/fsprojects/FSharp.Control.AsyncSeq/gh-pages?filepath=AsyncSeqAdvanced.ipynb) |
| 7 | +
|
| 8 | +# Advanced AsyncSeq Operations |
| 9 | +
|
| 10 | +This document covers advanced `AsyncSeq<'T>` operations: partitioning a sequence into keyed |
| 11 | +sub-streams with `groupBy`, deduplication with `distinctUntilChanged`, and accumulating |
| 12 | +elements into time-or-count-bounded batches with `bufferByCountAndTime`. |
| 13 | +
|
| 14 | +*) |
| 15 | +open System |
| 16 | +open FSharp.Control |
| 17 | +(** |
| 18 | +## Group By |
| 19 | +
|
| 20 | +`AsyncSeq.groupBy` partitions a sequence into sub-sequences based on a key, analogous to |
| 21 | +`Seq.groupBy`. Each key appears at most once in the output, paired with an `AsyncSeq` of the |
| 22 | +elements that share that key. |
| 23 | +
|
| 24 | +> **Important:** the sub-sequences **must** be consumed in parallel. Sequential consumption will |
| 25 | +deadlock because no sub-sequence can complete until all others are also being consumed. |
| 26 | +> |
| 27 | +
|
| 28 | +``` |
| 29 | +-------------------------------------------------- |
| 30 | +| source | e1 | e2 | e3 | e4 | | |
| 31 | +| key | k1 | k2 | k1 | k2 | | |
| 32 | +| result | k1 → [e1, e3] | k2 → [e2, e4] | |
| 33 | +-------------------------------------------------- |
| 34 | +``` |
| 35 | +
|
| 36 | +A common use case is processing a stream of domain events where events for the same entity must |
| 37 | +be handled in order, but events for different entities are independent and can be handled in |
| 38 | +parallel: |
| 39 | +
|
| 40 | +*) |
| 41 | +type Event = { |
| 42 | + entityId : int64 |
| 43 | + data : string |
| 44 | +} |
| 45 | + |
| 46 | +let stream : AsyncSeq<Event> = failwith "TODO: connect to message bus" |
| 47 | + |
| 48 | +let action (e: Event) : Async<unit> = failwith "TODO: process event" |
| 49 | + |
| 50 | +// Process each entity's events sequentially, but different entities in parallel. |
| 51 | +stream |
| 52 | +|> AsyncSeq.groupBy (fun e -> int e.entityId % 4) // hash into 4 buckets |
| 53 | +|> AsyncSeq.mapAsyncParallel (snd >> AsyncSeq.iterAsync action) |
| 54 | +|> AsyncSeq.iter ignore |
| 55 | +(** |
| 56 | +We can combine this with batching for higher throughput. For example, when writing events to a |
| 57 | +full-text search index, batching writes improves performance while the `groupBy` ensures ordering |
| 58 | +within each entity: |
| 59 | +
|
| 60 | +*) |
| 61 | +let batchStream : AsyncSeq<Event[]> = failwith "TODO: connect to batched source" |
| 62 | + |
| 63 | +let batchAction (es: Event[]) : Async<unit> = failwith "TODO: bulk index" |
| 64 | + |
| 65 | +batchStream |
| 66 | +|> AsyncSeq.concatSeq // flatten batches to individual events |
| 67 | +|> AsyncSeq.groupBy (fun e -> int e.entityId % 4) // partition into 4 groups |
| 68 | +|> AsyncSeq.mapAsyncParallel (snd |
| 69 | + >> AsyncSeq.bufferByCountAndTime 500 1000 // re-batch per sub-sequence |
| 70 | + >> AsyncSeq.iterAsync batchAction) // bulk index each batch |
| 71 | +|> AsyncSeq.iter ignore |
| 72 | +(** |
| 73 | +The above workflow: (1) reads events in batches, (2) flattens them, (3) partitions by entity into |
| 74 | +mutually-exclusive sub-sequences, (4) re-batches each sub-sequence by size/time, and (5) processes |
| 75 | +the batches in parallel while preserving per-entity ordering. |
| 76 | +
|
| 77 | +## Distinct Until Changed |
| 78 | +
|
| 79 | +`AsyncSeq.distinctUntilChanged` passes through every element of the source sequence but drops |
| 80 | +consecutive duplicates, so downstream consumers only see values that are genuinely new. |
| 81 | +
|
| 82 | +``` |
| 83 | +----------------------------------- |
| 84 | +| source | a | a | b | b | b | a | |
| 85 | +| result | a | | b | | | a | |
| 86 | +----------------------------------- |
| 87 | +``` |
| 88 | +
|
| 89 | +A natural use case is polling a resource on a fixed schedule and reacting only when its state |
| 90 | +actually changes. Consider a background job whose progress is exposed via a `getStatus` call: |
| 91 | +
|
| 92 | +*) |
| 93 | +type Status = { |
| 94 | + completed : int |
| 95 | + finished : bool |
| 96 | + result : string |
| 97 | +} |
| 98 | + |
| 99 | +let getStatus : Async<Status> = failwith "TODO: call job API" |
| 100 | + |
| 101 | +/// Poll every second and emit each status reading. |
| 102 | +let statuses : AsyncSeq<Status> = asyncSeq { |
| 103 | + while true do |
| 104 | + let! s = getStatus |
| 105 | + yield s |
| 106 | + do! Async.Sleep 1000 |
| 107 | +} |
| 108 | + |
| 109 | +/// Only emit when the status has actually changed. |
| 110 | +let distinctStatuses : AsyncSeq<Status> = |
| 111 | + statuses |> AsyncSeq.distinctUntilChanged |
| 112 | +(** |
| 113 | +We can now build a workflow that logs every status change and stops as soon as the job finishes: |
| 114 | +
|
| 115 | +*) |
| 116 | +let jobResult : Async<string> = |
| 117 | + distinctStatuses |
| 118 | + |> AsyncSeq.pick (fun st -> |
| 119 | + printfn "status=%A" st |
| 120 | + if st.finished then Some st.result else None) |
| 121 | +(** |
| 122 | +## Buffer by Count and Time |
| 123 | +
|
| 124 | +`AsyncSeq.bufferByCountAndTime` accumulates incoming elements and emits a batch whenever |
| 125 | +**either** the buffer reaches a given size **or** a timeout elapses — whichever comes first. If the |
| 126 | +buffer is empty when the timeout fires, nothing is emitted. |
| 127 | +
|
| 128 | +``` |
| 129 | +------------------------------------------------------- |
| 130 | +| source | a1 | a2 | a3 | a4 | | |
| 131 | +| result | | | [a1,a2,a3] | | [a4] | |
| 132 | +------------------------------------------------------- |
| 133 | + ← batch size 3 reached → ← timeout fires → |
| 134 | +``` |
| 135 | +
|
| 136 | +This is useful for services that write events to a bulk API (e.g. a search index). Fixed-size |
| 137 | +batching with `AsyncSeq.bufferByCount` can stall when the source slows down and a partial buffer |
| 138 | +never fills. `bufferByCountAndTime` avoids that by guaranteeing forward progress: |
| 139 | +
|
| 140 | +*) |
| 141 | +let events : AsyncSeq<Event> = failwith "TODO: connect to event source" |
| 142 | + |
| 143 | +let bufferSize = 100 |
| 144 | +let bufferTimeout = 1000 // milliseconds |
| 145 | + |
| 146 | +let bufferedEvents : AsyncSeq<Event[]> = |
| 147 | + events |> AsyncSeq.bufferByCountAndTime bufferSize bufferTimeout |
| 148 | + |
0 commit comments