1+ (**
2+
3+ *)
4+ #r " nuget: FSharp.Control.AsyncSeq,4.14.0"
5+ (**
6+ [ ![ Binder] ( https://mybinder.org/badge_logo.svg )] ( https://mybinder.org/v2/gh/fsprojects/FSharp.Control.AsyncSeq/gh-pages?filepath=AsyncSeqAdvanced.ipynb )
7+
8+ # Advanced AsyncSeq Operations
9+
10+ This document covers advanced ` AsyncSeq<'T> ` operations: partitioning a sequence into keyed
11+ sub-streams with ` groupBy ` , deduplication with ` distinctUntilChanged ` , and accumulating
12+ elements into time-or-count-bounded batches with ` bufferByCountAndTime ` .
13+
14+ *)
15+ open System
16+ open FSharp.Control
17+ (**
18+ ## Group By
19+
20+ ` AsyncSeq.groupBy ` partitions a sequence into sub-sequences based on a key, analogous to
21+ ` Seq.groupBy ` . Each key appears at most once in the output, paired with an ` AsyncSeq ` of the
22+ elements that share that key.
23+
24+ > ** Important:** the sub-sequences * must* be consumed in parallel. Sequential consumption will
25+ deadlock because no sub-sequence can complete until all others are also being consumed.
26+ >
27+
28+ ```
29+ --------------------------------------------------
30+ | source | e1 | e2 | e3 | e4 | |
31+ | key | k1 | k2 | k1 | k2 | |
32+ | result | k1 → [e1, e3] | k2 → [e2, e4] |
33+ --------------------------------------------------
34+ ```
35+
36+ A common use case is processing a stream of domain events where events for the same entity must
37+ be handled in order, but events for different entities are independent and can be handled in
38+ parallel:
39+
40+ *)
41+ type Event = {
42+ entityId : int64
43+ data : string
44+ }
45+
46+ let stream : AsyncSeq < Event > = failwith " TODO: connect to message bus"
47+
48+ let action ( e : Event ) : Async < unit > = failwith " TODO: process event"
49+
50+ // Process each entity's events sequentially, but different entities in parallel.
51+ stream
52+ |> AsyncSeq.groupBy ( fun e -> int e.entityId % 4 ) // hash into 4 buckets
53+ |> AsyncSeq.mapAsyncParallel ( snd >> AsyncSeq.iterAsync action)
54+ |> AsyncSeq.iter ignore
55+ (**
56+ We can combine this with batching for higher throughput. For example, when writing events to a
57+ full-text search index, batching writes improves performance while the ` groupBy ` ensures ordering
58+ within each entity:
59+
60+ *)
61+ let batchStream : AsyncSeq < Event []> = failwith " TODO: connect to batched source"
62+
63+ let batchAction ( es : Event []) : Async < unit > = failwith " TODO: bulk index"
64+
65+ batchStream
66+ |> AsyncSeq.concatSeq // flatten batches to individual events
67+ |> AsyncSeq.groupBy ( fun e -> int e.entityId % 4 ) // partition into 4 groups
68+ |> AsyncSeq.mapAsyncParallel ( snd
69+ >> AsyncSeq.bufferByCountAndTime 500 1000 // re-batch per sub-sequence
70+ >> AsyncSeq.iterAsync batchAction) // bulk index each batch
71+ |> AsyncSeq.iter ignore
72+ (**
73+ The above workflow: (1) reads events in batches, (2) flattens them, (3) partitions by entity into
74+ mutually-exclusive sub-sequences, (4) re-batches each sub-sequence by size/time, and (5) processes
75+ the batches in parallel while preserving per-entity ordering.
76+
77+ ## Distinct Until Changed
78+
79+ ` AsyncSeq.distinctUntilChanged ` passes through every element of the source sequence but drops
80+ consecutive duplicates, so downstream consumers only see values that are genuinely new.
81+
82+ ```
83+ -----------------------------------
84+ | source | a | a | b | b | b | a |
85+ | result | a | | b | | | a |
86+ -----------------------------------
87+ ```
88+
89+ A natural use case is polling a resource on a fixed schedule and reacting only when its state
90+ actually changes. Consider a background job whose progress is exposed via a ` getStatus ` call:
91+
92+ *)
93+ type Status = {
94+ completed : int
95+ finished : bool
96+ result : string
97+ }
98+
99+ let getStatus : Async < Status > = failwith " TODO: call job API"
100+
101+ /// Poll every second and emit each status reading.
102+ let statuses : AsyncSeq < Status > = asyncSeq {
103+ while true do
104+ let! s = getStatus
105+ yield s
106+ do ! Async.Sleep 1000
107+ }
108+
109+ /// Only emit when the status has actually changed.
110+ let distinctStatuses : AsyncSeq < Status > =
111+ statuses |> AsyncSeq.distinctUntilChanged
112+ (**
113+ We can now build a workflow that logs every status change and stops as soon as the job finishes:
114+
115+ *)
116+ let jobResult : Async < string > =
117+ distinctStatuses
118+ |> AsyncSeq.pick ( fun st ->
119+ printfn " status=%A " st
120+ if st.finished then Some st.result else None)
121+ (**
122+ ## Buffer by Count and Time
123+
124+ ` AsyncSeq.bufferByCountAndTime ` accumulates incoming elements and emits a batch whenever
125+ * either* the buffer reaches a given size * or* a timeout elapses — whichever comes first. If the
126+ buffer is empty when the timeout fires, nothing is emitted.
127+
128+ ```
129+ -------------------------------------------------------
130+ | source | a1 | a2 | a3 | a4 | |
131+ | result | | | [a1,a2,a3] | | [a4] |
132+ -------------------------------------------------------
133+ ← batch size 3 reached → ← timeout fires →
134+ ```
135+
136+ This is useful for services that write events to a bulk API (e.g. a search index). Fixed-size
137+ batching with ` AsyncSeq.bufferByCount ` can stall when the source slows down and a partial buffer
138+ never fills. ` bufferByCountAndTime ` avoids that by guaranteeing forward progress:
139+
140+ *)
141+ let events : AsyncSeq < Event > = failwith " TODO: connect to event source"
142+
143+ let bufferSize = 100
144+ let bufferTimeout = 1000 // milliseconds
145+
146+ let bufferedEvents : AsyncSeq < Event []> =
147+ events |> AsyncSeq.bufferByCountAndTime bufferSize bufferTimeout
0 commit comments