Skip to content

Commit cf8e7d2

Browse files
committed
tests, misc docs, pick
1 parent 59a40b4 commit cf8e7d2

3 files changed

Lines changed: 104 additions & 39 deletions

File tree

src/FSharp.Control.AsyncSeq/AsyncSeq.fs

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -706,6 +706,7 @@ module AsyncSeq =
706706
yield! loop () }
707707
yield! loop () }
708708

709+
// TODO
709710
let mapAsyncParallelBounded (parallelism:int) (f:'a -> Async<'b>) (s:AsyncSeq<'a>) = asyncSeq {
710711
use mb = BoundedMb.create (parallelism)
711712
do! s |> iterAsync (fun a -> async {
@@ -794,6 +795,26 @@ module AsyncSeq =
794795
let! moven = ie.MoveNext()
795796
b := moven }
796797

798+
let pickAsync (f:'T -> Async<'U option>) (source:AsyncSeq<'T>) = async {
799+
use ie = source.GetEnumerator()
800+
let! v = ie.MoveNext()
801+
let b = ref v
802+
let res = ref None
803+
while b.Value.IsSome && not res.Value.IsSome do
804+
let! fv = f b.Value.Value
805+
match fv with
806+
| None ->
807+
let! moven = ie.MoveNext()
808+
b := moven
809+
| Some _ as r ->
810+
res := r
811+
match res.Value with
812+
| Some _ -> return res.Value.Value
813+
| None -> return raise(KeyNotFoundException()) }
814+
815+
let pick f (source:AsyncSeq<'T>) =
816+
pickAsync (f >> async.Return) source
817+
797818
let tryPickAsync f (source : AsyncSeq<'T>) = async {
798819
use ie = source.GetEnumerator()
799820
let! v = ie.MoveNext()
@@ -1390,7 +1411,7 @@ module AsyncSeq =
13901411
let close (s:AsyncSeqSrc<'a>) : unit =
13911412
s.tail.tcs.SetResult(None)
13921413

1393-
let fail (ex:exn) (s:AsyncSeqSrc<'a>) : unit =
1414+
let error (ex:exn) (s:AsyncSeqSrc<'a>) : unit =
13941415
s.tail.tcs.SetException(ex)
13951416

13961417
let rec private toAsyncSeqImpl (s:AsyncSeqSrcNode<'a>) : AsyncSeq<'a> =
@@ -1462,7 +1483,7 @@ module AsyncSeqSrc =
14621483
let put a s = AsyncSeq.AsyncSeqSrcImpl.put a s
14631484
let close s = AsyncSeq.AsyncSeqSrcImpl.close s
14641485
let toAsyncSeq s = AsyncSeq.AsyncSeqSrcImpl.toAsyncSeq s
1465-
let fail e s = AsyncSeq.AsyncSeqSrcImpl.fail e s
1486+
let error e s = AsyncSeq.AsyncSeqSrcImpl.error e s
14661487

14671488
module Seq =
14681489

src/FSharp.Control.AsyncSeq/AsyncSeq.fsi

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@ type AsyncSeq<'T> = IAsyncEnumerable<'T>
1919

2020
[<RequireQualifiedAccess>]
2121
module AsyncSeq =
22-
/// Creates an empty asynchronou sequence that immediately ends
22+
/// Creates an empty asynchronous sequence that immediately ends.
2323
[<GeneralizableValueAttribute>]
2424
val empty<'T> : AsyncSeq<'T>
2525

26-
/// Creates an asynchronous sequence that generates a single element and then ends
26+
/// Creates an asynchronous sequence that generates a single element and then ends.
2727
val singleton : v:'T -> AsyncSeq<'T>
2828

2929
/// Generates a finite async sequence using the specified asynchronous initialization function.
@@ -32,10 +32,10 @@ module AsyncSeq =
3232
/// Generates a finite async sequence using the specified initialization function.
3333
val init : count: int64 -> mapping:(int64 -> 'T) -> AsyncSeq<'T>
3434

35-
/// Generates an async sequence using the specified asynchronous initialization function.
35+
/// Generates an infinite async sequence using the specified asynchronous initialization function.
3636
val initInfiniteAsync : mapping:(int64 -> Async<'T>) -> AsyncSeq<'T>
3737

38-
/// Generates an async sequence using the specified initialization function.
38+
/// Generates an infinite async sequence using the specified initialization function.
3939
val initInfinite : mapping:(int64 -> 'T) -> AsyncSeq<'T>
4040

4141
/// Generates an async sequence using the specified asynchronous generator function.
@@ -47,7 +47,7 @@ module AsyncSeq =
4747
/// Creates an async sequence which repeats the specified value the indicated number of times.
4848
val replicate : count: int -> v:'T -> AsyncSeq<'T>
4949

50-
/// Creates an async sequence which repeats the specified value indefinitely.
50+
/// Creates an infinite async sequence which repeats the specified value.
5151
val replicateInfinite : v:'T -> AsyncSeq<'T>
5252

5353
/// Yields all elements of the first asynchronous sequence and then
@@ -185,9 +185,20 @@ module AsyncSeq =
185185
/// Asynchronously determine if the sequence contains the given value
186186
val contains : value:'T -> source:AsyncSeq<'T> -> Async<bool> when 'T : equality
187187

188-
/// Asynchronously pick a value from a sequence
188+
/// Asynchronously pick a value from a sequence based on the specified chooser function.
189+
val tryPickAsync : chooser:('T -> Async<'TResult option>) -> source:AsyncSeq<'T> -> Async<'TResult option>
190+
191+
/// Asynchronously pick a value from a sequence based on the specified chooser function.
189192
val tryPick : chooser:('T -> 'TResult option) -> source:AsyncSeq<'T> -> Async<'TResult option>
190193

194+
/// Asynchronously pick a value from a sequence based on the specified chooser function.
195+
/// Raises KeyNotFoundException if the chooser function can't find a matching key.
196+
val pickAsync : chooser:('T -> Async<'TResult option>) -> source:AsyncSeq<'T> -> Async<'TResult>
197+
198+
/// Asynchronously pick a value from a sequence based on the specified chooser function.
199+
/// Raises KeyNotFoundException if the chooser function can't find a matching key.
200+
val pick : chooser:('T -> 'TResult option) -> source:AsyncSeq<'T> -> Async<'TResult>
201+
191202
/// Asynchronously find the first value in a sequence for which the predicate returns true
192203
val tryFind : predicate:('T -> bool) -> source:AsyncSeq<'T> -> Async<'T option>
193204

@@ -420,7 +431,7 @@ module Seq =
420431
val ofAsyncSeq : source:AsyncSeq<'T> -> seq<'T>
421432

422433

423-
/// An async sequence source.
434+
/// An async sequence source produces async sequences.
424435
type AsyncSeqSrc<'T>
425436

426437
/// Operations on async sequence sources.
@@ -429,15 +440,16 @@ module AsyncSeqSrc =
429440
/// Creates a new async sequence source.
430441
val create : unit -> AsyncSeqSrc<'T>
431442

432-
/// Puts an item into the async sequence source causing any created async sequences to yield the item.
443+
/// Causes any async sequences created before the call to yield the item.
433444
val put : item:'T -> src:AsyncSeqSrc<'T> -> unit
434445

435-
/// Closes the async sequence source casuing any created async sequences to terminate.
446+
/// Closes the async sequence source casuing any created async sequences to complete.
436447
val close : src:AsyncSeqSrc<'T> -> unit
437448

438-
/// Causes async sequence created immediately before the call to raise an exception.
439-
val fail : exn:exn -> src:AsyncSeqSrc<'T> -> unit
449+
/// Causes async sequence created before the call to raise an exception.
450+
val error : exn:exn -> src:AsyncSeqSrc<'T> -> unit
440451

441452
/// Creates an async sequence which yields values as they are put into the source and terminates
442453
/// when the source is closed. This sequence will yield items starting with the next put.
454+
/// Many async sequences can be created from once source.
443455
val toAsyncSeq : src:AsyncSeqSrc<'T> -> AsyncSeq<'T>

tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

Lines changed: 58 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@ module AsyncOps =
1818
let unit = async.Return()
1919
let never = Async.Sleep(-1)
2020

21+
let catch (f:'a -> 'b) : 'a -> Choice<'b, exn> =
22+
fun a ->
23+
try f a |> Choice1Of2
24+
with ex -> ex |> Choice2Of2
25+
2126
/// Determines equality of two async sequences by convering them to lists, ignoring side-effects.
2227
let EQ (a:AsyncSeq<'a>) (b:AsyncSeq<'a>) =
2328
let exp = a |> AsyncSeq.toList
@@ -49,6 +54,20 @@ type Assert with
4954
Assert.True((exnEq exp act), message)
5055
| _ ->
5156
Assert.Fail(message)
57+
58+
static member AreEqual (expected:unit -> 'a, actual:unit -> 'a) =
59+
let expected = (catch expected) ()
60+
let actual = (catch actual) ()
61+
let message = sprintf "expected=%A actual=%A" expected actual
62+
match expected,actual with
63+
| Choice1Of2 exp, Choice1Of2 act ->
64+
Assert.True((exp = act), message)
65+
| Choice2Of2 exp, Choice2Of2 act ->
66+
()
67+
| _ ->
68+
Assert.Fail(message)
69+
70+
5271

5372

5473

@@ -117,6 +136,16 @@ let ``AsyncSeq.tryPick works``() =
117136
let expected = ls |> Seq.tryPick (fun x -> if x = j then Some (string (x+1)) else None)
118137
Assert.True((expected = actual))
119138

139+
[<Test>]
140+
let ``AsyncSeq.pick works``() =
141+
for i in 0 .. 10 do
142+
let ls = [ 1 .. i ]
143+
for j in [0;i;i+1] do
144+
let chooser x = if x = j then Some (string (x+1)) else None
145+
let actual () = AsyncSeq.ofSeq ls |> AsyncSeq.pick chooser |> Async.RunSynchronously
146+
let expected () = ls |> Seq.pick chooser
147+
Assert.AreEqual(actual, expected)
148+
120149
[<Test>]
121150
let ``AsyncSeq.tryFind works``() =
122151
for i in 0 .. 10 do
@@ -1093,16 +1122,17 @@ let ``AsyncSeq.take should work``() =
10931122

10941123
[<Test>]
10951124
let ``AsyncSeq.mapParallelAsync should maintain order`` () =
1096-
let ls = List.init 500 id
1097-
let expected =
1098-
ls
1099-
|> AsyncSeq.ofSeq
1100-
|> AsyncSeq.mapAsync (async.Return)
1101-
let actual =
1102-
ls
1103-
|> AsyncSeq.ofSeq
1104-
|> AsyncSeq.mapAsyncParallel (async.Return)
1105-
Assert.AreEqual(expected, actual)
1125+
for i in 0..100 do
1126+
let ls = List.init i id
1127+
let expected =
1128+
ls
1129+
|> AsyncSeq.ofSeq
1130+
|> AsyncSeq.mapAsync (async.Return)
1131+
let actual =
1132+
ls
1133+
|> AsyncSeq.ofSeq
1134+
|> AsyncSeq.mapAsyncParallel (async.Return)
1135+
Assert.AreEqual(expected, actual)
11061136

11071137
//[<Test>]
11081138
let ``AsyncSeq.mapParallelAsync should be parallel`` () =
@@ -1113,7 +1143,7 @@ let ``AsyncSeq.mapParallelAsync should be parallel`` () =
11131143
s |> AsyncSeq.map id
11141144
let actual =
11151145
s
1116-
|> AsyncSeq.mapAsyncParallel (fun i -> async { barrier.SignalAndWait () ; return i })
1146+
|> AsyncSeq.mapAsyncParallel (fun i -> async { barrier.SignalAndWait () ; return i }) // can deadlock
11171147
Assert.AreEqual(expected, actual, timeout=200)
11181148

11191149
//[<Test>]
@@ -1164,27 +1194,29 @@ let ``AsyncSeqSrc.fail should throw`` () =
11641194
let item = 1
11651195
let src = AsyncSeqSrc.create ()
11661196
let actual = src |> AsyncSeqSrc.toAsyncSeq
1167-
src |> AsyncSeqSrc.fail (exn("test"))
1197+
src |> AsyncSeqSrc.error (exn("test"))
11681198
let expected = asyncSeq { raise (exn("test")) }
11691199
Assert.AreEqual (expected, actual)
11701200

11711201

11721202
[<Test>]
11731203
let ``AsyncSeq.groupBy should work``() =
1174-
let ls = List.init 100 id
1175-
let p i = i % 3
1176-
let expected =
1177-
ls
1178-
|> Seq.groupBy p
1179-
|> Seq.map (snd >> Seq.toList)
1180-
|> Seq.toList
1181-
|> AsyncSeq.ofSeq
1182-
let actual =
1183-
ls
1184-
|> AsyncSeq.ofSeq
1185-
|> AsyncSeq.groupBy p
1186-
|> AsyncSeq.mapAsyncParallel (snd >> AsyncSeq.toListAsync)
1187-
Assert.AreEqual(expected, actual)
1204+
for i in 0..100 do
1205+
for j in 1..3 do
1206+
let ls = List.init i id
1207+
let p x = x % j
1208+
let expected =
1209+
ls
1210+
|> Seq.groupBy p
1211+
|> Seq.map (snd >> Seq.toList)
1212+
|> Seq.toList
1213+
|> AsyncSeq.ofSeq
1214+
let actual =
1215+
ls
1216+
|> AsyncSeq.ofSeq
1217+
|> AsyncSeq.groupBy p
1218+
|> AsyncSeq.mapAsyncParallel (snd >> AsyncSeq.toListAsync)
1219+
Assert.AreEqual(expected, actual)
11881220

11891221
[<Test>]
11901222
let ``AsyncSeq.groupBy should propagate exception and terminate all groups``() =

0 commit comments

Comments
 (0)