Skip to content

Commit 3e7629c

Browse files
authored
Merge pull request #225 from fsprojects/repo-assist/add-mapasyncunorderedparallelthrottled-001ed43df569b4c9
[Repo Assist] Add AsyncSeq.mapAsyncUnorderedParallelThrottled
2 parents 0f5430c + 674f744 commit 3e7629c

3 files changed

Lines changed: 89 additions & 0 deletions

File tree

src/FSharp.Control.AsyncSeq/AsyncSeq.fs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -925,6 +925,34 @@ module AsyncSeq =
925925
| Choice1Of2 value -> return value
926926
| Choice2Of2 ex -> return raise ex })
927927
}
928+
929+
let mapAsyncUnorderedParallelThrottled (parallelism:int) (f:'a -> Async<'b>) (s:AsyncSeq<'a>) : AsyncSeq<'b> = asyncSeq {
930+
use mb = MailboxProcessor.Start (fun _ -> async.Return())
931+
use sm = new SemaphoreSlim(parallelism)
932+
let! err =
933+
s
934+
|> iterAsync (fun a -> async {
935+
do! sm.WaitAsync () |> Async.awaitTaskUnitCancellationAsError
936+
let! b = Async.StartChild (async {
937+
try
938+
let! result = f a
939+
sm.Release() |> ignore
940+
return Choice1Of2 result
941+
with ex ->
942+
sm.Release() |> ignore
943+
return Choice2Of2 ex
944+
})
945+
mb.Post (Some b) })
946+
|> Async.map (fun _ -> mb.Post None)
947+
|> Async.StartChildAsTask
948+
yield!
949+
replicateUntilNoneAsync (Task.chooseTask (err |> Task.taskFault) (async.Delay mb.Receive))
950+
|> mapAsync (fun childAsync -> async {
951+
let! result = childAsync
952+
match result with
953+
| Choice1Of2 value -> return value
954+
| Choice2Of2 ex -> return raise ex })
955+
}
928956
#endif
929957

930958
let chooseAsync f (source:AsyncSeq<'T>) =

src/FSharp.Control.AsyncSeq/AsyncSeq.fsi

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -543,6 +543,14 @@ module AsyncSeq =
543543
/// Parallelism is bound by the ThreadPool.
544544
val mapAsyncUnorderedParallel : mapping:('T -> Async<'U>) -> s:AsyncSeq<'T> -> AsyncSeq<'U>
545545

546+
/// Builds a new asynchronous sequence whose elements are generated by
547+
/// applying the specified function to all elements of the input sequence,
548+
/// with at most <c>parallelism</c> mapping operations running concurrently.
549+
///
550+
/// The function is applied to elements in parallel (throttled), and results are emitted
551+
/// in the order they complete (unordered), without preserving the original order.
552+
val mapAsyncUnorderedParallelThrottled : parallelism:int -> mapping:('T -> Async<'U>) -> s:AsyncSeq<'T> -> AsyncSeq<'U>
553+
546554
/// Applies a key-generating function to each element and returns an async sequence containing unique keys
547555
/// and async sequences containing elements corresponding to the key.
548556
///

tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1717,6 +1717,59 @@ let ``AsyncSeq.mapAsyncUnorderedParallel should not preserve order`` () =
17171717
Assert.IsTrue(allPresent, "All input elements should be present in results")
17181718

17191719

1720+
[<Test>]
1721+
let ``AsyncSeq.mapAsyncUnorderedParallelThrottled should produce all results`` () =
1722+
let input = [1; 2; 3; 4; 5]
1723+
let expected = [2; 4; 6; 8; 10] |> Set.ofList
1724+
1725+
let actual =
1726+
input
1727+
|> AsyncSeq.ofSeq
1728+
|> AsyncSeq.mapAsyncUnorderedParallelThrottled 3 (fun x -> async {
1729+
do! Async.Sleep(10)
1730+
return x * 2
1731+
})
1732+
|> AsyncSeq.toListAsync
1733+
|> runTest
1734+
|> Set.ofList
1735+
1736+
Assert.AreEqual(expected, actual)
1737+
1738+
[<Test>]
1739+
let ``AsyncSeq.mapAsyncUnorderedParallelThrottled should propagate handler exception`` () =
1740+
let res =
1741+
AsyncSeq.init 100L id
1742+
|> AsyncSeq.mapAsyncUnorderedParallelThrottled 10 (fun i -> async {
1743+
if i = 50L then return failwith "oh no"
1744+
else return i * 2L
1745+
})
1746+
|> AsyncSeq.toListAsync
1747+
|> Async.Catch
1748+
|> (fun x -> Async.RunSynchronously (x, timeout = 10000))
1749+
1750+
match res with
1751+
| Choice2Of2 _ -> ()
1752+
| Choice1Of2 _ -> Assert.Fail ("error expected")
1753+
1754+
[<Test>]
1755+
let ``AsyncSeq.mapAsyncUnorderedParallelThrottled should throttle`` () =
1756+
let count = ref 0
1757+
let parallelism = 5
1758+
1759+
let result =
1760+
AsyncSeq.init 50L id
1761+
|> AsyncSeq.mapAsyncUnorderedParallelThrottled parallelism (fun i -> async {
1762+
let c = Interlocked.Increment count
1763+
if c > parallelism then
1764+
return failwith (sprintf "concurrency exceeded: %d > %d" c parallelism)
1765+
do! Async.Sleep 5
1766+
Interlocked.Decrement count |> ignore
1767+
return i * 2L })
1768+
|> AsyncSeq.toListAsync
1769+
|> Async.RunSynchronously
1770+
1771+
Assert.AreEqual(50, result.Length)
1772+
17201773
//[<Test>]
17211774
//let ``AsyncSeq.mapParallelAsyncBounded should maintain order`` () =
17221775
// let ls = List.init 500 id

0 commit comments

Comments
 (0)