Skip to content

Commit 2172fda

Browse files
committed
Merge pull request #47 from eulerfx/groupby
AsyncSeq.groupBy, AsyncSeqSrc, docs
2 parents 040b645 + c66983d commit 2172fda

6 files changed

Lines changed: 555 additions & 12 deletions

File tree

FSharp.Control.AsyncSeq.sln

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ EndProject
3939
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "library", "library", "{B4BB88FA-DB8C-4651-9869-A8FEA79F044E}"
4040
ProjectSection(SolutionItems) = preProject
4141
docs\content\library\AsyncSeq.fsx = docs\content\library\AsyncSeq.fsx
42+
docs\content\library\AsyncSeqExamples.fsx = docs\content\library\AsyncSeqExamples.fsx
4243
EndProjectSection
4344
EndProject
4445
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "FSharp.Control.AsyncSeq.Profile7", "src\FSharp.Control.AsyncSeq.Profile7\FSharp.Control.AsyncSeq.Profile7.fsproj", "{114DCF3C-D7CC-4E53-AFF0-AFBFC0C8B966}"

docs/content/index.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@ This includes additional brief samples on using most of the functions.
2222

2323
[Terminology](terminology.html) a reference for some of the terminology around F# async.
2424

25-
[FSharp.Control.AsyncSeq](library/AsyncSeq.html) contains narrative and code samples explaining asynchronous sequences.
25+
[AsyncSeq](library/AsyncSeq.html) contains narrative and code samples explaining asynchronous sequences.
26+
27+
[AsyncSeq Examples](library/AsyncSeqExamples.html) contains examples.
2628

