Skip to content

Commit 65cc949

Browse files
committed
groupBy example
1 parent 6785d77 commit 65cc949

1 file changed

Lines changed: 93 additions & 0 deletions

File tree

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
(**
2+
3+
# F# AsyncSeq Examples
4+
5+
*)
6+
7+
8+
(**
9+
10+
### Group By
11+
12+
Suppose we would like to consume a stream of events `AsyncSeq<Event>` and perform an operation on each event.
13+
The operation on each event is of type `Event -> Async<unit>`. This can be done as follows:
14+
15+
*)
16+
17+
18+
type Event = {
19+
entityId : int64
20+
data : string
21+
}
22+
23+
let stream : AsyncSeq<Event> =
24+
failwith "undefined"
25+
26+
let action (e:Event) : Async<unit> =
27+
failwith "undefined"
28+
29+
stream
30+
|> AsyncSeq.iterAsync action
31+
32+
33+
(**
34+
35+
The above workflow will read an event from the stream, perform an operation and then read the next event.
36+
While the read operation and the operation on the event are *asynchronous*, the stream is processed *sequentially*.
37+
It may be desirable to parallelize the processing of the stream. Suppose that events correspond to some entity,
38+
such as a shopping cart. Events belonging to some shopping cart must be processed in a sequential order, however they
39+
are independent from events belonging to other shopping carts. Therefore, events belonging to distinct shopping carts
40+
can be processed in parallel. Using `AsyncSeq.groupBy`, we can partition the stream into a fixed set of sub-streams
41+
and then process the sub-streams in parallel using `AsyncSeq.mapAsyncParallel`:
42+
43+
*)
44+
45+
stream
46+
|> AsyncSeq.groupBy (fun e -> e.entityId % 4)
47+
|> AsyncSeq.mapAsyncParallel (snd >> AsyncSeq.iterAsync action)
48+
|> AsyncSeq.iter ignore
49+
50+
(**
51+
52+
`AsyncSeq.groupBy` partitions the input sequence into sub-sequences based on a key returned by a projection function.
53+
The resulting sub-sequences emit elements when the source sequence emits an element corresponding to the key of the
54+
sub-sequence. Elements of the resulting sequence are pairs of keys and sub-sequences, in this case `int * AsyncSeq<Event>`. Since by definition, these sub-sequences are independent, they can be processed in parallel. In fact, the sub-sequences *must* be processed in parallel, because it isn't possible to complete the processing of a sub-sequence until all elements of the source sequence are exhausted.
55+
56+
To continue improving the efficiency of our workflow, we can make use of batching. Specifically, we can read the incoming
57+
events in batches and we can perform actions on entire batches of events.
58+
59+
*)
60+
61+
let batchStream : AsyncSeq<Event[]> =
62+
failwith "undefined"
63+
64+
let batchAction (es:Event[]) : Async<unit> =
65+
failwith "undefined"
66+
67+
68+
(**
69+
70+
Ordering is still important. For example, the batch action could write events into a full-text search index. We would like the full-text search index to be sequentially consistent. As such, the events need to be applied in the order they were emitted. The following workflow has the desired properties:
71+
72+
*)
73+
74+
batchStream
75+
|> AsyncSeq.concatSeq // flatten the sequence of event arrays
76+
|> AsyncSeq.groupBy (fun e -> e.entityId % 4) // partition into 4 groups
77+
|> AsyncSeq.mapAsyncParallel (snd
78+
>> AsyncSeq.bufferByTimeAndCount 500 1000 // buffer sub-sequences
79+
>> AsyncSeq.iterAsync batchAction) // perform the batch operation
80+
|> AsyncSeq.iter ignore
81+
82+
83+
(**
84+
85+
The above workflow:
86+
87+
1. Reads events in batches.
88+
2. Flattens the batches.
89+
3. Partitions the events into mutually exclusive sub-sequences.
90+
4. Buffers elements of each sub-sequence by time and space.
91+
5. Processes the sub-sequences in parallel, but individual sub-sequences sequentially.
92+
93+
*)

0 commit comments

Comments
 (0)