Skip to content

Commit 080a1d0

Browse files
committed
Merge pull request #14 from eulerfx/master
Some docs on AsyncSeq and terminology
2 parents 939ac5e + 933fc3a commit 080a1d0

4 files changed

Lines changed: 350 additions & 0 deletions

File tree

FSharpx.Async.sln

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,18 @@ EndProject
2929
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "content", "content", "{8E6D5255-776D-4B61-85F9-73C37AA1FB9A}"
3030
ProjectSection(SolutionItems) = preProject
3131
docs\content\index.md = docs\content\index.md
32+
docs\content\terminology.md = docs\content\terminology.md
3233
EndProjectSection
3334
EndProject
3435
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "tests", "tests", "{ED8079DD-2B06-4030-9F0F-DC548F98E1C4}"
3536
EndProject
3637
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "FSharpx.Async.Tests", "tests\FSharpx.Async.Tests\FSharpx.Async.Tests.fsproj", "{ADBD32DB-B7AC-4E81-9507-28F329AF23E7}"
3738
EndProject
39+
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "library", "library", "{B4BB88FA-DB8C-4651-9869-A8FEA79F044E}"
40+
ProjectSection(SolutionItems) = preProject
41+
docs\content\library\AsyncSeq.fsx = docs\content\library\AsyncSeq.fsx
42+
EndProjectSection
43+
EndProject
3844
Global
3945
GlobalSection(SolutionConfigurationPlatforms) = preSolution
4046
Debug|Any CPU = Debug|Any CPU
@@ -57,5 +63,6 @@ Global
5763
{83F16175-43B1-4C90-A1EE-8E351C33435D} = {A6A6AF7D-D6E3-442D-9B1E-58CC91879BE1}
5864
{8E6D5255-776D-4B61-85F9-73C37AA1FB9A} = {A6A6AF7D-D6E3-442D-9B1E-58CC91879BE1}
5965
{ADBD32DB-B7AC-4E81-9507-28F329AF23E7} = {ED8079DD-2B06-4030-9F0F-DC548F98E1C4}
66+
{B4BB88FA-DB8C-4651-9869-A8FEA79F044E} = {8E6D5255-776D-4B61-85F9-73C37AA1FB9A}
6067
EndGlobalSection
6168
EndGlobal

docs/content/index.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,11 @@ Samples & documentation
1919

2020
[API Reference](reference/index.html) contains automatically generated documentation for all types, modules and functions in the library.
2121
This includes additional brief samples on using most of the functions.
22+
23+
[Terminology](terminology.html) a reference for some of the terminology around F# async.
2224

25+
[AsyncSeq](library/AsyncSeq.html) contains narrative and code samples explaining asynchronous sequences.
26+
2327
Contributing and copyright
2428
--------------------------
2529

docs/content/library/AsyncSeq.fsx

