Skip to content

Commit f667658

Browse files
committed
Conflicts: RELEASE_NOTES.md src/FSharpx.Async/AssemblyInfo.fs
2 parents 8ddc1b4 + 47b3050 commit f667658

8 files changed

Lines changed: 524 additions & 5 deletions

File tree

RELEASE_NOTES.md

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,13 @@
1+
### 1.11.0 - 27.02.2015
2+
* Added Async.map, Async.bind, Async.unit
3+
* Added AsyncSeq.toList, AsyncSeq.toArray, AsyncSeq.bufferByCount, AsyncSeq.unfoldAsync, AsyncSeq.concatSeq, AsyncSeq.interleave
4+
15
### 1.10.0 - 25.02.2015
26
* Use Paket instead of NuGet
37

8+
### 1.9.9 - 23.02.2015
9+
* BUGFIX: AsyncSeq.skipWhile skips an extra item - https://github.com/fsprojects/FSharpx.Async/pull/2
10+
411
### 1.9.9 - 23.02.2015
512
* Copied the async helpers from FSharpx
6-
* BUGFIX: AsyncSeq.skipWhile skips an extra item - https://github.com/fsprojects/FSharpx.Async/pull/2
13+
* BUGFIX: AsyncSeq.skipWhile skips an extra item - https://github.com/fsprojects/FSharpx.Async/pull/2

src/FSharpx.Async/Async.fs

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,14 @@ open System.Threading
88

99
// ----------------------------------------------------------------------------
1010

11-
[<AutoOpen>]
12-
module AsyncExtensions =
13-
type Microsoft.FSharp.Control.Async with
11+
module AsyncOps =
12+
13+
let unit : Async<unit> = async.Return()
14+
1415

16+
[<AutoOpen>]
17+
module AsyncExtensions =
18+
type Microsoft.FSharp.Control.Async with
1519
/// Creates an asynchronous workflow that runs the asynchronous workflow
1620
/// given as an argument at most once. When the returned workflow is
1721
/// started for the second time, it reuses the result of the
@@ -34,4 +38,36 @@ module AsyncExtensions =
3438
let ct = new System.Threading.CancellationTokenSource()
3539
Async.Start(op, ct.Token)
3640
{ new IDisposable with
37-
member x.Dispose() = ct.Cancel() }
41+
member x.Dispose() = ct.Cancel() }
42+
43+
/// Creates an async computations which runs the specified computations
44+
/// in parallel and returns their results.
45+
static member Parallel(a:Async<'a>, b:Async<'b>) : Async<'a * 'b> = async {
46+
let! a = a |> Async.StartChild
47+
let! b = b |> Async.StartChild
48+
let! a = a
49+
let! b = b
50+
return a,b }
51+
52+
/// Creates an async computations which runs the specified computations
53+
/// in parallel and returns their results.
54+
static member Parallel(a:Async<'a>, b:Async<'b>, c:Async<'c>) : Async<'a * 'b * 'c> = async {
55+
let! a = a |> Async.StartChild
56+
let! b = b |> Async.StartChild
57+
let! c = c |> Async.StartChild
58+
let! a = a
59+
let! b = b
60+
let! c = c
61+
return a,b,c }
62+
63+
/// An async computation which does nothing.
64+
static member inline unit = AsyncOps.unit
65+
66+
/// Creates an async computation which maps a function f over the
67+
/// value produced by the specified asynchronous computation.
68+
static member inline map f a = async.Bind(a, f >> async.Return)
69+
70+
/// Creates an async computation which binds the result of the specified
71+
/// async computation to the specified function. The computation produced
72+
/// by the specified function is returned.
73+
static member inline bind f a = async.Bind(a, f)

src/FSharpx.Async/AsyncSeq.fs

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -477,6 +477,81 @@ module AsyncSeq =
477477
| Nil -> return Nil
478478
else return! input }
479479

