Skip to content

Commit ddb5f9a

Browse files
committed
Fixed issue with iterAsyncParallel/iterAsyncParallelThrottled/mapAsyncParallel not respecting cancellation requests
1 parent 8064abd commit ddb5f9a

2 files changed

Lines changed: 43 additions & 2 deletions

File tree

src/FSharp.Control.AsyncSeq/AsyncSeq.fs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,8 @@ module internal Utils =
167167
let ivar = TaskCompletionSource<_>()
168168
if t.IsFaulted then
169169
ivar.SetException t.Exception
170+
else if t.IsCanceled then
171+
ivar.SetCanceled()
170172
ivar.Task)
171173
|> join
172174
#endif
@@ -1060,6 +1062,7 @@ module AsyncSeq =
10601062
mb.Post (Some b) })
10611063
|> Async.map (fun _ -> mb.Post None)
10621064
|> Async.StartChildAsTask
1065+
10631066
return!
10641067
replicateUntilNoneAsync (Task.chooseTask (err |> Task.taskFault) (async.Delay mb.Receive))
10651068
|> iterAsync id }

tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,8 +126,6 @@ type Assert with
126126
| _ ->
127127
Assert.Fail(message)
128128

129-
130-
131129
[<Test>]
132130
let ``AsyncSeq.never should equal itself`` () =
133131
Assert.AreEqual(AsyncSeq.never<int>, AsyncSeq.never<int>, timeout=100, exnEq=AreCancellationExns)
@@ -1516,6 +1514,46 @@ let ``AsyncSeq.iterAsyncParallel should propagate exception`` () =
15161514
| Choice2Of2 _ -> ()
15171515
| Choice1Of2 _ -> Assert.Fail ("error expected")
15181516

1517+
[<Test>]
1518+
let ``AsyncSeq.iterAsyncParallel should cancel and not block forever when run in parallel with another exception-throwing Async`` () =
1519+
1520+
let handle x = async {
1521+
do! Async.Sleep 50
1522+
}
1523+
1524+
let fakeAsync = async {
1525+
do! Async.Sleep 500
1526+
return "fakeAsync"
1527+
}
1528+
1529+
let makeAsyncSeqBatch () =
1530+
let rec loop() = asyncSeq {
1531+
let! batch = fakeAsync |> Async.Catch
1532+
match batch with
1533+
| Choice1Of2 batch ->
1534+
if (Seq.isEmpty batch) then
1535+
do! Async.Sleep 500
1536+
yield! loop()
1537+
else
1538+
yield batch
1539+
yield! loop()
1540+
| Choice2Of2 err ->
1541+
printfn "Problem getting batch: %A" err
1542+
}
1543+
1544+
loop()
1545+
1546+
let x = makeAsyncSeqBatch () |> AsyncSeq.concatSeq |> AsyncSeq.iterAsyncParallel handle
1547+
let exAsync = async {
1548+
do! Async.Sleep 2000
1549+
failwith "error"
1550+
}
1551+
1552+
let t = [x; exAsync] |> Async.Parallel |> Async.Ignore |> Async.StartAsTask
1553+
1554+
// should fail after 2 seconds
1555+
Assert.Throws<AggregateException>(fun _ -> t.Wait(4000) |> ignore) |> ignore
1556+
15191557
[<Test>]
15201558
let ``AsyncSeq.iterAsyncParallelThrottled should propagate handler exception`` () =
15211559

0 commit comments

Comments
 (0)