Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
### 4.16.0

* Performance: Replaced `ref` cells with `mutable` locals in the `ofSeq`, `tryWith`, and `tryFinally` enumerator state machines. Each call to `ofSeq` (or any async CE block using `try...with` / `try...finally` / `use`) previously heap-allocated a `Ref<T>` wrapper object per enumerator; it now uses a direct mutable field in the generated class, reducing GC pressure. The change is equivalent to the `mutable`-for-`ref` improvement introduced in 4.11.0 for other enumerators.
* Added `AsyncSeq.unzip` β€” splits an async sequence of pairs into two arrays. Mirrors `List.unzip`.
* Added `AsyncSeq.unzip3` β€” splits an async sequence of triples into three arrays. Mirrors `List.unzip3`.
* Added `AsyncSeq.map2` β€” applies a function to corresponding elements of two async sequences; stops when either is exhausted. Mirrors `Seq.map2`.
* Added `AsyncSeq.map3` β€” applies a function to corresponding elements of three async sequences; stops when any is exhausted. Mirrors `List.map3`.

### 4.15.0

Expand Down
36 changes: 36 additions & 0 deletions src/FSharp.Control.AsyncSeq/AsyncSeq.fs
Original file line number Diff line number Diff line change
Expand Up @@ -1994,6 +1994,42 @@
let! next1 = ie1.MoveNext()
b1 <- next1 }

let unzip (source: AsyncSeq<'T1 * 'T2>) : Async<'T1[] * 'T2[]> = async {
let as1 = System.Collections.Generic.List<'T1>()
let as2 = System.Collections.Generic.List<'T2>()
use ie = source.GetEnumerator()
let! move = ie.MoveNext()
let mutable cur = move
while cur.IsSome do
let (a, b) = cur.Value
as1.Add(a)
as2.Add(b)
let! next = ie.MoveNext()
cur <- next
return (as1.ToArray(), as2.ToArray()) }

