Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
### 4.15.0

* Bug fix: `AsyncSeq.removeAt` and `AsyncSeq.updateAt` now raise `ArgumentException` when the index is greater than or equal to the sequence length, consistent with `List.removeAt`, `Array.removeAt`, and `AsyncSeq.insertAt`. Previously they silently returned the sequence unchanged.

### 4.14.0

* Added `AsyncSeq.mapAsyncParallelThrottled` β€” ordered, bounded-concurrency parallel map. Like `mapAsyncParallel` but limits the number of in-flight operations to `parallelism`, preventing unbounded resource use on large or infinite sequences.
Expand Down
8 changes: 6 additions & 2 deletions src/FSharp.Control.AsyncSeq/AsyncSeq.fs
Original file line number Diff line number Diff line change
Expand Up @@ -1706,15 +1706,19 @@
let mutable i = 0
for x in source do
if i <> index then yield x
i <- i + 1 }
i <- i + 1
if i <= index then
invalidArg "index" "The index is outside the range of elements in the collection." }

let updateAt (index : int) (value : 'T) (source : AsyncSeq<'T>) : AsyncSeq<'T> = asyncSeq {
if index < 0 then invalidArg "index" "must be non-negative"
let mutable i = 0
for x in source do
if i = index then yield value
else yield x
i <- i + 1 }
i <- i + 1
if i <= index then
invalidArg "index" "The index is outside the range of elements in the collection." }

let insertAt (index : int) (value : 'T) (source : AsyncSeq<'T>) : AsyncSeq<'T> = asyncSeq {
if index < 0 then invalidArg "index" "must be non-negative"
Expand Down Expand Up @@ -2725,7 +2729,7 @@

[<CompilerMessage("The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.", 9999)>]
let groupBy (p:'a -> 'k) (s:AsyncSeq<'a>) : AsyncSeq<'k * AsyncSeq<'a>> =
groupByAsync (p >> async.Return) s

Check warning on line 2732 in src/FSharp.Control.AsyncSeq/AsyncSeq.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.

Check warning on line 2732 in src/FSharp.Control.AsyncSeq/AsyncSeq.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
#endif
#endif

Expand Down
36 changes: 36 additions & 0 deletions tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2094,7 +2094,7 @@
let actual =
ls
|> AsyncSeq.ofSeq
|> AsyncSeq.groupBy p

Check warning on line 2097 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (snd >> AsyncSeq.toListAsync)
Assert.AreEqual(expected, actual)

Expand All @@ -2103,7 +2103,7 @@
let expected = asyncSeq { raise (exn("test")) }
let actual =
asyncSeq { raise (exn("test")) }
|> AsyncSeq.groupBy (fun i -> i % 3)

Check warning on line 2106 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (snd >> AsyncSeq.toListAsync)
Assert.AreEqual(expected, actual)

Expand Down Expand Up @@ -3784,6 +3784,24 @@
|> Async.RunSynchronously |> ignore)
|> ignore

[<Test>]
let ``AsyncSeq.removeAt raises ArgumentException when index is out of range`` () =
Assert.Throws<System.ArgumentException>(fun () ->
AsyncSeq.ofSeq [ 1; 2; 3 ]
|> AsyncSeq.removeAt 10
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously |> ignore)
|> ignore

[<Test>]
let ``AsyncSeq.removeAt raises ArgumentException when index equals sequence length`` () =
Assert.Throws<System.ArgumentException>(fun () ->
AsyncSeq.ofSeq [ 1; 2; 3 ]
|> AsyncSeq.removeAt 3
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously |> ignore)
|> ignore

// ===== updateAt =====

[<Test>]
Expand Down Expand Up @@ -3822,6 +3840,24 @@
|> Async.RunSynchronously |> ignore)
|> ignore

[<Test>]
let ``AsyncSeq.updateAt raises ArgumentException when index is out of range`` () =
Assert.Throws<System.ArgumentException>(fun () ->
AsyncSeq.ofSeq [ 1; 2; 3 ]
|> AsyncSeq.updateAt 10 99
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously |> ignore)
|> ignore

[<Test>]
let ``AsyncSeq.updateAt raises ArgumentException when index equals sequence length`` () =
Assert.Throws<System.ArgumentException>(fun () ->
AsyncSeq.ofSeq [ 1; 2; 3 ]
|> AsyncSeq.updateAt 3 99
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously |> ignore)
|> ignore

// ===== insertAt =====

[<Test>]
Expand Down Expand Up @@ -4454,7 +4490,7 @@
let ``AsyncSeq.groupByAsync groups elements by async projection`` () =
let result =
AsyncSeq.ofSeq [1..6]
|> AsyncSeq.groupByAsync (fun x -> async { return x % 2 })

Check warning on line 4493 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (fun (key, grp) -> async {
let! items = AsyncSeq.toArrayAsync grp
return key, Array.sort items })
Expand All @@ -4467,7 +4503,7 @@
let ``AsyncSeq.groupByAsync on empty sequence returns empty`` () =
let result =
AsyncSeq.empty<int>
|> AsyncSeq.groupByAsync (fun x -> async { return x % 2 })

Check warning on line 4506 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously
Assert.AreEqual([||], result)
Expand All @@ -4476,7 +4512,7 @@
let ``AsyncSeq.groupByAsync with all-same key produces single group`` () =
let result =
AsyncSeq.ofSeq [1; 2; 3]
|> AsyncSeq.groupByAsync (fun _ -> async { return "same" })

Check warning on line 4515 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (fun (key, grp) -> async {
let! items = AsyncSeq.toArrayAsync grp
return key, Array.sort items })
Expand Down
2 changes: 1 addition & 1 deletion version.props
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<Project>
<PropertyGroup>
<Version>4.14.0</Version>
<Version>4.15.0</Version>
</PropertyGroup>
</Project>
Loading