Skip to content

Commit aa86ddb

Browse files
committed
AsyncSeq docs, terminology docs
1 parent 47b3050 commit aa86ddb

4 files changed

Lines changed: 346 additions & 0 deletions

File tree

FSharpx.Async.sln

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,18 @@ EndProject
3030
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "content", "content", "{8E6D5255-776D-4B61-85F9-73C37AA1FB9A}"
3131
ProjectSection(SolutionItems) = preProject
3232
docs\content\index.md = docs\content\index.md
33+
docs\content\terminology.md = docs\content\terminology.md
3334
EndProjectSection
3435
EndProject
3536
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "tests", "tests", "{ED8079DD-2B06-4030-9F0F-DC548F98E1C4}"
3637
EndProject
3738
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "FSharpx.Async.Tests", "tests\FSharpx.Async.Tests\FSharpx.Async.Tests.fsproj", "{ADBD32DB-B7AC-4E81-9507-28F329AF23E7}"
3839
EndProject
40+
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "library", "library", "{B4BB88FA-DB8C-4651-9869-A8FEA79F044E}"
41+
ProjectSection(SolutionItems) = preProject
42+
docs\content\library\AsyncSeq.fsx = docs\content\library\AsyncSeq.fsx
43+
EndProjectSection
44+
EndProject
3945
Global
4046
GlobalSection(SolutionConfigurationPlatforms) = preSolution
4147
Debug|Any CPU = Debug|Any CPU
@@ -58,5 +64,6 @@ Global
5864
{83F16175-43B1-4C90-A1EE-8E351C33435D} = {A6A6AF7D-D6E3-442D-9B1E-58CC91879BE1}
5965
{8E6D5255-776D-4B61-85F9-73C37AA1FB9A} = {A6A6AF7D-D6E3-442D-9B1E-58CC91879BE1}
6066
{ADBD32DB-B7AC-4E81-9507-28F329AF23E7} = {ED8079DD-2B06-4030-9F0F-DC548F98E1C4}
67+
{B4BB88FA-DB8C-4651-9869-A8FEA79F044E} = {8E6D5255-776D-4B61-85F9-73C37AA1FB9A}
6168
EndGlobalSection
6269
EndGlobal

docs/content/index.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,11 @@ Samples & documentation
1919

2020
[API Reference](reference/index.html) contains automatically generated documentation for all types, modules and functions in the library.
2121
This includes additional brief samples on using most of the functions.
22+
23+
[Terminology](terminology.html) a reference for some of the terminology around F# async.
2224

25+
[AsyncSeq](library/AsyncSeq.html) contains narrative and code samples explaining asynchronous sequences.
26+
2327
Contributing and copyright
2428
--------------------------
2529

docs/content/library/AsyncSeq.fsx

