Skip to content

Commit bf9aa03

Browse files
committed
deploy: 869a24d
0 parents  commit bf9aa03

39 files changed

Lines changed: 25758 additions & 0 deletions

.nojekyll

Whitespace-only changes.

AsyncSeq.fsx

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
(**
2+
3+
*)
4+
#r "nuget: FSharp.Control.AsyncSeq,4.7.0"
5+
(**
6+
[![Binder](https://mybinder.org/badge_logo.svg)](https://mybinder.org/v2/gh/fsprojects/FSharp.Control.AsyncSeq/gh-pages?filepath=AsyncSeq.ipynb)
7+
8+
# F# Asynchronous Sequences
9+
10+
An asynchronous sequence is a sequence in which individual elements are **awaited**, so the next element of the sequence is not necessarily available immediately. This allows for efficient composition of asynchronous workflows which involve sequences of data.
11+
12+
The `FSharp.Control.AsyncSeq` library is an implementation of functional asynchronous sequences for F#. The central type of the library is `AsyncSeq<'T>` and is a type alias for `System.Collections.Generic.IAsyncEnumerable<'T>`.
13+
14+
This library was also [one of the world's first implementations of asynchronous sequences](http://tomasp.net/blog/async-sequences.aspx) and has been used in production for many years. It is a mature library with a rich set of operations defined on `AsyncSeq` and is widely used in the F# community.
15+
16+
To use the library, referrence the NuGet package `FSharp.Control.AsyncSeq` in your project and open the `FSharp.Control` namespace:
17+
18+
*)
19+
open FSharp.Control
20+
(**
21+
### Generating asynchronous sequences
22+
23+
An asynchronous sequence can be generated using a computation expression, much like `seq<'T>`:
24+
25+
*)
26+
let async12 = asyncSeq {
27+
yield 1
28+
yield 2
29+
}
30+
(**
31+
or more succinctly:
32+
33+
*)
34+
let async12b = asyncSeq { 1; 2 }
35+
(**
36+
Another way to generate an asynchronous sequence is using the `Async.unfoldAsync` function. This
37+
function accepts as an argument a function which can generate individual elements based on a state and
38+
signal completion of the sequence.
39+
40+
For example, suppose that you're writing a program which consumes the Twitter API and stores tweets
41+
which satisfy some criteria into a database. There are several asynchronous request-reply interactions at play -
42+
one to retrieve a batch of tweets from the Twitter API, another to determine whether a tweet satisfies some
43+
criteria and finally an operation to write the desired tweet to a database.
44+
45+
Given the type `Tweet` to represent an individual tweet, the operation to retrieve a batch of tweets can
46+
be modeled with type `int -> Async<(Tweet[] * int) option>` where the incoming `int` represents the
47+
offset into the tweet stream. The asynchronous result is an `Option` which when `None` indicates the
48+
end of the stream, and otherwise contains the batch of retrieved tweets as well as the next offset.
49+
50+
The above function to retrieve a batch of tweets can be used to generate an asynchronous sequence
51+
of tweet batches as follows:
52+
53+
*)
54+
type Tweet = {
55+
user : string
56+
message : string
57+
}
58+
59+
let getTweetBatch (offset: int) : Async<(Tweet[] * int) option> =
60+
async { return failwith "TODO: call Twitter API" }
61+
62+
let tweetBatches : AsyncSeq<Tweet[]> =
63+
AsyncSeq.unfoldAsync getTweetBatch 0
64+
(**
65+
The asynchronous sequence `tweetBatches` will when iterated, incrementally consume the entire tweet stream.
66+
67+
Next, suppose that the tweet filtering function makes a call to a web service which determines
68+
whether a particular tweet is of interest and should be stored in the database. This function can be modeled with
69+
type `Tweet -> Async<bool>`. We can flatten the `tweetBatches` sequence and then filter it as follows:
70+
71+
*)
72+
let filterTweet (t: Tweet) : Async<bool> =
73+
failwith "TODO: call web service"
74+
75+
let filteredTweets : AsyncSeq<Tweet> =
76+
tweetBatches
77+
|> AsyncSeq.concatSeq // flatten
78+
|> AsyncSeq.filterAsync filterTweet // filter
79+
(**
80+
When the resulting sequence `filteredTweets` is consumed, it will lazily consume the underlying
81+
sequence `tweetBatches`, select individual tweets and filter them using the function `filterTweets`.
82+
83+
Finally, the function which stores a tweet in the database can be modeled by type `Tweet -> Async<unit>`.
84+
We can store all filtered tweets as follows:
85+
86+
*)
87+
let storeTweet (t: Tweet) : Async<unit> =
88+
failwith "TODO: call database"
89+
90+
let storeFilteredTweets : Async<unit> =
91+
filteredTweets
92+
|> AsyncSeq.iterAsync storeTweet
93+
(**
94+
Note that the value `storeFilteredTweets` is an asynchronous computation of type `Async<unit>`. At this point,
95+
it is a **representation** of the workflow which consists of reading batches of tweets, filtering them and storing them
96+
in the database. When executed, the workflow will consume the entire tweet stream. The entire workflow can be
97+
succinctly declared and executed as follows:
98+
99+
*)
100+
AsyncSeq.unfoldAsync getTweetBatch 0
101+
|> AsyncSeq.concatSeq
102+
|> AsyncSeq.filterAsync filterTweet
103+
|> AsyncSeq.iterAsync storeTweet
104+
|> Async.RunSynchronously
105+
(**
106+
The above snippet effectively orchestrates several asynchronous request-reply interactions into a cohesive unit
107+
composed using familiar operations on sequences. Furthermore, it will be executed efficiently in a non-blocking manner.
108+
109+
### Comparison with seq<'T>
110+
111+
The central difference between `seq<'T>` and `AsyncSeq<'T>` can be illustrated by introducing the notion of time.
112+
Suppose that generating subsequent elements of a sequence requires an IO-bound operation. Invoking long
113+
running IO-bound operations from within a `seq<'T>` will **block** the thread which calls `MoveNext` on the
114+
corresponding `IEnumerator`. An `AsyncSeq` on the other hand can use facilities provided by the F# `Async` type to make
115+
more efficient use of system resources.
116+
117+
*)
118+
open System.Threading
119+
120+
let withTime = seq {
121+
Thread.Sleep(1000) // calling thread will block
122+
yield 1
123+
Thread.Sleep(1000) // calling thread will block
124+
yield 1
125+
}
126+
127+
let withTime2 = asyncSeq {
128+
do! Async.Sleep 1000 // non-blocking sleep
129+
yield 1
130+
do! Async.Sleep 1000 // non-blocking sleep
131+
yield 2
132+
}
133+
(**
134+
When the asynchronous sequence `withTime'` is iterated, the calls to `Async.Sleep` won't block threads. Instead,
135+
the **continuation** of the sequence will be scheduled by `Async` while the calling thread will be free to perform other work.
136+
Overall, a `seq<'a>` can be viewed as a special case of an `AsyncSeq<'a>` where subsequent elements are retrieved
137+
in a blocking manner.
138+
139+
### Performance Considerations
140+
141+
While an asynchronous computation obviates the need to block an OS thread for the duration of an operation, it isn't always the case
142+
that this will improve the overall performance of an application. Note however that an async computation does not **require** a
143+
non-blocking operation, it simply allows for it. Also of note is that unlike calling `IEnumerable.MoveNext()`, consuming
144+
an item from an asynchronous sequence requires several allocations. Usually this is greatly outweighed by the
145+
benefits, it can make a difference in some scenarios.
146+
147+
## Related Articles
148+
149+
* [Programming with F# asynchronous sequences](http://tomasp.net/blog/async-sequences.aspx/)
150+
151+
*)
152+

0 commit comments

Comments
 (0)