Skip to content

Commit cc05229

Browse files
committed
unfold enumerable
1 parent 89df88c commit cc05229

2 files changed

Lines changed: 53 additions & 96 deletions

File tree

src/FSharp.Control.AsyncSeq/AsyncSeq.fs

Lines changed: 36 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,11 @@ module internal Utils =
4141
| Choice1Of2 a -> f a |> Choice1Of2
4242
| Choice2Of2 e -> Choice2Of2 e
4343

44+
module Disposable =
45+
46+
let empty : IDisposable =
47+
{ new IDisposable with member __.Dispose () = () }
48+
4449
// ----------------------------------------------------------------------------
4550

4651
module internal Observable =
@@ -184,21 +189,6 @@ module AsyncGenerator =
184189
| :? AsyncGeneratorEnumerator<'a> as e -> e.Generator
185190
| _ -> (new AsyncEnumeratorGenerator<_>(e) :> _)
186191

187-
// type private DelayEnumerable<'a> (f:unit -> AsyncSeq<'a>) =
188-
// member x.Delay = f
189-
// interface IAsyncEnumerable<'a> with
190-
// member __.GetEnumerator() =
191-
// let rec unwrap (f:unit -> AsyncSeq<_>) =
192-
// let s = f ()
193-
// match s with
194-
// | :? DelayEnumerable<_> as s -> unwrap s.Delay
195-
// | _ -> s
196-
// let s = unwrap f
197-
// s.GetEnumerator()
198-
199-
// let delay (f:unit -> AsyncSeq<'T>) : AsyncSeq<'T> =
200-
// new DelayEnumerable<'T>(f) :> _
201-
202192
let delay (f:unit -> AsyncSeq<'T>) : AsyncSeq<'T> =
203193
{ new IAsyncEnumerable<'T> with
204194
member x.GetEnumerator() = f().GetEnumerator() }
@@ -216,6 +206,24 @@ module AsyncGenerator =
216206
let append (s1:AsyncSeq<'a>) (s2:AsyncSeq<'a>) : AsyncSeq<'a> =
217207
fromGeneratorDelay (fun () -> bindG (toGenerator s1) (fun () -> toGenerator s2))
218208

209+
[<AutoOpen>]
210+
module AsyncSeqOp =
211+
212+
type UnfoldAsyncEnumerator<'S, 'T> (f:'S -> Async<('T * 'S) option>, init:'S) =
213+
interface IAsyncEnumerable<'T> with
214+
member __.GetEnumerator () =
215+
let s = ref init
216+
{ new IAsyncEnumerator<'T> with
217+
member __.MoveNext () : Async<'T option> = async {
218+
let! next = f !s
219+
match next with
220+
| None ->
221+
return None
222+
| Some (a,s') ->
223+
s := s'
224+
return Some a }
225+
member __.Dispose () = () }
226+
219227

220228

221229
/// Module with helper functions for working with asynchronous sequences
@@ -245,66 +253,8 @@ module AsyncSeq =
245253
let append (inp1: AsyncSeq<'T>) (inp2: AsyncSeq<'T>) : AsyncSeq<'T> =
246254
AsyncGenerator.append inp1 inp2
247255

248-
// [<RequireQualifiedAccess>]
249-
// type AppendState<'T> =
250-
// | NotStarted1 of AsyncSeq<'T> * AsyncSeq<'T>
251-
// | HaveEnumerator1 of IAsyncEnumerator<'T> * AsyncSeq<'T>
252-
// | NotStarted2 of AsyncSeq<'T>
253-
// | HaveEnumerator2 of IAsyncEnumerator<'T>
254-
// | Finished
255-
//
256-
// let append (inp1: AsyncSeq<'T>) (inp2: AsyncSeq<'T>) : AsyncSeq<'T> =
257-
// { new IAsyncEnumerable<'T> with
258-
// member x.GetEnumerator() =
259-
// let state = ref (AppendState.NotStarted1 (inp1, inp2) )
260-
// { new IAsyncEnumerator<'T> with
261-
// member x.MoveNext() =
262-
// async { match !state with
263-
// | AppendState.NotStarted1 (inp1, inp2) ->
264-
// return!
265-
// (let enum1 = inp1.GetEnumerator()
266-
// state := AppendState.HaveEnumerator1 (enum1, inp2)
267-
// x.MoveNext())
268-
// | AppendState.HaveEnumerator1 (enum1, inp2) ->
269-
// let! res = enum1.MoveNext()
270-
// match res with
271-
// | None ->
272-
// return!
273-
// (state := AppendState.NotStarted2 inp2
274-
// dispose enum1
275-
// x.MoveNext())
276-
// | Some _ ->
277-
// return res
278-
// | AppendState.NotStarted2 inp2 ->
279-
// return!
280-
// (let enum2 = inp2.GetEnumerator()
281-
// state := AppendState.HaveEnumerator2 enum2
282-
// x.MoveNext())
283-
// | AppendState.HaveEnumerator2 enum2 ->
284-
// let! res = enum2.MoveNext()
285-
// return (match res with
286-
// | None ->
287-
// state := AppendState.Finished
288-
// dispose enum2
289-
// None
290-
// | Some _ ->
291-
// res)
292-
// | _ ->
293-
// return None }
294-
// member x.Dispose() =
295-
// match !state with
296-
// | AppendState.HaveEnumerator1 (enum, _)
297-
// | AppendState.HaveEnumerator2 enum ->
298-
// state := AppendState.Finished
299-
// dispose enum
300-
// | _ -> () } }
301-
302-
303256
let inline delay (f: unit -> AsyncSeq<'T>) : AsyncSeq<'T> =
304257
AsyncGenerator.delay f
305-
// { new IAsyncEnumerable<'T> with
306-
// member x.GetEnumerator() = f().GetEnumerator() }
307-
308258

309259
[<RequireQualifiedAccess>]
310260
type BindState<'T,'U> =
@@ -660,34 +610,25 @@ module AsyncSeq =
660610
member internal x.For (seq:AsyncSeq<'T>, action:'T -> Async<unit>) =
661611
seq |> iterAsync action
662612

663-
let rec unfoldAsync (f:'State -> Async<('T * 'State) option>) (s:'State) : AsyncSeq<'T> =
664-
asyncSeq {
665-
let s = ref s
666-
let fin = ref false
667-
while not !fin do
668-
let! next = f !s
669-
match next with
670-
| None ->
671-
fin := true
672-
| Some (a,s') ->
673-
yield a
674-
s := s' }
613+
let unfoldAsync (f:'State -> Async<('T * 'State) option>) (s:'State) : AsyncSeq<'T> =
614+
new UnfoldAsyncEnumerator<_, _>(f, s) :> _
675615

676616
let replicateInfinite (v:'T) : AsyncSeq<'T> =
677-
asyncSeq {
678-
while true do
679-
yield v }
617+
let gen _ = async {
618+
return Some (v,0) }
619+
unfoldAsync gen 0
680620

681621
let replicateInfiniteAsync (v:Async<'T>) : AsyncSeq<'T> =
682-
asyncSeq {
683-
while true do
684-
let! v = v
685-
yield v }
622+
let gen _ = async {
623+
let! v = v
624+
return Some (v,0) }
625+
unfoldAsync gen 0
686626

687627
let replicate (count:int) (v:'T) : AsyncSeq<'T> =
688-
asyncSeq {
689-
for i in 1 .. count do
690-
yield v }
628+
let gen i = async {
629+
if i = count then return None
630+
else return Some (v,i + 1) }
631+
unfoldAsync gen 0
691632

692633
let intervalMs (periodMs:int) = asyncSeq {
693634
yield DateTime.UtcNow

tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqPerf.fsx

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,17 @@ N=1000000
3030
unfoldIter
3131
Real: 00:00:08.565, CPU: 00:00:08.562, GC gen0: 889, gen1: 2, gen2: 0
3232
------------------------------------------------------------------------------------------------------------------------
33+
-- handcoded unfold
34+
N=1000000
35+
36+
unfoldIter
37+
Real: 00:00:08.514, CPU: 00:00:08.562, GC gen0: 890, gen1: 3, gen2: 1
38+
Real: 00:00:00.878, CPU: 00:00:00.875, GC gen0: 96, gen1: 2, gen2: 0
39+
40+
replicate
41+
Real: 00:00:01.530, CPU: 00:00:01.531, GC gen0: 156, gen1: 1, gen2: 0
42+
Real: 00:00:00.926, CPU: 00:00:00.937, GC gen0: 97, gen1: 2, gen2: 0
43+
------------------------------------------------------------------------------------------------------------------------
3344
3445
*)
3546
let unfoldIter (N:int) =
@@ -51,5 +62,10 @@ let unfoldChooseIter (N:int) =
5162
|> AsyncSeq.chooseAsync (Some >> async.Return)
5263
|> AsyncSeq.iterAsync (ignore >> async.Return)
5364

54-
run unfoldIter
65+
let replicate (N:int) =
66+
AsyncSeq.replicate N ()
67+
|> AsyncSeq.iterAsync (ignore >> async.Return)
68+
69+
//run unfoldIter
5570
//run unfoldChooseIter
71+
run replicate

0 commit comments

Comments
 (0)