Skip to content

Commit 48f0dbd

Browse files
committed
AsyncStream
1 parent 59c59c9 commit 48f0dbd

4 files changed

Lines changed: 315 additions & 0 deletions

File tree

src/FSharpx.Async/AsyncStream.fs

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
namespace FSharpx.Control
2+
3+
4+
/// An infinite async sequence.
5+
type AsyncStream<'a> = Async<AsyncStreamNode<'a>>
6+
7+
/// A node of an async stream consisting of an element and the rest of the stream.
8+
and AsyncStreamNode<'a> = ASN of 'a * AsyncStream<'a>
9+
10+
/// Operations on async stream nodes.
11+
module AsyncStreamNode =
12+
13+
let inline un (ASN(hd,tl)) = hd,tl
14+
15+
let inline create hd tl = ASN(hd,tl)
16+
17+
let inline head (ASN(a,_)) = a
18+
19+
let inline tail (ASN(_,tl)) = tl
20+
21+
let rec map (f:'a -> 'b) (ASN(a,tail)) : AsyncStreamNode<'b> =
22+
ASN(f a, tail |> Async.map (map f))
23+
24+
let rec repeat (a:'a) : AsyncStreamNode<'a> =
25+
ASN (a, async.Delay(fun() -> repeat a |> async.Return))
26+
27+
let rec toAsyncSeq (ASN(a,tail)) : AsyncSeq<'a> = asyncSeq {
28+
yield a
29+
yield! tail |> Async.bind toAsyncSeq }
30+
31+
32+
/// Operations on async streams.
33+
module AsyncStream =
34+
35+
/// Creates an async stream given a head and tail.
36+
let inline create hd tl =
37+
AsyncStreamNode.create hd tl |> async.Return
38+
39+
/// Creates an async stream which repeatedly returns the provided value.
40+
let repeat (a:'a) : AsyncStream<'a> =
41+
AsyncStreamNode.repeat a |> async.Return
42+
43+
/// Generates an async stream.
44+
let rec unfoldAsync (f:'s -> Async<'a * 's>) (z:'s) : AsyncStream<'a> =
45+
f z |> Async.map (fun (a,s') -> AsyncStreamNode.create a (unfoldAsync f s'))
46+
47+
/// Returns infinite repetition of the specified list.
48+
let cycleList (xs:'a list) : AsyncStream<'a> =
49+
let rec loop = function
50+
| [] -> loop xs
51+
| hd::tl -> create hd (async.Delay(fun() -> loop tl))
52+
loop xs
53+
54+
/// Prepends a list to a stream.
55+
let rec prefixList (xs:'a list) (a:AsyncStream<'a>) : AsyncStream<'a> =
56+
match xs with
57+
| [] -> a
58+
| hd::tl -> create hd (prefixList tl a)
59+
60+
/// Prepends an async sequence to a stream.
61+
let rec prefixAsyncSeq (s:AsyncSeq<'a>) (a:AsyncStream<'a>) : AsyncStream<'a> =
62+
s |> Async.bind (function
63+
| Nil -> a
64+
| Cons(hd,tl) -> create hd (prefixAsyncSeq tl a))
65+
66+
/// Produces the infinite sequence of repeated applications of f.
67+
let rec iterate (f:'a -> 'a) (a:'a) : AsyncStream<'a> =
68+
let a' = f a in
69+
create a' (iterate f a')
70+
71+
/// Returns the first element of the stream.
72+
let head (s:AsyncStream<'a>) : Async<'a> =
73+
s |> Async.map (AsyncStreamNode.head)
74+
75+
/// Creates a stream which skips the first element of the provided stream.
76+
let tail (s:AsyncStream<'a>) : AsyncStream<'a> =
77+
s |> Async.bind (fun (ASN(_,tl)) -> tl)
78+
79+
/// Creates a stream of tails of the specified stream.
80+
let rec tails (s:AsyncStream<'a>) : AsyncStream<AsyncStream<'a>> =
81+
s |> Async.bind (fun (ASN(_,tl)) -> create tl (tails tl))
82+
83+
/// Maps a function over an async stream.
84+
let rec mapAsync (f:'a -> Async<'b>) (s:AsyncStream<'a>) : AsyncStream<'b> = async {
85+
let! (ASN(a,rest)) = s in
86+
let! b = f a
87+
return ASN(b, mapAsync f s)
88+
}
89+
90+
/// Maps each element of an async stream onto an async sequences returning a stream
91+
/// containing consecutive elements of the genereated async sequences.
92+
let rec mapAsyncSeq (f:'a -> AsyncSeq<'b>) (s:AsyncStream<'a>) : AsyncStream<'b> =
93+
s |> Async.bind (fun (ASN(hd,tl)) -> prefixAsyncSeq (f hd) (mapAsyncSeq f tl))
94+
95+
/// Creates an infinite async sequence from the stream.
96+
let toAsyncSeq (s:AsyncStream<'a>) : AsyncSeq<'a> =
97+
s |> Async.bind (AsyncStreamNode.toAsyncSeq)
98+
99+
/// Creates an async sequence which iterates through the first n elements from the stream.
100+
let take (n:int) (s:AsyncStream<'a>) : AsyncSeq<'a> =
101+
s |> toAsyncSeq |> AsyncSeq.take n
102+
103+
/// Takes elements from the stream until the specified predicate is no longer satisfied.
104+
let takeWhileAsync (p:'a -> Async<bool>) (s:AsyncStream<'a>) : AsyncSeq<'a> =
105+
s |> toAsyncSeq |> AsyncSeq.takeWhileAsync p
106+
107+
/// Drops the first n items from the stream.
108+
let rec drop (n:int) (s:AsyncStream<'a>) : AsyncStream<'a> =
109+
if n <= 0 then s
110+
else s |> Async.bind (fun (ASN(_,tl)) -> drop (n - 1) tl)
111+
112+
/// Returns a pair consisting of the prefix of the stream of the specified length
113+
/// and the remaining stream immediately following this prefix.
114+
let splitAtList (n:int) (s:AsyncStream<'a>) : Async<'a list * AsyncStream<'a>> =
115+
let rec loop n s xs =
116+
if n <= 0 then (xs |> List.rev,s) |> async.Return
117+
else s |> Async.bind (fun (ASN(hd,tl)) -> loop (n - 1) tl (hd::xs) )
118+
loop n s []
119+
120+
/// Filters a stream based on the specified predicate.
121+
let rec filterAsync (p:'a -> Async<bool>) (s:AsyncStream<'a>) : AsyncStream<'a> =
122+
s
123+
|> Async.bind (fun (ASN(hd,tl)) ->
124+
p hd
125+
|> Async.bind (function
126+
| true -> ASN(hd, filterAsync p tl) |> async.Return
127+
| false -> filterAsync p tl
128+
)
129+
)
130+
131+
/// Filters and maps a stream using the specified choose function.
132+
let rec chooseAsync (f:'a -> Async<'b option>) (s:AsyncStream<'a>) : AsyncStream<'b> =
133+
s
134+
|> Async.bind (fun (ASN(a,tl)) ->
135+
f a
136+
|> Async.bind (function
137+
| Some b -> create b (chooseAsync f tl)
138+
| None -> chooseAsync f tl
139+
)
140+
)
141+
142+
/// Scans the stream applying the specified function to consecutive elements and
143+
/// returning the stream of results.
144+
let rec scanAsync (f:'a -> 'b -> Async<'b>) (z:'b) (s:AsyncStream<'a>) : AsyncStream<'b> =
145+
s
146+
|> Async.bind (fun (ASN(hd,tl)) ->
147+
f hd z
148+
|> Async.map (fun b -> ASN(b, scanAsync f b tl))
149+
)
150+
151+
/// Creates a computation which applies the function f to elements of the stream forever.
152+
let rec iterAsync (f:'a -> Async<unit>) (s:AsyncStream<'a>) : Async<unit> =
153+
s |> Async.bind (fun (ASN(hd,tl)) -> f hd |> Async.bind (fun _ -> iterAsync f tl))
154+
155+
/// Zips two streams using the specified function.
156+
let rec zipWith (f:'a -> 'b -> 'c) (a:AsyncStream<'a>) (b:AsyncStream<'b>) : AsyncStream<'c> =
157+
Async.Parallel(a, b)
158+
|> Async.map (fun (ASN(a,atl),ASN(b,btl)) -> AsyncStreamNode.create (f a b) (zipWith f atl btl))
159+
160+
/// Zips two streams into a stream of pairs.
161+
let rec zip (a:AsyncStream<'a>) (b:AsyncStream<'b>) : AsyncStream<'a * 'b> =
162+
zipWith (fun a b -> (a,b)) a b
163+
164+
/// Takes a list of streams and produces a stream of lists.
165+
let rec distributeList (xs:AsyncStream<'a> list) : AsyncStream<'a list> =
166+
xs
167+
|> Async.Parallel
168+
|> Async.bind (fun xs ->
169+
let items,tails =
170+
xs
171+
|> List.ofArray
172+
|> List.map (AsyncStreamNode.un)
173+
|> List.unzip
174+
create items (distributeList tails)
175+
)

src/FSharpx.Async/FSharpx.Async.fsproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@
7979
<Compile Include="AsyncStreamReader.fsi" />
8080
<Compile Include="AsyncStreamReader.fs" />
8181
<Compile Include="AsyncSeq.fs" />
82+
<Compile Include="AsyncStream.fs" />
8283
<Compile Include="Async.IO.fs" />
8384
<None Include="paket.references" />
8485
</ItemGroup>
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
module AsyncStreamTests
2+
3+
open FSharpx.Control
4+
open NUnit.Framework
5+
6+
[<Test>]
7+
let ``AsyncStream.repeat``() =
8+
9+
let n = 3
10+
11+
let s =
12+
AsyncStream.repeat 1
13+
|> AsyncStream.take n
14+
|> AsyncSeq.toList
15+
|> Async.RunSynchronously
16+
17+
Assert.True ((List.init n (fun _ -> 1) = s))
18+
19+
[<Test>]
20+
let ``AsyncStream.unfoldAsync``() =
21+
22+
let s =
23+
AsyncStream.unfoldAsync (fun i -> (i,i + 1) |> async.Return) 0
24+
|> AsyncStream.take 3
25+
|> AsyncSeq.toList
26+
|> Async.RunSynchronously
27+
28+
Assert.True ((List.init 3 id = s))
29+
30+
[<Test>]
31+
let ``AsyncStream.mapAsync``() =
32+
33+
let s =
34+
AsyncStream.repeat 1
35+
|> AsyncStream.mapAsync (fun a -> a.ToString() |> async.Return)
36+
|> AsyncStream.take 3
37+
|> AsyncSeq.toList
38+
|> Async.RunSynchronously
39+
40+
Assert.True ((List.init 3 (fun _ -> "1") = s))
41+
42+
[<Test>]
43+
let ``AsyncStream.cycleList``() =
44+
45+
let s =
46+
AsyncStream.cycleList [1;2;3]
47+
|> AsyncStream.take 6
48+
|> AsyncSeq.toList
49+
|> Async.RunSynchronously
50+
51+
Assert.True(([1;2;3;1;2;3] = s))
52+
53+
[<Test>]
54+
let ``AsyncStream.tails``() =
55+
56+
let s =
57+
AsyncStream.cycleList [1;2;3;4]
58+
|> AsyncStream.tails
59+
|> AsyncStream.take 3
60+
|> AsyncSeq.mapAsync (AsyncStream.take 3 >> AsyncSeq.toList)
61+
|> AsyncSeq.toList
62+
|> Async.RunSynchronously
63+
64+
Assert.True(([ [2;3;4] ; [3;4;1] ; [4;1;2] ] = s))
65+
66+
[<Test>]
67+
let ``AsyncStream.prefixAsyncSeq``() =
68+
69+
let s =
70+
AsyncStream.repeat 3
71+
|> AsyncStream.prefixAsyncSeq (AsyncSeq.ofSeq [1;2])
72+
|> AsyncStream.take 3
73+
|> AsyncSeq.toList
74+
|> Async.RunSynchronously
75+
76+
Assert.True(([1;2;3] = s))
77+
78+
[<Test>]
79+
let ``AsyncStream.splitAtList``() =
80+
81+
let (ls,tl) =
82+
AsyncStream.cycleList [1;2;3]
83+
|> AsyncStream.splitAtList 3
84+
|> Async.RunSynchronously
85+
86+
Assert.True(([1;2;3] = ls))
87+
88+
[<Test>]
89+
let ``AsyncStream.filterAsync``() =
90+
91+
let s =
92+
AsyncStream.cycleList [1;2;3;4]
93+
|> AsyncStream.filterAsync (fun a -> (a % 2 = 0) |> async.Return)
94+
|> AsyncStream.take 4
95+
|> AsyncSeq.toList
96+
|> Async.RunSynchronously
97+
98+
Assert.True([2;4;2;4] = s)
99+
100+
[<Test>]
101+
let ``AsyncStream.chooseAsync``() =
102+
103+
let s =
104+
AsyncStream.cycleList [1;2;3;4]
105+
|> AsyncStream.chooseAsync (fun a -> if (a % 2 = 0) then Some (a.ToString()) |> async.Return else async.Return None)
106+
|> AsyncStream.take 4
107+
|> AsyncSeq.toList
108+
|> Async.RunSynchronously
109+
110+
Assert.True(["2";"4";"2";"4"] = s)
111+
112+
[<Test>]
113+
let ``AsyncStream.scanAsync``() =
114+
115+
let s =
116+
AsyncStream.cycleList [1;2;3;4]
117+
|> AsyncStream.scanAsync (fun a b -> b @ [a] |> async.Return) []
118+
|> AsyncStream.take 4
119+
|> AsyncSeq.toList
120+
|> Async.RunSynchronously
121+
122+
Assert.True([[1];[1;2];[1;2;3];[1;2;3;4]] = s)
123+
124+
[<Test>]
125+
let ``AsyncStream.iterAsync``() =
126+
let items = ResizeArray<_>()
127+
use cts = new System.Threading.CancellationTokenSource()
128+
let s =
129+
AsyncStream.cycleList [1;2;3]
130+
|> AsyncStream.iterAsync (fun a -> async {
131+
items.Add(a)
132+
if a = 3 then cts.Cancel()
133+
})
134+
try Async.RunSynchronously (s, -1, cts.Token)
135+
with _ -> ()
136+
Assert.True((List.ofSeq items = [1;2;3]))
137+
138+

tests/FSharpx.Async.Tests/FSharpx.Async.Tests.fsproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
<ItemGroup>
6060
<Compile Include="AsyncTest.fs" />
6161
<Compile Include="AsyncSeqTests.fs" />
62+
<Compile Include="AsyncStreamTests.fs" />
6263
<None Include="paket.references" />
6364
</ItemGroup>
6465
<ItemGroup>

0 commit comments

Comments
 (0)