@@ -8,9 +8,9 @@ similar to `list<'a>` with the difference being that each head and tail node or
88in ` Async ` . ` AsyncSeq ` also bears similarity to ` IObservable<'a> ` with the former being pull-based and the
99latter push-based. Analogs for most operations defined for ` Seq ` , ` List ` and ` IObservable ` are also defined for
1010` AsyncSeq ` . The power of ` AsyncSeq ` lies in that many of these operations also have analogs based on ` Async `
11- allowing one to compose complex asynchronous workflows.
11+ allowing composition of complex asynchronous workflows.
1212
13- The ` AsyncSeq ` type is located in the `FSharpx.Async.dll assembly which can be loaded in F# Interactive as follows:
13+ The ` AsyncSeq ` type is located in the ` FSharpx.Async.dll ` assembly which can be loaded in F# Interactive as follows:
1414*)
1515
1616#r " ../../../bin/FSharpx.Async.dll"
@@ -31,16 +31,16 @@ let asyncS = asyncSeq {
3131
3232(**
3333Another way to generate an asynchronous sequence is using the ` Async.unfoldAsync ` function. This
34- function takes another function which can generate individual elements based on a state and
34+ function accepts as an argument a function which can generate individual elements based on a state and
3535signal completion of the sequence.
3636
3737For example, suppose that you're writing a program which consumes the Twitter API and stores tweets
38- which satisfy some criteria into a database. There are several asynchronous request-reply operations at play -
38+ which satisfy some criteria into a database. There are several asynchronous request-reply interactions at play -
3939one to retrieve a batch of tweets from the Twitter API, another to determine whether a tweet satisfies some
4040criteria and finally an operation to write the desired tweet to a database.
4141
4242Given the type ` Tweet ` to represent an individual tweet, the operation to retrieve a batch of tweets can
43- be modeled with a type ` int -> Async<(Tweet[] * int) option> ` where the incoming ` int ` represents the
43+ be modeled with type ` int -> Async<(Tweet[] * int) option> ` where the incoming ` int ` represents the
4444offset into the tweet stream. The asynchronous result is an ` Option ` which when ` None ` indicates the
4545end of the stream, and otherwise contains the batch of retrieved tweets as well as the next offset.
4646
@@ -60,10 +60,10 @@ let tweetBatches : AsyncSeq<Tweet[]> =
6060 AsyncSeq.unfoldAsync getTweetBatch 0
6161
6262(**
63- The asynchronous sequence ` tweetBatches ` will when iterated consume the entire tweet stream.
63+ The asynchronous sequence ` tweetBatches ` will when iterated, incrementally consume the entire tweet stream.
6464
6565Next, suppose that the tweet filtering function makes a call to a web service which determines
66- whether a particular tweet should be stored in the database. This function can be modeled with
66+ whether a particular tweet is of interest and should be stored in the database. This function can be modeled with
6767type ` Tweet -> Async<bool> ` . We can flatten the ` tweetBatches ` sequence and then filter it as follows:
6868*)
6969
@@ -92,7 +92,7 @@ let storeFilteredTweets : Async<unit> =
9292
9393(**
9494Note that the value ` storeFilteredTweets ` is an asynchronous computation of type ` Async<unit> ` . At this point,
95- it is a ** representation* * of the workflow which consists of reading batches of tweets, filtering them and storing them
95+ it is a * representation* of the workflow which consists of reading batches of tweets, filtering them and storing them
9696in the database. When executed, the workflow will consume the entire tweet stream. The entire workflow can be
9797succinctly declared and executed as follows:
9898*)
@@ -113,9 +113,9 @@ composed using familiar operations on sequences. Furthermore, it will be execute
113113
114114The central difference between ` seq<'a> ` and ` AsyncSeq<'a> ` two can be illustrated by introducing the notion of time.
115115Suppose that generating subsequent elements of a sequence requires an IO-bound operation. Invoking long
116- running IO-bound operations from within a ` seq<'a> ` will ** block* * the thread which calls ` MoveNext ` on the
117- corresponding ` IEnumerator ` . An ` AsyncSeq ` can use facilities provided by the F# ` Async ` type to make more efficient
118- use of system resources.
116+ running IO-bound operations from within a ` seq<'a> ` will * block* the thread which calls ` MoveNext ` on the
117+ corresponding ` IEnumerator ` . An ` AsyncSeq ` on the other hand can use facilities provided by the F# ` Async ` type to make
118+ more efficient use of system resources.
119119*)
120120
121121let withTime = seq {
@@ -134,8 +134,9 @@ let withTime' = asyncSeq {
134134
135135(**
136136When the asynchronous sequence ` withTime' ` is iterated, the calls to ` Async.Sleep ` won't block threads. Instead,
137- the ** continuation** of the sequence will be scheduled by a ` ThreadPool ` thread, while the calling thread
138- will be free to perform other work. Overall, a ` seq<'a> ` can be viewed as a special case of an ` AsyncSeq<'a> ` .
137+ the * continuation* of the sequence will be scheduled by ` Async ` while the calling thread will be free to perform other work.
138+ Overall, a ` seq<'a> ` can be viewed as a special case of an ` AsyncSeq<'a> ` where subsequent elements are retrieved
139+ in a blocking manner.
139140*)
140141
141142
@@ -144,16 +145,16 @@ will be free to perform other work. Overall, a `seq<'a>` can be viewed as a spec
144145
145146Both ` IObservable<'a> ` and ` AsyncSeq<'a> ` represent collections of items and both provide similar operations
146147for transformation and composition. The central difference between the two is that the former is push-based
147- and the latter is pull-based. Consumers of an ` IObservable<'a> ` ** subscribe* * to receive notifications about
148- new items or completion . By contrast, consumers of an ` AsyncSeq<'a> ` ** retrieve* * subsequent items on their own
148+ and the latter is pull-based. Consumers of an ` IObservable<'a> ` * subscribe* to receive notifications about
149+ new items or the end of the sequence . By contrast, consumers of an ` AsyncSeq<'a> ` * retrieve* subsequent items on their own
149150terms. Some domains are more naturally modeled with one or the other, however it is less clear which is a more
150151suitable tool for a specific task. In many cases, a combination of the two provides the optimal solution and
151152restricting yourself to one, while simplifying the programming model, can lead one to view all problems as a nail.
152153
153154A more specific difference between the two is that ` IObservable<'a> ` subscribers have the basic type ` 'a -> unit `
154155and are therefore inherently synchronous and imperative. The observer can certainly make a blocking call, but this
155156can defeat the purpose of the observable sequence all together. Alternatively, the observer can spawn an operation, but
156- this can break composition because one can no longer rely on the observer operation returning to determine that it has
157+ this can break composition because one can no longer rely on the observer returning to determine that it has
157158completed. With the observable model however, we can model blocking operations through composition on sequences rather
158159than observers.
159160
@@ -190,6 +191,8 @@ module Observable =
190191 let bind ( f : 'a -> IObservable < 'b >) ( o : IObservable < 'a >) : IObservable < 'b > =
191192 failwith " TODO"
192193
194+ /// Filter an observable sequence using a predicate producing a observable
195+ /// which emits a single boolean value.
193196 let filterObs ( f : 'a -> IObservable < bool >) : IObservable < 'a > -> IObservable < 'a > =
194197 bind <| fun a ->
195198 f a
@@ -198,9 +201,12 @@ module Observable =
198201 | false -> None
199202 )
200203
204+ /// Filter an observable sequence using a predicate which returns an async
205+ /// computation producing a boolean value.
201206 let filterAsync ( f : 'a -> Async < bool >) : IObservable < 'a > -> IObservable < 'a > =
202207 filterObs ( f >> ofAsync)
203208
209+ /// Maps over an observable sequence using an async-returning function.
204210 let mapAsync ( f : 'a -> Async < 'b >) : IObservable < 'a > -> IObservable < 'b > =
205211 bind ( f >> ofAsync)
206212
@@ -236,17 +242,22 @@ over their generation. On the other hand, the program at hand will process the t
236242they are being generated. Moreover, the underlying Twitter API will likely utilize a request-reply protocol to retrieve batches of
237243tweets from persistent storage. As such, the distinction between push vs. pull becomes less interesting. If the underlying source
238244is truly push-based, then one can buffer its output and consume it using an asynchronous sequence. If the underlying source is pull-based,
239- then one can turn it into an observable sequence by first pulling, then pushing. In a real-time reactive system, notifications must be pushed
240- immediately without delay. This point however is moot since neither ` IObservable<'a> ` nor ` Async<'a> ` are well suited for
241- real-time systems.
245+ then one can turn it into an observable sequence by first pulling, then pushing. Note however that in a true real-time reactive system,
246+ notifications must be pushed immediately without delay.
247+
248+ Upon closer inspection, the consumption approaches between the two models aren't all too different. While ` AsyncSeq ` is pull based,
249+ it is usually consumed using an operator such as ` AsyncSeq.iterAsync ` as shown above. This is a function of type
250+ ` ('a -> Async<unit>) -> AsyncSeq<'a> -> Async<unit> ` where the first argument is a function ` 'a -> Async<unit> ` which performs
251+ some work on an item of the sequence and is applied repeatedly to subsequent items. In a sense, ` iterAsync ` * pushes* values to this
252+ function. The primary difference from observers of observable sequences is the return type ` Async<unit> ` rather than simply ` unit ` .
242253*)
243254
244255
245256(**
246257### Performance Considerations
247258
248259While an asynchronous computation obviates the need to block an OS thread for the duration of an operation, it isn't always the case
249- that this will improve the overall performance of an application. Note however that an async computation does not ** require* * a
260+ that this will improve the overall performance of an application. Note however that an async computation does not * require* a
250261non-blocking operation, it simply allows for it. Also of note is that unlike calling ` IEnumerable.MoveNext() ` , consuming
251262and item from an asynchronous sequence requires several allocations. Usually this is greatly outweighed by the
252263benefits, it can make a difference in some scenarios.
0 commit comments