Skip to content

Commit d8cf5dd

Browse files
Vasily KirichenkoVasily Kirichenko
authored andcommitted
AsyncSeq.toBlockingSeq does not hung forever if an exception is thrown and reraise it outside
1 parent dab5fed commit d8cf5dd

2 files changed

Lines changed: 17 additions & 5 deletions

File tree

src/FSharpx.Async/AsyncSeq.fs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -378,14 +378,17 @@ module AsyncSeq =
378378
let toBlockingSeq (input : AsyncSeq<'T>) =
379379
// Write all elements to a blocking buffer and then add None to denote end
380380
let buf = new BlockingQueueAgent<_>(1)
381-
async {
382-
do! iterAsync (Some >> buf.AsyncAdd) input
383-
do! buf.AsyncAdd(None) } |> Async.Start
381+
let iterator =
382+
async {
383+
let! res = iterAsync (Some >> buf.AsyncAdd) input |> Async.Catch
384+
do! buf.AsyncAdd(None)
385+
return match res with Choice2Of2 e -> raise e | _ -> ()
386+
} |> Async.StartAsTask
384387

385388
// Read elements from the blocking buffer & return a sequences
386389
let rec loop () = seq {
387390
match buf.Get() with
388-
| None -> ()
391+
| None -> iterator.Result
389392
| Some v ->
390393
yield v
391394
yield! loop() }

tests/FSharpx.Async.Tests/AsyncSeqTests.fs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
open NUnit.Framework
44
open FSharpx.Control
5+
open System
56

67
[<Test>]
78
let ``skipping should return all elements after the first non-match``() =
@@ -98,4 +99,12 @@ let ``AsyncSeq.bufferByCount``() =
9899

99100
let s' = s |> AsyncSeq.bufferByCount 2 |> AsyncSeq.toList |> Async.RunSynchronously
100101

101-
Assert.True(([[|1;2|];[|3;4|]] = s'))
102+
Assert.True(([[|1;2|];[|3;4|]] = s'))
103+
104+
[<Test>]
105+
let ``AsyncSeq.toBlockingSeq does not hung forever and rethrows exception``() =
106+
let s = asyncSeq {
107+
yield 1
108+
failwith "error"
109+
}
110+
Assert.Throws<AggregateException>(fun _ -> s |> AsyncSeq.toBlockingSeq |> Seq.toList |> ignore) |> ignore

0 commit comments

Comments
 (0)