Skip to content

Commit cef5b3b

Browse files
authored
Merge branch 'main' into repo-assist/improve-minmax-20260223-54863d68fedaee21
2 parents 5f75e72 + 7e95dcc commit cef5b3b

3 files changed

Lines changed: 234 additions & 0 deletions

File tree

src/FSharp.Control.AsyncSeq/AsyncSeq.fs

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1087,6 +1087,21 @@ module AsyncSeq =
10871087
let! moven = ie.MoveNext()
10881088
b := moven }
10891089

1090+
let windowed (windowSize:int) (source:AsyncSeq<'T>) : AsyncSeq<'T[]> =
1091+
if windowSize < 1 then invalidArg (nameof windowSize) "must be positive"
1092+
asyncSeq {
1093+
let window = System.Collections.Generic.Queue<'T>(windowSize)
1094+
use ie = source.GetEnumerator()
1095+
let! move = ie.MoveNext()
1096+
let b = ref move
1097+
while b.Value.IsSome do
1098+
window.Enqueue(b.Value.Value)
1099+
if window.Count = windowSize then
1100+
yield window.ToArray()
1101+
window.Dequeue() |> ignore
1102+
let! moven = ie.MoveNext()
1103+
b := moven }
1104+
10901105
let pickAsync (f:'T -> Async<'U option>) (source:AsyncSeq<'T>) = async {
10911106
use ie = source.GetEnumerator()
10921107
let! v = ie.MoveNext()
@@ -1145,6 +1160,25 @@ module AsyncSeq =
11451160
let fold f (state:'State) (source : AsyncSeq<'T>) =
11461161
foldAsync (fun st v -> f st v |> async.Return) state source
11471162

1163+
let reduceAsync (f: 'T -> 'T -> Async<'T>) (source: AsyncSeq<'T>) : Async<'T> = async {
1164+
use ie = source.GetEnumerator()
1165+
let! first = ie.MoveNext()
1166+
match first with
1167+
| None -> return raise (InvalidOperationException("The input sequence was empty."))
1168+
| Some v ->
1169+
let acc = ref v
1170+
let! next = ie.MoveNext()
1171+
let b = ref next
1172+
while b.Value.IsSome do
1173+
let! newAcc = f acc.Value b.Value.Value
1174+
acc := newAcc
1175+
let! more = ie.MoveNext()
1176+
b := more
1177+
return acc.Value }
1178+
1179+
let reduce (f: 'T -> 'T -> 'T) (source: AsyncSeq<'T>) : Async<'T> =
1180+
reduceAsync (fun a b -> f a b |> async.Return) source
1181+
11481182
let length (source : AsyncSeq<'T>) =
11491183
fold (fun st _ -> st + 1L) 0L source
11501184

@@ -1191,6 +1225,33 @@ module AsyncSeq =
11911225
let max (source: AsyncSeq<'T>) : Async<'T> =
11921226
maxBy id source
11931227

1228+
let inline sumBy (projection : 'T -> ^U) (source : AsyncSeq<'T>) : Async<^U> =
1229+
fold (fun s x -> s + projection x) LanguagePrimitives.GenericZero source
1230+
1231+
let inline sumByAsync (projection : 'T -> Async<^U>) (source : AsyncSeq<'T>) : Async<^U> =
1232+
foldAsync (fun s x -> async { let! v = projection x in return s + v }) LanguagePrimitives.GenericZero source
1233+
1234+
let inline average (source : AsyncSeq<^T>) : Async<^T> =
1235+
async {
1236+
let! sum, count = fold (fun (s, n) x -> (s + x, n + 1)) (LanguagePrimitives.GenericZero, 0) source
1237+
if count = 0 then invalidArg "source" "The input sequence was empty."
1238+
return LanguagePrimitives.DivideByInt sum count
1239+
}
1240+
1241+
let inline averageBy (projection : 'T -> ^U) (source : AsyncSeq<'T>) : Async<^U> =
1242+
async {
1243+
let! sum, count = fold (fun (s, n) x -> (s + projection x, n + 1)) (LanguagePrimitives.GenericZero, 0) source
1244+
if count = 0 then invalidArg "source" "The input sequence was empty."
1245+
return LanguagePrimitives.DivideByInt sum count
1246+
}
1247+
1248+
let inline averageByAsync (projection : 'T -> Async<^U>) (source : AsyncSeq<'T>) : Async<^U> =
1249+
async {
1250+
let! sum, count = foldAsync (fun (s, n) x -> async { let! v = projection x in return (s + v, n + 1) }) (LanguagePrimitives.GenericZero, 0) source
1251+
if count = 0 then invalidArg "source" "The input sequence was empty."
1252+
return LanguagePrimitives.DivideByInt sum count
1253+
}
1254+
11941255
let scan f (state:'State) (source : AsyncSeq<'T>) =
11951256
scanAsync (fun st v -> f st v |> async.Return) state source
11961257

src/FSharp.Control.AsyncSeq/AsyncSeq.fsi

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,14 @@ module AsyncSeq =
198198
/// singleton input sequence.
199199
val pairwise : source:AsyncSeq<'T> -> AsyncSeq<'T * 'T>
200200

201+
/// Returns an asynchronous sequence that yields sliding windows of the given size
202+
/// over the source sequence, each yielded as an array. The first window is emitted
203+
/// once <c>windowSize</c> elements have been consumed; subsequent windows slide one
204+
/// element at a time. The sequence is empty when the source has fewer than
205+
/// <c>windowSize</c> elements. Raises <c>System.ArgumentException</c> if
206+
/// <c>windowSize</c> is less than 1.
207+
val windowed : windowSize:int -> source:AsyncSeq<'T> -> AsyncSeq<'T []>
208+
201209
/// Asynchronously aggregate the elements of the input asynchronous sequence using the
202210
/// specified asynchronous 'aggregation' function.
203211
val foldAsync : folder:('State -> 'T -> Async<'State>) -> state:'State -> source:AsyncSeq<'T> -> Async<'State>
@@ -206,6 +214,14 @@ module AsyncSeq =
206214
/// specified 'aggregation' function.
207215
val fold : folder:('State -> 'T -> 'State) -> state:'State -> source:AsyncSeq<'T> -> Async<'State>
208216

217+
/// Asynchronously reduce the elements of the input asynchronous sequence using the
218+
/// specified asynchronous 'reduction' function. Raises InvalidOperationException if the sequence is empty.
219+
val reduceAsync : reduction:('T -> 'T -> Async<'T>) -> source:AsyncSeq<'T> -> Async<'T>
220+
221+
/// Asynchronously reduce the elements of the input asynchronous sequence using the
222+
/// specified 'reduction' function. Raises InvalidOperationException if the sequence is empty.
223+
val reduce : reduction:('T -> 'T -> 'T) -> source:AsyncSeq<'T> -> Async<'T>
224+
209225
/// Asynchronously sum the elements of the input asynchronous sequence using the specified function.
210226
val inline sum : source:AsyncSeq< ^T > -> Async< ^T>
211227
when ^T : (static member ( + ) : ^T * ^T -> ^T)
@@ -229,6 +245,37 @@ module AsyncSeq =
229245
/// Asynchronously find the maximum element. Raises InvalidOperationException if the sequence is empty.
230246
val max : source:AsyncSeq<'T> -> Async<'T> when 'T : comparison
231247

248+
/// Asynchronously sum the mapped elements of an asynchronous sequence using a synchronous projection.
249+
val inline sumBy : projection:('T -> ^U) -> source:AsyncSeq<'T> -> Async< ^U>
250+
when ^U : (static member ( + ) : ^U * ^U -> ^U)
251+
and ^U : (static member Zero : ^U)
252+
253+
/// Asynchronously sum the mapped elements of an asynchronous sequence using an asynchronous projection.
254+
val inline sumByAsync : projection:('T -> Async< ^U>) -> source:AsyncSeq<'T> -> Async< ^U>
255+
when ^U : (static member ( + ) : ^U * ^U -> ^U)
256+
and ^U : (static member Zero : ^U)
257+
258+
/// Asynchronously compute the average of the elements of the input asynchronous sequence.
259+
/// Raises InvalidArgumentException if the sequence is empty.
260+
val inline average : source:AsyncSeq< ^T> -> Async< ^T>
261+
when ^T : (static member ( + ) : ^T * ^T -> ^T)
262+
and ^T : (static member DivideByInt : ^T * int -> ^T)
263+
and ^T : (static member Zero : ^T)
264+
265+
/// Asynchronously compute the average of the mapped elements of an asynchronous sequence using a synchronous projection.
266+
/// Raises InvalidArgumentException if the sequence is empty.
267+
val inline averageBy : projection:('T -> ^U) -> source:AsyncSeq<'T> -> Async< ^U>
268+
when ^U : (static member ( + ) : ^U * ^U -> ^U)
269+
and ^U : (static member DivideByInt : ^U * int -> ^U)
270+
and ^U : (static member Zero : ^U)
271+
272+
/// Asynchronously compute the average of the mapped elements of an asynchronous sequence using an asynchronous projection.
273+
/// Raises InvalidArgumentException if the sequence is empty.
274+
val inline averageByAsync : projection:('T -> Async< ^U>) -> source:AsyncSeq<'T> -> Async< ^U>
275+
when ^U : (static member ( + ) : ^U * ^U -> ^U)
276+
and ^U : (static member DivideByInt : ^U * int -> ^U)
277+
and ^U : (static member Zero : ^U)
278+
232279
/// Asynchronously determine if the sequence contains the given value
233280
val contains : value:'T -> source:AsyncSeq<'T> -> Async<bool> when 'T : equality
234281

tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,53 @@ let ``AsyncSeq.maxByAsync uses async projection``() =
259259
let actual = AsyncSeq.ofSeq ls |> AsyncSeq.maxByAsync (fun x -> async.Return x) |> Async.RunSynchronously
260260
Assert.AreEqual(9, actual)
261261

262+
[<Test>]
263+
let ``AsyncSeq.sumBy works``() =
264+
for i in 0 .. 10 do
265+
let ls = [ 1 .. i ]
266+
let actual = AsyncSeq.ofSeq ls |> AsyncSeq.sumBy float |> Async.RunSynchronously
267+
let expected = ls |> List.sumBy float
268+
Assert.AreEqual(expected, actual)
269+
270+
[<Test>]
271+
let ``AsyncSeq.sumByAsync works``() =
272+
for i in 0 .. 10 do
273+
let ls = [ 1 .. i ]
274+
let actual = AsyncSeq.ofSeq ls |> AsyncSeq.sumByAsync (float >> async.Return) |> Async.RunSynchronously
275+
let expected = ls |> List.sumBy float
276+
Assert.AreEqual(expected, actual)
277+
278+
[<Test>]
279+
let ``AsyncSeq.average works``() =
280+
for i in 1 .. 10 do
281+
let ls = [ 1.0 .. float i ]
282+
let actual = AsyncSeq.ofSeq ls |> AsyncSeq.average |> Async.RunSynchronously
283+
let expected = ls |> List.average
284+
Assert.AreEqual(expected, actual)
285+
286+
[<Test>]
287+
let ``AsyncSeq.average raises on empty sequence``() =
288+
Assert.Throws<System.ArgumentException>(fun () ->
289+
AsyncSeq.empty<float> |> AsyncSeq.average |> Async.RunSynchronously |> ignore
290+
) |> ignore
291+
292+
[<Test>]
293+
let ``AsyncSeq.averageBy works``() =
294+
for i in 1 .. 10 do
295+
let ls = [ 1 .. i ]
296+
let actual = AsyncSeq.ofSeq ls |> AsyncSeq.averageBy float |> Async.RunSynchronously
297+
let expected = ls |> List.averageBy float
298+
Assert.AreEqual(expected, actual)
299+
300+
[<Test>]
301+
let ``AsyncSeq.averageByAsync works``() =
302+
for i in 1 .. 10 do
303+
let ls = [ 1 .. i ]
304+
let actual = AsyncSeq.ofSeq ls |> AsyncSeq.averageByAsync (float >> async.Return) |> Async.RunSynchronously
305+
let expected = ls |> List.averageBy float
306+
Assert.AreEqual(expected, actual)
307+
308+
262309
[<Test>]
263310
let ``AsyncSeq.length works``() =
264311
for i in 0 .. 10 do
@@ -2376,6 +2423,45 @@ let ``AsyncSeq.fold with empty sequence should return seed``() =
23762423
|> Async.RunSynchronously
23772424
Assert.AreEqual(10, result)
23782425

2426+
[<Test>]
2427+
let ``AsyncSeq.reduce sums non-empty sequence`` () =
2428+
let result = asyncSeq { yield 1; yield 2; yield 3; yield 4; yield 5 }
2429+
|> AsyncSeq.reduce (+)
2430+
|> Async.RunSynchronously
2431+
Assert.AreEqual(15, result)
2432+
2433+
[<Test>]
2434+
let ``AsyncSeq.reduce single element returns that element`` () =
2435+
let result = asyncSeq { yield 42 }
2436+
|> AsyncSeq.reduce (+)
2437+
|> Async.RunSynchronously
2438+
Assert.AreEqual(42, result)
2439+
2440+
[<Test>]
2441+
let ``AsyncSeq.reduce empty sequence raises InvalidOperationException`` () =
2442+
Assert.Throws<InvalidOperationException>(fun () ->
2443+
AsyncSeq.empty<int>
2444+
|> AsyncSeq.reduce (+)
2445+
|> Async.RunSynchronously
2446+
|> ignore) |> ignore
2447+
2448+
[<Test>]
2449+
let ``AsyncSeq.reduceAsync accumulates with async function`` () =
2450+
async {
2451+
let result =
2452+
asyncSeq { yield 10; yield 3; yield 2 }
2453+
|> AsyncSeq.reduceAsync (fun a b -> async { return a - b })
2454+
|> Async.RunSynchronously
2455+
Assert.AreEqual(5, result)
2456+
} |> Async.RunSynchronously
2457+
2458+
[<Test>]
2459+
let ``AsyncSeq.reduce matches Seq.reduce`` () =
2460+
for ls in [ [1]; [1;2]; [3;1;4;1;5;9;2;6] ] do
2461+
let expected = Seq.reduce (+) ls
2462+
let actual = AsyncSeq.ofSeq ls |> AsyncSeq.reduce (+) |> Async.RunSynchronously
2463+
Assert.AreEqual(expected, actual)
2464+
23792465
[<Test>]
23802466
let ``AsyncSeq.ofSeq should work with large sequence``() =
23812467
let largeSeq = seq { 1 .. 100 }
@@ -2600,6 +2686,46 @@ let ``AsyncSeq.pairwise with three elements should produce two pairs`` () =
26002686
let result = AsyncSeq.pairwise source |> AsyncSeq.toListSynchronously
26012687
Assert.AreEqual([(1, 2); (2, 3)], result)
26022688

2689+
[<Test>]
2690+
let ``AsyncSeq.windowed empty sequence returns empty`` () =
2691+
let result = AsyncSeq.windowed 3 AsyncSeq.empty<int> |> AsyncSeq.toListSynchronously
2692+
Assert.AreEqual([], result)
2693+
2694+
[<Test>]
2695+
let ``AsyncSeq.windowed fewer elements than window returns empty`` () =
2696+
let source = asyncSeq { yield 1; yield 2 }
2697+
let result = AsyncSeq.windowed 3 source |> AsyncSeq.toListSynchronously
2698+
Assert.AreEqual([], result)
2699+
2700+
[<Test>]
2701+
let ``AsyncSeq.windowed exact window size returns single window`` () =
2702+
let source = asyncSeq { yield 1; yield 2; yield 3 }
2703+
let result = AsyncSeq.windowed 3 source |> AsyncSeq.toListSynchronously
2704+
Assert.AreEqual([[|1; 2; 3|]], result)
2705+
2706+
[<Test>]
2707+
let ``AsyncSeq.windowed sliding window produces correct windows`` () =
2708+
let source = asyncSeq { yield 1; yield 2; yield 3; yield 4; yield 5 }
2709+
let result = AsyncSeq.windowed 3 source |> AsyncSeq.toListSynchronously
2710+
Assert.AreEqual([[|1;2;3|]; [|2;3;4|]; [|3;4;5|]], result)
2711+
2712+
[<Test>]
2713+
let ``AsyncSeq.windowed size 1 returns each element as singleton array`` () =
2714+
let source = asyncSeq { yield 10; yield 20; yield 30 }
2715+
let result = AsyncSeq.windowed 1 source |> AsyncSeq.toListSynchronously
2716+
Assert.AreEqual([[|10|]; [|20|]; [|30|]], result)
2717+
2718+
[<Test>]
2719+
let ``AsyncSeq.windowed size 2 is equivalent to pairwise as arrays`` () =
2720+
let source = asyncSeq { yield 1; yield 2; yield 3; yield 4 }
2721+
let result = AsyncSeq.windowed 2 source |> AsyncSeq.toListSynchronously
2722+
Assert.AreEqual([[|1;2|]; [|2;3|]; [|3;4|]], result)
2723+
2724+
[<Test>]
2725+
let ``AsyncSeq.windowed with size 0 raises ArgumentException`` () =
2726+
Assert.Throws<System.ArgumentException>(fun () ->
2727+
AsyncSeq.windowed 0 (asyncSeq { yield 1 }) |> AsyncSeq.toListSynchronously |> ignore) |> ignore
2728+
26032729
[<Test>]
26042730
let ``AsyncSeq.distinctUntilChangedWith should work with custom equality`` () =
26052731
let source = asyncSeq { yield "a"; yield "A"; yield "B"; yield "b"; yield "c" }

0 commit comments

Comments
 (0)