Skip to content

Commit 12224aa

Browse files
authored
Merge branch 'main' into repo-assist/add-contributing-guidelines-c9f0ce702280ab97
2 parents e510a68 + 7e95dcc commit 12224aa

3 files changed

Lines changed: 233 additions & 0 deletions

File tree

src/FSharp.Control.AsyncSeq/AsyncSeq.fs

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1087,6 +1087,21 @@ module AsyncSeq =
10871087
let! moven = ie.MoveNext()
10881088
b := moven }
10891089

1090+
let windowed (windowSize:int) (source:AsyncSeq<'T>) : AsyncSeq<'T[]> =
1091+
if windowSize < 1 then invalidArg (nameof windowSize) "must be positive"
1092+
asyncSeq {
1093+
let window = System.Collections.Generic.Queue<'T>(windowSize)
1094+
use ie = source.GetEnumerator()
1095+
let! move = ie.MoveNext()
1096+
let b = ref move
1097+
while b.Value.IsSome do
1098+
window.Enqueue(b.Value.Value)
1099+
if window.Count = windowSize then
1100+
yield window.ToArray()
1101+
window.Dequeue() |> ignore
1102+
let! moven = ie.MoveNext()
1103+
b := moven }
1104+
10901105
let pickAsync (f:'T -> Async<'U option>) (source:AsyncSeq<'T>) = async {
10911106
use ie = source.GetEnumerator()
10921107
let! v = ie.MoveNext()
@@ -1145,12 +1160,58 @@ module AsyncSeq =
11451160
let fold f (state:'State) (source : AsyncSeq<'T>) =
11461161
foldAsync (fun st v -> f st v |> async.Return) state source
11471162

1163+
let reduceAsync (f: 'T -> 'T -> Async<'T>) (source: AsyncSeq<'T>) : Async<'T> = async {
1164+
use ie = source.GetEnumerator()
1165+
let! first = ie.MoveNext()
1166+
match first with
1167+
| None -> return raise (InvalidOperationException("The input sequence was empty."))
1168+
| Some v ->
1169+
let acc = ref v
1170+
let! next = ie.MoveNext()
1171+
let b = ref next
1172+
while b.Value.IsSome do
1173+
let! newAcc = f acc.Value b.Value.Value
1174+
acc := newAcc
1175+
let! more = ie.MoveNext()
1176+
b := more
1177+
return acc.Value }
1178+
1179+
let reduce (f: 'T -> 'T -> 'T) (source: AsyncSeq<'T>) : Async<'T> =
1180+
reduceAsync (fun a b -> f a b |> async.Return) source
1181+
11481182
let length (source : AsyncSeq<'T>) =
11491183
fold (fun st _ -> st + 1L) 0L source
11501184

11511185
let inline sum (source : AsyncSeq<'T>) : Async<'T> =
11521186
(LanguagePrimitives.GenericZero, source) ||> fold (+)
11531187

1188+
let inline sumBy (projection : 'T -> ^U) (source : AsyncSeq<'T>) : Async<^U> =
1189+
fold (fun s x -> s + projection x) LanguagePrimitives.GenericZero source
1190+
1191+
let inline sumByAsync (projection : 'T -> Async<^U>) (source : AsyncSeq<'T>) : Async<^U> =
1192+
foldAsync (fun s x -> async { let! v = projection x in return s + v }) LanguagePrimitives.GenericZero source
1193+
1194+
let inline average (source : AsyncSeq<^T>) : Async<^T> =
1195+
async {
1196+
let! sum, count = fold (fun (s, n) x -> (s + x, n + 1)) (LanguagePrimitives.GenericZero, 0) source
1197+
if count = 0 then invalidArg "source" "The input sequence was empty."
1198+
return LanguagePrimitives.DivideByInt sum count
1199+
}
1200+
1201+
let inline averageBy (projection : 'T -> ^U) (source : AsyncSeq<'T>) : Async<^U> =
1202+
async {
1203+
let! sum, count = fold (fun (s, n) x -> (s + projection x, n + 1)) (LanguagePrimitives.GenericZero, 0) source
1204+
if count = 0 then invalidArg "source" "The input sequence was empty."
1205+
return LanguagePrimitives.DivideByInt sum count
1206+
}
1207+
1208+
let inline averageByAsync (projection : 'T -> Async<^U>) (source : AsyncSeq<'T>) : Async<^U> =
1209+
async {
1210+
let! sum, count = foldAsync (fun (s, n) x -> async { let! v = projection x in return (s + v, n + 1) }) (LanguagePrimitives.GenericZero, 0) source
1211+
if count = 0 then invalidArg "source" "The input sequence was empty."
1212+
return LanguagePrimitives.DivideByInt sum count
1213+
}
1214+
11541215
let scan f (state:'State) (source : AsyncSeq<'T>) =
11551216
scanAsync (fun st v -> f st v |> async.Return) state source
11561217

src/FSharp.Control.AsyncSeq/AsyncSeq.fsi

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,14 @@ module AsyncSeq =
198198
/// singleton input sequence.
199199
val pairwise : source:AsyncSeq<'T> -> AsyncSeq<'T * 'T>
200200

201+
/// Returns an asynchronous sequence that yields sliding windows of the given size
202+
/// over the source sequence, each yielded as an array. The first window is emitted
203+
/// once <c>windowSize</c> elements have been consumed; subsequent windows slide one
204+
/// element at a time. The sequence is empty when the source has fewer than
205+
/// <c>windowSize</c> elements. Raises <c>System.ArgumentException</c> if
206+
/// <c>windowSize</c> is less than 1.
207+
val windowed : windowSize:int -> source:AsyncSeq<'T> -> AsyncSeq<'T []>
208+
201209
/// Asynchronously aggregate the elements of the input asynchronous sequence using the
202210
/// specified asynchronous 'aggregation' function.
203211
val foldAsync : folder:('State -> 'T -> Async<'State>) -> state:'State -> source:AsyncSeq<'T> -> Async<'State>
@@ -206,11 +214,50 @@ module AsyncSeq =
206214
/// specified 'aggregation' function.
207215
val fold : folder:('State -> 'T -> 'State) -> state:'State -> source:AsyncSeq<'T> -> Async<'State>
208216

217+
/// Asynchronously reduce the elements of the input asynchronous sequence using the
218+
/// specified asynchronous 'reduction' function. Raises InvalidOperationException if the sequence is empty.
219+
val reduceAsync : reduction:('T -> 'T -> Async<'T>) -> source:AsyncSeq<'T> -> Async<'T>
220+
221+
/// Asynchronously reduce the elements of the input asynchronous sequence using the
222+
/// specified 'reduction' function. Raises InvalidOperationException if the sequence is empty.
223+
val reduce : reduction:('T -> 'T -> 'T) -> source:AsyncSeq<'T> -> Async<'T>
224+
209225
/// Asynchronously sum the elements of the input asynchronous sequence using the specified function.
210226
val inline sum : source:AsyncSeq< ^T > -> Async< ^T>
211227
when ^T : (static member ( + ) : ^T * ^T -> ^T)
212228
and ^T : (static member Zero : ^T)
213229

230+
/// Asynchronously sum the mapped elements of an asynchronous sequence using a synchronous projection.
231+
val inline sumBy : projection:('T -> ^U) -> source:AsyncSeq<'T> -> Async< ^U>
232+
when ^U : (static member ( + ) : ^U * ^U -> ^U)
233+
and ^U : (static member Zero : ^U)
234+
235+
/// Asynchronously sum the mapped elements of an asynchronous sequence using an asynchronous projection.
236+
val inline sumByAsync : projection:('T -> Async< ^U>) -> source:AsyncSeq<'T> -> Async< ^U>
237+
when ^U : (static member ( + ) : ^U * ^U -> ^U)
238+
and ^U : (static member Zero : ^U)
239+
240+
/// Asynchronously compute the average of the elements of the input asynchronous sequence.
241+
/// Raises InvalidArgumentException if the sequence is empty.
242+
val inline average : source:AsyncSeq< ^T> -> Async< ^T>
243+
when ^T : (static member ( + ) : ^T * ^T -> ^T)
244+
and ^T : (static member DivideByInt : ^T * int -> ^T)
245+
and ^T : (static member Zero : ^T)
246+
247+
/// Asynchronously compute the average of the mapped elements of an asynchronous sequence using a synchronous projection.
248+
/// Raises InvalidArgumentException if the sequence is empty.
249+
val inline averageBy : projection:('T -> ^U) -> source:AsyncSeq<'T> -> Async< ^U>
250+
when ^U : (static member ( + ) : ^U * ^U -> ^U)
251+
and ^U : (static member DivideByInt : ^U * int -> ^U)
252+
and ^U : (static member Zero : ^U)
253+
254+
/// Asynchronously compute the average of the mapped elements of an asynchronous sequence using an asynchronous projection.
255+
/// Raises InvalidArgumentException if the sequence is empty.
256+
val inline averageByAsync : projection:('T -> Async< ^U>) -> source:AsyncSeq<'T> -> Async< ^U>
257+
when ^U : (static member ( + ) : ^U * ^U -> ^U)
258+
and ^U : (static member DivideByInt : ^U * int -> ^U)
259+
and ^U : (static member Zero : ^U)
260+
214261
/// Asynchronously determine if the sequence contains the given value
215262
val contains : value:'T -> source:AsyncSeq<'T> -> Async<bool> when 'T : equality
216263

tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,52 @@ let ``AsyncSeq.sum works``() =
211211
let expected = ls |> List.sum
212212
Assert.True((expected = actual))
213213

214+
[<Test>]
215+
let ``AsyncSeq.sumBy works``() =
216+
for i in 0 .. 10 do
217+
let ls = [ 1 .. i ]
218+
let actual = AsyncSeq.ofSeq ls |> AsyncSeq.sumBy float |> Async.RunSynchronously
219+
let expected = ls |> List.sumBy float
220+
Assert.AreEqual(expected, actual)
221+
222+
[<Test>]
223+
let ``AsyncSeq.sumByAsync works``() =
224+
for i in 0 .. 10 do
225+
let ls = [ 1 .. i ]
226+
let actual = AsyncSeq.ofSeq ls |> AsyncSeq.sumByAsync (float >> async.Return) |> Async.RunSynchronously
227+
let expected = ls |> List.sumBy float
228+
Assert.AreEqual(expected, actual)
229+
230+
[<Test>]
231+
let ``AsyncSeq.average works``() =
232+
for i in 1 .. 10 do
233+
let ls = [ 1.0 .. float i ]
234+
let actual = AsyncSeq.ofSeq ls |> AsyncSeq.average |> Async.RunSynchronously
235+
let expected = ls |> List.average
236+
Assert.AreEqual(expected, actual)
237+
238+
[<Test>]
239+
let ``AsyncSeq.average raises on empty sequence``() =
240+
Assert.Throws<System.ArgumentException>(fun () ->
241+
AsyncSeq.empty<float> |> AsyncSeq.average |> Async.RunSynchronously |> ignore
242+
) |> ignore
243+
244+
[<Test>]
245+
let ``AsyncSeq.averageBy works``() =
246+
for i in 1 .. 10 do
247+
let ls = [ 1 .. i ]
248+
let actual = AsyncSeq.ofSeq ls |> AsyncSeq.averageBy float |> Async.RunSynchronously
249+
let expected = ls |> List.averageBy float
250+
Assert.AreEqual(expected, actual)
251+
252+
[<Test>]
253+
let ``AsyncSeq.averageByAsync works``() =
254+
for i in 1 .. 10 do
255+
let ls = [ 1 .. i ]
256+
let actual = AsyncSeq.ofSeq ls |> AsyncSeq.averageByAsync (float >> async.Return) |> Async.RunSynchronously
257+
let expected = ls |> List.averageBy float
258+
Assert.AreEqual(expected, actual)
259+
214260

215261
[<Test>]
216262
let ``AsyncSeq.length works``() =
@@ -2329,6 +2375,45 @@ let ``AsyncSeq.fold with empty sequence should return seed``() =
23292375
|> Async.RunSynchronously
23302376
Assert.AreEqual(10, result)
23312377

2378+
[<Test>]
2379+
let ``AsyncSeq.reduce sums non-empty sequence`` () =
2380+
let result = asyncSeq { yield 1; yield 2; yield 3; yield 4; yield 5 }
2381+
|> AsyncSeq.reduce (+)
2382+
|> Async.RunSynchronously
2383+
Assert.AreEqual(15, result)
2384+
2385+
[<Test>]
2386+
let ``AsyncSeq.reduce single element returns that element`` () =
2387+
let result = asyncSeq { yield 42 }
2388+
|> AsyncSeq.reduce (+)
2389+
|> Async.RunSynchronously
2390+
Assert.AreEqual(42, result)
2391+
2392+
[<Test>]
2393+
let ``AsyncSeq.reduce empty sequence raises InvalidOperationException`` () =
2394+
Assert.Throws<InvalidOperationException>(fun () ->
2395+
AsyncSeq.empty<int>
2396+
|> AsyncSeq.reduce (+)
2397+
|> Async.RunSynchronously
2398+
|> ignore) |> ignore
2399+
2400+
[<Test>]
2401+
let ``AsyncSeq.reduceAsync accumulates with async function`` () =
2402+
async {
2403+
let result =
2404+
asyncSeq { yield 10; yield 3; yield 2 }
2405+
|> AsyncSeq.reduceAsync (fun a b -> async { return a - b })
2406+
|> Async.RunSynchronously
2407+
Assert.AreEqual(5, result)
2408+
} |> Async.RunSynchronously
2409+
2410+
[<Test>]
2411+
let ``AsyncSeq.reduce matches Seq.reduce`` () =
2412+
for ls in [ [1]; [1;2]; [3;1;4;1;5;9;2;6] ] do
2413+
let expected = Seq.reduce (+) ls
2414+
let actual = AsyncSeq.ofSeq ls |> AsyncSeq.reduce (+) |> Async.RunSynchronously
2415+
Assert.AreEqual(expected, actual)
2416+
23322417
[<Test>]
23332418
let ``AsyncSeq.ofSeq should work with large sequence``() =
23342419
let largeSeq = seq { 1 .. 100 }
@@ -2553,6 +2638,46 @@ let ``AsyncSeq.pairwise with three elements should produce two pairs`` () =
25532638
let result = AsyncSeq.pairwise source |> AsyncSeq.toListSynchronously
25542639
Assert.AreEqual([(1, 2); (2, 3)], result)
25552640

2641+
[<Test>]
2642+
let ``AsyncSeq.windowed empty sequence returns empty`` () =
2643+
let result = AsyncSeq.windowed 3 AsyncSeq.empty<int> |> AsyncSeq.toListSynchronously
2644+
Assert.AreEqual([], result)
2645+
2646+
[<Test>]
2647+
let ``AsyncSeq.windowed fewer elements than window returns empty`` () =
2648+
let source = asyncSeq { yield 1; yield 2 }
2649+
let result = AsyncSeq.windowed 3 source |> AsyncSeq.toListSynchronously
2650+
Assert.AreEqual([], result)
2651+
2652+
[<Test>]
2653+
let ``AsyncSeq.windowed exact window size returns single window`` () =
2654+
let source = asyncSeq { yield 1; yield 2; yield 3 }
2655+
let result = AsyncSeq.windowed 3 source |> AsyncSeq.toListSynchronously
2656+
Assert.AreEqual([[|1; 2; 3|]], result)
2657+
2658+
[<Test>]
2659+
let ``AsyncSeq.windowed sliding window produces correct windows`` () =
2660+
let source = asyncSeq { yield 1; yield 2; yield 3; yield 4; yield 5 }
2661+
let result = AsyncSeq.windowed 3 source |> AsyncSeq.toListSynchronously
2662+
Assert.AreEqual([[|1;2;3|]; [|2;3;4|]; [|3;4;5|]], result)
2663+
2664+
[<Test>]
2665+
let ``AsyncSeq.windowed size 1 returns each element as singleton array`` () =
2666+
let source = asyncSeq { yield 10; yield 20; yield 30 }
2667+
let result = AsyncSeq.windowed 1 source |> AsyncSeq.toListSynchronously
2668+
Assert.AreEqual([[|10|]; [|20|]; [|30|]], result)
2669+
2670+
[<Test>]
2671+
let ``AsyncSeq.windowed size 2 is equivalent to pairwise as arrays`` () =
2672+
let source = asyncSeq { yield 1; yield 2; yield 3; yield 4 }
2673+
let result = AsyncSeq.windowed 2 source |> AsyncSeq.toListSynchronously
2674+
Assert.AreEqual([[|1;2|]; [|2;3|]; [|3;4|]], result)
2675+
2676+
[<Test>]
2677+
let ``AsyncSeq.windowed with size 0 raises ArgumentException`` () =
2678+
Assert.Throws<System.ArgumentException>(fun () ->
2679+
AsyncSeq.windowed 0 (asyncSeq { yield 1 }) |> AsyncSeq.toListSynchronously |> ignore) |> ignore
2680+
25562681
[<Test>]
25572682
let ``AsyncSeq.distinctUntilChangedWith should work with custom equality`` () =
25582683
let source = asyncSeq { yield "a"; yield "A"; yield "B"; yield "b"; yield "c" }

0 commit comments

Comments
 (0)