Skip to content

Commit 41a9741

Browse files
committed
fix docs
1 parent 4f60589 commit 41a9741

6 files changed

Lines changed: 988 additions & 657 deletions

File tree

docs/AsyncSeq.fsx

Lines changed: 5 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -29,19 +29,19 @@ keywords: F#, asynchronous sequences, AsyncSeq, IAsyncEnumerable, async workflow
2929
3030
An asynchronous sequence is a sequence in which individual elements are _awaited_, so the next element of the sequence is not necessarily available immediately. This allows for efficient composition of asynchronous workflows which involve sequences of data.
3131
32-
The `FSharp.Control.AsyncSeq` library is an implementation of functional asynchronous sequences for F#. The central type of the library is `AsyncSeq<'T>` and is a type alias for `System.Collections.Generic.IAsyncEnumerable<'T>`.
32+
FSharp.Control.AsyncSeq is an implementation of functional-first programming over asynchronous sequences for F#. The central type of the library, `AsyncSeq<'T>`, is a type alias for the standard type `System.Collections.Generic.IAsyncEnumerable<'T>`.
3333
34-
This library was also [one of the world's first implementations of asynchronous sequences](http://tomasp.net/blog/async-sequences.aspx) with integrated language support through computation expressions and has been used in production for many years. It is a mature library with a rich set of operations and is widely used in the F# community.
34+
This library was also [one of the world's first implementations of langauge integrated asynchronous sequences](http://tomasp.net/blog/async-sequences.aspx) - that is, asynchronous sequences with integrated language support through computation expressions. It is a mature library used in production for many years and is widely used in the F# community.
35+
36+
### Generating asynchronous sequences
3537
3638
To use the library, referrence the NuGet package `FSharp.Control.AsyncSeq` in your project and open the `FSharp.Control` namespace:
3739
*)
3840

3941
open FSharp.Control
4042

4143
(**
42-
### Generating asynchronous sequences
43-
44-
An asynchronous sequence can be generated using a computation expression, much like `seq<'T>`:
44+
An asynchronous sequence can be generated using a computation expression:
4545
*)
4646

4747
let async12 = asyncSeq {
@@ -55,85 +55,6 @@ or more succinctly:
5555

5656
let async12b = asyncSeq { 1; 2 }
5757

58-
(**
59-
Another way to generate an asynchronous sequence is using the `Async.unfoldAsync` function. This
60-
function accepts as an argument a function which can generate individual elements based on a state and
61-
signal completion of the sequence.
62-
63-
For example, suppose that you're writing a program which consumes the Twitter API and stores tweets
64-
which satisfy some criteria into a database. There are several asynchronous request-reply interactions at play -
65-
one to retrieve a batch of tweets from the Twitter API, another to determine whether a tweet satisfies some
66-
criteria and finally an operation to write the desired tweet to a database.
67-
68-
Given the type `Tweet` to represent an individual tweet, the operation to retrieve a batch of tweets can
69-
be modeled with type `int -> Async<(Tweet[] * int) option>` where the incoming `int` represents the
70-
offset into the tweet stream. The asynchronous result is an `Option` which when `None` indicates the
71-
end of the stream, and otherwise contains the batch of retrieved tweets as well as the next offset.
72-
73-
The above function to retrieve a batch of tweets can be used to generate an asynchronous sequence
74-
of tweet batches as follows:
75-
*)
76-
77-
type Tweet = {
78-
user : string
79-
message : string
80-
}
81-
82-
let getTweetBatch (offset: int) : Async<(Tweet[] * int) option> =
83-
async { return failwith "TODO: call Twitter API" }
84-
85-
let tweetBatches : AsyncSeq<Tweet[]> =
86-
AsyncSeq.unfoldAsync getTweetBatch 0
87-
88-
(**
89-
The asynchronous sequence `tweetBatches` will when iterated, incrementally consume the entire tweet stream.
90-
91-
Next, suppose that the tweet filtering function makes a call to a web service which determines
92-
whether a particular tweet is of interest and should be stored in the database. This function can be modeled with
93-
type `Tweet -> Async<bool>`. We can flatten the `tweetBatches` sequence and then filter it as follows:
94-
*)
95-
96-
let filterTweet (t: Tweet) : Async<bool> =
97-
failwith "TODO: call web service"
98-
99-
let filteredTweets : AsyncSeq<Tweet> =
100-
tweetBatches
101-
|> AsyncSeq.concatSeq // flatten
102-
|> AsyncSeq.filterAsync filterTweet // filter
103-
104-
(**
105-
When the resulting sequence `filteredTweets` is consumed, it will lazily consume the underlying
106-
sequence `tweetBatches`, select individual tweets and filter them using the function `filterTweets`.
107-
108-
Finally, the function which stores a tweet in the database can be modeled by type `Tweet -> Async<unit>`.
109-
We can store all filtered tweets as follows:
110-
*)
111-
112-
let storeTweet (t: Tweet) : Async<unit> =
113-
failwith "TODO: call database"
114-
115-
let storeFilteredTweets : Async<unit> =
116-
filteredTweets
117-
|> AsyncSeq.iterAsync storeTweet
118-
119-
(**
120-
Note that the value `storeFilteredTweets` is an asynchronous computation of type `Async<unit>`. At this point,
121-
it is a *representation* of the workflow which consists of reading batches of tweets, filtering them and storing them
122-
in the database. When executed, the workflow will consume the entire tweet stream. The entire workflow can be
123-
succinctly declared and executed as follows:
124-
*)
125-
126-
AsyncSeq.unfoldAsync getTweetBatch 0
127-
|> AsyncSeq.concatSeq
128-
|> AsyncSeq.filterAsync filterTweet
129-
|> AsyncSeq.iterAsync storeTweet
130-
|> Async.RunSynchronously
131-
132-
(**
133-
The above snippet effectively orchestrates several asynchronous request-reply interactions into a cohesive unit
134-
composed using familiar operations on sequences. Furthermore, it will be executed efficiently in a non-blocking manner.
135-
*)
136-
13758
(**
13859
### Comparison with `FSharp.Control.TaskSeq`
13960

docs/AsyncSeqAdvanced.fsx

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
(**
2+
---
3+
title: Advanced AsyncSeq Operations
4+
category: Documentation
5+
categoryindex: 2
6+
index: 5
7+
description: Advanced F# asynchronous sequence operations including groupBy, distinct-until-changed and time-or-count buffering.
8+
keywords: F#, asynchronous sequences, AsyncSeq, groupBy, distinctUntilChanged, bufferByCountAndTime
9+
---
10+
*)
11+
(*** condition: prepare ***)
12+
#nowarn "211"
13+
#I "../src/FSharp.Control.AsyncSeq/bin/Release/netstandard2.1"
14+
#r "FSharp.Control.AsyncSeq.dll"
15+
(*** condition: fsx ***)
16+
#if FSX
17+
#r "nuget: FSharp.Control.AsyncSeq,{{fsdocs-package-version}}"
18+
#endif // FSX
19+
(*** condition: ipynb ***)
20+
#if IPYNB
21+
#r "nuget: FSharp.Control.AsyncSeq,{{fsdocs-package-version}}"
22+
#endif // IPYNB
23+
24+
(**
25+
[![Binder](https://mybinder.org/badge_logo.svg)](https://mybinder.org/v2/gh/fsprojects/FSharp.Control.AsyncSeq/gh-pages?filepath=AsyncSeqAdvanced.ipynb)
26+
27+
# Advanced AsyncSeq Operations
28+
29+
This document covers advanced `AsyncSeq<'T>` operations: partitioning a sequence into keyed
30+
sub-streams with `groupBy`, deduplication with `distinctUntilChanged`, and accumulating
31+
elements into time-or-count-bounded batches with `bufferByCountAndTime`.
32+
33+
*)
34+
35+
open System
36+
open FSharp.Control
37+
38+
(**
39+
40+
## Group By
41+
42+
`AsyncSeq.groupBy` partitions a sequence into sub-sequences based on a key, analogous to
43+
`Seq.groupBy`. Each key appears at most once in the output, paired with an `AsyncSeq` of the
44+
elements that share that key.
45+
46+
> **Important:** the sub-sequences *must* be consumed in parallel. Sequential consumption will
47+
> deadlock because no sub-sequence can complete until all others are also being consumed.
48+
49+
```
50+
--------------------------------------------------
51+
| source | e1 | e2 | e3 | e4 | |
52+
| key | k1 | k2 | k1 | k2 | |
53+
| result | k1 → [e1, e3] | k2 → [e2, e4] |
54+
--------------------------------------------------
55+
```
56+
57+
A common use case is processing a stream of domain events where events for the same entity must
58+
be handled in order, but events for different entities are independent and can be handled in
59+
parallel:
60+
61+
*)
62+
63+
type Event = {
64+
entityId : int64
65+
data : string
66+
}
67+
68+
let stream : AsyncSeq<Event> = failwith "TODO: connect to message bus"
69+
70+
let action (e: Event) : Async<unit> = failwith "TODO: process event"
71+
72+
// Process each entity's events sequentially, but different entities in parallel.
73+
stream
74+
|> AsyncSeq.groupBy (fun e -> int e.entityId % 4) // hash into 4 buckets
75+
|> AsyncSeq.mapAsyncParallel (snd >> AsyncSeq.iterAsync action)
76+
|> AsyncSeq.iter ignore
77+
78+
(**
79+
80+
We can combine this with batching for higher throughput. For example, when writing events to a
81+
full-text search index, batching writes improves performance while the `groupBy` ensures ordering
82+
within each entity:
83+
84+
*)
85+
86+
let batchStream : AsyncSeq<Event[]> = failwith "TODO: connect to batched source"
87+
88+
let batchAction (es: Event[]) : Async<unit> = failwith "TODO: bulk index"
89+
90+
batchStream
91+
|> AsyncSeq.concatSeq // flatten batches to individual events
92+
|> AsyncSeq.groupBy (fun e -> int e.entityId % 4) // partition into 4 groups
93+
|> AsyncSeq.mapAsyncParallel (snd
94+
>> AsyncSeq.bufferByCountAndTime 500 1000 // re-batch per sub-sequence
95+
>> AsyncSeq.iterAsync batchAction) // bulk index each batch
96+
|> AsyncSeq.iter ignore
97+
98+
(**
99+
100+
The above workflow: (1) reads events in batches, (2) flattens them, (3) partitions by entity into
101+
mutually-exclusive sub-sequences, (4) re-batches each sub-sequence by size/time, and (5) processes
102+
the batches in parallel while preserving per-entity ordering.
103+
104+
## Distinct Until Changed
105+
106+
`AsyncSeq.distinctUntilChanged` passes through every element of the source sequence but drops
107+
consecutive duplicates, so downstream consumers only see values that are genuinely new.
108+
109+
```
110+
-----------------------------------
111+
| source | a | a | b | b | b | a |
112+
| result | a | | b | | | a |
113+
-----------------------------------
114+
```
115+
116+
A natural use case is polling a resource on a fixed schedule and reacting only when its state
117+
actually changes. Consider a background job whose progress is exposed via a `getStatus` call:
118+
119+
*)
120+
121+
type Status = {
122+
completed : int
123+
finished : bool
124+
result : string
125+
}
126+
127+
let getStatus : Async<Status> = failwith "TODO: call job API"
128+
129+
/// Poll every second and emit each status reading.
130+
let statuses : AsyncSeq<Status> = asyncSeq {
131+
while true do
132+
let! s = getStatus
133+
yield s
134+
do! Async.Sleep 1000
135+
}
136+
137+
/// Only emit when the status has actually changed.
138+
let distinctStatuses : AsyncSeq<Status> =
139+
statuses |> AsyncSeq.distinctUntilChanged
140+
141+
(**
142+
143+
We can now build a workflow that logs every status change and stops as soon as the job finishes:
144+
145+
*)
146+
147+
let jobResult : Async<string> =
148+
distinctStatuses
149+
|> AsyncSeq.pick (fun st ->
150+
printfn "status=%A" st
151+
if st.finished then Some st.result else None)
152+
153+
(**
154+
155+
## Buffer by Count and Time
156+
157+
`AsyncSeq.bufferByCountAndTime` accumulates incoming elements and emits a batch whenever
158+
*either* the buffer reaches a given size *or* a timeout elapses — whichever comes first. If the
159+
buffer is empty when the timeout fires, nothing is emitted.
160+
161+
```
162+
-------------------------------------------------------
163+
| source | a1 | a2 | a3 | a4 | |
164+
| result | | | [a1,a2,a3] | | [a4] |
165+
-------------------------------------------------------
166+
← batch size 3 reached → ← timeout fires →
167+
```
168+
169+
This is useful for services that write events to a bulk API (e.g. a search index). Fixed-size
170+
batching with `AsyncSeq.bufferByCount` can stall when the source slows down and a partial buffer
171+
never fills. `bufferByCountAndTime` avoids that by guaranteeing forward progress:
172+
173+
*)
174+
175+
let events : AsyncSeq<Event> = failwith "TODO: connect to event source"
176+
177+
let bufferSize = 100
178+
let bufferTimeout = 1000 // milliseconds
179+
180+
let bufferedEvents : AsyncSeq<Event[]> =
181+
events |> AsyncSeq.bufferByCountAndTime bufferSize bufferTimeout

0 commit comments

Comments
 (0)