Skip to content

Commit 41b061a

Browse files
Add tests for AsyncSeq.mapAsyncUnorderedParallelThrottled
Tests cover: - All results are produced (correctness) - Handler exceptions propagate - Concurrency is throttled to the specified parallelism Co-authored-by: Copilot <[email protected]>
1 parent 4028f38 commit 41b061a

1 file changed

Lines changed: 53 additions & 0 deletions

File tree

tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1717,6 +1717,59 @@ let ``AsyncSeq.mapAsyncUnorderedParallel should not preserve order`` () =
17171717
Assert.IsTrue(allPresent, "All input elements should be present in results")
17181718

17191719

1720+
[<Test>]
1721+
let ``AsyncSeq.mapAsyncUnorderedParallelThrottled should produce all results`` () =
1722+
let input = [1; 2; 3; 4; 5]
1723+
let expected = [2; 4; 6; 8; 10] |> Set.ofList
1724+
1725+
let actual =
1726+
input
1727+
|> AsyncSeq.ofSeq
1728+
|> AsyncSeq.mapAsyncUnorderedParallelThrottled 3 (fun x -> async {
1729+
do! Async.Sleep(10)
1730+
return x * 2
1731+
})
1732+
|> AsyncSeq.toListAsync
1733+
|> runTest
1734+
|> Set.ofList
1735+
1736+
Assert.AreEqual(expected, actual)
1737+
1738+
[<Test>]
1739+
let ``AsyncSeq.mapAsyncUnorderedParallelThrottled should propagate handler exception`` () =
1740+
let res =
1741+
AsyncSeq.init 100L id
1742+
|> AsyncSeq.mapAsyncUnorderedParallelThrottled 10 (fun i -> async {
1743+
if i = 50L then return failwith "oh no"
1744+
else return i * 2L
1745+
})
1746+
|> AsyncSeq.toListAsync
1747+
|> Async.Catch
1748+
|> (fun x -> Async.RunSynchronously (x, timeout = 10000))
1749+
1750+
match res with
1751+
| Choice2Of2 _ -> ()
1752+
| Choice1Of2 _ -> Assert.Fail ("error expected")
1753+
1754+
[<Test>]
1755+
let ``AsyncSeq.mapAsyncUnorderedParallelThrottled should throttle`` () =
1756+
let count = ref 0
1757+
let parallelism = 5
1758+
1759+
let result =
1760+
AsyncSeq.init 50L id
1761+
|> AsyncSeq.mapAsyncUnorderedParallelThrottled parallelism (fun i -> async {
1762+
let c = Interlocked.Increment count
1763+
if c > parallelism then
1764+
return failwith (sprintf "concurrency exceeded: %d > %d" c parallelism)
1765+
do! Async.Sleep 5
1766+
Interlocked.Decrement count |> ignore
1767+
return i * 2L })
1768+
|> AsyncSeq.toListAsync
1769+
|> Async.RunSynchronously
1770+
1771+
Assert.AreEqual(50, result.Length)
1772+
17201773
//[<Test>]
17211774
//let ``AsyncSeq.mapParallelAsyncBounded should maintain order`` () =
17221775
// let ls = List.init 500 id

0 commit comments

Comments
 (0)