Skip to content

Commit 7e95dcc

Browse files
github-actions[bot]Repo AssistCopilot
authored
Add AsyncSeq.reduce and AsyncSeq.reduceAsync (#242)
Adds two new combinators: - reduce: ('T -> 'T -> 'T) -> AsyncSeq<'T> -> Async<'T> - reduceAsync: ('T -> 'T -> Async<'T>) -> AsyncSeq<'T> -> Async<'T> These mirror Seq.reduce: aggregate without an initial state, raising InvalidOperationException on empty sequences. reduce is implemented in terms of reduceAsync for consistency. 5 new tests added covering: non-empty sequences, single-element sequences, empty sequence exception, async reduction, and parity with Seq.reduce. Co-authored-by: Repo Assist <[email protected]> Co-authored-by: Copilot <[email protected]>
1 parent 2194542 commit 7e95dcc

3 files changed

Lines changed: 66 additions & 0 deletions

File tree

src/FSharp.Control.AsyncSeq/AsyncSeq.fs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1160,6 +1160,25 @@ module AsyncSeq =
11601160
let fold f (state:'State) (source : AsyncSeq<'T>) =
11611161
foldAsync (fun st v -> f st v |> async.Return) state source
11621162

1163+
let reduceAsync (f: 'T -> 'T -> Async<'T>) (source: AsyncSeq<'T>) : Async<'T> = async {
1164+
use ie = source.GetEnumerator()
1165+
let! first = ie.MoveNext()
1166+
match first with
1167+
| None -> return raise (InvalidOperationException("The input sequence was empty."))
1168+
| Some v ->
1169+
let acc = ref v
1170+
let! next = ie.MoveNext()
1171+
let b = ref next
1172+
while b.Value.IsSome do
1173+
let! newAcc = f acc.Value b.Value.Value
1174+
acc := newAcc
1175+
let! more = ie.MoveNext()
1176+
b := more
1177+
return acc.Value }
1178+
1179+
let reduce (f: 'T -> 'T -> 'T) (source: AsyncSeq<'T>) : Async<'T> =
1180+
reduceAsync (fun a b -> f a b |> async.Return) source
1181+
11631182
let length (source : AsyncSeq<'T>) =
11641183
fold (fun st _ -> st + 1L) 0L source
11651184

src/FSharp.Control.AsyncSeq/AsyncSeq.fsi

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,14 @@ module AsyncSeq =
214214
/// specified 'aggregation' function.
215215
val fold : folder:('State -> 'T -> 'State) -> state:'State -> source:AsyncSeq<'T> -> Async<'State>
216216

217+
/// Asynchronously reduce the elements of the input asynchronous sequence using the
218+
/// specified asynchronous 'reduction' function. Raises InvalidOperationException if the sequence is empty.
219+
val reduceAsync : reduction:('T -> 'T -> Async<'T>) -> source:AsyncSeq<'T> -> Async<'T>
220+
221+
/// Asynchronously reduce the elements of the input asynchronous sequence using the
222+
/// specified 'reduction' function. Raises InvalidOperationException if the sequence is empty.
223+
val reduce : reduction:('T -> 'T -> 'T) -> source:AsyncSeq<'T> -> Async<'T>
224+
217225
/// Asynchronously sum the elements of the input asynchronous sequence using the specified function.
218226
val inline sum : source:AsyncSeq< ^T > -> Async< ^T>
219227
when ^T : (static member ( + ) : ^T * ^T -> ^T)

tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2375,6 +2375,45 @@ let ``AsyncSeq.fold with empty sequence should return seed``() =
23752375
|> Async.RunSynchronously
23762376
Assert.AreEqual(10, result)
23772377

2378+
[<Test>]
2379+
let ``AsyncSeq.reduce sums non-empty sequence`` () =
2380+
let result = asyncSeq { yield 1; yield 2; yield 3; yield 4; yield 5 }
2381+
|> AsyncSeq.reduce (+)
2382+
|> Async.RunSynchronously
2383+
Assert.AreEqual(15, result)
2384+
2385+
[<Test>]
2386+
let ``AsyncSeq.reduce single element returns that element`` () =
2387+
let result = asyncSeq { yield 42 }
2388+
|> AsyncSeq.reduce (+)
2389+
|> Async.RunSynchronously
2390+
Assert.AreEqual(42, result)
2391+
2392+
[<Test>]
2393+
let ``AsyncSeq.reduce empty sequence raises InvalidOperationException`` () =
2394+
Assert.Throws<InvalidOperationException>(fun () ->
2395+
AsyncSeq.empty<int>
2396+
|> AsyncSeq.reduce (+)
2397+
|> Async.RunSynchronously
2398+
|> ignore) |> ignore
2399+
2400+
[<Test>]
2401+
let ``AsyncSeq.reduceAsync accumulates with async function`` () =
2402+
async {
2403+
let result =
2404+
asyncSeq { yield 10; yield 3; yield 2 }
2405+
|> AsyncSeq.reduceAsync (fun a b -> async { return a - b })
2406+
|> Async.RunSynchronously
2407+
Assert.AreEqual(5, result)
2408+
} |> Async.RunSynchronously
2409+
2410+
[<Test>]
2411+
let ``AsyncSeq.reduce matches Seq.reduce`` () =
2412+
for ls in [ [1]; [1;2]; [3;1;4;1;5;9;2;6] ] do
2413+
let expected = Seq.reduce (+) ls
2414+
let actual = AsyncSeq.ofSeq ls |> AsyncSeq.reduce (+) |> Async.RunSynchronously
2415+
Assert.AreEqual(expected, actual)
2416+
23782417
[<Test>]
23792418
let ``AsyncSeq.ofSeq should work with large sequence``() =
23802419
let largeSeq = seq { 1 .. 100 }

0 commit comments

Comments
 (0)