Lines changed: 258 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,258 @@
1+
(**
2+
3+
# F# Async: AsyncSeq
4+
5+
An AsyncSeq is an sequence in which individual elements are retrieved using an `Async` computation.
6+
It is similar to `seq<'a>` in that subsequent elements are pulled lazily. Structurally it is
7+
similar to `list<'a>` with the difference being that each head and tail node or empty node is wrapped
8+
in `Async`. `AsyncSeq` also bears similarity to `IObservable<'a>` with the former being pull-based and the
9+
latter push-based. Analogs for most operations defined for `Seq`, `List` and `IObservable` are also defined for
10+
`AsyncSeq`. The power of `AsyncSeq` lies in that many of these operations also have analogs based on `Async`
11+
allowing one to compose complex asynchronous workflows.
12+
13+
The `AsyncSeq` type is located in the `FSharpx.Async.dll assembly which can be loaded in F# Interactive as follows:
14+
*)
15+
16+
#r "../../../bin/FSharpx.Async.dll"
17+
open FSharpx.Control
18+
19+
20+
21+
(**
22+
### Generating asynchronous sequences
23+
24+
An `AsyncSeq<'a>` can be generated using computation expression syntax much like `seq<'a>`:
25+
*)
26+
27+
let asyncS = asyncSeq {
28+
yield 1
29+
yield 2
30+
}
31+
32+
(**
33+
Another way to generate an asynchronous sequence is using the `Async.unfoldAsync` function. This
34+
function takes another function which can generate individual elements based on a state. It can
35+
signal completion of the sequence.
36+
37+
For example, suppose that you're writing a program which consumes the Twitter API and stores tweets
38+
which satisfy some criteria into a database. There are several asynchronous request-reply operations at play -
39+
one to retrieve a batch of tweets from the Twitter API, another to determine whether a tweet satisfies some
40+
criteria and finally an operation to write the desired tweet to a database.
41+
42+
Given the type `Tweet` to represent an individual tweet, the operation to retrieve a batch of tweets can
43+
be modeled with a type `int -> Async<(Tweet[] * int) option>` where the incoming `int` represents the
44+
offset into the tweet stream. The asynchronous result is an `Option` which when `None` indicates the
45+
end of the stream, and otherwise contains the batch of retrieved tweets as well as the next offset.
46+
47+
The above function to retrieve a batch of tweets can be used to generate an asynchronous sequence
48+
of tweet batches as follows:
49+
*)
50+
51+
type Tweet = {
52+
user : string
53+
message : string
54+
}
55+
56+
let getTweetBatch (offset:int) : Async<(Tweet[] * int) option> =
57+
failwith "TODO: call Twitter API"
58+
59+
let tweetBatches : AsyncSeq<Tweet[]> =
60+
AsyncSeq.unfoldAsync getTweetBatch 0
61+
62+
(**
63+
The asynchronous sequence `tweetBatches` will when iterated consume the entire tweet stream.
64+
65+
Next, suppose that the tweet filtering function makes a call to a web service which determines
66+
whether a particular tweet should be stored in the database. This function can be modeled with
67+
type `Tweet -> Async<bool>`. We can flatten the `tweetBatches` sequence and then filter it using
68+
this function:
69+
*)
70+
71+
let filterTweet (t:Tweet) : Async<bool> =
72+
failwith "TODO: call web service"
73+
74+
let filteredTweets : AsyncSeq<Tweet> =
75+
tweetBatches
76+
|> AsyncSeq.concatSeq // flatten
77+
|> AsyncSeq.filterAsync filterTweet // filter
78+
79+
(**
80+
When the resulting sequence `filteredTweets` is consumed, it will lazily consume the underlying
81+
sequence `tweetBatches`, select individual tweets and filter them using the function `filterTweets`.
82+
83+
Finally, the function which stores a tweet in the database can be modeled by type `Tweet -> Async<unit>`.
84+
We can store all filtered tweets as follows:
85+
*)
86+
87+
let storeTweet (t:Tweet) : Async<unit> =
88+
failwith "TODO: call database"
89+
90+
let storeFilteredTweets : Async<unit> =
91+
filteredTweets
92+
|> AsyncSeq.iterAsync storeTweet
93+
94+
(**
95+
Note that the value `storeFilteredTweets` is an asynchronous computation of type `Async<unit>`. At this point,
96+
it is a **representation** of the workflow which consists of reading batches of tweets, filtering them and storing them
97+
in the database. When executed, the workflow will consume the entire tweet stream. The entire workflow can be
98+
succinctly expressed and executed as follows:
99+
*)
100+
101+
AsyncSeq.unfoldAsync getTweetBatch 0
102+
|> AsyncSeq.concatSeq
103+
|> AsyncSeq.filterAsync filterTweet
104+
|> AsyncSeq.iterAsync storeTweet
105+
|> Async.RunSynchronously
106+
107+
(**
108+
The above snippet effectively orchestrates several asynchronous request-reply interactions into a cohesive unit
109+
composed with familiar operations on sequences. Furthermore, it can be executed efficiently in a non-blocking manner.
110+
*)
111+
112+
(**
113+
### Comparison with seq<'a>
114+
115+
The central difference between `seq<'a>` and `AsyncSeq<'a>` two can be illustrated by introducing the notion of time.
116+
Suppose that generating subsequent elements of a sequence requires an IO-bound operation. Invoking long
117+
running IO-bound operations from within a `seq<'a>` will **block** the thread which calls `MoveNext` on the
118+
corresponding `IEnumerator`. An `AsyncSeq` can use facilities provided by the F# `Async` type to make more efficient
119+
use of system resources.
120+
*)
121+
122+
let withTime = seq {
123+
System.Threading.Thread.Sleep(1000) // calling thread will block
124+
yield 1
125+
System.Threading.Thread.Sleep(1000) // calling thread will block
126+
yield 1
127+
}
128+
129+
let withTime' = asyncSeq {
130+
do! Async.Sleep 1000 // non-blocking sleep
131+
yield 1
132+
do! Async.Sleep 1000 // non-blocking sleep
133+
yield 2
134+
}
135+
136+
(**
137+
When the asynchronous sequence `withTime'` is iterated, the calls to `Async.Sleep` won't block threads. Instead,
138+
the **continuation** of the sequence will be scheduled by a `ThreadPool` thread, while the calling thread
139+
will be free to perform other work.
140+
*)
141+
142+
143+
(**
144+
### Comparison with IObservable<'a>
145+
146+
Both `IObservable<'a>` and `AsyncSeq<'a>` represent collections of items and both provide similar operations
147+
for transformation and composition. The central difference between the two is that the former is push-based
148+
and the latter is pull-based. Consumers of an `IObservable<'a>` **subscribe** to receive notifications about
149+
new items or completion. By contrast, consumers of an `AsyncSeq<'a>` **retrieve** subsequent items on their own
150+
terms. Some domains are more naturally modeled with one or the other, however it is less clear which is a more
151+
suitable tool for a specific task. In many cases, a combination of the two provides the optimal solution and
152+
restricting yourself to one, while simplifying the programming model, can lead one two view all problems as a nail.
153+
154+
A more specific difference between the two is that `IObservable<'a>` subscribers have the basic type `'a -> unit`
155+
and are therefore inherently synchronous and imperative. The observer can certainly make a blocking call, but this
156+
can defeat the purpose of the observable sequence all together. Alternatively, the observer can spawn an operation, but
157+
this can break composition because one can no longer rely on the observer operation returning to determine that it has
158+
completed. With the observable model however, we can model blocking operations through composition.
159+
160+
To illustrate, lets try to implement the above Tweet retrieval, filtering and storage workflow using observable sequences.
161+
Suppose we already have an observable sequence representing tweets `IObservable<Tweet>` and we simply wish
162+
to filter it and store the resulting tweets. The function `Observable.filter` allows one to filter observable
163+
sequences based on a predicate, however in this case it doesn't quite cut it because the predicate is synchronous
164+
`'a -> bool`:
165+
*)
166+
167+
open System
168+
169+
let tweetsObs : IObservable<Tweet> =
170+
failwith "TODO: create observable"
171+
172+
let filteredTweetsObs =
173+
tweetsObs
174+
|> Observable.filter (filterTweet >> Async.RunSynchronously) // blocking IO-call!
175+
176+
(**
177+
To remedy the blocking IO-call we can adapt the filtering function to the `IObservable<'a>` model. An `Async<'a>`
178+
can be modeled as an `IObservable<'a>` with one element so suppose that we have `Tweet -> IObservable<bool>`. We can
179+
then compose an observable that filters tweets using this function as follows:
180+
*)
181+
182+
module Observable =
183+
184+
let ofAsync (a:Async<'a>) : IObservable<'a> =
185+
failwith "TODO"
186+
187+
/// Observable.SelectMany in Rx
188+
let bind (f:'a -> IObservable<'b>) (o:IObservable<'a>) : IObservable<'b> =
189+
failwith "TODO"
190+
191+
let filterObs (f:'a -> IObservable<bool>) : IObservable<'a> -> IObservable<'a> =
192+
bind <| fun a ->
193+
f a
194+
|> Observable.choose (function
195+
| true -> Some a
196+
| false -> None
197+
)
198+
199+
let filterAsync (f:'a -> Async<bool>) : IObservable<'a> -> IObservable<'a> =
200+
filterObs (f >> ofAsync)
201+
202+
let mapAsync (f:'a -> Async<'b>) : IObservable<'a> -> IObservable<'b> =
203+
bind (f >> ofAsync)
204+
205+
let filteredTweetsObs' : IObservable<Tweet> =
206+
filteredTweetsObs
207+
|> Observable.filterAsync filterTweet
208+
209+
210+
(**
211+
With little effort we were able to adapt `IObservable<'a>` to our needs. Next lets try implementing the storage of
212+
filtered tweets. Again, we can adapt the function `storeTweet` defined above to the observable model and bind the
213+
observable of filtered tweets to it:
214+
*)
215+
216+
let storedTweetsObs : IObservable<unit> =
217+
filteredTweetsObs'
218+
|> Observable.mapAsync storeTweet
219+
220+
(**
221+
The observable sequence `storedTweetsObs` will produces a value each time a filtered tweet is stored. The entire
222+
workflow can be expressed as follows:
223+
*)
224+
225+
let storedTeetsObs' : IObservable<unit> =
226+
tweetsObs
227+
|> Observable.filterAsync filterTweet
228+
|> Observable.mapAsync storeTweet
229+
230+
(**
231+
Overall, both solutions are succinct and composable and can ultimately be a matter of preference. Some things to consider
232+
are the push vs. pull semantics. On the one hand, tweets are pushed based - the consumer has no control over their generation.
233+
On the other hand, the program at hand will process the tweets on its own terms regardless of how quickly they are being generated.
234+
Moreover, the underlying Twitter API will likely utilize a request-reply protocol to retrieve batches of tweets from persistent
235+
storage. As such, the distinction between push vs. pull becomes less interesting. If the underlying source is truly push-based, then
236+
one can buffer its output and consume it using an asynchronous sequence. If the underlying source is pull-based, then one can turn
237+
it into an observable sequence by first pulling, then pushing. In a real-time reactive system, notifications must be pushed
238+
immediately without delay. This point however is moot since neither `IObservable<'a>` nor `Async<'a>` are well suited for
239+
real-time systems.
240+
*)
241+
242+
243+
(**
244+
### Performance Considerations
245+
246+
While an async computation obviates the need to block an OS thread for the duration of an operation, it isn't always the case
247+
that this will improve the overall performance of an application. Note however that an async computation does not **require** a
248+
non-blocking operation, it simply allows for it.
249+
250+
*)
251+
252+
253+
(**
254+
## Related Articles
255+
256+
* [Programming with F# asynchronous sequences](http://tomasp.net/blog/async-sequences.aspx/)
257+
258+
*)

docs/content/terminology.md

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
Terminology
2+
====
3+
4+
Terminology is frequently a source of confusion. Often times, terms have different
5+
meanings depending on the context, different terms are used to refer to the same
6+
concept, and finally not everyone agrees on any of this. The terminology described here
7+
is scoped to F# and the .NET Framework. The goal is to briefly introduce high level
8+
concepts and hopefully alleviate some confusion.
9+
10+
## Thread
11+
12+
A thread in this context refers to a [managed thread](https://msdn.microsoft.com/en-us/library/6kac2kdh(v=vs.110).aspx).
13+
A thread is a basic unit to which an OS allocates CPU resources. A **managed** thread usually maps directly to
14+
an OS thread, however it is possible for a CLR host to override this behavior. A thread has a corresponding context
15+
consisting of a pointer to the code being executed as well as stack and register state.
16+
17+
A thread can be viewed as a total ordering of instructions. Instructions executed by different threads
18+
are partially ordered by causality relationships - in some cases its impossible to tell which ran
19+
before the other. Much of the difficulty in multi-threaded code can be attributed to this lack of
20+
information about ordering.
21+
22+
## Context Switch
23+
24+
A context switch occurs when the OS scheduler changes the thread to which it allocates CPU resources.
25+
In order to do this, it must save the context of the thread which is giving up use of the CPU and reconstitute
26+
the context of the thread which will make use of the CPU. Context switches occur for a number of reasons.
27+
It occurs naturally as part of preemptive scheduling - threads run for a **time slice** or **quantum** until another
28+
thread is given a chance to run. Another reason is when a thread explicitly yields its time slice with a call to
29+
`Thread.Sleep`.
30+
31+
## Synchronous vs. Asynchronous
32+
33+
An operation is synchronous if the caller must wait for it to complete before making progress.
34+
More specifically, the calling **thread** may **block** until the synchronous operation is complete.
35+
Note that a CPU-bound task exhibits behavior similar to blocking.
36+
37+
An operation is asynchronous if the request to begin the operation and the result of the operation can
38+
be delivered through different channels. This provides a convenient mechanism to encapsulate waiting. In other words,
39+
an asynchronous operations decouples the means of sending the request from the means of receiving a response.
40+
41+
This decoupling allows one to in turn decouple the **logical** notion of an operation from the **physical**
42+
details of how it is executed. For example, an asynchronous operation to download a web page is a single
43+
logical operation. Due to its asynchronous nature however, the underlying implementation can start the
44+
operation on one thread and then deliver the completion notification through a different thread
45+
(such as an IO completion thread managed by the ThreadPool). In the meantime, the calling thread is free
46+
to perform other work. In fact, the completion notification can even be handled by the same thread
47+
as the calling thread.
48+
49+
By contrast, a synchronous operation will use the calling thread's context to deliver the completion
50+
notification. If the work to be performed is small enough, this can be very efficient. If however the
51+
operation is long running, the OS will perform a context switch to allow other threads to proceed, and
52+
then another context switch to resume the calling thread. Note that synchrony can be viewed as a special
53+
form of asynchrony.
54+
55+
In F# asynchrony is represented by the `Async` type.
56+
57+
## Blocking vs. Non-blocking
58+
59+
A thread is [blocked](http://www.albahari.com/threading/part2.aspx#_Blocking) when its execution
60+
is paused as it waits for some operation to complete (receiving IO, a lock being released, etc). Once the operation completes,
61+
the OS will schedule the thread to resume and continue where it left off.
62+
63+
A non-blocking operation is one that does not prevent the calling thread from making progress. In other words,
64+
once an non-blocking operation is started, the calling thread is free to perform other work, such as starting
65+
yet another operation.
66+
67+
It is important to remember that when a thread is blocked, the CPU and the system as a whole can still do other work.
68+
The issue with blocking is that the specific thread which is blocked can't do other work and the OS must
69+
use resources for thread's context so that it can context switch continue where it left off once the operating being waited on is complete.
70+
Since managed threads have a relatively high cost (by default, a .NET thread is allocated 1mb of stack space), this can
71+
lead to inefficiencies. Non-blocking operations allow one to make more efficient use of system resources.
72+
73+
## Further Reading
74+
75+
* [Managed Threading](https://msdn.microsoft.com/en-us/library/3e8s7xdd%28v=vs.110%29.aspx)
76+
* [Threading in C#](http://www.albahari.com/threading/)
77+
* [The Art of Multiprocessor Programming](http://www.amazon.com/Art-Multiprocessor-Programming-Revised-Reprint/dp/0123973376/)

0 commit comments

Comments
 (0)