Skip to content

Commit d51fd59

Browse files
committed
AsyncSeq.bufferByCount / fix toList
1 parent 4afba84 commit d51fd59

2 files changed

Lines changed: 45 additions & 3 deletions

File tree

src/FSharpx.Async/AsyncSeq.fs

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -485,7 +485,9 @@ module AsyncSeq =
485485

486486
/// Creates an async computation which iterates the AsyncSeq and collects the output into a list.
487487
let toList (input:AsyncSeq<'T>) : Async<'T list> =
488-
input |> fold (fun arr a -> a::arr) []
488+
input
489+
|> fold (fun arr a -> a::arr) []
490+
|> Async.map List.rev
489491

490492
/// Generates an async sequence using the specified generator function.
491493
let rec unfoldAsync (f:'State -> Async<('T * 'State) option>) (s:'State) : AsyncSeq<'T> = asyncSeq {
@@ -525,6 +527,31 @@ module AsyncSeq =
525527
left
526528

527529

530+
/// Buffer items from the async sequence into buffers of a specified size.
531+
/// The last buffer returned may be less than the specified buffer size.
532+
let rec bufferByCount (bufferSize:int) (s:AsyncSeq<'T>) : AsyncSeq<'T[]> =
533+
if (bufferSize < 1) then invalidArg "bufferSize" "must be positive"
534+
async {
535+
let buffer = ResizeArray<_>()
536+
let rec loop s = async {
537+
let! step = s
538+
match step with
539+
| Nil ->
540+
if (buffer.Count > 0) then return Cons(buffer.ToArray(),async.Return Nil)
541+
else return Nil
542+
| Cons(a,tl) ->
543+
buffer.Add(a)
544+
if buffer.Count = bufferSize then
545+
let buf = buffer.ToArray()
546+
buffer.Clear()
547+
return Cons(buf, loop tl)
548+
else
549+
return! loop tl
550+
}
551+
return! loop s
552+
}
553+
554+
528555
[<AutoOpen>]
529556
module AsyncSeqExtensions =
530557
/// Builds an asynchronou sequence using the computation builder syntax

tests/FSharpx.Async.Tests/AsyncSeqTests.fs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,9 +78,24 @@ let ``unfoldAsync should generate a sequence``() =
7878

7979

8080
[<Test>]
81-
let ``Interleave should interleave two sequences``() =
81+
let ``interleave should interleave two sequences``() =
8282
let s1 = AsyncSeq.ofSeq ["a";"b";"c"]
8383
let s2 = AsyncSeq.ofSeq [1;2;3]
8484
let merged = AsyncSeq.interleave s1 s2 |> AsyncSeq.toList |> Async.RunSynchronously
8585
printfn "%A" merged
86-
Assert.True([Choice1Of2 "a" ; Choice2Of2 1 ; Choice1Of2 "b" ; Choice2Of2 2 ; Choice1Of2 "c" ; Choice2Of2 3] = merged)
86+
Assert.True([Choice1Of2 "a" ; Choice2Of2 1 ; Choice1Of2 "b" ; Choice2Of2 2 ; Choice1Of2 "c" ; Choice2Of2 3] = merged)
87+
88+
89+
[<Test>]
90+
let ``bufferByCount should buffer``() =
91+
92+
let s = asyncSeq {
93+
yield 1
94+
yield 2
95+
yield 3
96+
yield 4
97+
}
98+
99+
let s' = s |> AsyncSeq.bufferByCount 2 |> AsyncSeq.toList |> Async.RunSynchronously
100+
101+
Assert.True(([[|1;2|];[|3;4|]] = s'))

0 commit comments

Comments
 (0)