480+
/// Creates an async computation which iterates the AsyncSeq and collects the output into an array.
481+
let toArray (input:AsyncSeq<'T>) : Async<'T[]> =
482+
input
483+
|> fold (fun (arr:ResizeArray<_>) a -> arr.Add(a) ; arr) (new ResizeArray<_>())
484+
|> Async.map (fun arr -> arr.ToArray())
485+
486+
/// Creates an async computation which iterates the AsyncSeq and collects the output into a list.
487+
let toList (input:AsyncSeq<'T>) : Async<'T list> =
488+
input
489+
|> fold (fun arr a -> a::arr) []
490+
|> Async.map List.rev
491+
492+
/// Generates an async sequence using the specified generator function.
493+
let rec unfoldAsync (f:'State -> Async<('T * 'State) option>) (s:'State) : AsyncSeq<'T> = asyncSeq {
494+
let! r = f s
495+
match r with
496+
| Some (a,s) ->
497+
yield a
498+
yield! unfoldAsync f s
499+
| None -> () }
500+
501+
/// Flattens an AsyncSeq of sequences.
502+
let rec concatSeq (input:AsyncSeq<#seq<'T>>) : AsyncSeq<'T> = asyncSeq {
503+
let! v = input
504+
match v with
505+
| Nil -> ()
506+
| Cons (hd, tl) ->
507+
for item in hd do
508+
yield item
509+
yield! concatSeq tl }
510+
511+
/// Interleaves two async sequences into a resulting sequence. The provided
512+
/// sequences are consumed in lock-step.
513+
let interleave =
514+
515+
let rec left (a:AsyncSeq<'a>) (b:AsyncSeq<'b>) : AsyncSeq<Choice<_,_>> = async {
516+
let! a = a
517+
match a with
518+
| Cons (a1, t1) -> return Cons (Choice1Of2 a1, right t1 b)
519+
| Nil -> return! b |> map Choice2Of2 }
520+
521+
and right (a:AsyncSeq<'a>) (b:AsyncSeq<'b>) : AsyncSeq<Choice<_,_>> = async {
522+
let! b = b
523+
match b with
524+
| Cons (a2, t2) -> return Cons (Choice2Of2 a2, left a t2)
525+
| Nil -> return! a |> map Choice1Of2 }
526+
527+
left
528+
529+
530+
/// Buffer items from the async sequence into buffers of a specified size.
531+
/// The last buffer returned may be less than the specified buffer size.
532+
let rec bufferByCount (bufferSize:int) (s:AsyncSeq<'T>) : AsyncSeq<'T[]> =
533+
if (bufferSize < 1) then invalidArg "bufferSize" "must be positive"
534+
async {
535+
let buffer = ResizeArray<_>()
536+
let rec loop s = async {
537+
let! step = s
538+
match step with
539+
| Nil ->
540+
if (buffer.Count > 0) then return Cons(buffer.ToArray(),async.Return Nil)
541+
else return Nil
542+
| Cons(a,tl) ->
543+
buffer.Add(a)
544+
if buffer.Count = bufferSize then
545+
let buf = buffer.ToArray()
546+
buffer.Clear()
547+
return Cons(buf, loop tl)
548+
else
549+
return! loop tl
550+
}
551+
return! loop s
552+
}
553+
554+
480555
[<AutoOpen>]
481556
module AsyncSeqExtensions =
482557
/// Builds an asynchronou sequence using the computation builder syntax

src/FSharpx.Async/AsyncStream.fs

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
namespace FSharpx.Control
2+
3+
4+
/// An infinite async sequence.
5+
type AsyncStream<'a> = Async<AsyncStreamNode<'a>>
6+
7+
/// A node of an async stream consisting of an element and the rest of the stream.
8+
and AsyncStreamNode<'a> = ASN of 'a * AsyncStream<'a>
9+
10+
/// Operations on async stream nodes.
11+
module AsyncStreamNode =
12+
13+
let inline un (ASN(hd,tl)) = hd,tl
14+
15+
let inline create hd tl = ASN(hd,tl)
16+
17+
let inline head (ASN(a,_)) = a
18+
19+
let inline tail (ASN(_,tl)) = tl
20+
21+
let rec map (f:'a -> 'b) (ASN(a,tail)) : AsyncStreamNode<'b> =
22+
ASN(f a, tail |> Async.map (map f))
23+
24+
let rec repeat (a:'a) : AsyncStreamNode<'a> =
25+
ASN (a, async.Delay(fun() -> repeat a |> async.Return))
26+
27+
let rec toAsyncSeq (ASN(a,tail)) : AsyncSeq<'a> = asyncSeq {
28+
yield a
29+
yield! tail |> Async.bind toAsyncSeq }
30+
31+
32+
/// Operations on async streams.
33+
module AsyncStream =
34+
35+
/// Creates an async stream given a head and tail.
36+
let inline create hd tl =
37+
AsyncStreamNode.create hd tl |> async.Return
38+
39+
/// Creates an async stream which repeatedly returns the provided value.
40+
let repeat (a:'a) : AsyncStream<'a> =
41+
AsyncStreamNode.repeat a |> async.Return
42+
43+
/// Generates an async stream.
44+
let rec unfoldAsync (f:'s -> Async<'a * 's>) (z:'s) : AsyncStream<'a> =
45+
f z |> Async.map (fun (a,s') -> AsyncStreamNode.create a (unfoldAsync f s'))
46+
47+
/// Returns infinite repetition of the specified list.
48+
let cycleList (xs:'a list) : AsyncStream<'a> =
49+
let rec loop = function
50+
| [] -> loop xs
51+
| hd::tl -> create hd (async.Delay(fun() -> loop tl))
52+
loop xs
53+
54+
/// Prepends a list to a stream.
55+
let rec prefixList (xs:'a list) (a:AsyncStream<'a>) : AsyncStream<'a> =
56+
match xs with
57+
| [] -> a
58+
| hd::tl -> create hd (prefixList tl a)
59+
60+
/// Prepends an async sequence to a stream.
61+
let rec prefixAsyncSeq (s:AsyncSeq<'a>) (a:AsyncStream<'a>) : AsyncStream<'a> =
62+
s |> Async.bind (function
63+
| Nil -> a
64+
| Cons(hd,tl) -> create hd (prefixAsyncSeq tl a))
65+
66+
/// Produces the infinite sequence of repeated applications of f.
67+
let rec iterate (f:'a -> 'a) (a:'a) : AsyncStream<'a> =
68+
let a' = f a in
69+
create a' (iterate f a')
70+
71+
/// Returns the first element of the stream.
72+
let head (s:AsyncStream<'a>) : Async<'a> =
73+
s |> Async.map (AsyncStreamNode.head)
74+
75+
/// Creates a stream which skips the first element of the provided stream.
76+
let tail (s:AsyncStream<'a>) : AsyncStream<'a> =
77+
s |> Async.bind (fun (ASN(_,tl)) -> tl)
78+
79+
/// Creates a stream of tails of the specified stream.
80+
let rec tails (s:AsyncStream<'a>) : AsyncStream<AsyncStream<'a>> =
81+
s |> Async.bind (fun (ASN(_,tl)) -> create tl (tails tl))
82+
83+
/// Maps a function over an async stream.
84+
let rec mapAsync (f:'a -> Async<'b>) (s:AsyncStream<'a>) : AsyncStream<'b> = async {
85+
let! (ASN(a,rest)) = s in
86+
let! b = f a
87+
return ASN(b, mapAsync f s)
88+
}
89+
90+
/// Maps each element of an async stream onto an async sequences returning a stream
91+
/// containing consecutive elements of the genereated async sequences.
92+
let rec mapAsyncSeq (f:'a -> AsyncSeq<'b>) (s:AsyncStream<'a>) : AsyncStream<'b> =
93+
s |> Async.bind (fun (ASN(hd,tl)) -> prefixAsyncSeq (f hd) (mapAsyncSeq f tl))
94+
95+
/// Creates an infinite async sequence from the stream.
96+
let toAsyncSeq (s:AsyncStream<'a>) : AsyncSeq<'a> =
97+
s |> Async.bind (AsyncStreamNode.toAsyncSeq)
98+
99+
/// Creates an async sequence which iterates through the first n elements from the stream.
100+
let take (n:int) (s:AsyncStream<'a>) : AsyncSeq<'a> =
101+
s |> toAsyncSeq |> AsyncSeq.take n
102+
103+
/// Takes elements from the stream until the specified predicate is no longer satisfied.
104+
let takeWhileAsync (p:'a -> Async<bool>) (s:AsyncStream<'a>) : AsyncSeq<'a> =
105+
s |> toAsyncSeq |> AsyncSeq.takeWhileAsync p
106+
107+
/// Drops the first n items from the stream.
108+
let rec drop (n:int) (s:AsyncStream<'a>) : AsyncStream<'a> =
109+
if n <= 0 then s
110+
else s |> Async.bind (fun (ASN(_,tl)) -> drop (n - 1) tl)
111+
112+
/// Returns a pair consisting of the prefix of the stream of the specified length
113+
/// and the remaining stream immediately following this prefix.
114+
let splitAtList (n:int) (s:AsyncStream<'a>) : Async<'a list * AsyncStream<'a>> =
115+
let rec loop n s xs =
116+
if n <= 0 then (xs |> List.rev,s) |> async.Return
117+
else s |> Async.bind (fun (ASN(hd,tl)) -> loop (n - 1) tl (hd::xs) )
118+
loop n s []
119+
120+
/// Filters a stream based on the specified predicate.
121+
let rec filterAsync (p:'a -> Async<bool>) (s:AsyncStream<'a>) : AsyncStream<'a> =
122+
s
123+
|> Async.bind (fun (ASN(hd,tl)) ->
124+
p hd
125+
|> Async.bind (function
126+
| true -> ASN(hd, filterAsync p tl) |> async.Return
127+
| false -> filterAsync p tl
128+
)
129+
)
130+
131+
/// Filters and maps a stream using the specified choose function.
132+
let rec chooseAsync (f:'a -> Async<'b option>) (s:AsyncStream<'a>) : AsyncStream<'b> =
133+
s
134+
|> Async.bind (fun (ASN(a,tl)) ->
135+
f a
136+
|> Async.bind (function
137+
| Some b -> create b (chooseAsync f tl)
138+
| None -> chooseAsync f tl
139+
)
140+
)
141+
142+
/// Scans the stream applying the specified function to consecutive elements and
143+
/// returning the stream of results.
144+
let rec scanAsync (f:'a -> 'b -> Async<'b>) (z:'b) (s:AsyncStream<'a>) : AsyncStream<'b> =
145+
s
146+
|> Async.bind (fun (ASN(hd,tl)) ->
147+
f hd z
148+
|> Async.map (fun b -> ASN(b, scanAsync f b tl))
149+
)
150+
151+
/// Creates a computation which applies the function f to elements of the stream forever.
152+
let rec iterAsync (f:'a -> Async<unit>) (s:AsyncStream<'a>) : Async<unit> =
153+
s |> Async.bind (fun (ASN(hd,tl)) -> f hd |> Async.bind (fun _ -> iterAsync f tl))
154+
155+
/// Zips two streams using the specified function.
156+
let rec zipWith (f:'a -> 'b -> 'c) (a:AsyncStream<'a>) (b:AsyncStream<'b>) : AsyncStream<'c> =
157+
Async.Parallel(a, b)
158+
|> Async.map (fun (ASN(a,atl),ASN(b,btl)) -> AsyncStreamNode.create (f a b) (zipWith f atl btl))
159+
160+
/// Zips two streams into a stream of pairs.
161+
let rec zip (a:AsyncStream<'a>) (b:AsyncStream<'b>) : AsyncStream<'a * 'b> =
162+
zipWith (fun a b -> (a,b)) a b
163+
164+
/// Takes a list of streams and produces a stream of lists.
165+
let rec distributeList (xs:AsyncStream<'a> list) : AsyncStream<'a list> =
166+
xs
167+
|> Async.Parallel
168+
|> Async.bind (fun xs ->
169+
let items,tails =
170+
xs
171+
|> List.ofArray
172+
|> List.map (AsyncStreamNode.un)
173+
|> List.unzip
174+
create items (distributeList tails)
175+
)

src/FSharpx.Async/FSharpx.Async.fsproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@
7979
<Compile Include="AsyncStreamReader.fsi" />
8080
<Compile Include="AsyncStreamReader.fs" />
8181
<Compile Include="AsyncSeq.fs" />
82+
<Compile Include="AsyncStream.fs" />
8283
<Compile Include="Async.IO.fs" />
8384
<None Include="paket.references" />
8485
<None Include="paket.template" />

0 commit comments

Comments
 (0)