Skip to content

Commit e61b457

Browse files
committed
merge upstream
2 parents 4fd12b3 + fa32ff0 commit e61b457

3 files changed

Lines changed: 21 additions & 9 deletions

File tree

src/FSharpx.Async/AssemblyInfo.fs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@ open System.Reflection
44
[<assembly: AssemblyTitleAttribute("FSharpx.Async")>]
55
[<assembly: AssemblyProductAttribute("FSharpx.Async")>]
66
[<assembly: AssemblyDescriptionAttribute("Async extensions for F#")>]
7-
[<assembly: AssemblyVersionAttribute("1.11.0")>]
8-
[<assembly: AssemblyFileVersionAttribute("1.11.0")>]
7+
[<assembly: AssemblyVersionAttribute("1.11.1")>]
8+
[<assembly: AssemblyFileVersionAttribute("1.11.1")>]
99
do ()
1010

1111
module internal AssemblyVersionInformation =
12-
let [<Literal>] Version = "1.11.0"
12+
let [<Literal>] Version = "1.11.1"

src/FSharpx.Async/AsyncSeq.fs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -390,14 +390,17 @@ module AsyncSeq =
390390
let toBlockingSeq (input : AsyncSeq<'T>) =
391391
// Write all elements to a blocking buffer and then add None to denote end
392392
let buf = new BlockingQueueAgent<_>(1)
393-
async {
394-
do! iterAsync (Some >> buf.AsyncAdd) input
395-
do! buf.AsyncAdd(None) } |> Async.Start
393+
let iterator =
394+
async {
395+
let! res = iterAsync (Some >> buf.AsyncAdd) input |> Async.Catch
396+
do! buf.AsyncAdd(None)
397+
return match res with Choice2Of2 e -> raise e | _ -> ()
398+
} |> Async.StartAsTask
396399

397400
// Read elements from the blocking buffer & return a sequences
398401
let rec loop () = seq {
399402
match buf.Get() with
400-
| None -> ()
403+
| None -> iterator.Result
401404
| Some v ->
402405
yield v
403406
yield! loop() }
@@ -674,4 +677,4 @@ module Seq =
674677
/// Converts asynchronous sequence to a synchronous blocking sequence.
675678
/// The elements of the asynchronous sequence are consumed lazily.
676679
let ofAsyncSeq (input : AsyncSeq<'T>) =
677-
AsyncSeq.toBlockingSeq input
680+
AsyncSeq.toBlockingSeq input

tests/FSharpx.Async.Tests/AsyncSeqTests.fs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
open NUnit.Framework
44
open FSharpx.Control
5+
open System
56

67
/// Determines equality of two async sequences by convering them to lists, ignoring side-effects.
78
let EQ (a:AsyncSeq<'a>) (b:AsyncSeq<'a>) =
@@ -236,4 +237,12 @@ let ``AsyncSeq.traverseChoiceAsync``() =
236237
| Choice2Of2 e ->
237238
Assert.AreEqual("oh no", e)
238239
Assert.True(([1;2] = (seen |> List.ofSeq)))
239-
240+
241+
242+
[<Test>]
243+
let ``AsyncSeq.toBlockingSeq does not hung forever and rethrows exception``() =
244+
let s = asyncSeq {
245+
yield 1
246+
failwith "error"
247+
}
248+
Assert.Throws<AggregateException>(fun _ -> s |> AsyncSeq.toBlockingSeq |> Seq.toList |> ignore) |> ignore

0 commit comments

Comments
 (0)