@@ -986,6 +986,44 @@ module AsyncSeq =
986986 if ( buffer.Count > 0 ) then
987987 yield buffer.ToArray() }
988988
989+ let bufferByCountAndTime ( bufferSize : int ) ( timeoutMs : int ) ( source : AsyncSeq < 'T >) : AsyncSeq < 'T []> =
990+ if ( bufferSize < 1 ) then invalidArg " bufferSize" " must be positive"
991+ if ( timeoutMs < 1 ) then invalidArg " timeoutMs" " must be positive"
992+ asyncSeq {
993+ let buffer = new ResizeArray<_>()
994+ use ie = source.GetEnumerator()
995+ let rec loop rem rt = asyncSeq {
996+ let! move =
997+ match rem with
998+ | Some rem -> async.Return rem
999+ | None -> Async.StartChildAsTask( ie.MoveNext())
1000+ let t = DateTime.Now
1001+ let! time = Async.StartChildAsTask( Async.Sleep ( max 0 rt))
1002+ let! moveOr = Async.chooseTasks move time
1003+ let delta = int ( DateTime.Now - t). TotalMilliseconds
1004+ match moveOr with
1005+ | Choice1Of2 ( None, _) ->
1006+ if buffer.Count > 0 then
1007+ yield buffer.ToArray()
1008+ | Choice1Of2 ( Some v, _) ->
1009+ buffer.Add v
1010+ if buffer.Count = bufferSize then
1011+ yield buffer.ToArray()
1012+ buffer.Clear()
1013+ yield ! loop None timeoutMs
1014+ else
1015+ yield ! loop None ( rt - delta)
1016+ | Choice2Of2 (_, rest) ->
1017+ if buffer.Count > 0 then
1018+ yield buffer.ToArray()
1019+ buffer.Clear()
1020+ yield ! loop ( Some rest) timeoutMs
1021+ else
1022+ yield ! loop ( Some rest) timeoutMs
1023+ }
1024+ yield ! loop None timeoutMs
1025+ }
1026+
9891027 let mergeChoice ( source1 : AsyncSeq < 'T1 >) ( source2 : AsyncSeq < 'T2 >) : AsyncSeq < Choice < 'T1 , 'T2 >> = asyncSeq {
9901028 use ie1 = source1.GetEnumerator()
9911029 use ie2 = source2.GetEnumerator()
0 commit comments