Skip to content

Commit 89df88c

Browse files
committed
append generator
1 parent 9115c4e commit 89df88c

3 files changed

Lines changed: 252 additions & 60 deletions

File tree

src/FSharp.Control.AsyncSeq/AsyncSeq.fs

Lines changed: 195 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,137 @@ module internal Utils =
8787
else return! failwith (sprintf "unreachable, i = %d" i) }
8888

8989

90+
// via: https://github.com/fsharp/fsharp/blob/master/src/fsharp/FSharp.Core/seq.fs
91+
module AsyncGenerator =
92+
93+
type Step<'a> =
94+
| Stop
95+
| Yield of 'a
96+
97+
/// Jump to another generator.
98+
| Goto of AsyncGenerator<'a>
99+
100+
and AsyncGenerator<'a> =
101+
abstract Apply : unit -> Async<Step<'a>>
102+
abstract Disposer : (unit -> unit) option
103+
104+
let disposeG (g:AsyncGenerator<'a>) =
105+
match g.Disposer with
106+
| None -> ()
107+
| Some f -> f ()
108+
109+
let appG (g:AsyncGenerator<'a>) = async {
110+
let! res = g.Apply ()
111+
match res with
112+
| Goto next -> return Goto next
113+
| Yield _ -> return res
114+
| Stop ->
115+
disposeG g
116+
return res }
117+
118+
type GenerateCont<'a> (g:AsyncGenerator<'a>, cont:unit -> AsyncGenerator<'a>) =
119+
member __.Generator = g
120+
member __.Cont = cont
121+
interface AsyncGenerator<'a> with
122+
member x.Apply () = async {
123+
let! step = appG g
124+
match step with
125+
| Stop -> return Goto (cont ())
126+
| Yield _ as res -> return res
127+
| Goto next -> return Goto (GenerateCont<_>.Bind (next, cont)) }
128+
member x.Disposer =
129+
g.Disposer
130+
131+
static member Bind (g:AsyncGenerator<'a>, cont:unit -> AsyncGenerator<'a>) =
132+
match g with
133+
| :? GenerateCont<'a> as g -> GenerateCont<_>.Bind (g.Generator, (fun () -> GenerateCont<_>.Bind (g.Cont(), cont)))
134+
| _ -> (new GenerateCont<'a>(g, cont) :> AsyncGenerator<'a>)
135+
136+
/// Right-associating binder.
137+
let bindG (g:AsyncGenerator<'a>) (cont:unit -> AsyncGenerator<'a>) : AsyncGenerator<'a> =
138+
GenerateCont<_>.Bind (g, cont)
139+
140+
141+
/// Converts a generator to an enumerator.
142+
/// The generator can point to another generator using Goto, in which case
143+
/// the enumerator mutates its current generator and continues.
144+
type AsyncGeneratorEnumerator<'a> (g:AsyncGenerator<'a>) =
145+
let mutable g = g
146+
let mutable fin = false
147+
member __.Generator = g
148+
interface IAsyncEnumerator<'a> with
149+
member x.MoveNext () = async {
150+
let! step = appG g
151+
match step with
152+
| Stop ->
153+
fin <- true
154+
return None
155+
| Yield a ->
156+
return Some a
157+
| Goto next ->
158+
g <- next
159+
return! (x :> IAsyncEnumerator<_>).MoveNext() }
160+
member __.Dispose () =
161+
disposeG g
162+
163+
/// Converts an enumerator to a generator.
164+
/// The resulting generator will either yield or stop.
165+
type AsyncEnumeratorGenerator<'a> (enum:IAsyncEnumerator<'a>) =
166+
member __.Enumerator = enum
167+
interface AsyncGenerator<'a> with
168+
member __.Apply () = async {
169+
let! next = enum.MoveNext()
170+
match next with
171+
| Some a ->
172+
return Yield a
173+
| None ->
174+
return Stop }
175+
member __.Disposer = Some ((fun () -> (enum :> IDisposable).Dispose()))
176+
177+
let enumeratorFromGenerator (g:AsyncGenerator<'a>) : IAsyncEnumerator<'a> =
178+
match g with
179+
| :? AsyncEnumeratorGenerator<'a> as g -> g.Enumerator
180+
| _ -> (new AsyncGeneratorEnumerator<_>(g) :> _)
181+
182+
let generatorFromEnumerator (e:IAsyncEnumerator<'a>) : AsyncGenerator<'a> =
183+
match e with
184+
| :? AsyncGeneratorEnumerator<'a> as e -> e.Generator
185+
| _ -> (new AsyncEnumeratorGenerator<_>(e) :> _)
186+
187+
// type private DelayEnumerable<'a> (f:unit -> AsyncSeq<'a>) =
188+
// member x.Delay = f
189+
// interface IAsyncEnumerable<'a> with
190+
// member __.GetEnumerator() =
191+
// let rec unwrap (f:unit -> AsyncSeq<_>) =
192+
// let s = f ()
193+
// match s with
194+
// | :? DelayEnumerable<_> as s -> unwrap s.Delay
195+
// | _ -> s
196+
// let s = unwrap f
197+
// s.GetEnumerator()
198+
199+
// let delay (f:unit -> AsyncSeq<'T>) : AsyncSeq<'T> =
200+
// new DelayEnumerable<'T>(f) :> _
201+
202+
let delay (f:unit -> AsyncSeq<'T>) : AsyncSeq<'T> =
203+
{ new IAsyncEnumerable<'T> with
204+
member x.GetEnumerator() = f().GetEnumerator() }
205+
206+
let emitEnum (e:IAsyncEnumerator<'a>) : AsyncSeq<'a> =
207+
{ new IAsyncEnumerable<_> with
208+
member __.GetEnumerator () = e }
209+
210+
let fromGeneratorDelay (f:unit -> AsyncGenerator<'a>) : AsyncSeq<'a> =
211+
delay (fun () -> emitEnum (enumeratorFromGenerator (f ())))
212+
213+
let toGenerator (s:AsyncSeq<'a>) : AsyncGenerator<'a> =
214+
generatorFromEnumerator (s.GetEnumerator())
215+
216+
let append (s1:AsyncSeq<'a>) (s2:AsyncSeq<'a>) : AsyncSeq<'a> =
217+
fromGeneratorDelay (fun () -> bindG (toGenerator s1) (fun () -> toGenerator s2))
218+
219+
220+
90221
/// Module with helper functions for working with asynchronous sequences
91222
module AsyncSeq =
92223

@@ -111,64 +242,68 @@ module AsyncSeq =
111242
return (if res then Some v else None) }
112243
member x.Dispose() = () } }
113244

114-
[<RequireQualifiedAccess>]
115-
type AppendState<'T> =
116-
| NotStarted1 of AsyncSeq<'T> * AsyncSeq<'T>
117-
| HaveEnumerator1 of IAsyncEnumerator<'T> * AsyncSeq<'T>
118-
| NotStarted2 of AsyncSeq<'T>
119-
| HaveEnumerator2 of IAsyncEnumerator<'T>
120-
| Finished
121-
122245
let append (inp1: AsyncSeq<'T>) (inp2: AsyncSeq<'T>) : AsyncSeq<'T> =
123-
{ new IAsyncEnumerable<'T> with
124-
member x.GetEnumerator() =
125-
let state = ref (AppendState.NotStarted1 (inp1, inp2) )
126-
{ new IAsyncEnumerator<'T> with
127-
member x.MoveNext() =
128-
async { match !state with
129-
| AppendState.NotStarted1 (inp1, inp2) ->
130-
return!
131-
(let enum1 = inp1.GetEnumerator()
132-
state := AppendState.HaveEnumerator1 (enum1, inp2)
133-
x.MoveNext())
134-
| AppendState.HaveEnumerator1 (enum1, inp2) ->
135-
let! res = enum1.MoveNext()
136-
match res with
137-
| None ->
138-
return!
139-
(state := AppendState.NotStarted2 inp2
140-
dispose enum1
141-
x.MoveNext())
142-
| Some _ ->
143-
return res
144-
| AppendState.NotStarted2 inp2 ->
145-
return!
146-
(let enum2 = inp2.GetEnumerator()
147-
state := AppendState.HaveEnumerator2 enum2
148-
x.MoveNext())
149-
| AppendState.HaveEnumerator2 enum2 ->
150-
let! res = enum2.MoveNext()
151-
return (match res with
152-
| None ->
153-
state := AppendState.Finished
154-
dispose enum2
155-
None
156-
| Some _ ->
157-
res)
158-
| _ ->
159-
return None }
160-
member x.Dispose() =
161-
match !state with
162-
| AppendState.HaveEnumerator1 (enum, _)
163-
| AppendState.HaveEnumerator2 enum ->
164-
state := AppendState.Finished
165-
dispose enum
166-
| _ -> () } }
167-
168-
169-
let delay (f: unit -> AsyncSeq<'T>) : AsyncSeq<'T> =
170-
{ new IAsyncEnumerable<'T> with
171-
member x.GetEnumerator() = f().GetEnumerator() }
246+
AsyncGenerator.append inp1 inp2
247+
248+
// [<RequireQualifiedAccess>]
249+
// type AppendState<'T> =
250+
// | NotStarted1 of AsyncSeq<'T> * AsyncSeq<'T>
251+
// | HaveEnumerator1 of IAsyncEnumerator<'T> * AsyncSeq<'T>
252+
// | NotStarted2 of AsyncSeq<'T>
253+
// | HaveEnumerator2 of IAsyncEnumerator<'T>
254+
// | Finished
255+
//
256+
// let append (inp1: AsyncSeq<'T>) (inp2: AsyncSeq<'T>) : AsyncSeq<'T> =
257+
// { new IAsyncEnumerable<'T> with
258+
// member x.GetEnumerator() =
259+
// let state = ref (AppendState.NotStarted1 (inp1, inp2) )
260+
// { new IAsyncEnumerator<'T> with
261+
// member x.MoveNext() =
262+
// async { match !state with
263+
// | AppendState.NotStarted1 (inp1, inp2) ->
264+
// return!
265+
// (let enum1 = inp1.GetEnumerator()
266+
// state := AppendState.HaveEnumerator1 (enum1, inp2)
267+
// x.MoveNext())
268+
// | AppendState.HaveEnumerator1 (enum1, inp2) ->
269+
// let! res = enum1.MoveNext()
270+
// match res with
271+
// | None ->
272+
// return!
273+
// (state := AppendState.NotStarted2 inp2
274+
// dispose enum1
275+
// x.MoveNext())
276+
// | Some _ ->
277+
// return res
278+
// | AppendState.NotStarted2 inp2 ->
279+
// return!
280+
// (let enum2 = inp2.GetEnumerator()
281+
// state := AppendState.HaveEnumerator2 enum2
282+
// x.MoveNext())
283+
// | AppendState.HaveEnumerator2 enum2 ->
284+
// let! res = enum2.MoveNext()
285+
// return (match res with
286+
// | None ->
287+
// state := AppendState.Finished
288+
// dispose enum2
289+
// None
290+
// | Some _ ->
291+
// res)
292+
// | _ ->
293+
// return None }
294+
// member x.Dispose() =
295+
// match !state with
296+
// | AppendState.HaveEnumerator1 (enum, _)
297+
// | AppendState.HaveEnumerator2 enum ->
298+
// state := AppendState.Finished
299+
// dispose enum
300+
// | _ -> () } }
301+
302+
303+
let inline delay (f: unit -> AsyncSeq<'T>) : AsyncSeq<'T> =
304+
AsyncGenerator.delay f
305+
// { new IAsyncEnumerable<'T> with
306+
// member x.GetEnumerator() = f().GetEnumerator() }
172307

173308

174309
[<RequireQualifiedAccess>]
@@ -220,10 +355,11 @@ module AsyncSeq =
220355
member x.YieldFrom(s:AsyncSeq<'T>) = s
221356
member x.Zero () = empty
222357
member x.Bind (inp:Async<'T>, body : 'T -> AsyncSeq<'U>) : AsyncSeq<'U> = bindAsync body inp
223-
member x.Combine (seq1:AsyncSeq<'T>,seq2:AsyncSeq<'T>) = append seq1 seq2
358+
member x.Combine (seq1:AsyncSeq<'T>, seq2:AsyncSeq<'T>) =
359+
AsyncGenerator.append seq1 seq2
224360
member x.While (guard, body:AsyncSeq<'T>) =
225361
// Use F#'s support for Landin's knot for a low-allocation fixed-point
226-
let rec fix = delay (fun () -> if guard() then append body fix else empty)
362+
let rec fix = delay (fun () -> if guard() then AsyncGenerator.append body fix else empty)
227363
fix
228364
member x.Delay (f:unit -> AsyncSeq<'T>) =
229365
delay f
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
#r @"../../bin/FSharp.Control.AsyncSeq.dll"
2+
#nowarn "40"
3+
#time "on"
4+
5+
open System
6+
open System.Diagnostics
7+
open FSharp.Control
8+
9+
let N = 1000000
10+
11+
let run (test:int -> Async<_>) =
12+
let sw = Stopwatch.StartNew()
13+
test N |> Async.RunSynchronously |> ignore
14+
sw.Stop()
15+
16+
(*
17+
18+
------------------------------------------------------------------------------------------------------------------------
19+
-- append generator
20+
N=5000
21+
unfoldIter
22+
Real: 00:00:03.587, CPU: 00:00:03.578, GC gen0: 346, gen1: 2, gen2: 0
23+
Real: 00:00:00.095, CPU: 00:00:00.093, GC gen0: 4, gen1: 2, gen2: 0
24+
------------------------------------------------------------------------------------------------------------------------
25+
N=1000000
26+
unfoldChooseIter
27+
Real: 00:00:10.818, CPU: 00:00:10.828, GC gen0: 1114, gen1: 3, gen2: 0
28+
------------------------------------------------------------------------------------------------------------------------
29+
N=1000000
30+
unfoldIter
31+
Real: 00:00:08.565, CPU: 00:00:08.562, GC gen0: 889, gen1: 2, gen2: 0
32+
------------------------------------------------------------------------------------------------------------------------
33+
34+
*)
35+
let unfoldIter (N:int) =
36+
let generator state = async {
37+
if state < N then
38+
return Some (state, state + 1)
39+
else
40+
return None }
41+
AsyncSeq.unfoldAsync generator 0
42+
|> AsyncSeq.iterAsync (ignore >> async.Return)
43+
44+
let unfoldChooseIter (N:int) =
45+
let generator state = async {
46+
if state < N then
47+
return Some (state, state + 1)
48+
else
49+
return None }
50+
AsyncSeq.unfoldAsync generator 0
51+
|> AsyncSeq.chooseAsync (Some >> async.Return)
52+
|> AsyncSeq.iterAsync (ignore >> async.Return)
53+
54+
run unfoldIter
55+
//run unfoldChooseIter

tests/FSharp.Control.AsyncSeq.Tests/FSharp.Control.AsyncSeq.Tests.fsproj

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,9 @@
5757
<Import Project="$(FSharpTargetsPath)" />
5858
<Import Project="$(SolutionDir)\.nuget\NuGet.targets" Condition="Exists('$(SolutionDir)\.nuget\NuGet.targets')" />
5959
<ItemGroup>
60-
<Compile Include="AsyncSeqTests.fs" />
6160
<None Include="paket.references" />
61+
<Compile Include="AsyncSeqTests.fs" />
62+
<None Include="AsyncSeqPerf.fsx" />
6263
</ItemGroup>
6364
<ItemGroup>
6465
<Reference Include="mscorlib" />

0 commit comments

Comments
 (0)