Skip to content

Commit 63baea3

Browse files
Repo AssistCopilot
authored andcommitted
Add AsyncSeq.reduce and AsyncSeq.reduceAsync
Adds two new combinators: - reduce: ('T -> 'T -> 'T) -> AsyncSeq<'T> -> Async<'T> - reduceAsync: ('T -> 'T -> Async<'T>) -> AsyncSeq<'T> -> Async<'T> These mirror Seq.reduce: aggregate without an initial state, raising InvalidOperationException on empty sequences. reduce is implemented in terms of reduceAsync for consistency. 5 new tests added covering: non-empty sequences, single-element sequences, empty sequence exception, async reduction, and parity with Seq.reduce. Co-authored-by: Copilot <[email protected]>
1 parent 44f16f0 commit 63baea3

3 files changed

Lines changed: 66 additions & 0 deletions

File tree

src/FSharp.Control.AsyncSeq/AsyncSeq.fs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1145,6 +1145,25 @@ module AsyncSeq =
11451145
let fold f (state:'State) (source : AsyncSeq<'T>) =
11461146
foldAsync (fun st v -> f st v |> async.Return) state source
11471147

1148+
let reduceAsync (f: 'T -> 'T -> Async<'T>) (source: AsyncSeq<'T>) : Async<'T> = async {
1149+
use ie = source.GetEnumerator()
1150+
let! first = ie.MoveNext()
1151+
match first with
1152+
| None -> return raise (InvalidOperationException("The input sequence was empty."))
1153+
| Some v ->
1154+
let acc = ref v
1155+
let! next = ie.MoveNext()
1156+
let b = ref next
1157+
while b.Value.IsSome do
1158+
let! newAcc = f acc.Value b.Value.Value
1159+
acc := newAcc
1160+
let! more = ie.MoveNext()
1161+
b := more
1162+
return acc.Value }
1163+
1164+
let reduce (f: 'T -> 'T -> 'T) (source: AsyncSeq<'T>) : Async<'T> =
1165+
reduceAsync (fun a b -> f a b |> async.Return) source
1166+
11481167
let length (source : AsyncSeq<'T>) =
11491168
fold (fun st _ -> st + 1L) 0L source
11501169

src/FSharp.Control.AsyncSeq/AsyncSeq.fsi

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,14 @@ module AsyncSeq =
206206
/// specified 'aggregation' function.
207207
val fold : folder:('State -> 'T -> 'State) -> state:'State -> source:AsyncSeq<'T> -> Async<'State>
208208

209+
/// Asynchronously reduce the elements of the input asynchronous sequence using the
210+
/// specified asynchronous 'reduction' function. Raises InvalidOperationException if the sequence is empty.
211+
val reduceAsync : reduction:('T -> 'T -> Async<'T>) -> source:AsyncSeq<'T> -> Async<'T>
212+
213+
/// Asynchronously reduce the elements of the input asynchronous sequence using the
214+
/// specified 'reduction' function. Raises InvalidOperationException if the sequence is empty.
215+
val reduce : reduction:('T -> 'T -> 'T) -> source:AsyncSeq<'T> -> Async<'T>
216+
209217
/// Asynchronously sum the elements of the input asynchronous sequence using the specified function.
210218
val inline sum : source:AsyncSeq< ^T > -> Async< ^T>
211219
when ^T : (static member ( + ) : ^T * ^T -> ^T)

tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2329,6 +2329,45 @@ let ``AsyncSeq.fold with empty sequence should return seed``() =
23292329
|> Async.RunSynchronously
23302330
Assert.AreEqual(10, result)
23312331

2332+
[<Test>]
2333+
let ``AsyncSeq.reduce sums non-empty sequence`` () =
2334+
let result = asyncSeq { yield 1; yield 2; yield 3; yield 4; yield 5 }
2335+
|> AsyncSeq.reduce (+)
2336+
|> Async.RunSynchronously
2337+
Assert.AreEqual(15, result)
2338+
2339+
[<Test>]
2340+
let ``AsyncSeq.reduce single element returns that element`` () =
2341+
let result = asyncSeq { yield 42 }
2342+
|> AsyncSeq.reduce (+)
2343+
|> Async.RunSynchronously
2344+
Assert.AreEqual(42, result)
2345+
2346+
[<Test>]
2347+
let ``AsyncSeq.reduce empty sequence raises InvalidOperationException`` () =
2348+
Assert.Throws<InvalidOperationException>(fun () ->
2349+
AsyncSeq.empty<int>
2350+
|> AsyncSeq.reduce (+)
2351+
|> Async.RunSynchronously
2352+
|> ignore) |> ignore
2353+
2354+
[<Test>]
2355+
let ``AsyncSeq.reduceAsync accumulates with async function`` () =
2356+
async {
2357+
let result =
2358+
asyncSeq { yield 10; yield 3; yield 2 }
2359+
|> AsyncSeq.reduceAsync (fun a b -> async { return a - b })
2360+
|> Async.RunSynchronously
2361+
Assert.AreEqual(5, result)
2362+
} |> Async.RunSynchronously
2363+
2364+
[<Test>]
2365+
let ``AsyncSeq.reduce matches Seq.reduce`` () =
2366+
for ls in [ [1]; [1;2]; [3;1;4;1;5;9;2;6] ] do
2367+
let expected = Seq.reduce (+) ls
2368+
let actual = AsyncSeq.ofSeq ls |> AsyncSeq.reduce (+) |> Async.RunSynchronously
2369+
Assert.AreEqual(expected, actual)
2370+
23322371
[<Test>]
23332372
let ``AsyncSeq.ofSeq should work with large sequence``() =
23342373
let largeSeq = seq { 1 .. 100 }

0 commit comments

Comments
 (0)