Skip to content

Commit 11deb73

Browse files
committed
fix perf of AsyncSeq.ofSeq
1 parent e314f34 commit 11deb73

2 files changed

Lines changed: 106 additions & 33 deletions

File tree

src/FSharp.Control.AsyncSeq/AsyncSeq.fs

Lines changed: 63 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ open System.IO
99
open System.Threading
1010
open System.Threading.Tasks
1111

12+
#nowarn "40"
13+
1214
// ----------------------------------------------------------------------------
1315

1416
type IAsyncEnumerator<'T> =
@@ -185,16 +187,21 @@ module AsyncSeq =
185187
| _ -> () } }
186188

187189

188-
let make (f: unit -> Async<AsyncSeq<'T>>) : AsyncSeq<'T> =
190+
let delay (f: unit -> AsyncSeq<'T>) : AsyncSeq<'T> =
189191
{ new IAsyncEnumerable<'T> with
192+
member x.GetEnumerator() = f().GetEnumerator() }
193+
194+
let bindAsync (f: 'T -> AsyncSeq<'U>) (inp : Async<'T>) : AsyncSeq<'U> =
195+
{ new IAsyncEnumerable<'U> with
190196
member x.GetEnumerator() =
191197
let state = ref -1
192-
let enum = ref Unchecked.defaultof<IAsyncEnumerator<'T>>
193-
{ new IAsyncEnumerator<'T> with
198+
let enum = ref Unchecked.defaultof<IAsyncEnumerator<'U>>
199+
{ new IAsyncEnumerator<'U> with
194200
member x.MoveNext() =
195201
async { match !state with
196202
| -1 ->
197-
let! s = f()
203+
let! v = inp
204+
let s = f v
198205
let e = s.GetEnumerator()
199206
enum := e
200207
state := 0
@@ -219,11 +226,6 @@ module AsyncSeq =
219226
dispose e
220227
| _ -> () } }
221228

222-
let delay (f: unit -> AsyncSeq<'T>) : AsyncSeq<'T> =
223-
make (fun () -> async { return f() })
224-
225-
let bindAsync (f: 'T -> AsyncSeq<'U>) (inp : Async<'T>) : AsyncSeq<'U> =
226-
make (fun () -> async { let! v = inp in return f v })
227229

228230

229231
type AsyncSeqBuilder() =
@@ -237,12 +239,11 @@ module AsyncSeq =
237239
member x.Return _ = empty
238240
member x.YieldFrom(s:AsyncSeq<'T>) = s
239241
member x.Zero () = empty
240-
member x.Bind (inp:Async<'T>, body : 'T -> AsyncSeq<'U>) : AsyncSeq<'U> =
241-
bindAsync body inp
242-
member x.Combine (seq1:AsyncSeq<'T>,seq2:AsyncSeq<'T>) =
243-
append seq1 seq2
242+
member x.Bind (inp:Async<'T>, body : 'T -> AsyncSeq<'U>) : AsyncSeq<'U> = bindAsync body inp
243+
member x.Combine (seq1:AsyncSeq<'T>,seq2:AsyncSeq<'T>) = append seq1 seq2
244244
member x.While (guard, body:AsyncSeq<'T>) =
245-
if guard() then x.Combine(body,x.Delay(fun () -> x.While (guard, body))) else x.Zero()
245+
let rec fix = delay (fun () -> if guard() then append body fix else empty)
246+
fix
246247
member x.Delay (f:unit -> AsyncSeq<'T>) =
247248
delay f
248249

@@ -417,16 +418,53 @@ module AsyncSeq =
417418
x.Dispose()
418419
| _ -> () } }
419420

420-
let iteriAsync f (source: AsyncSeq<_>) = async {
421-
use ie = source.GetEnumerator()
422-
let count = ref 0
423-
let! move = ie.MoveNext()
424-
let b = ref move
425-
while b.Value.IsSome do
426-
do! f !count b.Value.Value
427-
incr count
428-
let! moven = ie.MoveNext()
429-
b := moven }
421+
let ofSeq (inp: seq<'T>) : AsyncSeq<'T> =
422+
{ new IAsyncEnumerable<'T> with
423+
member x.GetEnumerator() =
424+
let state = ref -1
425+
let enum = ref Unchecked.defaultof<System.Collections.Generic.IEnumerator<'T>>
426+
{ new IAsyncEnumerator<'T> with
427+
member x.MoveNext() =
428+
async { match !state with
429+
| -1 ->
430+
enum := inp.GetEnumerator()
431+
state := 0
432+
return! x.MoveNext()
433+
| 0 ->
434+
let e1 = enum.Value
435+
if e1.MoveNext() then
436+
return Some enum.Value.Current
437+
else
438+
enum := Unchecked.defaultof<_>
439+
dispose e1
440+
state := 1
441+
return None
442+
| 1 ->
443+
return None
444+
| _ ->
445+
return! invalidOp "enumerator already finished" }
446+
member x.Dispose() =
447+
match !state with
448+
| 0 ->
449+
let e = enum.Value
450+
state := 1
451+
enum := Unchecked.defaultof<_>
452+
dispose e
453+
| _ -> () } }
454+
455+
let iteriAsync f (source : AsyncSeq<_>) =
456+
async {
457+
use ie = source.GetEnumerator()
458+
let count = ref 0
459+
let! move = ie.MoveNext()
460+
let b = ref move
461+
while b.Value.IsSome do
462+
do! f !count b.Value.Value
463+
let! moven = ie.MoveNext()
464+
do incr count
465+
b := moven
466+
}
467+
430468

431469
let iterAsync (f: 'T -> Async<unit>) (inp: AsyncSeq<'T>) = iteriAsync (fun i x -> f x) inp
432470

@@ -447,11 +485,7 @@ module AsyncSeq =
447485
if box resource <> null then dispose resource)
448486

449487
member x.For(seq:seq<'T>, action:'T -> AsyncSeq<'TResult>) =
450-
x.Delay(fun () ->
451-
let enum = seq.GetEnumerator()
452-
x.TryFinally(x.While((fun () -> enum.MoveNext()), x.Delay(fun () ->
453-
action enum.Current)), (fun () ->
454-
if enum <> null then enum.Dispose() )))
488+
collect action (ofSeq seq)
455489

456490
member x.For (seq:AsyncSeq<'T>, action:'T -> AsyncSeq<'TResult>) =
457491
collect action seq
@@ -608,9 +642,6 @@ module AsyncSeq =
608642
// --------------------------------------------------------------------------
609643
// Converting from/to synchronous sequences or IObservables
610644

611-
let ofSeq (source : seq<'T>) = asyncSeq {
612-
for el in source do
613-
yield el }
614645

615646
let ofObservableBuffered (source : System.IObservable<_>) =
616647
asyncSeq {

tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,10 @@
1-
module AsyncSeqTests
1+
#if INTERACTIVE
2+
#r @"../../bin/FSharp.Control.AsyncSeq.dll"
3+
#r @"../../packages/NUnit/lib/nunit.framework.dll"
4+
#time "on"
5+
#else
6+
module AsyncSeqTests
7+
#endif
28

39
open NUnit.Framework
410
open FSharp.Control
@@ -699,3 +705,39 @@ let ``asyncSeq.For should delay``() =
699705
interface System.Collections.IEnumerable with
700706
member x.GetEnumerator() = failwith "fail" }
701707
Assert.DoesNotThrow(fun _ -> asyncSeq.For(s, (fun v -> AsyncSeq.empty)) |> ignore)
708+
709+
710+
711+
let empty = async { return () }
712+
let perfTest1() =
713+
Seq.init 6000 id
714+
|> AsyncSeq.ofSeq
715+
|> AsyncSeq.iterAsync (fun _ -> empty )
716+
|> Async.RunSynchronously
717+
718+
let perfTest2 n =
719+
Seq.init n id
720+
|> AsyncSeq.ofSeq
721+
|> AsyncSeq.toArray
722+
723+
//perfTest2 1000
724+
//perfTest2 2000
725+
//perfTest2 3000
726+
//perfTest2 4000
727+
//perfTest2 1000000
728+
729+
// 1000 - 0.227 - 0.038
730+
// 1000 - 0.905
731+
// 3000 - 2.154
732+
// 4000 - 3.757
733+
// 5000 - 6.197
734+
735+
//perfTest1()
736+
//1000 - 0.244
737+
//2000 - 0.922
738+
//3000 - 2.091
739+
//4000 - 3.811
740+
//5000 - 6.311
741+
//6000 - 10.071
742+
743+

0 commit comments

Comments
 (0)