Lines changed: 262 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,262 @@
1+
(**
2+
3+
# F# Async: AsyncSeq
4+
5+
An AsyncSeq is a sequence in which individual elements are retrieved using an `Async` computation.
6+
It is similar to `seq<'a>` in that subsequent elements are pulled lazily. Structurally it is
7+
similar to `list<'a>` with the difference being that each head and tail node or empty node is wrapped
8+
in `Async`. `AsyncSeq` also bears similarity to `IObservable<'a>` with the former being pull-based and the
9+
latter push-based. Analogs for most operations defined for `Seq`, `List` and `IObservable` are also defined for
10+
`AsyncSeq`. The power of `AsyncSeq` lies in that many of these operations also have analogs based on `Async`
11+
allowing one to compose complex asynchronous workflows.
12+
13+
The `AsyncSeq` type is located in the `FSharpx.Async.dll assembly which can be loaded in F# Interactive as follows:
14+
*)
15+
16+
#r "../../../bin/FSharpx.Async.dll"
17+
open FSharpx.Control
18+
19+
20+
21+
(**
22+
### Generating asynchronous sequences
23+
24+
An `AsyncSeq<'a>` can be generated using computation expression syntax much like `seq<'a>`:
25+
*)
26+
27+
let asyncS = asyncSeq {
28+
yield 1
29+
yield 2
30+
}
31+
32+
(**
33+
Another way to generate an asynchronous sequence is using the `Async.unfoldAsync` function. This
34+
function takes another function which can generate individual elements based on a state and
35+
signal completion of the sequence.
36+
37+
For example, suppose that you're writing a program which consumes the Twitter API and stores tweets
38+
which satisfy some criteria into a database. There are several asynchronous request-reply operations at play -
39+
one to retrieve a batch of tweets from the Twitter API, another to determine whether a tweet satisfies some
40+
criteria and finally an operation to write the desired tweet to a database.
41+
42+
Given the type `Tweet` to represent an individual tweet, the operation to retrieve a batch of tweets can
43+
be modeled with a type `int -> Async<(Tweet[] * int) option>` where the incoming `int` represents the
44+
offset into the tweet stream. The asynchronous result is an `Option` which when `None` indicates the
45+
end of the stream, and otherwise contains the batch of retrieved tweets as well as the next offset.
46+
47+
The above function to retrieve a batch of tweets can be used to generate an asynchronous sequence
48+
of tweet batches as follows:
49+
*)
50+
51+
type Tweet = {
52+
user : string
53+
message : string
54+
}
55+
56+
let getTweetBatch (offset:int) : Async<(Tweet[] * int) option> =
57+
failwith "TODO: call Twitter API"
58+
59+
let tweetBatches : AsyncSeq<Tweet[]> =
60+
AsyncSeq.unfoldAsync getTweetBatch 0
61+
62+
(**
63+
The asynchronous sequence `tweetBatches` will when iterated consume the entire tweet stream.
64+
65+
Next, suppose that the tweet filtering function makes a call to a web service which determines
66+
whether a particular tweet should be stored in the database. This function can be modeled with
67+
type `Tweet -> Async<bool>`. We can flatten the `tweetBatches` sequence and then filter it as follows:
68+
*)
69+
70+
let filterTweet (t:Tweet) : Async<bool> =
71+
failwith "TODO: call web service"
72+
73+
let filteredTweets : AsyncSeq<Tweet> =
74+
tweetBatches
75+
|> AsyncSeq.concatSeq // flatten
76+
|> AsyncSeq.filterAsync filterTweet // filter
77+
78+
(**
79+
When the resulting sequence `filteredTweets` is consumed, it will lazily consume the underlying
80+
sequence `tweetBatches`, select individual tweets and filter them using the function `filterTweets`.
81+
82+
Finally, the function which stores a tweet in the database can be modeled by type `Tweet -> Async<unit>`.
83+
We can store all filtered tweets as follows:
84+
*)
85+
86+
let storeTweet (t:Tweet) : Async<unit> =
87+
failwith "TODO: call database"
88+
89+
let storeFilteredTweets : Async<unit> =
90+
filteredTweets
91+
|> AsyncSeq.iterAsync storeTweet
92+
93+
(**
94+
Note that the value `storeFilteredTweets` is an asynchronous computation of type `Async<unit>`. At this point,
95+
it is a **representation** of the workflow which consists of reading batches of tweets, filtering them and storing them
96+
in the database. When executed, the workflow will consume the entire tweet stream. The entire workflow can be
97+
succinctly declared and executed as follows:
98+
*)
99+
100+
AsyncSeq.unfoldAsync getTweetBatch 0
101+
|> AsyncSeq.concatSeq
102+
|> AsyncSeq.filterAsync filterTweet
103+
|> AsyncSeq.iterAsync storeTweet
104+
|> Async.RunSynchronously
105+
106+
(**
107+
The above snippet effectively orchestrates several asynchronous request-reply interactions into a cohesive unit
108+
composed using familiar operations on sequences. Furthermore, it will be executed efficiently in a non-blocking manner.
109+
*)
110+
111+
(**
112+
### Comparison with seq<'a>
113+
114+
The central difference between `seq<'a>` and `AsyncSeq<'a>` two can be illustrated by introducing the notion of time.
115+
Suppose that generating subsequent elements of a sequence requires an IO-bound operation. Invoking long
116+
running IO-bound operations from within a `seq<'a>` will **block** the thread which calls `MoveNext` on the
117+
corresponding `IEnumerator`. An `AsyncSeq` can use facilities provided by the F# `Async` type to make more efficient
118+
use of system resources.
119+
*)
120+
121+
let withTime = seq {
122+
System.Threading.Thread.Sleep(1000) // calling thread will block
123+
yield 1
124+
System.Threading.Thread.Sleep(1000) // calling thread will block
125+
yield 1
126+
}
127+
128+
let withTime' = asyncSeq {
129+
do! Async.Sleep 1000 // non-blocking sleep
130+
yield 1
131+
do! Async.Sleep 1000 // non-blocking sleep
132+
yield 2
133+
}
134+
135+
(**
136+
When the asynchronous sequence `withTime'` is iterated, the calls to `Async.Sleep` won't block threads. Instead,
137+
the **continuation** of the sequence will be scheduled by a `ThreadPool` thread, while the calling thread
138+
will be free to perform other work. Overall, a `seq<'a>` can be viewed as a special case of an `AsyncSeq<'a>`.
139+
*)
140+
141+
142+
(**
143+
### Comparison with IObservable<'a>
144+
145+
Both `IObservable<'a>` and `AsyncSeq<'a>` represent collections of items and both provide similar operations
146+
for transformation and composition. The central difference between the two is that the former is push-based
147+
and the latter is pull-based. Consumers of an `IObservable<'a>` **subscribe** to receive notifications about
148+
new items or completion. By contrast, consumers of an `AsyncSeq<'a>` **retrieve** subsequent items on their own
149+
terms. Some domains are more naturally modeled with one or the other, however it is less clear which is a more
150+
suitable tool for a specific task. In many cases, a combination of the two provides the optimal solution and
151+
restricting yourself to one, while simplifying the programming model, can lead one to view all problems as a nail.
152+
153+
A more specific difference between the two is that `IObservable<'a>` subscribers have the basic type `'a -> unit`
154+
and are therefore inherently synchronous and imperative. The observer can certainly make a blocking call, but this
155+
can defeat the purpose of the observable sequence all together. Alternatively, the observer can spawn an operation, but
156+
this can break composition because one can no longer rely on the observer operation returning to determine that it has
157+
completed. With the observable model however, we can model blocking operations through composition on sequences rather
158+
than observers.
159+
160+
To illustrate, lets try to implement the above Tweet retrieval, filtering and storage workflow using observable sequences.
161+
Suppose we already have an observable sequence representing tweets `IObservable<Tweet>` and we simply wish
162+
to filter it and store the resulting tweets. The function `Observable.filter` allows one to filter observable
163+
sequences based on a predicate, however in this case it doesn't quite cut it because the predicate passed to it must
164+
be synchronous `'a -> bool`:
165+
*)
166+
167+
open System
168+
169+
let tweetsObs : IObservable<Tweet> =
170+
failwith "TODO: create observable"
171+
172+
let filteredTweetsObs =
173+
tweetsObs
174+
|> Observable.filter (filterTweet >> Async.RunSynchronously) // blocking IO-call!
175+
176+
(**
177+
To remedy the blocking IO-call we can better adapt the filtering function to the `IObservable<'a>` model. A value
178+
of type `Async<'a>` can be modeled as an `IObservable<'a>` with one element. Suppose that we have
179+
`Tweet -> IObservable<bool>`. We can define a few helper operators on observables to allow filtering using
180+
an asynchronous predicate as follows:
181+
*)
182+
183+
module Observable =
184+
185+
/// a |> Async.StartAsTask |> (fun t -> t.ToObservable())
186+
let ofAsync (a:Async<'a>) : IObservable<'a> =
187+
failwith "TODO"
188+
189+
/// Observable.SelectMany
190+
let bind (f:'a -> IObservable<'b>) (o:IObservable<'a>) : IObservable<'b> =
191+
failwith "TODO"
192+
193+
let filterObs (f:'a -> IObservable<bool>) : IObservable<'a> -> IObservable<'a> =
194+
bind <| fun a ->
195+
f a
196+
|> Observable.choose (function
197+
| true -> Some a
198+
| false -> None
199+
)
200+
201+
let filterAsync (f:'a -> Async<bool>) : IObservable<'a> -> IObservable<'a> =
202+
filterObs (f >> ofAsync)
203+
204+
let mapAsync (f:'a -> Async<'b>) : IObservable<'a> -> IObservable<'b> =
205+
bind (f >> ofAsync)
206+
207+
let filteredTweetsObs' : IObservable<Tweet> =
208+
filteredTweetsObs
209+
|> Observable.filterAsync filterTweet
210+
211+
212+
(**
213+
With a little effort, we were able to adapt `IObservable<'a>` to our needs. Next lets try implementing the storage of
214+
filtered tweets. Again, we can adapt the function `storeTweet` defined above to the observable model and bind the
215+
observable of filtered tweets to it:
216+
*)
217+
218+
let storedTweetsObs : IObservable<unit> =
219+
filteredTweetsObs'
220+
|> Observable.mapAsync storeTweet
221+
222+
(**
223+
The observable sequence `storedTweetsObs` will produces a value each time a filtered tweet is stored. The entire
224+
workflow can be expressed as follows:
225+
*)
226+
227+
let storedTeetsObs' : IObservable<unit> =
228+
tweetsObs
229+
|> Observable.filterAsync filterTweet
230+
|> Observable.mapAsync storeTweet
231+
232+
(**
233+
Overall, both solutions are succinct and composable and deciding which one to use can ultimately be a matter of preference.
234+
Some things to consider are the push vs. pull semantics. On the one hand, tweets are pushed based - the consumer has no control
235+
over their generation. On the other hand, the program at hand will process the tweets on its own terms regardless of how quickly
236+
they are being generated. Moreover, the underlying Twitter API will likely utilize a request-reply protocol to retrieve batches of
237+
tweets from persistent storage. As such, the distinction between push vs. pull becomes less interesting. If the underlying source
238+
is truly push-based, then one can buffer its output and consume it using an asynchronous sequence. If the underlying source is pull-based,
239+
then one can turn it into an observable sequence by first pulling, then pushing. In a real-time reactive system, notifications must be pushed
240+
immediately without delay. This point however is moot since neither `IObservable<'a>` nor `Async<'a>` are well suited for
241+
real-time systems.
242+
*)
243+
244+
245+
(**
246+
### Performance Considerations
247+
248+
While an asynchronous computation obviates the need to block an OS thread for the duration of an operation, it isn't always the case
249+
that this will improve the overall performance of an application. Note however that an async computation does not **require** a
250+
non-blocking operation, it simply allows for it. Also of note is that unlike calling `IEnumerable.MoveNext()`, consuming
251+
and item from an asynchronous sequence requires several allocations. Usually this is greatly outweighed by the
252+
benefits, it can make a difference in some scenarios.
253+
254+
*)
255+
256+
257+
(**
258+
## Related Articles
259+
260+
* [Programming with F# asynchronous sequences](http://tomasp.net/blog/async-sequences.aspx/)
261+
262+
*)

docs/content/terminology.md

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
Terminology
2+
====
3+
4+
Terminology is frequently a source of confusion. Often times, terms have different
5+
meanings depending on the context, different terms are used to refer to the same
6+
concept, and finally not everyone agrees on any of this. The terminology described here
7+
is scoped to F# and the .NET Framework. The goal is to briefly introduce high level
8+
concepts and hopefully alleviate some confusion.
9+
10+
## Thread
11+
12+
A thread in this scope refers to a [managed thread](https://msdn.microsoft.com/en-us/library/6kac2kdh(v=vs.110).aspx).
13+
A thread is a basic unit to which an OS allocates CPU resources. A **managed** thread usually maps directly to
14+
an OS thread, however it is possible for a CLR host to override this behavior. A thread has a corresponding context
15+
consisting of a pointer to the code being executed as well as stack and register state.
16+
17+
A thread can be viewed as a total ordering of instructions. Instructions executed by different threads
18+
are partially ordered by causality relationships - in some cases its impossible to tell which ran
19+
before the other. Much of the difficulty in multi-threaded code can be attributed to this lack of
20+
information about ordering.
21+
22+
## Context Switch
23+
24+
A context switch occurs when the OS scheduler decides to the thread to which it allocates CPU resources.
25+
In order to do this, it must save the context of the thread which is giving up use of the CPU and reconstitute
26+
the context of the thread which is next in line. Context switches occur for a number of reasons.
27+
It occurs naturally as part of preemptive scheduling - threads run for a **time slice** or **quantum** until another
28+
thread is given a chance to run. Another reason is when a thread explicitly yields its time slice, such as with a call to
29+
`Thread.Sleep`.
30+
31+
## Synchronous vs. Asynchronous
32+
33+
An operation is synchronous if the caller must wait for it to complete before making progress.
34+
More specifically, the calling **thread** may **block** until the synchronous operation is complete.
35+
Note that a CPU-bound task exhibits behavior similar to blocking.
36+
37+
An operation is asynchronous if the request to begin the operation and the result of the operation can
38+
be delivered through different channels. This provides a convenient mechanism to encapsulate waiting. In other words,
39+
an asynchronous operations decouples the means of sending the request from the means of receiving a response.
40+
41+
This decoupling allows one to in turn decouple the **logical** notion of an operation from the **physical**
42+
details of how it is executed. For example, an asynchronous operation to download a web page is a single
43+
logical operation. Due to its asynchronous nature however, the underlying implementation can start the
44+
operation on one thread and then deliver the completion notification through a different thread
45+
(such as an IO completion thread managed by the ThreadPool). In the meantime, the calling thread is free
46+
to perform other work. In fact, the completion notification can even be handled by the same thread
47+
as the calling thread.
48+
49+
By contrast, a synchronous operation will use the calling thread's context to deliver the completion
50+
notification. If the work to be performed is small enough, this can be very efficient. If however the
51+
operation is long running, the OS will perform a context switch to allow other threads to proceed, and
52+
then another context switch to resume the calling thread. Note that synchrony can be viewed as a special
53+
form of asynchrony.
54+
55+
In F# asynchrony is represented by the `Async` type.
56+
57+
## Blocking vs. Non-blocking
58+
59+
A thread is [blocked](http://www.albahari.com/threading/part2.aspx#_Blocking) when its execution
60+
is paused as it waits for some operation to complete (receiving IO, a lock being released, etc). Once the operation completes,
61+
the OS will schedule the thread to resume and continue where it left off.
62+
63+
A non-blocking operation is one that does not prevent the calling thread from making progress. In other words,
64+
once an non-blocking operation is started, the calling thread is free to perform other work, such as starting
65+
yet another operation.
66+
67+
It is important to remember that when a thread is blocked, the CPU and the system as a whole can still do other work.
68+
The issue with blocking is that the specific thread which is blocked can't do other work and the OS must
69+
use resources for thread's context so that it can context switch continue where it left off once the operating being waited on is complete.
70+
Since managed threads have a relatively high cost (by default, a .NET thread is allocated 1mb of stack space), this can
71+
lead to inefficiencies. Non-blocking operations allow one to make more efficient use of system resources.
72+
73+
## Further Reading
74+
75+
* [Managed Threading](https://msdn.microsoft.com/en-us/library/3e8s7xdd%28v=vs.110%29.aspx)
76+
* [Threading in C#](http://www.albahari.com/threading/)
77+
* [The Art of Multiprocessor Programming](http://www.amazon.com/Art-Multiprocessor-Programming-Revised-Reprint/dp/0123973376/)

0 commit comments

Comments
 (0)