Skip to content

Commit dde3b6a

Browse files
committed
deploy: e24654a
0 parents  commit dde3b6a

42 files changed

Lines changed: 29901 additions & 0 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.nojekyll

Whitespace-only changes.

AsyncSeqAdvanced.fsx

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
(**
2+
3+
*)
4+
#r "nuget: FSharp.Control.AsyncSeq,4.11.0"
5+
(**
6+
[![Binder](https://mybinder.org/badge_logo.svg)](https://mybinder.org/v2/gh/fsprojects/FSharp.Control.AsyncSeq/gh-pages?filepath=AsyncSeqAdvanced.ipynb)
7+
8+
# Advanced AsyncSeq Operations
9+
10+
This document covers advanced `AsyncSeq<'T>` operations: partitioning a sequence into keyed
11+
sub-streams with `groupBy`, deduplication with `distinctUntilChanged`, and accumulating
12+
elements into time-or-count-bounded batches with `bufferByCountAndTime`.
13+
14+
*)
15+
open System
16+
open FSharp.Control
17+
(**
18+
## Group By
19+
20+
`AsyncSeq.groupBy` partitions a sequence into sub-sequences based on a key, analogous to
21+
`Seq.groupBy`. Each key appears at most once in the output, paired with an `AsyncSeq` of the
22+
elements that share that key.
23+
24+
> **Important:** the sub-sequences **must** be consumed in parallel. Sequential consumption will
25+
deadlock because no sub-sequence can complete until all others are also being consumed.
26+
>
27+
28+
```
29+
--------------------------------------------------
30+
| source | e1 | e2 | e3 | e4 | |
31+
| key | k1 | k2 | k1 | k2 | |
32+
| result | k1 → [e1, e3] | k2 → [e2, e4] |
33+
--------------------------------------------------
34+
```
35+
36+
A common use case is processing a stream of domain events where events for the same entity must
37+
be handled in order, but events for different entities are independent and can be handled in
38+
parallel:
39+
40+
*)
41+
type Event = {
42+
entityId : int64
43+
data : string
44+
}
45+
46+
let stream : AsyncSeq<Event> = failwith "TODO: connect to message bus"
47+
48+
let action (e: Event) : Async<unit> = failwith "TODO: process event"
49+
50+
// Process each entity's events sequentially, but different entities in parallel.
51+
stream
52+
|> AsyncSeq.groupBy (fun e -> int e.entityId % 4) // hash into 4 buckets
53+
|> AsyncSeq.mapAsyncParallel (snd >> AsyncSeq.iterAsync action)
54+
|> AsyncSeq.iter ignore
55+
(**
56+
We can combine this with batching for higher throughput. For example, when writing events to a
57+
full-text search index, batching writes improves performance while the `groupBy` ensures ordering
58+
within each entity:
59+
60+
*)
61+
let batchStream : AsyncSeq<Event[]> = failwith "TODO: connect to batched source"
62+
63+
let batchAction (es: Event[]) : Async<unit> = failwith "TODO: bulk index"
64+
65+
batchStream
66+
|> AsyncSeq.concatSeq // flatten batches to individual events
67+
|> AsyncSeq.groupBy (fun e -> int e.entityId % 4) // partition into 4 groups
68+
|> AsyncSeq.mapAsyncParallel (snd
69+
>> AsyncSeq.bufferByCountAndTime 500 1000 // re-batch per sub-sequence
70+
>> AsyncSeq.iterAsync batchAction) // bulk index each batch
71+
|> AsyncSeq.iter ignore
72+
(**
73+
The above workflow: (1) reads events in batches, (2) flattens them, (3) partitions by entity into
74+
mutually-exclusive sub-sequences, (4) re-batches each sub-sequence by size/time, and (5) processes
75+
the batches in parallel while preserving per-entity ordering.
76+
77+
## Distinct Until Changed
78+
79+
`AsyncSeq.distinctUntilChanged` passes through every element of the source sequence but drops
80+
consecutive duplicates, so downstream consumers only see values that are genuinely new.
81+
82+
```
83+
-----------------------------------
84+
| source | a | a | b | b | b | a |
85+
| result | a | | b | | | a |
86+
-----------------------------------
87+
```
88+
89+
A natural use case is polling a resource on a fixed schedule and reacting only when its state
90+
actually changes. Consider a background job whose progress is exposed via a `getStatus` call:
91+
92+
*)
93+
type Status = {
94+
completed : int
95+
finished : bool
96+
result : string
97+
}
98+
99+
let getStatus : Async<Status> = failwith "TODO: call job API"
100+
101+
/// Poll every second and emit each status reading.
102+
let statuses : AsyncSeq<Status> = asyncSeq {
103+
while true do
104+
let! s = getStatus
105+
yield s
106+
do! Async.Sleep 1000
107+
}
108+
109+
/// Only emit when the status has actually changed.
110+
let distinctStatuses : AsyncSeq<Status> =
111+
statuses |> AsyncSeq.distinctUntilChanged
112+
(**
113+
We can now build a workflow that logs every status change and stops as soon as the job finishes:
114+
115+
*)
116+
let jobResult : Async<string> =
117+
distinctStatuses
118+
|> AsyncSeq.pick (fun st ->
119+
printfn "status=%A" st
120+
if st.finished then Some st.result else None)
121+
(**
122+
## Buffer by Count and Time
123+
124+
`AsyncSeq.bufferByCountAndTime` accumulates incoming elements and emits a batch whenever
125+
**either** the buffer reaches a given size **or** a timeout elapses — whichever comes first. If the
126+
buffer is empty when the timeout fires, nothing is emitted.
127+
128+
```
129+
-------------------------------------------------------
130+
| source | a1 | a2 | a3 | a4 | |
131+
| result | | | [a1,a2,a3] | | [a4] |
132+
-------------------------------------------------------
133+
← batch size 3 reached → ← timeout fires →
134+
```
135+
136+
This is useful for services that write events to a bulk API (e.g. a search index). Fixed-size
137+
batching with `AsyncSeq.bufferByCount` can stall when the source slows down and a partial buffer
138+
never fills. `bufferByCountAndTime` avoids that by guaranteeing forward progress:
139+
140+
*)
141+
let events : AsyncSeq<Event> = failwith "TODO: connect to event source"
142+
143+
let bufferSize = 100
144+
let bufferTimeout = 1000 // milliseconds
145+
146+
let bufferedEvents : AsyncSeq<Event[]> =
147+
events |> AsyncSeq.bufferByCountAndTime bufferSize bufferTimeout
148+

0 commit comments

Comments
 (0)