Skip to content

Commit 0f308ca

Browse files
authored
Update AsyncSeq.fsx
1 parent a75149f commit 0f308ca

1 file changed

Lines changed: 0 additions & 111 deletions

File tree

docs/AsyncSeq.fsx

Lines changed: 0 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -152,117 +152,6 @@ the *continuation* of the sequence will be scheduled by `Async` while the callin
152152
Overall, a `seq<'a>` can be viewed as a special case of an `AsyncSeq<'a>` where subsequent elements are retrieved
153153
in a blocking manner.
154154
155-
### Comparison with IObservable<'T>
156-
157-
Both `IObservable<'T>` and `AsyncSeq<'T>` represent collections of items and both provide similar operations
158-
for transformation and composition. The central difference between the two is that the former uses a *synchronous push*
159-
to a subscriber and the latter uses an *asynchronous pull* by a consumer.
160-
Consumers of an `IObservable<'T>` *subscribe* to receive notifications about
161-
new items or the end of the sequence. By contrast, consumers of an `AsyncSeq<'T>` *asynchronously retrieve* subsequent items on their own
162-
terms. Some domains are more naturally modeled with one or the other, however it is less clear which is a more
163-
suitable tool for a specific task. In many cases, a combination of the two provides the optimal solution and
164-
restricting yourself to one, while simplifying the programming model, can lead one to view all problems as a nail.
165-
166-
A more specific difference between the two is that `IObservable<'T>` subscribers have the basic type `'T -> unit`
167-
and are therefore inherently synchronous and imperative. The observer can certainly make a blocking call, but this
168-
can defeat the purpose of the observable sequence all together. Alternatively, the observer can spawn an operation, but
169-
this can break composition because one can no longer rely on the observer returning to determine that it has
170-
completed. With the observable model however, we can model blocking operations through composition on sequences rather
171-
than observers.
172-
173-
To illustrate, let's try to implement the above Tweet retrieval, filtering and storage workflow using observable sequences.
174-
Suppose we already have an observable sequence representing tweets `IObservable<Tweet>` and we simply wish
175-
to filter it and store the resulting tweets. The function `Observable.filter` allows one to filter observable
176-
sequences based on a predicate, however in this case it doesn't quite cut it because the predicate passed to it must
177-
be synchronous `'T -> bool`:
178-
*)
179-
180-
open System
181-
182-
let tweetsObs : IObservable<Tweet> =
183-
failwith "TODO: create observable"
184-
185-
let filteredTweetsObs =
186-
tweetsObs
187-
|> Observable.filter (filterTweet >> Async.RunSynchronously) // blocking IO-call!
188-
189-
(**
190-
To remedy the blocking IO-call we can better adapt the filtering function to the `IObservable<'T>` model. A value
191-
of type `Async<'T>` can be modeled as an `IObservable<'T>` with one element. Suppose that we have
192-
`Tweet -> IObservable<bool>`. We can define a few helper operators on observables to allow filtering using
193-
an asynchronous predicate as follows:
194-
*)
195-
196-
module Observable =
197-
198-
/// a |> Async.StartAsTask |> (fun t -> t.ToObservable())
199-
let ofAsync (a:Async<'a>) : IObservable<'a> =
200-
failwith "TODO"
201-
202-
/// Observable.SelectMany
203-
let bind (f:'a -> IObservable<'b>) (o:IObservable<'a>) : IObservable<'b> =
204-
failwith "TODO"
205-
206-
/// Filter an observable sequence using a predicate producing a observable
207-
/// which emits a single boolean value.
208-
let filterObs (f:'a -> IObservable<bool>) : IObservable<'a> -> IObservable<'a> =
209-
bind <| fun a ->
210-
f a
211-
|> Observable.choose (function
212-
| true -> Some a
213-
| false -> None
214-
)
215-
216-
/// Filter an observable sequence using a predicate which returns an async
217-
/// computation producing a boolean value.
218-
let filterAsync (f:'a -> Async<bool>) : IObservable<'a> -> IObservable<'a> =
219-
filterObs (f >> ofAsync)
220-
221-
/// Maps over an observable sequence using an async-returning function.
222-
let mapAsync (f:'a -> Async<'b>) : IObservable<'a> -> IObservable<'b> =
223-
bind (f >> ofAsync)
224-
225-
let filteredTweetsObs' : IObservable<Tweet> =
226-
filteredTweetsObs
227-
|> Observable.filterAsync filterTweet
228-
229-
230-
(**
231-
With a little effort, we were able to adapt `IObservable<'a>` to our needs. Next let's try implementing the storage of
232-
filtered tweets. Again, we can adapt the function `storeTweet` defined above to the observable model and bind the
233-
observable of filtered tweets to it:
234-
*)
235-
236-
let storedTweetsObs : IObservable<unit> =
237-
filteredTweetsObs'
238-
|> Observable.mapAsync storeTweet
239-
240-
(**
241-
The observable sequence `storedTweetsObs` will produces a value each time a filtered tweet is stored. The entire
242-
workflow can be expressed as follows:
243-
*)
244-
245-
let storedTeetsObs' : IObservable<unit> =
246-
tweetsObs
247-
|> Observable.filterAsync filterTweet
248-
|> Observable.mapAsync storeTweet
249-
250-
(**
251-
Overall, both solutions are succinct and composable and deciding which one to use can ultimately be a matter of preference.
252-
Some things to consider are the "synchronous push" vs. "asynchronous pull" semantics. On the one hand, tweets are pushed based - the consumer has no control
253-
over their generation. On the other hand, the program at hand will process the tweets on its own terms regardless of how quickly
254-
they are being generated. Moreover, the underlying Twitter API will likely utilize a request-reply protocol to retrieve batches of
255-
tweets from persistent storage. As such, the distinction between "synchronous push" vs. "asynchronous pull" becomes less interesting. If the underlying source
256-
is truly push-based, then one can buffer its output and consume it using an asynchronous sequence. If the underlying source is pull-based,
257-
then one can turn it into an observable sequence by first pulling, then pushing. Note however that in a true real-time reactive system,
258-
notifications must be pushed immediately without delay.
259-
260-
Upon closer inspection, the consumption approaches between the two models aren't all too different. While `AsyncSeq` is based on an asynchronous-pull operation,
261-
it is usually consumed using an operator such as `AsyncSeq.iterAsync` as shown above. This is a function of type
262-
`('T -> Async<unit>) -> AsyncSeq<'T> -> Async<unit>` where the first argument is a function `'T -> Async<unit>` which performs
263-
some work on an item of the sequence and is applied repeatedly to subsequent items. In a sense, `iterAsync` *pushes* values to this
264-
function. The primary difference from observers of observable sequences is the return type `Async<unit>` rather than simply `unit`.
265-
266155
### Performance Considerations
267156
268157
While an asynchronous computation obviates the need to block an OS thread for the duration of an operation, it isn't always the case

0 commit comments

Comments
 (0)