Skip to content

Commit 696e172

Browse files
authored
Merge pull request #222 from fsprojects/co-maintainer/chunkby-consecutive-grouping-9ea149af742b39a9
[Auto Maintainer Assistant] Add AsyncSeq.chunkBy and AsyncSeq.chunkByAsync
2 parents 0087ccc + 4c8d251 commit 696e172

3 files changed

Lines changed: 67 additions & 0 deletions

File tree

src/FSharp.Control.AsyncSeq/AsyncSeq.fs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1558,6 +1558,33 @@ module AsyncSeq =
15581558
let bufferByCount (bufferSize:int) (source:AsyncSeq<'T>) : AsyncSeq<'T[]> =
15591559
chunkBySize bufferSize source
15601560

1561+
let chunkByAsync (projection:'T -> Async<'Key>) (source:AsyncSeq<'T>) : AsyncSeq<'Key * 'T list> = asyncSeq {
1562+
use ie = source.GetEnumerator()
1563+
let! move = ie.MoveNext()
1564+
let b = ref move
1565+
if b.Value.IsSome then
1566+
let! key0 = projection b.Value.Value
1567+
let mutable currentKey = key0
1568+
let buffer = ResizeArray<'T>()
1569+
buffer.Add b.Value.Value
1570+
let! moveNext = ie.MoveNext()
1571+
b := moveNext
1572+
while b.Value.IsSome do
1573+
let! key = projection b.Value.Value
1574+
if key = currentKey then
1575+
buffer.Add b.Value.Value
1576+
else
1577+
yield (currentKey, buffer |> Seq.toList)
1578+
currentKey <- key
1579+
buffer.Clear()
1580+
buffer.Add b.Value.Value
1581+
let! moveNext = ie.MoveNext()
1582+
b := moveNext
1583+
yield (currentKey, buffer |> Seq.toList) }
1584+
1585+
let chunkBy (projection:'T -> 'Key) (source:AsyncSeq<'T>) : AsyncSeq<'Key * 'T list> =
1586+
chunkByAsync (projection >> async.Return) source
1587+
15611588
#if !FABLE_COMPILER
15621589
let toSortedSeq fn source =
15631590
toArrayAsync source |> Async.map fn |> Async.RunSynchronously

src/FSharp.Control.AsyncSeq/AsyncSeq.fsi

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -484,6 +484,14 @@ module AsyncSeq =
484484
[<Obsolete("Use AsyncSeq.chunkBySize instead")>]
485485
val bufferByCount : bufferSize:int -> source:AsyncSeq<'T> -> AsyncSeq<'T []>
486486

487+
/// Groups consecutive elements of the async sequence that share the same key (as computed by an async projection)
488+
/// and yields each group as a pair of the key and a list of elements.
489+
val chunkByAsync<'T, 'Key when 'Key : equality> : projection:('T -> Async<'Key>) -> source:AsyncSeq<'T> -> AsyncSeq<'Key * 'T list>
490+
491+
/// Groups consecutive elements of the async sequence that share the same key (as computed by a projection)
492+
/// and yields each group as a pair of the key and a list of elements.
493+
val chunkBy<'T, 'Key when 'Key : equality> : projection:('T -> 'Key) -> source:AsyncSeq<'T> -> AsyncSeq<'Key * 'T list>
494+
487495
#if !FABLE_COMPILER
488496
/// Buffer items from the async sequence until a specified buffer size is reached or a specified amount of time is elapsed.
489497
val bufferByCountAndTime : bufferSize:int -> timeoutMs:int -> source:AsyncSeq<'T> -> AsyncSeq<'T []>

tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2611,3 +2611,35 @@ let ``AsyncSeqOp.FoldAsync with exception in folder should propagate`` () =
26112611
() // Expected
26122612
} |> Async.RunSynchronously
26132613

2614+
[<Test>]
2615+
let ``AsyncSeq.chunkBy empty sequence returns empty`` () =
2616+
let result = AsyncSeq.chunkBy id AsyncSeq.empty<int> |> AsyncSeq.toListSynchronously
2617+
Assert.AreEqual([], result)
2618+
2619+
[<Test>]
2620+
let ``AsyncSeq.chunkBy single element`` () =
2621+
let source = asyncSeq { yield 42 }
2622+
let result = AsyncSeq.chunkBy id source |> AsyncSeq.toListSynchronously
2623+
Assert.AreEqual([(42, [42])], result)
2624+
2625+
[<Test>]
2626+
let ``AsyncSeq.chunkBy groups consecutive equal keys`` () =
2627+
let source = asyncSeq { yield 1; yield 1; yield 2; yield 2; yield 1 }
2628+
let result = AsyncSeq.chunkBy id source |> AsyncSeq.toListSynchronously
2629+
Assert.AreEqual([(1, [1;1]); (2, [2;2]); (1, [1])], result)
2630+
2631+
[<Test>]
2632+
let ``AsyncSeq.chunkBy with projection`` () =
2633+
let source = asyncSeq { yield 1; yield 3; yield 2; yield 4; yield 5 }
2634+
let result = AsyncSeq.chunkBy (fun x -> x % 2 = 0) source |> AsyncSeq.toListSynchronously
2635+
Assert.AreEqual([(false, [1;3]); (true, [2;4]); (false, [5])], result)
2636+
2637+
[<Test>]
2638+
let ``AsyncSeq.chunkByAsync groups consecutive equal keys`` () =
2639+
async {
2640+
let source = asyncSeq { yield 1; yield 1; yield 2; yield 2 }
2641+
let result = AsyncSeq.chunkByAsync (fun x -> async { return x }) source |> AsyncSeq.toListSynchronously
2642+
Assert.AreEqual([(1, [1;1]); (2, [2;2])], result)
2643+
} |> Async.RunSynchronously
2644+
2645+

0 commit comments

Comments
 (0)