Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
### 4.11.0

* Performance: `mapiAsync` — replaced `asyncSeq`-builder + `collect` implementation with a direct optimised enumerator (`OptimizedMapiAsyncEnumerator`), eliminating `collect` overhead and bringing per-element cost in line with `mapAsync`. Benchmarks added in `AsyncSeqMapiBenchmarks`.

### 4.10.0

* Added `AsyncSeq.withCancellation` — returns a new `AsyncSeq` that passes the given `CancellationToken` to `GetAsyncEnumerator`, overriding whatever token would otherwise be supplied. Mirrors `TaskSeq.withCancellation` and is useful when consuming sequences from libraries (e.g. Entity Framework) that accept a cancellation token through `GetAsyncEnumerator`. Part of ongoing design-parity work with FSharp.Control.TaskSeq (see #277).
Expand Down
31 changes: 25 additions & 6 deletions src/FSharp.Control.AsyncSeq/AsyncSeq.fs
Original file line number Diff line number Diff line change
Expand Up @@ -924,6 +924,29 @@
disposed <- true
source.Dispose()

// Optimized mapiAsync enumerator: avoids asyncSeq builder + collect overhead by
// maintaining the index in a mutable field and iterating the source directly.
type private OptimizedMapiAsyncEnumerator<'T, 'TResult>(source: IAsyncSeqEnumerator<'T>, f: int64 -> 'T -> Async<'TResult>) =
let mutable disposed = false
let mutable index = 0L

interface IAsyncSeqEnumerator<'TResult> with
member _.MoveNext() = async {
let! moveResult = source.MoveNext()
match moveResult with
| None -> return None
| Some value ->
let i = index
index <- index + 1L
let! mapped = f i value
return Some mapped
}

member _.Dispose() =
if not disposed then
disposed <- true
source.Dispose()

// Optimized filterAsync enumerator that avoids computation builder overhead
type private OptimizedFilterAsyncEnumerator<'T>(source: IAsyncSeqEnumerator<'T>, f: 'T -> Async<bool>) =
let mutable disposed = false
Expand Down Expand Up @@ -980,12 +1003,8 @@
| _ ->
AsyncSeqImpl(fun () -> new OptimizedMapAsyncEnumerator<'T, 'TResult>(source.GetEnumerator(), f) :> IAsyncSeqEnumerator<'TResult>) :> AsyncSeq<'TResult>

let mapiAsync f (source : AsyncSeq<'T>) : AsyncSeq<'TResult> = asyncSeq {
let i = ref 0L
for itm in source do
let! v = f i.Value itm
i := i.Value + 1L
yield v }
let mapiAsync f (source : AsyncSeq<'T>) : AsyncSeq<'TResult> =
AsyncSeqImpl(fun () -> new OptimizedMapiAsyncEnumerator<'T, 'TResult>(source.GetEnumerator(), f) :> IAsyncSeqEnumerator<'TResult>) :> AsyncSeq<'TResult>

#if !FABLE_COMPILER
let mapAsyncParallel (f:'a -> Async<'b>) (s:AsyncSeq<'a>) : AsyncSeq<'b> = asyncSeq {
Expand Down Expand Up @@ -2450,7 +2469,7 @@

[<CompilerMessage("The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.", 9999)>]
let groupBy (p:'a -> 'k) (s:AsyncSeq<'a>) : AsyncSeq<'k * AsyncSeq<'a>> =
groupByAsync (p >> async.Return) s

Check warning on line 2472 in src/FSharp.Control.AsyncSeq/AsyncSeq.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.

Check warning on line 2472 in src/FSharp.Control.AsyncSeq/AsyncSeq.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
#endif
#endif

Expand Down
33 changes: 33 additions & 0 deletions tests/FSharp.Control.AsyncSeq.Benchmarks/AsyncSeqBenchmarks.fs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,39 @@ type AsyncSeqPipelineBenchmarks() =
|> Async.RunSynchronously
|> ignore

/// Benchmarks for map and mapi variants — ensures the direct-enumerator optimisation
/// for mapiAsync is visible and comparable against mapAsync.
[<MemoryDiagnoser>]
[<SimpleJob(RuntimeMoniker.Net80)>]
type AsyncSeqMapiBenchmarks() =

[<Params(1000, 10000)>]
member val ElementCount = 0 with get, set

/// Baseline: mapAsync (already uses direct enumerator)
[<Benchmark(Baseline = true)>]
member this.MapAsync() =
AsyncSeq.replicate this.ElementCount 1
|> AsyncSeq.mapAsync (fun x -> async.Return (x * 2))
|> AsyncSeq.iterAsync (fun _ -> async.Return())
|> Async.RunSynchronously

/// mapiAsync — now uses direct enumerator; should be close to mapAsync cost
[<Benchmark>]
member this.MapiAsync() =
AsyncSeq.replicate this.ElementCount 1
|> AsyncSeq.mapiAsync (fun i x -> async.Return (i, x * 2))
|> AsyncSeq.iterAsync (fun _ -> async.Return())
|> Async.RunSynchronously

/// mapi — synchronous projection variant; dispatches through mapiAsync
[<Benchmark>]
member this.Mapi() =
AsyncSeq.replicate this.ElementCount 1
|> AsyncSeq.mapi (fun i x -> (i, x * 2))
|> AsyncSeq.iterAsync (fun _ -> async.Return())
|> Async.RunSynchronously

/// Entry point for running benchmarks.
/// Delegates directly to BenchmarkSwitcher so all BenchmarkDotNet CLI options
/// (--filter, --job short, --exporters, etc.) work out of the box.
Expand Down
Loading