You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
`AsyncSeq.groupBy` partitions an input sequence into sub-sequences with respect to the specified `projection` function. This operation is the asynchronous analog to `Seq.groupBy`.
32
30
33
-
34
31
### Example Execution
35
32
36
33
An example execution can be depicted visually as follows:
@@ -69,7 +61,6 @@ let action (e:Event) : Async<unit> =
69
61
stream
70
62
|> AsyncSeq.iterAsync action
71
63
72
-
73
64
(**
74
65
75
66
The above workflow will read an event from the stream, perform an operation and then read the next event.
@@ -104,7 +95,6 @@ let batchStream : AsyncSeq<Event[]> =
104
95
letbatchAction(es:Event[]):Async<unit>=
105
96
failwith "undefined"
106
97
107
-
108
98
(**
109
99
110
100
Ordering is still important. For example, the batch action could write events into a full-text search index. We would like the full-text search index to be sequentially consistent. As such, the events need to be applied in the order they were emitted. The following workflow has the desired properties:
@@ -119,7 +109,6 @@ batchStream
119
109
>> AsyncSeq.iterAsync batchAction)// perform the batch operation
120
110
|> AsyncSeq.iter ignore
121
111
122
-
123
112
(**
124
113
125
114
The above workflow:
@@ -130,14 +119,6 @@ The above workflow:
130
119
4. Buffers elements of each sub-sequence by time and space.
131
120
5. Processes the sub-sequences in parallel, but individual sub-sequences sequentially.
132
121
133
-
---
134
-
135
-
*)
136
-
137
-
138
-
139
-
(**
140
-
141
122
## Merge
142
123
143
124
`AsyncSeq.merge` non-deterministically merges two async sequences into one. It is non-deterministic in the sense that the resulting sequence emits elements whenever *either* input sequence emits a value. Since it isn't always known which will emit a value first, if at all, the operation is non-deterministic. This operation is in contrast to `AsyncSeq.zip` which also takes two async sequences and returns a single async sequence, but as opposed to emitting an element when *either* input sequence produces a value, it emits an element when *both* sequences emit a value. This operation is also in contrast to `AsyncSeq.append` which concatenates two async sequences, emitting all element of one, followed by all elements of the another.
@@ -146,17 +127,18 @@ The above workflow:
146
127
147
128
An example execution can be depicted visually as follows:
148
129
130
+
```
149
131
-----------------------------------------
150
132
| source1 | t0 | | t1 | | | t2 |
151
133
| source2 | | u0 | | | u1 | |
152
134
| result | t0 | u0 | t1 | | u1 | t2 |
153
135
-----------------------------------------
136
+
```
154
137
155
138
### Use Case
156
139
157
140
Suppose you wish to perform an operation when either of two async sequences emits an element. One way to do this is two start consuming both async sequences in parallel. If we would like to perform only one operation at a time, we can use `AsyncSeq.merge` as follows:
158
-
159
-
```
141
+
*)
160
142
161
143
/// Represents an stream emitting elements on a specified interval.
162
144
letintervalMs(periodMs:int)= asyncSeq {
@@ -168,18 +150,10 @@ let intervalMs (periodMs:int) = asyncSeq {
168
150
leteither:AsyncSeq<DateTime>=
169
151
AsyncSeq.merge (intervalMs 20)(intervalMs 30)
170
152
171
-
The sequence `either` emits an element every 20ms and every 30ms.
172
-
173
-
```
174
-
175
-
---
176
-
177
-
*)
178
-
179
-
180
-
181
153
(**
182
154
155
+
The sequence `either` emits an element every 20ms and every 30ms.
156
+
183
157
## Combine Latest
184
158
185
159
@@ -202,7 +176,6 @@ c0 = f a0 b0
202
176
c1 = f a0 b1
203
177
c2 = f a1 b1
204
178
c3 = f a2 b1
205
-
206
179
```
207
180
208
181
### Use Case
@@ -257,15 +230,6 @@ let changesOrInterval : AsyncSeq<Value> =
257
230
258
231
We can now consume this async sequence and use it to trigger downstream operations, such as updating the configuration of a running program, in flight.
259
232
260
-
---
261
-
262
-
*)
263
-
264
-
265
-
266
-
267
-
268
-
(**
269
233
270
234
## Distinct Until Changed
271
235
@@ -275,10 +239,12 @@ We can now consume this async sequence and use it to trigger downstream operatio
275
239
276
240
An example execution can be visualized as follows:
277
241
242
+
```
278
243
-----------------------------------
279
244
| source | a | a | b | b | b | a |
280
245
| result | a | | b | | | a |
281
246
-----------------------------------
247
+
```
282
248
283
249
### Use Case
284
250
@@ -327,13 +293,6 @@ let result : Async<string> =
327
293
if st.finished then Some st.result
328
294
else None)
329
295
330
-
(**
331
-
332
-
---
333
-
334
-
*)
335
-
336
-
337
296
(**
338
297
339
298
## Zip
@@ -374,13 +333,6 @@ let eventsAtLeastOneSec =
374
333
375
334
The resulting async sequence `eventsAtLeastOneSec` will emit an element at-most every second. Note that the input sequence of timeouts is infinite - this is to allow the other sequence to have any length since `AsyncSeq.zipWith` will terminate when either input sequence terminates.
376
335
377
-
*)
378
-
379
-
380
-
381
-
382
-
(**
383
-
384
336
## Buffer by Time and Count
385
337
386
338
`AsyncSeq.bufferByTimeAndCount` consumes the input sequence until a specified number of elements are consumed or a timeout expires at which point the resulting sequence emits the buffered of elements, unless no elements have been buffered. It is similar to `AsyncSeq.bufferByCount` but allows a buffer to be emitted base on a timeout in addition to buffer size. Both are useful for batching inputs before performing an operation. `AsyncSeq.bufferByTimeAndCount` allows an async workflow to proceed even if there are no inputs received during a certain time period.
@@ -397,9 +349,6 @@ An example execution can be visually depicted as follows:
397
349
```
398
350
The last event `a4` is emitted after a timeout.
399
351
400
-
401
-
402
-
403
352
### Use Case
404
353
405
354
Suppose we're writing a service which consumes a stream of events and indexes them into full-text search index. We can index each event one by one, however we get a performance improvement if we buffer events into small batches. We can buffer into fixed size batches using `AsyncSeq.bufferByCount`. However, the source event stream may stop emitting events half way through a batch which would leave those events in the buffer until more events arrive. `AsyncSeq.bufferByTimeAndCount` allows the async workflow to make progress by imposing a bound on how long a non-empty but incomplete buffer can wait more additional items.
0 commit comments