Skip to content

Commit 89300e2

Browse files
authored
Merge pull request fsprojects#136 from thednaz/master
Fixed issue with iterAsyncParallel/iterAsyncParallelThrottled/mapAsyncParallel not respecting cancellation requests
2 parents 4698c82 + ddb5f9a commit 89300e2

2 files changed

Lines changed: 43 additions & 2 deletions

File tree

src/FSharp.Control.AsyncSeq/AsyncSeq.fs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,8 @@ module internal Utils =
167167
let ivar = TaskCompletionSource<_>()
168168
if t.IsFaulted then
169169
ivar.SetException t.Exception
170+
else if t.IsCanceled then
171+
ivar.SetCanceled()
170172
ivar.Task)
171173
|> join
172174
#endif
@@ -1060,6 +1062,7 @@ module AsyncSeq =
10601062
mb.Post (Some b) })
10611063
|> Async.map (fun _ -> mb.Post None)
10621064
|> Async.StartChildAsTask
1065+
10631066
return!
10641067
replicateUntilNoneAsync (Task.chooseTask (err |> Task.taskFault) (async.Delay mb.Receive))
10651068
|> iterAsync id }

tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,6 @@ type Assert with
122122
| _ ->
123123
Assert.Fail(message)
124124

125-
126-
127125
[<Test>]
128126
let ``AsyncSeq.never should equal itself`` () =
129127
Assert.AreEqual(AsyncSeq.never<int>, AsyncSeq.never<int>, timeout=100, exnEq=AreCancellationExns)
@@ -1512,6 +1510,46 @@ let ``AsyncSeq.iterAsyncParallel should propagate exception`` () =
15121510
| Choice2Of2 _ -> ()
15131511
| Choice1Of2 _ -> Assert.Fail ("error expected")
15141512

1513+
[<Test>]
1514+
let ``AsyncSeq.iterAsyncParallel should cancel and not block forever when run in parallel with another exception-throwing Async`` () =
1515+
1516+
let handle x = async {
1517+
do! Async.Sleep 50
1518+
}
1519+
1520+
let fakeAsync = async {
1521+
do! Async.Sleep 500
1522+
return "fakeAsync"
1523+
}
1524+
1525+
let makeAsyncSeqBatch () =
1526+
let rec loop() = asyncSeq {
1527+
let! batch = fakeAsync |> Async.Catch
1528+
match batch with
1529+
| Choice1Of2 batch ->
1530+
if (Seq.isEmpty batch) then
1531+
do! Async.Sleep 500
1532+
yield! loop()
1533+
else
1534+
yield batch
1535+
yield! loop()
1536+
| Choice2Of2 err ->
1537+
printfn "Problem getting batch: %A" err
1538+
}
1539+
1540+
loop()
1541+
1542+
let x = makeAsyncSeqBatch () |> AsyncSeq.concatSeq |> AsyncSeq.iterAsyncParallel handle
1543+
let exAsync = async {
1544+
do! Async.Sleep 2000
1545+
failwith "error"
1546+
}
1547+
1548+
let t = [x; exAsync] |> Async.Parallel |> Async.Ignore |> Async.StartAsTask
1549+
1550+
// should fail after 2 seconds
1551+
Assert.Throws<AggregateException>(fun _ -> t.Wait(4000) |> ignore) |> ignore
1552+
15151553
[<Test>]
15161554
let ``AsyncSeq.iterAsyncParallelThrottled should propagate handler exception`` () =
15171555

0 commit comments

Comments
 (0)