Skip to content

Commit 3c4a28c

Browse files
committed
Async.ParallelIgnore
1 parent 7a7ce72 commit 3c4a28c

2 files changed

Lines changed: 87 additions & 2 deletions

File tree

src/FSharpx.Async/Async.fs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,50 @@ module AsyncExtensions =
6161
let! c = c
6262
return a,b,c }
6363

64+
/// Creates an async computation which runs the provided sequence of computations and completes
65+
/// when all computations in the sequence complete. Up to parallelism computations will
66+
/// be in-flight at any given point in time. Error or cancellation of any computation in
67+
/// the sequence causes the resulting computation to error or cancel, respectively.
68+
static member ParallelIgnore (parallelism:int) (xs:seq<Async<_>>) = async {
69+
70+
let sm = new SemaphoreSlim(parallelism)
71+
let cde = new CountdownEvent(1)
72+
let tcs = new TaskCompletionSource<unit>()
73+
74+
let inline ok _ =
75+
sm.Release() |> ignore
76+
if (cde.Signal()) then
77+
tcs.SetResult(())
78+
79+
let inline err (ex:exn) =
80+
tcs.SetException ex
81+
sm.Release() |> ignore
82+
83+
let inline cnc (ex:OperationCanceledException) =
84+
tcs.SetCanceled()
85+
sm.Release() |> ignore
86+
87+
try
88+
89+
for computation in xs do
90+
sm.Wait()
91+
cde.AddCount(1)
92+
// the following decreases throughput 3x but avoids blocking
93+
// do! sm.WaitAsync() |> Async.AwaitTask
94+
Async.StartWithContinuations(computation, ok, err, cnc)
95+
96+
if (cde.Signal()) then
97+
tcs.SetResult(())
98+
99+
do! tcs.Task |> Async.AwaitTask
100+
101+
finally
102+
103+
cde.Dispose()
104+
sm.Dispose()
105+
106+
}
107+
64108
/// An async computation which does nothing.
65109
static member inline unit = AsyncOps.unit
66110

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,46 @@
1-
21
module FSharpx.Async.Tests.AsyncTest
32

3+
open System
4+
open System.Threading
5+
open System.Threading.Tasks
6+
open NUnit.Framework
7+
open FSharpx.Control
8+
9+
[<Test>]
10+
let ``Async.ParallelIgnore should run argument computations``() =
11+
let bag = System.Collections.Concurrent.ConcurrentBag<_>()
12+
let s = Seq.init 10 id |> Set.ofSeq
13+
s
14+
|> Seq.map (fun i -> bag.Add i ; Async.unit)
15+
|> Async.ParallelIgnore 1
16+
|> Async.RunSynchronously
17+
Assert.True((s = (bag |> Set.ofSeq)))
18+
19+
[<Test>]
20+
let ``Async.ParallelIgnore should fail upon first failure``() =
21+
let s =
22+
[
23+
async { return failwith "catch me if you can" }
24+
]
25+
Assert.Throws<AggregateException>(fun() ->
26+
s
27+
|> Async.ParallelIgnore 1
28+
|> Async.RunSynchronously
29+
)
30+
|> ignore
431

5-
// TODO:
32+
[<Test>]
33+
let ``Async.ParallelIgnore should cancel upon first cancellation``() =
34+
let tcs = new TaskCompletionSource<unit>()
35+
let s =
36+
[
37+
tcs.Task |> Async.AwaitTask
38+
]
39+
tcs.SetCanceled()
40+
Assert.Throws<OperationCanceledException>(fun() ->
41+
s
42+
|> Async.ParallelIgnore 1
43+
|> Async.RunSynchronously
44+
)
45+
|> ignore
46+

0 commit comments

Comments
 (0)