Skip to content

Commit 66d3ac0

Browse files
committed
Merge pull request #26 from eulerfx/master
bufferByCountAndTime
2 parents 1745b03 + 895b112 commit 66d3ac0

3 files changed

Lines changed: 77 additions & 5 deletions

File tree

src/FSharp.Control.AsyncSeq/AsyncSeq.fs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -986,6 +986,44 @@ module AsyncSeq =
986986
if (buffer.Count > 0) then
987987
yield buffer.ToArray() }
988988

989+
let bufferByCountAndTime (bufferSize:int) (timeoutMs:int) (source:AsyncSeq<'T>) : AsyncSeq<'T[]> =
990+
if (bufferSize < 1) then invalidArg "bufferSize" "must be positive"
991+
if (timeoutMs < 1) then invalidArg "timeoutMs" "must be positive"
992+
asyncSeq {
993+
let buffer = new ResizeArray<_>()
994+
use ie = source.GetEnumerator()
995+
let rec loop rem rt = asyncSeq {
996+
let! move =
997+
match rem with
998+
| Some rem -> async.Return rem
999+
| None -> Async.StartChildAsTask(ie.MoveNext())
1000+
let t = DateTime.Now
1001+
let! time = Async.StartChildAsTask(Async.Sleep (max 0 rt))
1002+
let! moveOr = Async.chooseTasks move time
1003+
let delta = int (DateTime.Now - t).TotalMilliseconds
1004+
match moveOr with
1005+
| Choice1Of2 (None, _) ->
1006+
if buffer.Count > 0 then
1007+
yield buffer.ToArray()
1008+
| Choice1Of2 (Some v, _) ->
1009+
buffer.Add v
1010+
if buffer.Count = bufferSize then
1011+
yield buffer.ToArray()
1012+
buffer.Clear()
1013+
yield! loop None timeoutMs
1014+
else
1015+
yield! loop None (rt - delta)
1016+
| Choice2Of2 (_, rest) ->
1017+
if buffer.Count > 0 then
1018+
yield buffer.ToArray()
1019+
buffer.Clear()
1020+
yield! loop (Some rest) timeoutMs
1021+
else
1022+
yield! loop (Some rest) timeoutMs
1023+
}
1024+
yield! loop None timeoutMs
1025+
}
1026+
9891027
let mergeChoice (source1:AsyncSeq<'T1>) (source2:AsyncSeq<'T2>) : AsyncSeq<Choice<'T1,'T2>> = asyncSeq {
9901028
use ie1 = source1.GetEnumerator()
9911029
use ie2 = source2.GetEnumerator()

src/FSharp.Control.AsyncSeq/AsyncSeq.fsi

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -106,11 +106,11 @@ module AsyncSeq =
106106
member Zero : unit -> AsyncSeq<'T>
107107

108108

109-
/// Creates an asynchronous sequence that iterates over the given input sequence.
110-
/// For every input element, it calls the the specified function and iterates
111-
/// over all elements generated by that asynchronous sequence.
112-
/// This is the 'bind' operation of the computation expression (exposed using
113-
/// the 'for' keyword in asyncSeq computation).
109+
/// Creates an asynchronous sequence that iterates over the given input sequence.
110+
/// For every input element, it calls the the specified function and iterates
111+
/// over all elements generated by that asynchronous sequence.
112+
/// This is the 'bind' operation of the computation expression (exposed using
113+
/// the 'for' keyword in asyncSeq computation).
114114
val collect : mapping:('T -> AsyncSeq<'TResult>) -> source:AsyncSeq<'T> -> AsyncSeq<'TResult>
115115

116116
/// Builds a new asynchronous sequence whose elements are generated by
@@ -334,6 +334,9 @@ module AsyncSeq =
334334
/// The last buffer returned may be less than the specified buffer size.
335335
val bufferByCount : bufferSize:int -> source:AsyncSeq<'T> -> AsyncSeq<'T []>
336336

337+
/// Buffer items from the async sequence until a specified buffer size is reached or a specified amount of time is elapsed.
338+
val bufferByCountAndTime : bufferSize:int -> timeoutMs:int -> source:AsyncSeq<'T> -> AsyncSeq<'T []>
339+
337340
/// Merges two async sequences into an async sequence non-deterministically.
338341
val mergeChoice: source1:AsyncSeq<'T1> -> source2:AsyncSeq<'T2> -> AsyncSeq<Choice<'T1,'T2>>
339342

tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,37 @@ let ``AsyncSeq.bufferByCount empty``() =
205205
let s' = s |> AsyncSeq.bufferByCount 2 |> AsyncSeq.toList
206206
Assert.True(([] = s'))
207207

208+
209+
[<Test>]
210+
let ``AsyncSeq.bufferByTimeAndCount``() =
211+
let s = asyncSeq {
212+
yield 1
213+
yield 2
214+
yield 3
215+
do! Async.Sleep 250
216+
yield 4
217+
yield 5
218+
}
219+
let actual = AsyncSeq.bufferByCountAndTime 2 50 s |> AsyncSeq.toList
220+
Assert.True((actual = [ [|1;2|] ; [|3|] ; [|4;5|] ]))
221+
222+
[<Test>]
223+
let ``AsyncSeq.bufferByCountAndTime various sizes``() =
224+
for sz in 0 .. 10 do
225+
let s = asyncSeq {
226+
for i in 1 .. sz do
227+
yield i
228+
}
229+
let s' = s |> AsyncSeq.bufferByCountAndTime 1 1 |> AsyncSeq.toList
230+
Assert.True(([for i in 1 .. sz -> [|i|]] = s'))
231+
232+
[<Test>]
233+
let ``AsyncSeq.bufferByTimeAndCount empty``() =
234+
let s = AsyncSeq.empty<int>
235+
let actual = AsyncSeq.bufferByCountAndTime 2 10 s |> AsyncSeq.toList
236+
Assert.True((actual = []))
237+
238+
208239
[<Test>]
209240
let ``try finally works no exception``() =
210241
let x = ref 0

0 commit comments

Comments
 (0)