Skip to content

Commit ea20a27

Browse files
committed
deploy: 5e71b51
0 parents  commit ea20a27

45 files changed

Lines changed: 28175 additions & 0 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.nojekyll

Whitespace-only changes.

AsyncSeq.fsx

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
(**
2+
3+
*)
4+
#r "nuget: FSharp.Control.AsyncSeq,4.7.0"
5+
(**
6+
[![Binder](https://mybinder.org/badge_logo.svg)](https://mybinder.org/v2/gh/fsprojects/FSharp.Control.AsyncSeq/gh-pages?filepath=AsyncSeq.ipynb)
7+
8+
# Tutorial
9+
10+
## Generating asynchronous sequences
11+
12+
To use the library, referrence the NuGet package `FSharp.Control.AsyncSeq` in your project and open the `FSharp.Control` namespace:
13+
14+
*)
15+
open FSharp.Control
16+
(**
17+
An asynchronous sequence can be generated using a computation expression:
18+
19+
*)
20+
let async12 = asyncSeq {
21+
yield 1
22+
yield 2
23+
}
24+
(**
25+
or more succinctly:
26+
27+
*)
28+
let async12b = asyncSeq { 1; 2 }
29+
open System.Threading
30+
31+
let withTime = seq {
32+
Thread.Sleep(1000) // calling thread will block
33+
yield 1
34+
Thread.Sleep(1000) // calling thread will block
35+
yield 1
36+
}
37+
38+
let withTime2 = asyncSeq {
39+
do! Async.Sleep 1000 // non-blocking sleep
40+
yield 1
41+
do! Async.Sleep 1000 // non-blocking sleep
42+
yield 2
43+
}
44+
(**
45+
When the asynchronous sequence `withTime'` is iterated, the calls to `Async.Sleep` won't block threads. Instead, the **continuation** of the sequence will be scheduled by `Async` while the calling thread will be free to perform other work. Overall, a `seq<'a>` can be viewed as a special case of an `AsyncSeq<'a>` where subsequent elements are retrieved in a blocking manner.
46+
47+
## Performance Considerations
48+
49+
While an asynchronous computation obviates the need to block an OS thread for the duration of an operation, it isn't always the case that this will improve the overall performance of an application. Note however that an async computation does not **require** a non-blocking operation, it simply allows for it. Also of note is that unlike calling `IEnumerable.MoveNext()`, consuming an item from an asynchronous sequence requires several allocations. Usually this is greatly outweighed by the benefits, it can make a difference in some scenarios.
50+
51+
## Related Articles
52+
53+
* [Programming with F# asynchronous sequences](http://tomasp.net/blog/async-sequences.aspx/)
54+
55+
*)
56+

AsyncSeq.html

Lines changed: 281 additions & 0 deletions
Large diffs are not rendered by default.

AsyncSeq.ipynb

Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
2+
{
3+
"cells": [
4+
{
5+
"cell_type": "markdown",
6+
"metadata": {},
7+
8+
"source": [
9+
10+
]
11+
}
12+
,
13+
{
14+
"cell_type": "code",
15+
"metadata": {
16+
"dotnet_interactive": {
17+
"language": "fsharp"
18+
},
19+
"polyglot_notebook": {
20+
"kernelName": "fsharp"
21+
}
22+
},
23+
"execution_count": null, "outputs": [],
24+
"source": [
25+
"#r \"nuget: FSharp.Control.AsyncSeq,4.7.0\"\n"
26+
]
27+
}
28+
,
29+
{
30+
"cell_type": "markdown",
31+
"metadata": {},
32+
33+
"source": [
34+
"[![Binder](https://mybinder.org/badge_logo.svg)](https://mybinder.org/v2/gh/fsprojects/FSharp.Control.AsyncSeq/gh-pages?filepath=AsyncSeq.ipynb)\n",
35+
"\n",
36+
"# Tutorial\n",
37+
"\n",
38+
"## Generating asynchronous sequences\n",
39+
"\n",
40+
"To use the library, referrence the NuGet package `FSharp.Control.AsyncSeq` in your project and open the `FSharp.Control` namespace:\n",
41+
"\n"
42+
]
43+
}
44+
,
45+
{
46+
"cell_type": "code",
47+
"metadata": {
48+
"dotnet_interactive": {
49+
"language": "fsharp"
50+
},
51+
"polyglot_notebook": {
52+
"kernelName": "fsharp"
53+
}
54+
},
55+
"execution_count": null, "outputs": [],
56+
"source": [
57+
"open FSharp.Control\n"
58+
]
59+
}
60+
,
61+
{
62+
"cell_type": "markdown",
63+
"metadata": {},
64+
65+
"source": [
66+
"An asynchronous sequence can be generated using a computation expression:\n",
67+
"\n"
68+
]
69+
}
70+
,
71+
{
72+
"cell_type": "code",
73+
"metadata": {
74+
"dotnet_interactive": {
75+
"language": "fsharp"
76+
},
77+
"polyglot_notebook": {
78+
"kernelName": "fsharp"
79+
}
80+
},
81+
"execution_count": null, "outputs": [],
82+
"source": [
83+
"let async12 = asyncSeq {\n",
84+
" yield 1\n",
85+
" yield 2\n",
86+
"}\n"
87+
]
88+
}
89+
,
90+
{
91+
"cell_type": "markdown",
92+
"metadata": {},
93+
94+
"source": [
95+
"or more succinctly:\n",
96+
"\n"
97+
]
98+
}
99+
,
100+
{
101+
"cell_type": "code",
102+
"metadata": {
103+
"dotnet_interactive": {
104+
"language": "fsharp"
105+
},
106+
"polyglot_notebook": {
107+
"kernelName": "fsharp"
108+
}
109+
},
110+
"execution_count": null, "outputs": [],
111+
"source": [
112+
"let async12b = asyncSeq { 1; 2 }\n"
113+
]
114+
}
115+
,
116+
{
117+
"cell_type": "code",
118+
"metadata": {
119+
"dotnet_interactive": {
120+
"language": "fsharp"
121+
},
122+
"polyglot_notebook": {
123+
"kernelName": "fsharp"
124+
}
125+
},
126+
"execution_count": null, "outputs": [],
127+
"source": [
128+
"open System.Threading\n",
129+
"\n",
130+
"let withTime = seq {\n",
131+
" Thread.Sleep(1000) // calling thread will block\n",
132+
" yield 1\n",
133+
" Thread.Sleep(1000) // calling thread will block\n",
134+
" yield 1\n",
135+
"}\n",
136+
"\n",
137+
"let withTime2 = asyncSeq {\n",
138+
" do! Async.Sleep 1000 // non-blocking sleep\n",
139+
" yield 1\n",
140+
" do! Async.Sleep 1000 // non-blocking sleep\n",
141+
" yield 2\n",
142+
"}\n"
143+
]
144+
}
145+
,
146+
{
147+
"cell_type": "markdown",
148+
"metadata": {},
149+
150+
"source": [
151+
"When the asynchronous sequence `withTime\u0027` is iterated, the calls to `Async.Sleep` won\u0027t block threads. Instead, the **continuation** of the sequence will be scheduled by `Async` while the calling thread will be free to perform other work. Overall, a `seq\u003c\u0027a\u003e` can be viewed as a special case of an `AsyncSeq\u003c\u0027a\u003e` where subsequent elements are retrieved in a blocking manner.\n",
152+
"\n",
153+
"## Performance Considerations\n",
154+
"\n",
155+
"While an asynchronous computation obviates the need to block an OS thread for the duration of an operation, it isn\u0027t always the case that this will improve the overall performance of an application. Note however that an async computation does not **require** a non-blocking operation, it simply allows for it. Also of note is that unlike calling `IEnumerable.MoveNext()`, consuming an item from an asynchronous sequence requires several allocations. Usually this is greatly outweighed by the benefits, it can make a difference in some scenarios.\n",
156+
"\n",
157+
"## Related Articles\n",
158+
"\n",
159+
"* [Programming with F# asynchronous sequences](http://tomasp.net/blog/async-sequences.aspx/)\n",
160+
"\n"
161+
]
162+
}
163+
],
164+
"metadata": {
165+
"kernelspec": {
166+
"display_name": ".NET (F#)",
167+
"language": "F#",
168+
"name": ".net-fsharp"
169+
},
170+
"language_info": {
171+
"file_extension": ".fs",
172+
"mimetype": "text/x-fsharp",
173+
"name": "polyglot-notebook",
174+
"pygments_lexer": "fsharp"
175+
},
176+
"polyglot_notebook": {
177+
"kernelInfo": {
178+
"defaultKernelName": "fsharp",
179+
"items": [
180+
{
181+
"aliases": [],
182+
"languageName": "fsharp",
183+
"name": "fsharp"
184+
}
185+
]
186+
}
187+
}
188+
},
189+
"nbformat": 4,
190+
"nbformat_minor": 2
191+
}
192+

AsyncSeqAdvanced.fsx

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
(**
2+
3+
*)
4+
#r "nuget: FSharp.Control.AsyncSeq,4.7.0"
5+
(**
6+
[![Binder](https://mybinder.org/badge_logo.svg)](https://mybinder.org/v2/gh/fsprojects/FSharp.Control.AsyncSeq/gh-pages?filepath=AsyncSeqAdvanced.ipynb)
7+
8+
# Advanced AsyncSeq Operations
9+
10+
This document covers advanced `AsyncSeq<'T>` operations: partitioning a sequence into keyed
11+
sub-streams with `groupBy`, deduplication with `distinctUntilChanged`, and accumulating
12+
elements into time-or-count-bounded batches with `bufferByCountAndTime`.
13+
14+
*)
15+
open System
16+
open FSharp.Control
17+
(**
18+
## Group By
19+
20+
`AsyncSeq.groupBy` partitions a sequence into sub-sequences based on a key, analogous to
21+
`Seq.groupBy`. Each key appears at most once in the output, paired with an `AsyncSeq` of the
22+
elements that share that key.
23+
24+
> **Important:** the sub-sequences **must** be consumed in parallel. Sequential consumption will
25+
deadlock because no sub-sequence can complete until all others are also being consumed.
26+
>
27+
28+
```
29+
--------------------------------------------------
30+
| source | e1 | e2 | e3 | e4 | |
31+
| key | k1 | k2 | k1 | k2 | |
32+
| result | k1 → [e1, e3] | k2 → [e2, e4] |
33+
--------------------------------------------------
34+
```
35+
36+
A common use case is processing a stream of domain events where events for the same entity must
37+
be handled in order, but events for different entities are independent and can be handled in
38+
parallel:
39+
40+
*)
41+
type Event = {
42+
entityId : int64
43+
data : string
44+
}
45+
46+
let stream : AsyncSeq<Event> = failwith "TODO: connect to message bus"
47+
48+
let action (e: Event) : Async<unit> = failwith "TODO: process event"
49+
50+
// Process each entity's events sequentially, but different entities in parallel.
51+
stream
52+
|> AsyncSeq.groupBy (fun e -> int e.entityId % 4) // hash into 4 buckets
53+
|> AsyncSeq.mapAsyncParallel (snd >> AsyncSeq.iterAsync action)
54+
|> AsyncSeq.iter ignore
55+
(**
56+
We can combine this with batching for higher throughput. For example, when writing events to a
57+
full-text search index, batching writes improves performance while the `groupBy` ensures ordering
58+
within each entity:
59+
60+
*)
61+
let batchStream : AsyncSeq<Event[]> = failwith "TODO: connect to batched source"
62+
63+
let batchAction (es: Event[]) : Async<unit> = failwith "TODO: bulk index"
64+
65+
batchStream
66+
|> AsyncSeq.concatSeq // flatten batches to individual events
67+
|> AsyncSeq.groupBy (fun e -> int e.entityId % 4) // partition into 4 groups
68+
|> AsyncSeq.mapAsyncParallel (snd
69+
>> AsyncSeq.bufferByCountAndTime 500 1000 // re-batch per sub-sequence
70+
>> AsyncSeq.iterAsync batchAction) // bulk index each batch
71+
|> AsyncSeq.iter ignore
72+
(**
73+
The above workflow: (1) reads events in batches, (2) flattens them, (3) partitions by entity into
74+
mutually-exclusive sub-sequences, (4) re-batches each sub-sequence by size/time, and (5) processes
75+
the batches in parallel while preserving per-entity ordering.
76+
77+
## Distinct Until Changed
78+
79+
`AsyncSeq.distinctUntilChanged` passes through every element of the source sequence but drops
80+
consecutive duplicates, so downstream consumers only see values that are genuinely new.
81+
82+
```
83+
-----------------------------------
84+
| source | a | a | b | b | b | a |
85+
| result | a | | b | | | a |
86+
-----------------------------------
87+
```
88+
89+
A natural use case is polling a resource on a fixed schedule and reacting only when its state
90+
actually changes. Consider a background job whose progress is exposed via a `getStatus` call:
91+
92+
*)
93+
type Status = {
94+
completed : int
95+
finished : bool
96+
result : string
97+
}
98+
99+
let getStatus : Async<Status> = failwith "TODO: call job API"
100+
101+
/// Poll every second and emit each status reading.
102+
let statuses : AsyncSeq<Status> = asyncSeq {
103+
while true do
104+
let! s = getStatus
105+
yield s
106+
do! Async.Sleep 1000
107+
}
108+
109+
/// Only emit when the status has actually changed.
110+
let distinctStatuses : AsyncSeq<Status> =
111+
statuses |> AsyncSeq.distinctUntilChanged
112+
(**
113+
We can now build a workflow that logs every status change and stops as soon as the job finishes:
114+
115+
*)
116+
let jobResult : Async<string> =
117+
distinctStatuses
118+
|> AsyncSeq.pick (fun st ->
119+
printfn "status=%A" st
120+
if st.finished then Some st.result else None)
121+
(**
122+
## Buffer by Count and Time
123+
124+
`AsyncSeq.bufferByCountAndTime` accumulates incoming elements and emits a batch whenever
125+
**either** the buffer reaches a given size **or** a timeout elapses — whichever comes first. If the
126+
buffer is empty when the timeout fires, nothing is emitted.
127+
128+
```
129+
-------------------------------------------------------
130+
| source | a1 | a2 | a3 | a4 | |
131+
| result | | | [a1,a2,a3] | | [a4] |
132+
-------------------------------------------------------
133+
← batch size 3 reached → ← timeout fires →
134+
```
135+
136+
This is useful for services that write events to a bulk API (e.g. a search index). Fixed-size
137+
batching with `AsyncSeq.bufferByCount` can stall when the source slows down and a partial buffer
138+
never fills. `bufferByCountAndTime` avoids that by guaranteeing forward progress:
139+
140+
*)
141+
let events : AsyncSeq<Event> = failwith "TODO: connect to event source"
142+
143+
let bufferSize = 100
144+
let bufferTimeout = 1000 // milliseconds
145+
146+
let bufferedEvents : AsyncSeq<Event[]> =
147+
events |> AsyncSeq.bufferByCountAndTime bufferSize bufferTimeout
148+

0 commit comments

Comments
 (0)