Skip to content

Commit 4fe2f1a

Browse files
Perf: optimise mapiAsync with direct enumerator
Replace the asyncSeq-builder + collect implementation of mapiAsync with a new OptimizedMapiAsyncEnumerator that iterates the source directly and maintains the index in a mutable field. This mirrors the existing OptimizedMapAsyncEnumerator and eliminates the collect/For overhead that was previously incurred for every element. Also adds AsyncSeqMapiBenchmarks to measure mapAsync vs mapiAsync vs mapi. Co-authored-by: Copilot <[email protected]>
1 parent bc5c1f0 commit 4fe2f1a

3 files changed

Lines changed: 62 additions & 6 deletions

File tree

RELEASE_NOTES.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
### 4.11.0
2+
3+
* Performance: `mapiAsync` — replaced `asyncSeq`-builder + `collect` implementation with a direct optimised enumerator (`OptimizedMapiAsyncEnumerator`), eliminating `collect` overhead and bringing per-element cost in line with `mapAsync`. Benchmarks added in `AsyncSeqMapiBenchmarks`.
4+
15
### 4.10.0
26

37
* Added `AsyncSeq.withCancellation` — returns a new `AsyncSeq` that passes the given `CancellationToken` to `GetAsyncEnumerator`, overriding whatever token would otherwise be supplied. Mirrors `TaskSeq.withCancellation` and is useful when consuming sequences from libraries (e.g. Entity Framework) that accept a cancellation token through `GetAsyncEnumerator`. Part of ongoing design-parity work with FSharp.Control.TaskSeq (see #277).

src/FSharp.Control.AsyncSeq/AsyncSeq.fs

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -924,6 +924,29 @@ module AsyncSeq =
924924
disposed <- true
925925
source.Dispose()
926926

927+
// Optimized mapiAsync enumerator: avoids asyncSeq builder + collect overhead by
928+
// maintaining the index in a mutable field and iterating the source directly.
929+
type private OptimizedMapiAsyncEnumerator<'T, 'TResult>(source: IAsyncSeqEnumerator<'T>, f: int64 -> 'T -> Async<'TResult>) =
930+
let mutable disposed = false
931+
let mutable index = 0L
932+
933+
interface IAsyncSeqEnumerator<'TResult> with
934+
member _.MoveNext() = async {
935+
let! moveResult = source.MoveNext()
936+
match moveResult with
937+
| None -> return None
938+
| Some value ->
939+
let i = index
940+
index <- index + 1L
941+
let! mapped = f i value
942+
return Some mapped
943+
}
944+
945+
member _.Dispose() =
946+
if not disposed then
947+
disposed <- true
948+
source.Dispose()
949+
927950
// Optimized filterAsync enumerator that avoids computation builder overhead
928951
type private OptimizedFilterAsyncEnumerator<'T>(source: IAsyncSeqEnumerator<'T>, f: 'T -> Async<bool>) =
929952
let mutable disposed = false
@@ -980,12 +1003,8 @@ module AsyncSeq =
9801003
| _ ->
9811004
AsyncSeqImpl(fun () -> new OptimizedMapAsyncEnumerator<'T, 'TResult>(source.GetEnumerator(), f) :> IAsyncSeqEnumerator<'TResult>) :> AsyncSeq<'TResult>
9821005

983-
let mapiAsync f (source : AsyncSeq<'T>) : AsyncSeq<'TResult> = asyncSeq {
984-
let i = ref 0L
985-
for itm in source do
986-
let! v = f i.Value itm
987-
i := i.Value + 1L
988-
yield v }
1006+
let mapiAsync f (source : AsyncSeq<'T>) : AsyncSeq<'TResult> =
1007+
AsyncSeqImpl(fun () -> new OptimizedMapiAsyncEnumerator<'T, 'TResult>(source.GetEnumerator(), f) :> IAsyncSeqEnumerator<'TResult>) :> AsyncSeq<'TResult>
9891008

9901009
#if !FABLE_COMPILER
9911010
let mapAsyncParallel (f:'a -> Async<'b>) (s:AsyncSeq<'a>) : AsyncSeq<'b> = asyncSeq {

tests/FSharp.Control.AsyncSeq.Benchmarks/AsyncSeqBenchmarks.fs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,39 @@ type AsyncSeqPipelineBenchmarks() =
179179
|> Async.RunSynchronously
180180
|> ignore
181181

182+
/// Benchmarks for map and mapi variants — ensures the direct-enumerator optimisation
183+
/// for mapiAsync is visible and comparable against mapAsync.
184+
[<MemoryDiagnoser>]
185+
[<SimpleJob(RuntimeMoniker.Net80)>]
186+
type AsyncSeqMapiBenchmarks() =
187+
188+
[<Params(1000, 10000)>]
189+
member val ElementCount = 0 with get, set
190+
191+
/// Baseline: mapAsync (already uses direct enumerator)
192+
[<Benchmark(Baseline = true)>]
193+
member this.MapAsync() =
194+
AsyncSeq.replicate this.ElementCount 1
195+
|> AsyncSeq.mapAsync (fun x -> async.Return (x * 2))
196+
|> AsyncSeq.iterAsync (fun _ -> async.Return())
197+
|> Async.RunSynchronously
198+
199+
/// mapiAsync — now uses direct enumerator; should be close to mapAsync cost
200+
[<Benchmark>]
201+
member this.MapiAsync() =
202+
AsyncSeq.replicate this.ElementCount 1
203+
|> AsyncSeq.mapiAsync (fun i x -> async.Return (i, x * 2))
204+
|> AsyncSeq.iterAsync (fun _ -> async.Return())
205+
|> Async.RunSynchronously
206+
207+
/// mapi — synchronous projection variant; dispatches through mapiAsync
208+
[<Benchmark>]
209+
member this.Mapi() =
210+
AsyncSeq.replicate this.ElementCount 1
211+
|> AsyncSeq.mapi (fun i x -> (i, x * 2))
212+
|> AsyncSeq.iterAsync (fun _ -> async.Return())
213+
|> Async.RunSynchronously
214+
182215
/// Entry point for running benchmarks.
183216
/// Delegates directly to BenchmarkSwitcher so all BenchmarkDotNet CLI options
184217
/// (--filter, --job short, --exporters, etc.) work out of the box.

0 commit comments

Comments
 (0)