Skip to content

Commit d4617de

Browse files
GitHub CopilotCopilot
authored andcommitted
Add AsyncSeq.mapAsyncUnorderedParallelThrottled
Adds a new function that applies an async mapping in parallel (unordered results) while limiting concurrency to a specified number of tasks. Combines the patterns from mapAsyncUnorderedParallel (unordered emission) and iterAsyncParallelThrottled (SemaphoreSlim-based throttling). Closes #31 Co-authored-by: Copilot <[email protected]>
1 parent 7a8c9bc commit d4617de

2 files changed

Lines changed: 36 additions & 0 deletions

File tree

src/FSharp.Control.AsyncSeq/AsyncSeq.fs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -925,6 +925,34 @@ module AsyncSeq =
925925
| Choice1Of2 value -> return value
926926
| Choice2Of2 ex -> return raise ex })
927927
}
928+
929+
let mapAsyncUnorderedParallelThrottled (parallelism:int) (f:'a -> Async<'b>) (s:AsyncSeq<'a>) : AsyncSeq<'b> = asyncSeq {
930+
use mb = MailboxProcessor.Start (fun _ -> async.Return())
931+
use sm = new SemaphoreSlim(parallelism)
932+
let! err =
933+
s
934+
|> iterAsync (fun a -> async {
935+
do! sm.WaitAsync () |> Async.awaitTaskUnitCancellationAsError
936+
let! b = Async.StartChild (async {
937+
try
938+
let! result = f a
939+
sm.Release() |> ignore
940+
return Choice1Of2 result
941+
with ex ->
942+
sm.Release() |> ignore
943+
return Choice2Of2 ex
944+
})
945+
mb.Post (Some b) })
946+
|> Async.map (fun _ -> mb.Post None)
947+
|> Async.StartChildAsTask
948+
yield!
949+
replicateUntilNoneAsync (Task.chooseTask (err |> Task.taskFault) (async.Delay mb.Receive))
950+
|> mapAsync (fun childAsync -> async {
951+
let! result = childAsync
952+
match result with
953+
| Choice1Of2 value -> return value
954+
| Choice2Of2 ex -> return raise ex })
955+
}
928956
#endif
929957

930958
let chooseAsync f (source:AsyncSeq<'T>) =

src/FSharp.Control.AsyncSeq/AsyncSeq.fsi

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -543,6 +543,14 @@ module AsyncSeq =
543543
/// Parallelism is bound by the ThreadPool.
544544
val mapAsyncUnorderedParallel : mapping:('T -> Async<'U>) -> s:AsyncSeq<'T> -> AsyncSeq<'U>
545545

546+
/// Builds a new asynchronous sequence whose elements are generated by
547+
/// applying the specified function to all elements of the input sequence,
548+
/// with at most <c>parallelism</c> mapping operations running concurrently.
549+
///
550+
/// The function is applied to elements in parallel (throttled), and results are emitted
551+
/// in the order they complete (unordered), without preserving the original order.
552+
val mapAsyncUnorderedParallelThrottled : parallelism:int -> mapping:('T -> Async<'U>) -> s:AsyncSeq<'T> -> AsyncSeq<'U>
553+
546554
/// Applies a key-generating function to each element and returns an async sequence containing unique keys
547555
/// and async sequences containing elements corresponding to the key.
548556
///

0 commit comments

Comments
 (0)