2729
Contributing and copyright
2830
--------------------------
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
(**
2+
3+
# F# AsyncSeq Examples
4+
5+
*)
6+
7+
#r "../../../bin/FSharp.Control.AsyncSeq.dll"
8+
open System
9+
open FSharp.Control
10+
11+
12+
(**
13+
14+
### Group By
15+
16+
Suppose we would like to consume a stream of events `AsyncSeq<Event>` and perform an operation on each event.
17+
The operation on each event is of type `Event -> Async<unit>`. This can be done as follows:
18+
19+
*)
20+
21+
22+
type Event = {
23+
entityId : int64
24+
data : string
25+
}
26+
27+
let stream : AsyncSeq<Event> =
28+
failwith "undefined"
29+
30+
let action (e:Event) : Async<unit> =
31+
failwith "undefined"
32+
33+
stream
34+
|> AsyncSeq.iterAsync action
35+
36+
37+
(**
38+
39+
The above workflow will read an event from the stream, perform an operation and then read the next event.
40+
While the read operation and the operation on the event are *asynchronous*, the stream is processed *sequentially*.
41+
It may be desirable to parallelize the processing of the stream. Suppose that events correspond to some entity,
42+
such as a shopping cart. Events belonging to some shopping cart must be processed in a sequential order, however they
43+
are independent from events belonging to other shopping carts. Therefore, events belonging to distinct shopping carts
44+
can be processed in parallel. Using `AsyncSeq.groupBy`, we can partition the stream into a fixed set of sub-streams
45+
and then process the sub-streams in parallel using `AsyncSeq.mapAsyncParallel`:
46+
47+
*)
48+
49+
stream
50+
|> AsyncSeq.groupBy (fun e -> int e.entityId % 4)
51+
|> AsyncSeq.mapAsyncParallel (snd >> AsyncSeq.iterAsync action)
52+
|> AsyncSeq.iter ignore
53+
54+
(**
55+
56+
`AsyncSeq.groupBy` partitions the input sequence into sub-sequences based on a key returned by a projection function.
57+
The resulting sub-sequences emit elements when the source sequence emits an element corresponding to the key of the
58+
sub-sequence. Elements of the resulting sequence are pairs of keys and sub-sequences, in this case `int * AsyncSeq<Event>`. Since by definition, these sub-sequences are independent, they can be processed in parallel. In fact, the sub-sequences *must* be processed in parallel, because it isn't possible to complete the processing of a sub-sequence until all elements of the source sequence are exhausted.
59+
60+
To continue improving the efficiency of our workflow, we can make use of batching. Specifically, we can read the incoming
61+
events in batches and we can perform actions on entire batches of events.
62+
63+
*)
64+
65+
let batchStream : AsyncSeq<Event[]> =
66+
failwith "undefined"
67+
68+
let batchAction (es:Event[]) : Async<unit> =
69+
failwith "undefined"
70+
71+
72+
(**
73+
74+
Ordering is still important. For example, the batch action could write events into a full-text search index. We would like the full-text search index to be sequentially consistent. As such, the events need to be applied in the order they were emitted. The following workflow has the desired properties:
75+
76+
*)
77+
78+
batchStream
79+
|> AsyncSeq.concatSeq // flatten the sequence of event arrays
80+
|> AsyncSeq.groupBy (fun e -> int e.entityId % 4) // partition into 4 groups
81+
|> AsyncSeq.mapAsyncParallel (snd
82+
>> AsyncSeq.bufferByCountAndTime 500 1000 // buffer sub-sequences
83+
>> AsyncSeq.iterAsync batchAction) // perform the batch operation
84+
|> AsyncSeq.iter ignore
85+
86+
87+
(**
88+
89+
The above workflow:
90+
91+
1. Reads events in batches.
92+
2. Flattens the batches.
93+
3. Partitions the events into mutually exclusive sub-sequences.
94+
4. Buffers elements of each sub-sequence by time and space.
95+
5. Processes the sub-sequences in parallel, but individual sub-sequences sequentially.
96+
97+
*)
98+
99+
100+
101+
(**
102+
103+
### Merge
104+
105+
`AsyncSeq.merge` non-deterministically merges two async sequences into one. It is non-deterministic in the sense that the resulting sequence emits elements
106+
whenever *either* input sequence emits a value. Since it isn't always known which will emit a value first, if at all, the operation is non-deterministic. This operation
107+
is in contrast to `AsyncSeq.zip` which also takes two async sequences and returns a single async sequence, but as opposed to emitting an element when *either* input
108+
sequence produces a value, it emits an element when *both* sequences emit a value.
109+
110+
Suppose we would like to trigger an operation whenever a change occurs. We can represent changes as an `AsyncSeq`. To gain intuition for this, consider the [Consul](https://www.consul.io/)
111+
configuration management system. It stores configuration information in a tree-like structure. For this purpose of this discussion, it can be thought of as a key-value store
112+
exposed via HTTP. In addition, `Consul` supports change notifications using HTTP long-polling - when an HTTP GET request is made to retrieve the value of a key,
113+
if the request specified a modify-index, `Consul` won't respond to the request until a change has occurred *since* the modify-index. We can represent this operation using
114+
the type `Key * ModifyIndex -> Async<Value * ModifyIndex>`. Next, we can take this operation and turn it into an `AsyncSeq` of changes as follows:
115+
*)
116+
117+
type Key = string
118+
119+
type Value = string
120+
121+
type ModifyIndex = int64
122+
123+
let longPollKey (key:Key, mi:ModifyIndex) : Async<Value * ModifyIndex> =
124+
failwith "undefined"
125+
126+
let changes (key:Key, mi:ModifyIndex) : AsyncSeq<Value> =
127+
AsyncSeq.unfoldAsync
128+
(fun (mi:ModifyIndex) -> async {
129+
let! value,mi = longPollKey (key, mi)
130+
return Some (value,mi) })
131+
mi
132+
133+
(**
134+
135+
The function `changes` produces an async sequence which emits elements whenever the value corresponding to the key changes. Suppose also that we would like to trigger an operation
136+
whenever the key changes or based on a fixed interval. We can represent a fixed interval as an async sequence as follows:
137+
138+
*)
139+
140+
let intervalMs (periodMs:int) = asyncSeq {
141+
yield DateTime.UtcNow
142+
while true do
143+
do! Async.Sleep periodMs
144+
yield DateTime.UtcNow }
145+
146+
(**
147+
148+
Putting it all together:
149+
150+
*)
151+
152+
let changesOrInterval : AsyncSeq<Choice<Value, DateTime>> =
153+
AsyncSeq.mergeChoice (changes ("myKey", 0L)) (intervalMs (1000 * 60))
154+
155+
156+
(**
157+
158+
We can now consume this async sequence and use it to trigger downstream operations, such as updating the configuration of a running program, in flight.
159+
160+
*)

src/FSharp.Control.AsyncSeq/AsyncSeq.fs

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,12 @@ type IAsyncEnumerable<'T> =
2525
type AsyncSeq<'T> = IAsyncEnumerable<'T>
2626
// abstract GetEnumerator : unit -> IAsyncEnumerator<'T>
2727

28+
type AsyncSeqSrc<'a> = private { tail : AsyncSeqSrcNode<'a> ref }
29+
30+
and private AsyncSeqSrcNode<'a> =
31+
val tcs : TaskCompletionSource<('a * AsyncSeqSrcNode<'a>) option>
32+
new (tcs) = { tcs = tcs }
33+
2834
[<AutoOpen>]
2935
module internal Utils =
3036
module internal Choice =
@@ -79,6 +85,51 @@ module internal Utils =
7985
elif i = 1 then return (Choice2Of2 (b.Result, a))
8086
else return! failwith (sprintf "unreachable, i = %d" i) }
8187

