@@ -12,6 +12,7 @@ open System.Threading.Tasks
1212open System.Runtime .ExceptionServices
1313#if ! FABLE_ COMPILER
1414open System.Linq
15+ open System.Threading .Channels
1516#endif
1617
1718#nowarn " 40" " 3218"
@@ -374,6 +375,17 @@ module AsyncSeq =
374375 member x.MoveNext () = async { return None }
375376 member x.Dispose () = () } }
376377
378+ let emptyAsync < 'T > ( action : Async < unit >) : AsyncSeq < 'T > =
379+ { new IAsyncEnumerable< 'T> with
380+ member x.GetEnumerator () =
381+ { new IAsyncEnumerator< 'T> with
382+ member x.MoveNext () =
383+ async {
384+ do ! action
385+ return None
386+ }
387+ member x.Dispose () = () } }
388+
377389 let singleton ( v : 'T ) : AsyncSeq < 'T > =
378390 { new IAsyncEnumerable< 'T> with
379391 member x.GetEnumerator () =
@@ -1946,6 +1958,75 @@ module AsyncSeq =
19461958 #endif
19471959
19481960
1961+ #if ! FABLE_ COMPILER
1962+ open System.Threading .Channels
1963+
1964+ let toChannel ( writer : ChannelWriter < 'a >) ( xs : AsyncSeq < 'a >) : Async < unit > =
1965+ async {
1966+ try
1967+ do !
1968+ xs
1969+ |> iterAsync
1970+ ( fun x ->
1971+ async {
1972+ if not ( writer.TryWrite( x)) then
1973+ let! ct = Async.CancellationToken
1974+
1975+ do !
1976+ writer.WriteAsync( x, ct). AsTask()
1977+ |> Async.AwaitTask
1978+ })
1979+
1980+ writer.Complete()
1981+ with exn ->
1982+ writer.Complete( error = exn)
1983+ }
1984+
1985+ let fromChannel ( reader : ChannelReader < 'a >) : AsyncSeq < 'a > =
1986+ asyncSeq {
1987+ let mutable keepGoing = true
1988+
1989+ while keepGoing do
1990+ let mutable item = Unchecked.defaultof< 'a>
1991+
1992+ if reader.TryRead(& item) then
1993+ yield item
1994+ else
1995+ let! ct = Async.CancellationToken
1996+
1997+ let! hasMoreData =
1998+ reader.WaitToReadAsync( ct). AsTask()
1999+ |> Async.AwaitTask
2000+
2001+ if not hasMoreData then
2002+ keepGoing <- false
2003+ }
2004+
2005+ let prefetch ( numberToPrefetch : int ) ( xs : AsyncSeq < 'a >) : AsyncSeq < 'a > =
2006+ if numberToPrefetch = 0 then
2007+ xs
2008+ else
2009+ if numberToPrefetch < 1 then
2010+ invalidArg ( nameof numberToPrefetch) " must be at least zero"
2011+ asyncSeq {
2012+ let opts = BoundedChannelOptions( numberToPrefetch)
2013+ opts.SingleWriter <- true
2014+ opts.SingleReader <- true
2015+
2016+ let channel = Channel.CreateBounded( opts)
2017+
2018+ let! fillChannelTask =
2019+ toChannel channel.Writer xs
2020+ |> Async.StartChild
2021+
2022+ yield !
2023+ append
2024+ ( fromChannel channel.Reader)
2025+ ( emptyAsync fillChannelTask)
2026+ }
2027+
2028+ #endif
2029+
19492030
19502031[<AutoOpen>]
19512032module AsyncSeqExtensions =
0 commit comments