let unzip3 (source: AsyncSeq<'T1 * 'T2 * 'T3>) : Async<'T1[] * 'T2[] * 'T3[]> = async {
let as1 = System.Collections.Generic.List<'T1>()
let as2 = System.Collections.Generic.List<'T2>()
let as3 = System.Collections.Generic.List<'T3>()
use ie = source.GetEnumerator()
let! move = ie.MoveNext()
let mutable cur = move
while cur.IsSome do
let (a, b, c) = cur.Value
as1.Add(a)
as2.Add(b)
as3.Add(c)
let! next = ie.MoveNext()
cur <- next
return (as1.ToArray(), as2.ToArray(), as3.ToArray()) }

let map2 (mapping: 'T1 -> 'T2 -> 'U) (source1: AsyncSeq<'T1>) (source2: AsyncSeq<'T2>) : AsyncSeq<'U> =
zipWith mapping source1 source2

let map3 (mapping: 'T1 -> 'T2 -> 'T3 -> 'U) (source1: AsyncSeq<'T1>) (source2: AsyncSeq<'T2>) (source3: AsyncSeq<'T3>) : AsyncSeq<'U> =
zipWith3 mapping source1 source2 source3

let zappAsync (fs:AsyncSeq<'T -> Async<'U>>) (s:AsyncSeq<'T>) : AsyncSeq<'U> =
zipWithAsync (|>) s fs

Expand Down Expand Up @@ -2729,7 +2765,7 @@

[<CompilerMessage("The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.", 9999)>]
let groupBy (p:'a -> 'k) (s:AsyncSeq<'a>) : AsyncSeq<'k * AsyncSeq<'a>> =
groupByAsync (p >> async.Return) s

Check warning on line 2768 in src/FSharp.Control.AsyncSeq/AsyncSeq.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.

Check warning on line 2768 in src/FSharp.Control.AsyncSeq/AsyncSeq.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
#endif
#endif

Expand Down
16 changes: 16 additions & 0 deletions src/FSharp.Control.AsyncSeq/AsyncSeq.fsi
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,22 @@ module AsyncSeq =
/// The second sequence is fully buffered before iteration begins, mirroring Seq.allPairs.
val allPairs : source1:AsyncSeq<'T1> -> source2:AsyncSeq<'T2> -> AsyncSeq<'T1 * 'T2>

/// Splits an async sequence of pairs into two arrays. Mirrors List.unzip.
val unzip : source:AsyncSeq<'T1 * 'T2> -> Async<'T1[] * 'T2[]>

/// Splits an async sequence of triples into three arrays. Mirrors List.unzip3.
val unzip3 : source:AsyncSeq<'T1 * 'T2 * 'T3> -> Async<'T1[] * 'T2[] * 'T3[]>

/// Builds a new async sequence whose elements are the results of applying the given
/// function to the corresponding elements of the two sequences. Stops when either
/// sequence is exhausted. Mirrors Seq.map2.
val map2 : mapping:('T1 -> 'T2 -> 'U) -> source1:AsyncSeq<'T1> -> source2:AsyncSeq<'T2> -> AsyncSeq<'U>

/// Builds a new async sequence whose elements are the results of applying the given
/// function to the corresponding elements of three sequences. Stops when any
/// sequence is exhausted. Mirrors List.map3.
val map3 : mapping:('T1 -> 'T2 -> 'T3 -> 'U) -> source1:AsyncSeq<'T1> -> source2:AsyncSeq<'T2> -> source3:AsyncSeq<'T3> -> AsyncSeq<'U>

/// Builds a new asynchronous sequence whose elements are generated by
/// applying the specified function to all elements of the input sequence.
///
Expand Down
74 changes: 73 additions & 1 deletion tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2094,7 +2094,7 @@
let actual =
ls
|> AsyncSeq.ofSeq
|> AsyncSeq.groupBy p

Check warning on line 2097 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (snd >> AsyncSeq.toListAsync)
Assert.AreEqual(expected, actual)

Expand All @@ -2103,7 +2103,7 @@
let expected = asyncSeq { raise (exn("test")) }
let actual =
asyncSeq { raise (exn("test")) }
|> AsyncSeq.groupBy (fun i -> i % 3)

Check warning on line 2106 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (snd >> AsyncSeq.toListAsync)
Assert.AreEqual(expected, actual)

Expand Down Expand Up @@ -3684,7 +3684,79 @@
|> AsyncSeq.toArrayAsync |> Async.RunSynchronously
Assert.AreEqual([||], result)

// ── AsyncSeq.rev ─────────────────────────────────────────────────────────────
// ── AsyncSeq.unzip ───────────────────────────────────────────────────────────

[<Test>]
let ``AsyncSeq.unzip splits pairs into two arrays`` () =
let source = asyncSeq { yield (1, 'a'); yield (2, 'b'); yield (3, 'c') }
let (lefts, rights) = AsyncSeq.unzip source |> Async.RunSynchronously
Assert.AreEqual([| 1; 2; 3 |], lefts)
Assert.AreEqual([| 'a'; 'b'; 'c' |], rights)

[<Test>]
let ``AsyncSeq.unzip empty source returns two empty arrays`` () =
let (lefts, rights) = AsyncSeq.unzip AsyncSeq.empty<int * string> |> Async.RunSynchronously
Assert.AreEqual([||], lefts)
Assert.AreEqual([||], rights)

[<Test>]
let ``AsyncSeq.unzip mirrors List.unzip`` () =
let pairs = [ (1, 'x'); (2, 'y'); (3, 'z') ]
let (expL, expR) = List.unzip pairs
let (actL, actR) = AsyncSeq.unzip (AsyncSeq.ofList pairs) |> Async.RunSynchronously
Assert.AreEqual(expL |> Array.ofList, actL)
Assert.AreEqual(expR |> Array.ofList, actR)

// ── AsyncSeq.unzip3 ──────────────────────────────────────────────────────────

[<Test>]
let ``AsyncSeq.unzip3 splits triples into three arrays`` () =
let source = asyncSeq { yield (1, 'a', true); yield (2, 'b', false); yield (3, 'c', true) }
let (as1, as2, as3) = AsyncSeq.unzip3 source |> Async.RunSynchronously
Assert.AreEqual([| 1; 2; 3 |], as1)
Assert.AreEqual([| 'a'; 'b'; 'c' |], as2)
Assert.AreEqual([| true; false; true |], as3)

[<Test>]
let ``AsyncSeq.unzip3 empty source returns three empty arrays`` () =
let (as1, as2, as3) = AsyncSeq.unzip3 AsyncSeq.empty<int * string * bool> |> Async.RunSynchronously
Assert.AreEqual([||], as1)
Assert.AreEqual([||], as2)
Assert.AreEqual([||], as3)

// ── AsyncSeq.map2 ────────────────────────────────────────────────────────────

[<Test>]
let ``AsyncSeq.map2 applies function pairwise`` () =
let s1 = asyncSeq { yield 1; yield 2; yield 3 }
let s2 = asyncSeq { yield 10; yield 20; yield 30 }
let result = AsyncSeq.map2 (+) s1 s2 |> AsyncSeq.toArrayAsync |> Async.RunSynchronously
Assert.AreEqual([| 11; 22; 33 |], result)

[<Test>]
let ``AsyncSeq.map2 stops at shorter sequence`` () =
let s1 = asyncSeq { yield 1; yield 2; yield 3 }
let s2 = asyncSeq { yield 10; yield 20 }
let result = AsyncSeq.map2 (+) s1 s2 |> AsyncSeq.toArrayAsync |> Async.RunSynchronously
Assert.AreEqual([| 11; 22 |], result)

// ── AsyncSeq.map3 ────────────────────────────────────────────────────────────

[<Test>]
let ``AsyncSeq.map3 applies function to three sequences`` () =
let s1 = asyncSeq { yield 1; yield 2; yield 3 }
let s2 = asyncSeq { yield 10; yield 20; yield 30 }
let s3 = asyncSeq { yield 100; yield 200; yield 300 }
let result = AsyncSeq.map3 (fun a b c -> a + b + c) s1 s2 s3 |> AsyncSeq.toArrayAsync |> Async.RunSynchronously
Assert.AreEqual([| 111; 222; 333 |], result)

[<Test>]
let ``AsyncSeq.map3 stops at shortest sequence`` () =
let s1 = asyncSeq { yield 1; yield 2; yield 3 }
let s2 = asyncSeq { yield 10; yield 20; yield 30 }
let s3 = asyncSeq { yield 100 }
let result = AsyncSeq.map3 (fun a b c -> a + b + c) s1 s2 s3 |> AsyncSeq.toArrayAsync |> Async.RunSynchronously
Assert.AreEqual([| 111 |], result)

[<Test>]
let ``AsyncSeq.rev reverses a sequence`` () =
Expand Down Expand Up @@ -4490,7 +4562,7 @@
let ``AsyncSeq.groupByAsync groups elements by async projection`` () =
let result =
AsyncSeq.ofSeq [1..6]
|> AsyncSeq.groupByAsync (fun x -> async { return x % 2 })

Check warning on line 4565 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (fun (key, grp) -> async {
let! items = AsyncSeq.toArrayAsync grp
return key, Array.sort items })
Expand All @@ -4503,7 +4575,7 @@
let ``AsyncSeq.groupByAsync on empty sequence returns empty`` () =
let result =
AsyncSeq.empty<int>
|> AsyncSeq.groupByAsync (fun x -> async { return x % 2 })

Check warning on line 4578 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously
Assert.AreEqual([||], result)
Expand All @@ -4512,7 +4584,7 @@
let ``AsyncSeq.groupByAsync with all-same key produces single group`` () =
let result =
AsyncSeq.ofSeq [1; 2; 3]
|> AsyncSeq.groupByAsync (fun _ -> async { return "same" })

Check warning on line 4587 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (fun (key, grp) -> async {
let! items = AsyncSeq.toArrayAsync grp
return key, Array.sort items })
Expand Down
Loading