88+
89+
type internal MbReq<'a> =
90+
| Put of 'a
91+
| Take of AsyncReplyChannel<'a>
92+
93+
/// An unbounded FIFO mailbox.
94+
type Mb<'a> internal () =
95+
96+
let agent = MailboxProcessor.Start (fun agent ->
97+
let rec receive () = async {
98+
return!
99+
agent.Scan(function
100+
| Put a -> Some (send a)
101+
| _ -> None ) }
102+
and send (a:'a) = async {
103+
return!
104+
agent.Scan(function
105+
| Take rep -> Some (rep.Reply a ; receive ())
106+
| _ -> None) }
107+
receive ())
108+
109+
member __.Put (a:'a) =
110+
agent.Post (Put a)
111+
112+
member __.Take =
113+
agent.PostAndAsyncReply (Take)
114+
115+
interface IDisposable with
116+
member __.Dispose () = (agent :> IDisposable).Dispose()
117+
118+
119+
/// Operations on unbounded FIFO mailboxes.
120+
module Mb =
121+
122+
/// Creates a new unbounded mailbox.
123+
let create () = new Mb<'a> ()
124+
125+
/// Puts a message into a mailbox, no waiting.
126+
let put (a:'a) (mb:Mb<'a>) = mb.Put a
127+
128+
/// Creates an async computation that completes when a message is available in a mailbox.
129+
let take (mb:Mb<'a>) = mb.Take
130+
131+
132+
82133
/// Module with helper functions for working with asynchronous sequences
83134
module AsyncSeq =
84135

@@ -549,6 +600,22 @@ module AsyncSeq =
549600
i := i.Value + 1L
550601
yield v }
551602

603+
let mapAsyncParallel (f:'a -> Async<'b>) (s:AsyncSeq<'a>) = asyncSeq {
604+
use mb = Mb.create ()
605+
do! s |> iterAsync (fun a -> async {
606+
let! b = Async.StartChild (f a)
607+
mb |> Mb.put (Some b) })
608+
mb.Put None
609+
let rec loop () = asyncSeq {
610+
let! b = Mb.take mb
611+
match b with
612+
| None -> ()
613+
| Some b ->
614+
let! b = b
615+
yield b
616+
yield! loop () }
617+
yield! loop () }
618+
552619
let chooseAsync f (source : AsyncSeq<'T>) : AsyncSeq<'R> = asyncSeq {
553620
for itm in source do
554621
let! v = f itm
@@ -621,6 +688,26 @@ module AsyncSeq =
621688
let! moven = ie.MoveNext()
622689
b := moven }
623690

691+
let pickAsync (f:'T -> Async<'U option>) (source:AsyncSeq<'T>) = async {
692+
use ie = source.GetEnumerator()
693+
let! v = ie.MoveNext()
694+
let b = ref v
695+
let res = ref None
696+
while b.Value.IsSome && not res.Value.IsSome do
697+
let! fv = f b.Value.Value
698+
match fv with
699+
| None ->
700+
let! moven = ie.MoveNext()
701+
b := moven
702+
| Some _ as r ->
703+
res := r
704+
match res.Value with
705+
| Some _ -> return res.Value.Value
706+
| None -> return raise(KeyNotFoundException()) }
707+
708+
let pick f (source:AsyncSeq<'T>) =
709+
pickAsync (f >> async.Return) source
710+
624711
let tryPickAsync f (source : AsyncSeq<'T>) = async {
625712
use ie = source.GetEnumerator()
626713
let! v = ie.MoveNext()
@@ -1200,6 +1287,79 @@ module AsyncSeq =
12001287
}
12011288

12021289

1290+
module AsyncSeqSrcImpl =
1291+
1292+
let private createNode () =
1293+
new AsyncSeqSrcNode<_>(new TaskCompletionSource<_>())
1294+
1295+
let create () : AsyncSeqSrc<'a> =
1296+
{ tail = ref (createNode ()) }
1297+
1298+
let put (a:'a) (s:AsyncSeqSrc<'a>) =
1299+
let newTail = createNode ()
1300+
let tail = Interlocked.Exchange(s.tail, newTail)
1301+
tail.tcs.SetResult(Some(a, newTail))
1302+
1303+
let close (s:AsyncSeqSrc<'a>) : unit =
1304+
s.tail.Value.tcs.SetResult(None)
1305+
1306+
let error (ex:exn) (s:AsyncSeqSrc<'a>) : unit =
1307+
s.tail.Value.tcs.SetException(ex)
1308+
1309+
let rec private toAsyncSeqImpl (s:AsyncSeqSrcNode<'a>) : AsyncSeq<'a> =
1310+
asyncSeq {
1311+
let! next = s.tcs.Task |> Async.AwaitTask
1312+
match next with
1313+
| None -> ()
1314+
| Some (a,tl) ->
1315+
yield a
1316+
yield! toAsyncSeqImpl tl }
1317+
1318+
let toAsyncSeq (s:AsyncSeqSrc<'a>) : AsyncSeq<'a> =
1319+
toAsyncSeqImpl s.tail.Value
1320+
1321+
1322+
1323+
type private Group<'k, 'a> = { key : 'k ; src : AsyncSeqSrc<'a> }
1324+
1325+
let groupByAsync (p:'a -> Async<'k>) (s:AsyncSeq<'a>) : AsyncSeq<'k * AsyncSeq<'a>> = asyncSeq {
1326+
let groups = Collections.Generic.Dictionary<'k, Group<'k, 'a>>()
1327+
let close group =
1328+
groups.Remove(group.key) |> ignore
1329+
AsyncSeqSrcImpl.close group.src
1330+
let closeGroups () =
1331+
groups.Values |> Seq.toArray |> Array.iter close
1332+
use enum = s.GetEnumerator()
1333+
let rec go () = asyncSeq {
1334+
try
1335+
let! next = enum.MoveNext ()
1336+
match next with
1337+
| None -> closeGroups ()
1338+
| Some a ->
1339+
let! key = p a
1340+
let mutable group = Unchecked.defaultof<_>
1341+
if groups.TryGetValue(key, &group) then
1342+
AsyncSeqSrcImpl.put a group.src
1343+
yield! go ()
1344+
else
1345+
let src = AsyncSeqSrcImpl.create ()
1346+
let subSeq = src |> AsyncSeqSrcImpl.toAsyncSeq
1347+
AsyncSeqSrcImpl.put a src
1348+
let group = { key = key ; src = src }
1349+
groups.Add(key, group)
1350+
yield key,subSeq
1351+
yield! go ()
1352+
with ex ->
1353+
closeGroups ()
1354+
raise ex }
1355+
yield! go () }
1356+
1357+
let groupBy (p:'a -> 'k) (s:AsyncSeq<'a>) : AsyncSeq<'k * AsyncSeq<'a>> =
1358+
groupByAsync (p >> async.Return) s
1359+
1360+
1361+
1362+
12031363
[<AutoOpen>]
12041364
module AsyncSeqExtensions =
12051365
let asyncSeq = new AsyncSeq.AsyncSeqBuilder()
@@ -1209,6 +1369,14 @@ module AsyncSeqExtensions =
12091369
member x.For (seq:AsyncSeq<'T>, action:'T -> Async<unit>) =
12101370
seq |> AsyncSeq.iterAsync action
12111371

1372+
module AsyncSeqSrc =
1373+
1374+
let create () = AsyncSeq.AsyncSeqSrcImpl.create ()
1375+
let put a s = AsyncSeq.AsyncSeqSrcImpl.put a s
1376+
let close s = AsyncSeq.AsyncSeqSrcImpl.close s
1377+
let toAsyncSeq s = AsyncSeq.AsyncSeqSrcImpl.toAsyncSeq s
1378+
let error e s = AsyncSeq.AsyncSeqSrcImpl.error e s
1379+
12121380
module Seq =
12131381

12141382
let ofAsyncSeq (source : AsyncSeq<'T>) =

0 commit comments

Comments
 (0)