22
33# F# Async: AsyncSeq
44
5- An AsyncSeq is an sequence in which individual elements are retrieved using an ` Async ` computation.
5+ An AsyncSeq is a sequence in which individual elements are retrieved using an ` Async ` computation.
66It is similar to ` seq<'a> ` in that subsequent elements are pulled lazily. Structurally it is
77similar to ` list<'a> ` with the difference being that each head and tail node or empty node is wrapped
88in ` Async ` . ` AsyncSeq ` also bears similarity to ` IObservable<'a> ` with the former being pull-based and the
@@ -31,7 +31,7 @@ let asyncS = asyncSeq {
3131
3232(**
3333Another way to generate an asynchronous sequence is using the ` Async.unfoldAsync ` function. This
34- function takes another function which can generate individual elements based on a state. It can
34+ function takes another function which can generate individual elements based on a state and
3535signal completion of the sequence.
3636
3737For example, suppose that you're writing a program which consumes the Twitter API and stores tweets
@@ -64,8 +64,7 @@ The asynchronous sequence `tweetBatches` will when iterated consume the entire t
6464
6565Next, suppose that the tweet filtering function makes a call to a web service which determines
6666whether a particular tweet should be stored in the database. This function can be modeled with
67- type ` Tweet -> Async<bool> ` . We can flatten the ` tweetBatches ` sequence and then filter it using
68- this function:
67+ type ` Tweet -> Async<bool> ` . We can flatten the ` tweetBatches ` sequence and then filter it as follows:
6968*)
7069
7170let filterTweet ( t : Tweet ) : Async < bool > =
@@ -95,7 +94,7 @@ let storeFilteredTweets : Async<unit> =
9594Note that the value ` storeFilteredTweets ` is an asynchronous computation of type ` Async<unit> ` . At this point,
9695it is a ** representation** of the workflow which consists of reading batches of tweets, filtering them and storing them
9796in the database. When executed, the workflow will consume the entire tweet stream. The entire workflow can be
98- succinctly expressed and executed as follows:
97+ succinctly declared and executed as follows:
9998*)
10099
101100AsyncSeq.unfoldAsync getTweetBatch 0
@@ -106,7 +105,7 @@ AsyncSeq.unfoldAsync getTweetBatch 0
106105
107106(**
108107The above snippet effectively orchestrates several asynchronous request-reply interactions into a cohesive unit
109- composed with familiar operations on sequences. Furthermore, it can be executed efficiently in a non-blocking manner.
108+ composed using familiar operations on sequences. Furthermore, it will be executed efficiently in a non-blocking manner.
110109*)
111110
112111(**
@@ -136,7 +135,7 @@ let withTime' = asyncSeq {
136135(**
137136When the asynchronous sequence ` withTime' ` is iterated, the calls to ` Async.Sleep ` won't block threads. Instead,
138137the ** continuation** of the sequence will be scheduled by a ` ThreadPool ` thread, while the calling thread
139- will be free to perform other work.
138+ will be free to perform other work. Overall, a ` seq<'a> ` can be viewed as a special case of an ` AsyncSeq<'a> ` .
140139*)
141140
142141
@@ -149,19 +148,20 @@ and the latter is pull-based. Consumers of an `IObservable<'a>` **subscribe** to
149148new items or completion. By contrast, consumers of an ` AsyncSeq<'a> ` ** retrieve** subsequent items on their own
150149terms. Some domains are more naturally modeled with one or the other, however it is less clear which is a more
151150suitable tool for a specific task. In many cases, a combination of the two provides the optimal solution and
152- restricting yourself to one, while simplifying the programming model, can lead one two view all problems as a nail.
151+ restricting yourself to one, while simplifying the programming model, can lead one to view all problems as a nail.
153152
154153A more specific difference between the two is that ` IObservable<'a> ` subscribers have the basic type ` 'a -> unit `
155154and are therefore inherently synchronous and imperative. The observer can certainly make a blocking call, but this
156155can defeat the purpose of the observable sequence all together. Alternatively, the observer can spawn an operation, but
157156this can break composition because one can no longer rely on the observer operation returning to determine that it has
158- completed. With the observable model however, we can model blocking operations through composition.
157+ completed. With the observable model however, we can model blocking operations through composition on sequences rather
158+ than observers.
159159
160160To illustrate, lets try to implement the above Tweet retrieval, filtering and storage workflow using observable sequences.
161161Suppose we already have an observable sequence representing tweets ` IObservable<Tweet> ` and we simply wish
162162to filter it and store the resulting tweets. The function ` Observable.filter ` allows one to filter observable
163- sequences based on a predicate, however in this case it doesn't quite cut it because the predicate is synchronous
164- ` 'a -> bool ` :
163+ sequences based on a predicate, however in this case it doesn't quite cut it because the predicate passed to it must
164+ be synchronous ` 'a -> bool ` :
165165*)
166166
167167open System
@@ -174,17 +174,19 @@ let filteredTweetsObs =
174174 |> Observable.filter ( filterTweet >> Async.RunSynchronously) // blocking IO-call!
175175
176176(**
177- To remedy the blocking IO-call we can adapt the filtering function to the ` IObservable<'a> ` model. An ` Async<'a> `
178- can be modeled as an ` IObservable<'a> ` with one element so suppose that we have ` Tweet -> IObservable<bool> ` . We can
179- then compose an observable that filters tweets using this function as follows:
177+ To remedy the blocking IO-call we can better adapt the filtering function to the ` IObservable<'a> ` model. A value
178+ of type ` Async<'a> ` can be modeled as an ` IObservable<'a> ` with one element. Suppose that we have
179+ ` Tweet -> IObservable<bool> ` . We can define a few helper operators on observables to allow filtering using
180+ an asynchronous predicate as follows:
180181*)
181182
182183module Observable =
183184
185+ /// a |> Async.StartAsTask |> (fun t -> t.ToObservable())
184186 let ofAsync ( a : Async < 'a >) : IObservable < 'a > =
185187 failwith " TODO"
186188
187- /// Observable.SelectMany in Rx
189+ /// Observable.SelectMany
188190 let bind ( f : 'a -> IObservable < 'b >) ( o : IObservable < 'a >) : IObservable < 'b > =
189191 failwith " TODO"
190192
@@ -208,7 +210,7 @@ let filteredTweetsObs' : IObservable<Tweet> =
208210
209211
210212(**
211- With little effort we were able to adapt ` IObservable<'a> ` to our needs. Next lets try implementing the storage of
213+ With a little effort, we were able to adapt ` IObservable<'a> ` to our needs. Next lets try implementing the storage of
212214filtered tweets. Again, we can adapt the function ` storeTweet ` defined above to the observable model and bind the
213215observable of filtered tweets to it:
214216*)
@@ -228,13 +230,13 @@ let storedTeetsObs' : IObservable<unit> =
228230 |> Observable.mapAsync storeTweet
229231
230232(**
231- Overall, both solutions are succinct and composable and can ultimately be a matter of preference. Some things to consider
232- are the push vs. pull semantics. On the one hand, tweets are pushed based - the consumer has no control over their generation.
233- On the other hand, the program at hand will process the tweets on its own terms regardless of how quickly they are being generated.
234- Moreover, the underlying Twitter API will likely utilize a request-reply protocol to retrieve batches of tweets from persistent
235- storage. As such, the distinction between push vs. pull becomes less interesting. If the underlying source is truly push-based, then
236- one can buffer its output and consume it using an asynchronous sequence. If the underlying source is pull-based, then one can turn
237- it into an observable sequence by first pulling, then pushing. In a real-time reactive system, notifications must be pushed
233+ Overall, both solutions are succinct and composable and deciding which one to use can ultimately be a matter of preference.
234+ Some things to consider are the push vs. pull semantics. On the one hand, tweets are pushed based - the consumer has no control
235+ over their generation. On the other hand, the program at hand will process the tweets on its own terms regardless of how quickly
236+ they are being generated. Moreover, the underlying Twitter API will likely utilize a request-reply protocol to retrieve batches of
237+ tweets from persistent storage. As such, the distinction between push vs. pull becomes less interesting. If the underlying source
238+ is truly push-based, then one can buffer its output and consume it using an asynchronous sequence. If the underlying source is pull-based,
239+ then one can turn it into an observable sequence by first pulling, then pushing. In a real-time reactive system, notifications must be pushed
238240immediately without delay. This point however is moot since neither ` IObservable<'a> ` nor ` Async<'a> ` are well suited for
239241real-time systems.
240242*)
@@ -243,9 +245,11 @@ real-time systems.
243245(**
244246### Performance Considerations
245247
246- While an async computation obviates the need to block an OS thread for the duration of an operation, it isn't always the case
248+ While an asynchronous computation obviates the need to block an OS thread for the duration of an operation, it isn't always the case
247249that this will improve the overall performance of an application. Note however that an async computation does not ** require** a
248- non-blocking operation, it simply allows for it.
250+ non-blocking operation, it simply allows for it. Also of note is that unlike calling ` IEnumerable.MoveNext() ` , consuming
251+ and item from an asynchronous sequence requires several allocations. Usually this is greatly outweighed by the
252+ benefits, it can make a difference in some scenarios.
249253
250254*)
251255
0 commit comments