Skip to content

Commit d23e8ab

Browse files
committed
Adds channel interop functions
1 parent 4210e65 commit d23e8ab

3 files changed

Lines changed: 102 additions & 1 deletion

File tree

src/FSharp.Control.AsyncSeq/AsyncSeq.fs

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ open System.Threading.Tasks
1212
open System.Runtime.ExceptionServices
1313
#if !FABLE_COMPILER
1414
open System.Linq
15+
open System.Threading.Channels
1516
#endif
1617

1718
#nowarn "40" "3218"
@@ -375,6 +376,17 @@ module AsyncSeq =
375376
member x.MoveNext() = async { return None }
376377
member x.Dispose() = () } }
377378

379+
let emptyAsync<'T> (action : Async<unit>) : AsyncSeq<'T> =
380+
{ new IAsyncEnumerable<'T> with
381+
member x.GetEnumerator() =
382+
{ new IAsyncEnumerator<'T> with
383+
member x.MoveNext() =
384+
async {
385+
do! action
386+
return None
387+
}
388+
member x.Dispose() = () } }
389+
378390
let singleton (v:'T) : AsyncSeq<'T> =
379391
{ new IAsyncEnumerable<'T> with
380392
member x.GetEnumerator() =
@@ -1811,6 +1823,75 @@ module AsyncSeq =
18111823
#endif
18121824

18131825

1826+
#if !FABLE_COMPILER
1827+
open System.Threading.Channels
1828+
1829+
let toChannel (writer : ChannelWriter<'a>) (xs : AsyncSeq<'a>) : Async<unit> =
1830+
async {
1831+
try
1832+
do!
1833+
xs
1834+
|> iterAsync
1835+
(fun x ->
1836+
async {
1837+
if not (writer.TryWrite(x)) then
1838+
let! ct = Async.CancellationToken
1839+
1840+
do!
1841+
writer.WriteAsync(x, ct).AsTask()
1842+
|> Async.AwaitTask
1843+
})
1844+
1845+
writer.Complete()
1846+
with exn ->
1847+
writer.Complete(error = exn)
1848+
}
1849+
1850+
let fromChannel (reader : ChannelReader<'a>) : AsyncSeq<'a> =
1851+
asyncSeq {
1852+
let mutable keepGoing = true
1853+
1854+
while keepGoing do
1855+
let mutable item = Unchecked.defaultof<'a>
1856+
1857+
if reader.TryRead(&item) then
1858+
yield item
1859+
else
1860+
let! ct = Async.CancellationToken
1861+
1862+
let! hasMoreData =
1863+
reader.WaitToReadAsync(ct).AsTask()
1864+
|> Async.AwaitTask
1865+
1866+
if not hasMoreData then
1867+
keepGoing <- false
1868+
}
1869+
1870+
let prefetch (numberToPrefetch : int) (xs : AsyncSeq<'a>) : AsyncSeq<'a> =
1871+
if numberToPrefetch = 0 then
1872+
xs
1873+
else
1874+
if numberToPrefetch < 1 then
1875+
invalidArg (nameof numberToPrefetch) "must be at least zero"
1876+
asyncSeq {
1877+
let opts = BoundedChannelOptions(numberToPrefetch)
1878+
opts.SingleWriter <- true
1879+
opts.SingleReader <- true
1880+
1881+
let channel = Channel.CreateBounded(opts)
1882+
1883+
let! fillChannelTask =
1884+
toChannel channel.Writer xs
1885+
|> Async.StartChild
1886+
1887+
yield!
1888+
append
1889+
(fromChannel channel.Reader)
1890+
(emptyAsync fillChannelTask)
1891+
}
1892+
1893+
#endif
1894+
18141895

18151896
[<AutoOpen>]
18161897
module AsyncSeqExtensions =

src/FSharp.Control.AsyncSeq/AsyncSeq.fsi

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -462,7 +462,7 @@ module AsyncSeq =
462462
/// large or infinite sequences.
463463
val sortByDescending : projection:('T -> 'Key) -> source:AsyncSeq<'T> -> array<'T> when 'Key : comparison
464464
#endif
465-
465+
466466
/// Interleaves two async sequences of the same type into a resulting sequence. The provided
467467
/// sequences are consumed in lock-step.
468468
val interleave : source1:AsyncSeq<'T> -> source2:AsyncSeq<'T> -> AsyncSeq<'T>
@@ -544,6 +544,25 @@ module AsyncSeq =
544544
#endif
545545
#endif
546546

547+
#if !FABLE_COMPILER
548+
549+
open System.Threading.Channels
550+
551+
/// Fills a channel writer with the values from an async seq.
552+
/// The writer will be closed when the async seq completes or raises an error.
553+
val toChannel<'T> : writer: ChannelWriter<'T> -> source: AsyncSeq<'T> -> Async<unit>
554+
555+
/// Creates an async seq from a channel reader.
556+
/// The async seq will read values from the channel reader until it is closed.
557+
/// If the reader raises an error than the sequence will raise it.
558+
val fromChannel<'T> : reader: ChannelReader<'T> -> AsyncSeq<'T>
559+
560+
/// Transforms an async seq to a new one that fetches values ahead of time to improve throughput.
561+
val prefetch<'T> : numberToPrefetch: int -> source: AsyncSeq<'T> -> AsyncSeq<'T>
562+
563+
#endif
564+
565+
547566
/// An automatically-opened module that contains the `asyncSeq` builder and an extension method
548567
[<AutoOpen>]
549568
module AsyncSeqExtensions =

src/FSharp.Control.AsyncSeq/FSharp.Control.AsyncSeq.fsproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
<ItemGroup>
2424
<PackageReference Update="FSharp.Core" Version="4.7.2" />
2525
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="5.0" />
26+
<PackageReference Include="System.Threading.Channels" Version="*" />
2627
<Content Include="*.fsproj; **\*.fs; **\*.fsi;" PackagePath="fable\" />
2728
</ItemGroup>
2829
</Project>

0 commit comments

Comments
 (0)