Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
### 4.16.0

* Performance: Replaced `ref` cells with `mutable` locals in the `ofSeq`, `tryWith`, and `tryFinally` enumerator state machines. Each call to `ofSeq` (or any async CE block using `try...with` / `try...finally` / `use`) previously heap-allocated a `Ref<T>` wrapper object per enumerator; it now uses a direct mutable field in the generated class, reducing GC pressure. The change is equivalent to the `mutable`-for-`ref` improvement introduced in 4.11.0 for other enumerators.

### 4.15.0

* Bug fix: `AsyncSeq.removeAt` and `AsyncSeq.updateAt` now raise `ArgumentException` when the index is greater than or equal to the sequence length, consistent with `List.removeAt`, `Array.removeAt`, and `AsyncSeq.insertAt`. Previously they silently returned the sequence unchanged.
Expand Down
50 changes: 25 additions & 25 deletions src/FSharp.Control.AsyncSeq/AsyncSeq.fs
Original file line number Diff line number Diff line change
Expand Up @@ -548,35 +548,35 @@
let tryWith (inp: AsyncSeq<'T>) (handler : exn -> AsyncSeq<'T>) : AsyncSeq<'T> =
// Note: this is put outside the object deliberately, so the object doesn't permanently capture inp1 and inp2
AsyncSeqImpl(fun () ->
let state = ref (TryWithState.NotStarted inp)
let mutable state = TryWithState.NotStarted inp
{ new IAsyncSeqEnumerator<'T> with
member x.MoveNext() =
async { match state.Value with
async { match state with
| TryWithState.NotStarted inp ->
let res = ref Unchecked.defaultof<_>
let mutable res = Unchecked.defaultof<_>
try
res.Value <- Choice1Of2 (inp.GetEnumerator())
res <- Choice1Of2 (inp.GetEnumerator())
with exn ->
res.Value <- Choice2Of2 exn
match res.Value with
res <- Choice2Of2 exn
match res with
| Choice1Of2 r ->
return!
(state.Value <- TryWithState.HaveBodyEnumerator r
(state <- TryWithState.HaveBodyEnumerator r
x.MoveNext())
| Choice2Of2 exn ->
return!
(x.Dispose()
let enum = (handler exn).GetEnumerator()
state.Value <- TryWithState.HaveHandlerEnumerator enum
state <- TryWithState.HaveHandlerEnumerator enum
x.MoveNext())
| TryWithState.HaveBodyEnumerator e ->
let res = ref Unchecked.defaultof<_>
let mutable res = Unchecked.defaultof<_>
try
let! r = e.MoveNext()
res.Value <- Choice1Of2 r
res <- Choice1Of2 r
with exn ->
res.Value <- Choice2Of2 exn
match res.Value with
res <- Choice2Of2 exn
match res with
| Choice1Of2 res ->
return
(match res with
Expand All @@ -587,7 +587,7 @@
return!
(x.Dispose()
let e = (handler exn).GetEnumerator()
state.Value <- TryWithState.HaveHandlerEnumerator e
state <- TryWithState.HaveHandlerEnumerator e
x.MoveNext())
| TryWithState.HaveHandlerEnumerator e ->
let! res = e.MoveNext()
Expand All @@ -597,9 +597,9 @@
| _ ->
return None }
member x.Dispose() =
match state.Value with
match state with
| TryWithState.HaveBodyEnumerator e | TryWithState.HaveHandlerEnumerator e ->
state.Value <- TryWithState.Finished
state <- TryWithState.Finished
dispose e
| _ -> () }) :> AsyncSeq<'T>

Expand All @@ -614,14 +614,14 @@
// The (synchronous) compensation is run when the Dispose() is called
let tryFinally (inp: AsyncSeq<'T>) (compensation : unit -> unit) : AsyncSeq<'T> =
AsyncSeqImpl(fun () ->
let state = ref (TryFinallyState.NotStarted inp)
let mutable state = TryFinallyState.NotStarted inp
{ new IAsyncSeqEnumerator<'T> with
member x.MoveNext() =
async { match state.Value with
async { match state with
| TryFinallyState.NotStarted inp ->
return!
(let e = inp.GetEnumerator()
state.Value <- TryFinallyState.HaveBodyEnumerator e
state <- TryFinallyState.HaveBodyEnumerator e
x.MoveNext())
| TryFinallyState.HaveBodyEnumerator e ->
let! res = e.MoveNext()
Expand All @@ -633,9 +633,9 @@
| _ ->
return None }
member x.Dispose() =
match state.Value with
match state with
| TryFinallyState.HaveBodyEnumerator e->
state.Value <- TryFinallyState.Finished
state <- TryFinallyState.Finished
dispose e
compensation()
| _ -> () }) :> AsyncSeq<'T>
Expand Down Expand Up @@ -767,13 +767,13 @@

let ofSeq (inp: seq<'T>) : AsyncSeq<'T> =
AsyncSeqImpl(fun () ->
let state = ref (MapState.NotStarted inp)
let mutable state = MapState.NotStarted inp
{ new IAsyncSeqEnumerator<'T> with
member x.MoveNext() =
async { match state.Value with
async { match state with
| MapState.NotStarted inp ->
let e = inp.GetEnumerator()
state.Value <- MapState.HaveEnumerator e
state <- MapState.HaveEnumerator e
return! x.MoveNext()
| MapState.HaveEnumerator e ->
return
Expand All @@ -784,9 +784,9 @@
None)
| _ -> return None }
member x.Dispose() =
match state.Value with
match state with
| MapState.HaveEnumerator e ->
state.Value <- MapState.Finished
state <- MapState.Finished
dispose e
| _ -> () }) :> AsyncSeq<'T>

Expand Down Expand Up @@ -2729,7 +2729,7 @@

[<CompilerMessage("The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.", 9999)>]
let groupBy (p:'a -> 'k) (s:AsyncSeq<'a>) : AsyncSeq<'k * AsyncSeq<'a>> =
groupByAsync (p >> async.Return) s

Check warning on line 2732 in src/FSharp.Control.AsyncSeq/AsyncSeq.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.

Check warning on line 2732 in src/FSharp.Control.AsyncSeq/AsyncSeq.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
#endif
#endif

Expand Down
37 changes: 37 additions & 0 deletions tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2094,7 +2094,7 @@
let actual =
ls
|> AsyncSeq.ofSeq
|> AsyncSeq.groupBy p

Check warning on line 2097 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (snd >> AsyncSeq.toListAsync)
Assert.AreEqual(expected, actual)

Expand All @@ -2103,7 +2103,7 @@
let expected = asyncSeq { raise (exn("test")) }
let actual =
asyncSeq { raise (exn("test")) }
|> AsyncSeq.groupBy (fun i -> i % 3)

Check warning on line 2106 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (snd >> AsyncSeq.toListAsync)
Assert.AreEqual(expected, actual)

Expand Down Expand Up @@ -4490,7 +4490,7 @@
let ``AsyncSeq.groupByAsync groups elements by async projection`` () =
let result =
AsyncSeq.ofSeq [1..6]
|> AsyncSeq.groupByAsync (fun x -> async { return x % 2 })

Check warning on line 4493 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (fun (key, grp) -> async {
let! items = AsyncSeq.toArrayAsync grp
return key, Array.sort items })
Expand All @@ -4503,7 +4503,7 @@
let ``AsyncSeq.groupByAsync on empty sequence returns empty`` () =
let result =
AsyncSeq.empty<int>
|> AsyncSeq.groupByAsync (fun x -> async { return x % 2 })

Check warning on line 4506 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously
Assert.AreEqual([||], result)
Expand All @@ -4512,7 +4512,7 @@
let ``AsyncSeq.groupByAsync with all-same key produces single group`` () =
let result =
AsyncSeq.ofSeq [1; 2; 3]
|> AsyncSeq.groupByAsync (fun _ -> async { return "same" })

Check warning on line 4515 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (fun (key, grp) -> async {
let! items = AsyncSeq.toArrayAsync grp
return key, Array.sort items })
Expand Down Expand Up @@ -4585,3 +4585,40 @@
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously
Assert.AreEqual([| 42; 42; 42; 42; 42 |], result)

// ===== ofSeq: re-enumeration and empty-sequence edge cases =====

[<Test>]
let ``AsyncSeq.ofSeq empty returns empty`` () =
let result = AsyncSeq.ofSeq Seq.empty<int> |> AsyncSeq.toArrayAsync |> Async.RunSynchronously
Assert.AreEqual([||], result)

[<Test>]
let ``AsyncSeq.ofSeq can be enumerated multiple times`` () =
let s = AsyncSeq.ofSeq [1; 2; 3]
let r1 = s |> AsyncSeq.toArrayAsync |> Async.RunSynchronously
let r2 = s |> AsyncSeq.toArrayAsync |> Async.RunSynchronously
Assert.AreEqual([| 1; 2; 3 |], r1)
Assert.AreEqual([| 1; 2; 3 |], r2)

// ===== tryFinally: compensation runs even when downstream stops early =====

[<Test>]
let ``asyncSeq use releases resource on early termination`` () =
let disposed = ref false
let resource = { new System.IDisposable with member _.Dispose() = disposed := true }
let s = asyncSeq {
use _r = resource
yield 1; yield 2; yield 3 }
s |> AsyncSeq.take 1 |> AsyncSeq.toArrayAsync |> Async.RunSynchronously |> ignore
Assert.IsTrue(disposed.Value)

// ===== tryWith: handler receives exception and yields elements =====

[<Test>]
let ``asyncSeq try-with handler can yield elements`` () =
let s = asyncSeq {
try failwith "boom"
with _ -> yield 42 }
let result = s |> AsyncSeq.toArrayAsync |> Async.RunSynchronously
Assert.AreEqual([| 42 |], result)
Loading