Skip to content

Commit 6a94db5

Browse files
committed
reinstate AsyncSeq.fsx
1 parent 350ea2f commit 6a94db5

1 file changed

Lines changed: 274 additions & 0 deletions

File tree

docs/content/library/AsyncSeq.fsx

Lines changed: 274 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,274 @@
1+
(**
2+
3+
# F# Async: FSharp.Control.AsyncSeq
4+
5+
An AsyncSeq is a sequence in which individual elements are retrieved using an `Async` computation.
6+
It is similar to `seq<'a>` in that subsequent elements are pulled on-demand. Structurally it is
7+
similar to `list<'a>` with the difference being that each head and tail node or empty node is wrapped
8+
in `Async`. `AsyncSeq` also bears similarity to `IObservable<'a>` with the former being based on an "asynchronous pull" and the
9+
latter based on a "synchronous push". Analogs for most operations defined for `Seq`, `List` and `IObservable` are also defined for
10+
`AsyncSeq`. The power of `AsyncSeq` lies in that many of these operations also have analogs based on `Async`
11+
allowing composition of complex asynchronous workflows.
12+
13+
The `AsyncSeq` type is located in the `FSharp.Control.AsyncSeq.dll` assembly which can be loaded in F# Interactive as follows:
14+
*)
15+
16+
#r "../../../bin/FSharp.Control.AsyncSeq.dll"
17+
open FSharp.Control
18+
19+
20+
21+
(**
22+
### Generating asynchronous sequences
23+
24+
An `AsyncSeq<'a>` can be generated using computation expression syntax much like `seq<'a>`:
25+
*)
26+
27+
let asyncS = asyncSeq {
28+
yield 1
29+
yield 2
30+
}
31+
32+
(**
33+
Another way to generate an asynchronous sequence is using the `Async.unfoldAsync` function. This
34+
function accepts as an argument a function which can generate individual elements based on a state and
35+
signal completion of the sequence.
36+
37+
For example, suppose that you're writing a program which consumes the Twitter API and stores tweets
38+
which satisfy some criteria into a database. There are several asynchronous request-reply interactions at play -
39+
one to retrieve a batch of tweets from the Twitter API, another to determine whether a tweet satisfies some
40+
criteria and finally an operation to write the desired tweet to a database.
41+
42+
Given the type `Tweet` to represent an individual tweet, the operation to retrieve a batch of tweets can
43+
be modeled with type `int -> Async<(Tweet[] * int) option>` where the incoming `int` represents the
44+
offset into the tweet stream. The asynchronous result is an `Option` which when `None` indicates the
45+
end of the stream, and otherwise contains the batch of retrieved tweets as well as the next offset.
46+
47+
The above function to retrieve a batch of tweets can be used to generate an asynchronous sequence
48+
of tweet batches as follows:
49+
*)
50+
51+
type Tweet = {
52+
user : string
53+
message : string
54+
}
55+
56+
let getTweetBatch (offset:int) : Async<(Tweet[] * int) option> =
57+
failwith "TODO: call Twitter API"
58+
59+
let tweetBatches : AsyncSeq<Tweet[]> =
60+
AsyncSeq.unfoldAsync getTweetBatch 0
61+
62+
(**
63+
The asynchronous sequence `tweetBatches` will when iterated, incrementally consume the entire tweet stream.
64+
65+
Next, suppose that the tweet filtering function makes a call to a web service which determines
66+
whether a particular tweet is of interest and should be stored in the database. This function can be modeled with
67+
type `Tweet -> Async<bool>`. We can flatten the `tweetBatches` sequence and then filter it as follows:
68+
*)
69+
70+
let filterTweet (t:Tweet) : Async<bool> =
71+
failwith "TODO: call web service"
72+
73+
let filteredTweets : AsyncSeq<Tweet> =
74+
tweetBatches
75+
|> AsyncSeq.concatSeq // flatten
76+
|> AsyncSeq.filterAsync filterTweet // filter
77+
78+
(**
79+
When the resulting sequence `filteredTweets` is consumed, it will lazily consume the underlying
80+
sequence `tweetBatches`, select individual tweets and filter them using the function `filterTweets`.
81+
82+
Finally, the function which stores a tweet in the database can be modeled by type `Tweet -> Async<unit>`.
83+
We can store all filtered tweets as follows:
84+
*)
85+
86+
let storeTweet (t:Tweet) : Async<unit> =
87+
failwith "TODO: call database"
88+
89+
let storeFilteredTweets : Async<unit> =
90+
filteredTweets
91+
|> AsyncSeq.iterAsync storeTweet
92+
93+
(**
94+
Note that the value `storeFilteredTweets` is an asynchronous computation of type `Async<unit>`. At this point,
95+
it is a *representation* of the workflow which consists of reading batches of tweets, filtering them and storing them
96+
in the database. When executed, the workflow will consume the entire tweet stream. The entire workflow can be
97+
succinctly declared and executed as follows:
98+
*)
99+
100+
AsyncSeq.unfoldAsync getTweetBatch 0
101+
|> AsyncSeq.concatSeq
102+
|> AsyncSeq.filterAsync filterTweet
103+
|> AsyncSeq.iterAsync storeTweet
104+
|> Async.RunSynchronously
105+
106+
(**
107+
The above snippet effectively orchestrates several asynchronous request-reply interactions into a cohesive unit
108+
composed using familiar operations on sequences. Furthermore, it will be executed efficiently in a non-blocking manner.
109+
*)
110+
111+
(**
112+
### Comparison with seq<'a>
113+
114+
The central difference between `seq<'a>` and `AsyncSeq<'a>` two can be illustrated by introducing the notion of time.
115+
Suppose that generating subsequent elements of a sequence requires an IO-bound operation. Invoking long
116+
running IO-bound operations from within a `seq<'a>` will *block* the thread which calls `MoveNext` on the
117+
corresponding `IEnumerator`. An `AsyncSeq` on the other hand can use facilities provided by the F# `Async` type to make
118+
more efficient use of system resources.
119+
*)
120+
121+
let withTime = seq {
122+
System.Threading.Thread.Sleep(1000) // calling thread will block
123+
yield 1
124+
System.Threading.Thread.Sleep(1000) // calling thread will block
125+
yield 1
126+
}
127+
128+
let withTime' = asyncSeq {
129+
do! Async.Sleep 1000 // non-blocking sleep
130+
yield 1
131+
do! Async.Sleep 1000 // non-blocking sleep
132+
yield 2
133+
}
134+
135+
(**
136+
When the asynchronous sequence `withTime'` is iterated, the calls to `Async.Sleep` won't block threads. Instead,
137+
the *continuation* of the sequence will be scheduled by `Async` while the calling thread will be free to perform other work.
138+
Overall, a `seq<'a>` can be viewed as a special case of an `AsyncSeq<'a>` where subsequent elements are retrieved
139+
in a blocking manner.
140+
*)
141+
142+
143+
(**
144+
### Comparison with IObservable<'a>
145+
146+
Both `IObservable<'a>` and `AsyncSeq<'a>` represent collections of items and both provide similar operations
147+
for transformation and composition. The central difference between the two is that the former uses a *synchronous push*
148+
to a subscriber and the latter uses an *asynchronous pull* by a consumer.
149+
Consumers of an `IObservable<'a>` *subscribe* to receive notifications about
150+
new items or the end of the sequence. By contrast, consumers of an `AsyncSeq<'a>` *asynchronously retrieve* subsequent items on their own
151+
terms. Some domains are more naturally modeled with one or the other, however it is less clear which is a more
152+
suitable tool for a specific task. In many cases, a combination of the two provides the optimal solution and
153+
restricting yourself to one, while simplifying the programming model, can lead one to view all problems as a nail.
154+
155+
A more specific difference between the two is that `IObservable<'a>` subscribers have the basic type `'a -> unit`
156+
and are therefore inherently synchronous and imperative. The observer can certainly make a blocking call, but this
157+
can defeat the purpose of the observable sequence all together. Alternatively, the observer can spawn an operation, but
158+
this can break composition because one can no longer rely on the observer returning to determine that it has
159+
completed. With the observable model however, we can model blocking operations through composition on sequences rather
160+
than observers.
161+
162+
To illustrate, lets try to implement the above Tweet retrieval, filtering and storage workflow using observable sequences.
163+
Suppose we already have an observable sequence representing tweets `IObservable<Tweet>` and we simply wish
164+
to filter it and store the resulting tweets. The function `Observable.filter` allows one to filter observable
165+
sequences based on a predicate, however in this case it doesn't quite cut it because the predicate passed to it must
166+
be synchronous `'a -> bool`:
167+
*)
168+
169+
open System
170+
171+
let tweetsObs : IObservable<Tweet> =
172+
failwith "TODO: create observable"
173+
174+
let filteredTweetsObs =
175+
tweetsObs
176+
|> Observable.filter (filterTweet >> Async.RunSynchronously) // blocking IO-call!
177+
178+
(**
179+
To remedy the blocking IO-call we can better adapt the filtering function to the `IObservable<'a>` model. A value
180+
of type `Async<'a>` can be modeled as an `IObservable<'a>` with one element. Suppose that we have
181+
`Tweet -> IObservable<bool>`. We can define a few helper operators on observables to allow filtering using
182+
an asynchronous predicate as follows:
183+
*)
184+
185+
module Observable =
186+
187+
/// a |> Async.StartAsTask |> (fun t -> t.ToObservable())
188+
let ofAsync (a:Async<'a>) : IObservable<'a> =
189+
failwith "TODO"
190+
191+
/// Observable.SelectMany
192+
let bind (f:'a -> IObservable<'b>) (o:IObservable<'a>) : IObservable<'b> =
193+
failwith "TODO"
194+
195+
/// Filter an observable sequence using a predicate producing a observable
196+
/// which emits a single boolean value.
197+
let filterObs (f:'a -> IObservable<bool>) : IObservable<'a> -> IObservable<'a> =
198+
bind <| fun a ->
199+
f a
200+
|> Observable.choose (function
201+
| true -> Some a
202+
| false -> None
203+
)
204+
205+
/// Filter an observable sequence using a predicate which returns an async
206+
/// computation producing a boolean value.
207+
let filterAsync (f:'a -> Async<bool>) : IObservable<'a> -> IObservable<'a> =
208+
filterObs (f >> ofAsync)
209+
210+
/// Maps over an observable sequence using an async-returning function.
211+
let mapAsync (f:'a -> Async<'b>) : IObservable<'a> -> IObservable<'b> =
212+
bind (f >> ofAsync)
213+
214+
let filteredTweetsObs' : IObservable<Tweet> =
215+
filteredTweetsObs
216+
|> Observable.filterAsync filterTweet
217+
218+
219+
(**
220+
With a little effort, we were able to adapt `IObservable<'a>` to our needs. Next lets try implementing the storage of
221+
filtered tweets. Again, we can adapt the function `storeTweet` defined above to the observable model and bind the
222+
observable of filtered tweets to it:
223+
*)
224+
225+
let storedTweetsObs : IObservable<unit> =
226+
filteredTweetsObs'
227+
|> Observable.mapAsync storeTweet
228+
229+
(**
230+
The observable sequence `storedTweetsObs` will produces a value each time a filtered tweet is stored. The entire
231+
workflow can be expressed as follows:
232+
*)
233+
234+
let storedTeetsObs' : IObservable<unit> =
235+
tweetsObs
236+
|> Observable.filterAsync filterTweet
237+
|> Observable.mapAsync storeTweet
238+
239+
(**
240+
Overall, both solutions are succinct and composable and deciding which one to use can ultimately be a matter of preference.
241+
Some things to consider are the "synchronous push" vs. "asynchronous pull" semantics. On the one hand, tweets are pushed based - the consumer has no control
242+
over their generation. On the other hand, the program at hand will process the tweets on its own terms regardless of how quickly
243+
they are being generated. Moreover, the underlying Twitter API will likely utilize a request-reply protocol to retrieve batches of
244+
tweets from persistent storage. As such, the distinction between "synchronous push" vs. "asynchronous pull" becomes less interesting. If the underlying source
245+
is truly push-based, then one can buffer its output and consume it using an asynchronous sequence. If the underlying source is pull-based,
246+
then one can turn it into an observable sequence by first pulling, then pushing. Note however that in a true real-time reactive system,
247+
notifications must be pushed immediately without delay.
248+
249+
Upon closer inspection, the consumption approaches between the two models aren't all too different. While `AsyncSeq` is based on an asynchronous-pull operation,
250+
it is usually consumed using an operator such as `AsyncSeq.iterAsync` as shown above. This is a function of type
251+
`('a -> Async<unit>) -> AsyncSeq<'a> -> Async<unit>` where the first argument is a function `'a -> Async<unit>` which performs
252+
some work on an item of the sequence and is applied repeatedly to subsequent items. In a sense, `iterAsync` *pushes* values to this
253+
function. The primary difference from observers of observable sequences is the return type `Async<unit>` rather than simply `unit`.
254+
*)
255+
256+
257+
(**
258+
### Performance Considerations
259+
260+
While an asynchronous computation obviates the need to block an OS thread for the duration of an operation, it isn't always the case
261+
that this will improve the overall performance of an application. Note however that an async computation does not *require* a
262+
non-blocking operation, it simply allows for it. Also of note is that unlike calling `IEnumerable.MoveNext()`, consuming
263+
and item from an asynchronous sequence requires several allocations. Usually this is greatly outweighed by the
264+
benefits, it can make a difference in some scenarios.
265+
266+
*)
267+
268+
269+
(**
270+
## Related Articles
271+
272+
* [Programming with F# asynchronous sequences](http://tomasp.net/blog/async-sequences.aspx/)
273+
274+
*)

0 commit comments

Comments
 (0)