Skip to content

Commit 9b063a3

Browse files
committed
deploy: 517fece
0 parents  commit 9b063a3

32 files changed

Lines changed: 4316 additions & 0 deletions

.nojekyll

Whitespace-only changes.

AsyncSeq.fsx

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
#r "nuget: FSharp.Control.AsyncSeq,{{package-version}}"
2+
(**
3+
[![Binder](https://mybinder.org/badge_logo.svg)](https://mybinder.org/v2/gh/fsprojects/FSharp.Control.AsyncSeq/gh-pages?filepath=AsyncSeq.ipynb)
4+
5+
# F# Async: FSharp.Control.AsyncSeq
6+
7+
> NOTE: There is also the option to use [FSharp.Control.TaskSeq](https://github.com/fsprojects/FSharp.Control.TaskSeq) which has a very similar usage model.
8+
9+
An AsyncSeq is a sequence in which individual elements are retrieved using an `Async` computation.
10+
It is similar to `seq<'a>` in that subsequent elements are pulled on-demand.
11+
`AsyncSeq` also bears similarity to `IObservable<'a>` with the former being based on an "asynchronous pull" and the
12+
latter based on a "synchronous push". Analogs for most operations defined for `Seq`, `List` and `IObservable` are also defined for
13+
`AsyncSeq`. The power of `AsyncSeq` lies in that many of these operations also have analogs based on `Async`
14+
allowing composition of complex asynchronous workflows.
15+
16+
> **v4.0 and later:** `AsyncSeq<'T>` is a type alias for `System.Collections.Generic.IAsyncEnumerable<'T>`.
17+
> Any `IAsyncEnumerable<'T>` value (e.g. from EF Core, ASP.NET Core channels, or `taskSeq { }`) can be used
18+
> directly as an `AsyncSeq<'T>` without conversion, and vice-versa.
19+
20+
The `AsyncSeq` type is located in the `FSharp.Control.AsyncSeq.dll` assembly which can be loaded in F# Interactive as follows:
21+
22+
*)
23+
#r "../../../bin/FSharp.Control.AsyncSeq.dll"
24+
open FSharp.Control
25+
(**
26+
### Generating asynchronous sequences
27+
28+
An `AsyncSeq<'T>` can be generated using computation expression syntax much like `seq<'T>`:
29+
30+
*)
31+
let async12 = asyncSeq {
32+
yield 1
33+
yield 2
34+
}
35+
(**
36+
Another way to generate an asynchronous sequence is using the `Async.unfoldAsync` function. This
37+
function accepts as an argument a function which can generate individual elements based on a state and
38+
signal completion of the sequence.
39+
40+
For example, suppose that you're writing a program which consumes the Twitter API and stores tweets
41+
which satisfy some criteria into a database. There are several asynchronous request-reply interactions at play -
42+
one to retrieve a batch of tweets from the Twitter API, another to determine whether a tweet satisfies some
43+
criteria and finally an operation to write the desired tweet to a database.
44+
45+
Given the type `Tweet` to represent an individual tweet, the operation to retrieve a batch of tweets can
46+
be modeled with type `int -> Async<(Tweet[] * int) option>` where the incoming `int` represents the
47+
offset into the tweet stream. The asynchronous result is an `Option` which when `None` indicates the
48+
end of the stream, and otherwise contains the batch of retrieved tweets as well as the next offset.
49+
50+
The above function to retrieve a batch of tweets can be used to generate an asynchronous sequence
51+
of tweet batches as follows:
52+
53+
*)
54+
type Tweet = {
55+
user : string
56+
message : string
57+
}
58+
59+
let getTweetBatch (offset: int) : Async<(Tweet[] * int) option> =
60+
failwith "TODO: call Twitter API"
61+
62+
let tweetBatches : AsyncSeq<Tweet[]> =
63+
AsyncSeq.unfoldAsync getTweetBatch 0
64+
(**
65+
The asynchronous sequence `tweetBatches` will when iterated, incrementally consume the entire tweet stream.
66+
67+
Next, suppose that the tweet filtering function makes a call to a web service which determines
68+
whether a particular tweet is of interest and should be stored in the database. This function can be modeled with
69+
type `Tweet -> Async<bool>`. We can flatten the `tweetBatches` sequence and then filter it as follows:
70+
71+
*)
72+
let filterTweet (t: Tweet) : Async<bool> =
73+
failwith "TODO: call web service"
74+
75+
let filteredTweets : AsyncSeq<Tweet> =
76+
tweetBatches
77+
|> AsyncSeq.concatSeq // flatten
78+
|> AsyncSeq.filterAsync filterTweet // filter
79+
(**
80+
When the resulting sequence `filteredTweets` is consumed, it will lazily consume the underlying
81+
sequence `tweetBatches`, select individual tweets and filter them using the function `filterTweets`.
82+
83+
Finally, the function which stores a tweet in the database can be modeled by type `Tweet -> Async<unit>`.
84+
We can store all filtered tweets as follows:
85+
86+
*)
87+
let storeTweet (t: Tweet) : Async<unit> =
88+
failwith "TODO: call database"
89+
90+
let storeFilteredTweets : Async<unit> =
91+
filteredTweets
92+
|> AsyncSeq.iterAsync storeTweet
93+
(**
94+
Note that the value `storeFilteredTweets` is an asynchronous computation of type `Async<unit>`. At this point,
95+
it is a *representation* of the workflow which consists of reading batches of tweets, filtering them and storing them
96+
in the database. When executed, the workflow will consume the entire tweet stream. The entire workflow can be
97+
succinctly declared and executed as follows:
98+
99+
*)
100+
AsyncSeq.unfoldAsync getTweetBatch 0
101+
|> AsyncSeq.concatSeq
102+
|> AsyncSeq.filterAsync filterTweet
103+
|> AsyncSeq.iterAsync storeTweet
104+
|> Async.RunSynchronously
105+
(**
106+
The above snippet effectively orchestrates several asynchronous request-reply interactions into a cohesive unit
107+
composed using familiar operations on sequences. Furthermore, it will be executed efficiently in a non-blocking manner.
108+
109+
### Comparison with seq<'T>
110+
111+
The central difference between `seq<'T>` and `AsyncSeq<'T>` can be illustrated by introducing the notion of time.
112+
Suppose that generating subsequent elements of a sequence requires an IO-bound operation. Invoking long
113+
running IO-bound operations from within a `seq<'T>` will *block* the thread which calls `MoveNext` on the
114+
corresponding `IEnumerator`. An `AsyncSeq` on the other hand can use facilities provided by the F# `Async` type to make
115+
more efficient use of system resources.
116+
117+
*)
118+
let withTime = seq {
119+
Thread.Sleep(1000) // calling thread will block
120+
yield 1
121+
Thread.Sleep(1000) // calling thread will block
122+
yield 1
123+
}
124+
125+
let withTime' = asyncSeq {
126+
do! Async.Sleep 1000 // non-blocking sleep
127+
yield 1
128+
do! Async.Sleep 1000 // non-blocking sleep
129+
yield 2
130+
}
131+
(**
132+
When the asynchronous sequence `withTime'` is iterated, the calls to `Async.Sleep` won't block threads. Instead,
133+
the *continuation* of the sequence will be scheduled by `Async` while the calling thread will be free to perform other work.
134+
Overall, a `seq<'a>` can be viewed as a special case of an `AsyncSeq<'a>` where subsequent elements are retrieved
135+
in a blocking manner.
136+
137+
### Performance Considerations
138+
139+
While an asynchronous computation obviates the need to block an OS thread for the duration of an operation, it isn't always the case
140+
that this will improve the overall performance of an application. Note however that an async computation does not *require* a
141+
non-blocking operation, it simply allows for it. Also of note is that unlike calling `IEnumerable.MoveNext()`, consuming
142+
an item from an asynchronous sequence requires several allocations. Usually this is greatly outweighed by the
143+
benefits, it can make a difference in some scenarios.
144+
145+
## Related Articles
146+
147+
* [Programming with F# asynchronous sequences](http://tomasp.net/blog/async-sequences.aspx/)
148+
149+
150+
*)
151+

0 commit comments

Comments
 (0)