Skip to content

Commit 24d9dc9

Browse files
committed
Merge pull request #24 from dsyme/merge-perf
Fix perf of AsyncSeq.mergeAll
2 parents 6cda345 + b6cded8 commit 24d9dc9

5 files changed

Lines changed: 278 additions & 104 deletions

File tree

RELEASE_NOTES.md

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,17 @@
1-
### 1.16.0 - 13.05.2015
1+
### 2.0.0 - 28.05.2015
22
* Simplify ofObservableBuffered and toBlockingSeq
33
* Move to IAsyncEnumerable model to support try/finally and try/with
44
* Rename replicate to replicateInfinite
55
* Rename toList to toListAsync
66
* Rename toArray to toArrayAsync
77
* Rename zipWithIndexAsync to mapiAsync
8-
* Add init, initInfinite, initAsync, initInfiniteAsync, replicateInfinite
8+
* Rename interleave to interleaveChoice
9+
* Add interleave
10+
* Add mergeChoice
11+
* Fix performance of mergeAll
12+
* Add init, initInfinite
13+
* Add initAsync, initInfiniteAsync, replicateInfinite
14+
* Add RequireQualifiedAccess to AsyncSeq
915

1016
### 1.15.0 - 30.03.2015
1117
* Add AsyncSeq.getIterator (unblocks use of AsyncSeq in FSharpx.Async)

src/FSharp.Control.AsyncSeq/AssemblyInfo.fs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@ open System.Reflection
44
[<assembly: AssemblyTitleAttribute("FSharp.Control.AsyncSeq")>]
55
[<assembly: AssemblyProductAttribute("FSharp.Control.AsyncSeq")>]
66
[<assembly: AssemblyDescriptionAttribute("Asynchronous sequences for F#")>]
7-
[<assembly: AssemblyVersionAttribute("1.16.0")>]
8-
[<assembly: AssemblyFileVersionAttribute("1.16.0")>]
7+
[<assembly: AssemblyVersionAttribute("2.0.0")>]
8+
[<assembly: AssemblyFileVersionAttribute("2.0.0")>]
99
do ()
1010

1111
module internal AssemblyVersionInformation =
12-
let [<Literal>] Version = "1.16.0"
12+
let [<Literal>] Version = "2.0.0"

src/FSharp.Control.AsyncSeq/AsyncSeq.fs

Lines changed: 84 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -67,54 +67,15 @@ module internal Utils =
6767
{ new IDisposable with
6868
member x.Dispose() = ct.Cancel() }
6969

70-
/// Creates an async computations which runs the specified computations
71-
/// in parallel and returns their results.
72-
static member Parallel(a:Async<'T>, b:Async<'U>) : Async<'T * 'U> = async {
73-
let! a = a |> Async.StartChild
74-
let! b = b |> Async.StartChild
75-
let! a = a
76-
let! b = b
77-
return a,b }
78-
79-
80-
/// Creates an async computation which maps a function f over the
81-
/// value produced by the specified asynchronous computation.
8270
static member map f a = async.Bind(a, f >> async.Return)
8371

84-
/// Creates an async computation which binds the result of the specified
85-
/// async computation to the specified function. The computation produced
86-
/// by the specified function is returned.
87-
static member bind f a = async.Bind(a, f)
88-
89-
/// Creates a computation which produces a tuple consiting of the value produces by the first
90-
/// argument computation to complete and a handle to the other computation. The second computation
91-
/// to complete is memoized.
92-
static member internal chooseBoth (a:Async<'T>) (b:Async<'T>) : Async<'T * Async<'T>> =
93-
Async.FromContinuations <| fun (ok,err,cnc) ->
94-
let state = ref 0
95-
let tcs = TaskCompletionSource<'T>()
96-
let inline ok a =
97-
if (Interlocked.CompareExchange(state, 1, 0) = 0) then
98-
ok (a, tcs.Task |> Async.AwaitTask)
99-
else
100-
tcs.SetResult a
101-
let inline err (ex:exn) =
102-
if (Interlocked.CompareExchange(state, 1, 0) = 0) then err ex
103-
else tcs.SetException ex
104-
let inline cnc ex =
105-
if (Interlocked.CompareExchange(state, 1, 0) = 0) then cnc ex
106-
else tcs.SetCanceled()
107-
Async.StartWithContinuations(a, ok, err, cnc)
108-
Async.StartWithContinuations(b, ok, err, cnc)
109-
110-
static member internal chooseBoths (a:Async<'T>) (b:Async<'U>) : Async<Choice<'T * Async<'U>, 'U * Async<'T>>> =
111-
Async.chooseBoth (a |> Async.map Choice1Of2) (b |> Async.map Choice2Of2)
112-
|> Async.map (fun (first,second) ->
113-
match first with
114-
| Choice1Of2 a -> (a,(second |> Async.map (function Choice2Of2 b -> b | _ -> failwith "invalid state"))) |> Choice1Of2
115-
| Choice2Of2 b -> (b,(second |> Async.map (function Choice1Of2 a -> a | _ -> failwith "invalid state"))) |> Choice2Of2
116-
)
117-
72+
static member internal chooseTasks (a:Task<'T>) (b:Task<'U>) : Async<Choice<'T * Task<'U>, 'U * Task<'T>>> =
73+
async {
74+
let! ct = Async.CancellationToken
75+
let i = Task.WaitAny( [| (a :> Task);(b :> Task) |],ct)
76+
if i = 0 then return (Choice1Of2 (a.Result, b))
77+
elif i = 1 then return (Choice2Of2 (b.Result, a))
78+
else return! failwith (sprintf "unreachable, i = %d" i) }
11879

11980
/// Module with helper functions for working with asynchronous sequences
12081
module AsyncSeq =
@@ -750,16 +711,16 @@ module AsyncSeq =
750711
let ofObservable (source : System.IObservable<'T>) : AsyncSeq<'T> = failwith "no longer supported"
751712

752713
let toObservable (aseq:AsyncSeq<_>) =
753-
let start (obs:IObserver<_>) =
754-
async {
755-
try
756-
for v in aseq do obs.OnNext(v)
757-
obs.OnCompleted()
758-
with e ->
759-
obs.OnError(e) }
760-
|> Async.StartDisposable
761714
{ new IObservable<_> with
762-
member x.Subscribe(obs) = start obs }
715+
member x.Subscribe(obs) =
716+
async {
717+
try
718+
for v in aseq do
719+
obs.OnNext(v)
720+
obs.OnCompleted()
721+
with e ->
722+
obs.OnError(e) }
723+
|> Async.StartDisposable }
763724

764725
let toBlockingSeq (source : AsyncSeq<'T>) =
765726
seq {
@@ -792,10 +753,10 @@ module AsyncSeq =
792753
asyncSeq {
793754
use ie = source.GetEnumerator()
794755
let iRef = ref 0
795-
let lockTaken = ref false
796756
let fin = ref false
797757
while not fin.Value do
798758
let i = iRef.Value
759+
let lockTaken = ref false
799760
try
800761
System.Threading.Monitor.Enter(iRef, lockTaken);
801762
if i >= cache.Count then
@@ -842,7 +803,7 @@ module AsyncSeq =
842803
let zip (source1 : AsyncSeq<'T1>) (source2 : AsyncSeq<'T2>) : AsyncSeq<_> =
843804
zipWithAsync (fun a b -> async.Return (a,b)) source1 source2
844805

845-
let inline zipWith (z:'T1 -> 'T2 -> 'U) (a:AsyncSeq<'T1>) (b:AsyncSeq<'T2>) : AsyncSeq<'U> =
806+
let zipWith (z:'T1 -> 'T2 -> 'U) (a:AsyncSeq<'T1>) (b:AsyncSeq<'T2>) : AsyncSeq<'U> =
846807
zipWithAsync (fun a b -> z a b |> async.Return) a b
847808

848809
let mapiAsync (f:int -> 'T -> Async<'U>) (source:AsyncSeq<'T>) : AsyncSeq<'U> =
@@ -870,14 +831,20 @@ module AsyncSeq =
870831
else
871832
b := None }
872833

834+
let takeWhile p (source : AsyncSeq<'T>) =
835+
takeWhileAsync (p >> async.Return) source
836+
873837
let takeUntilSignal (signal:Async<unit>) (source:AsyncSeq<'T>) : AsyncSeq<'T> = asyncSeq {
874838
use ie = source.GetEnumerator()
875-
let! move = Async.chooseBoths signal (ie.MoveNext())
839+
let! signalT = Async.StartChildAsTask signal
840+
let! moveT = Async.StartChildAsTask (ie.MoveNext())
841+
let! move = Async.chooseTasks signalT moveT
876842
let b = ref move
877843
while (match b.Value with Choice2Of2 (Some _,_) -> true | _ -> false) do
878844
let v,sg = (match b.Value with Choice2Of2 (Some v,sg) -> v,sg | _ -> failwith "unreachable")
879845
yield v
880-
let! move = Async.chooseBoths sg (ie.MoveNext())
846+
let! moveT = Async.StartChildAsTask (ie.MoveNext())
847+
let! move = Async.chooseTasks sg moveT
881848
b := move }
882849

883850
let takeUntil signal source = takeUntilSignal signal source
@@ -901,17 +868,20 @@ module AsyncSeq =
901868

902869
let skipUntilSignal (signal:Async<unit>) (source:AsyncSeq<'T>) : AsyncSeq<'T> = asyncSeq {
903870
use ie = source.GetEnumerator()
904-
let! move = Async.chooseBoths signal (ie.MoveNext())
871+
let! signalT = Async.StartChildAsTask signal
872+
let! moveT = Async.StartChildAsTask (ie.MoveNext())
873+
let! move = Async.chooseTasks signalT moveT
905874
let b = ref move
906875
while (match b.Value with Choice2Of2 (Some _,_) -> true | _ -> false) do
907876
let v,sg = (match b.Value with Choice2Of2 (Some v,sg) -> v,sg | _ -> failwith "unreachable")
908-
let! move = Async.chooseBoths sg (ie.MoveNext())
877+
let! moveT = Async.StartChildAsTask (ie.MoveNext())
878+
let! move = Async.chooseTasks sg moveT
909879
b := move
910880
match b.Value with
911881
| Choice2Of2 (None,_) ->
912882
()
913883
| Choice1Of2 (_,rest) ->
914-
let! move = rest
884+
let! move = Async.AwaitTask rest
915885
let b2 = ref move
916886
// Yield the rest of the sequence
917887
while b2.Value.IsSome do
@@ -923,9 +893,6 @@ module AsyncSeq =
923893

924894
let skipUntil signal source = skipUntilSignal signal source
925895

926-
let takeWhile p (source : AsyncSeq<'T>) =
927-
takeWhileAsync (p >> async.Return) source
928-
929896
let skipWhile p (source : AsyncSeq<'T>) =
930897
skipWhileAsync (p >> async.Return) source
931898

@@ -983,7 +950,7 @@ module AsyncSeq =
983950
let! moven = ie.MoveNext()
984951
b := moven }
985952

986-
let interleave (source1: AsyncSeq<'T1>) (source2: AsyncSeq<'T2>) = asyncSeq {
953+
let interleaveChoice (source1: AsyncSeq<'T1>) (source2: AsyncSeq<'T2>) = asyncSeq {
987954
use ie1 = (source1 |> map Choice1Of2).GetEnumerator()
988955
use ie2 = (source2 |> map Choice2Of2).GetEnumerator()
989956
let! move = ie1.MoveNext()
@@ -998,6 +965,9 @@ module AsyncSeq =
998965
yield! emitEnumerator (if is1.Value then ie2 else ie1)
999966
}
1000967

968+
let interleave (source1:AsyncSeq<'T>) (source2:AsyncSeq<'T>) : AsyncSeq<'T> =
969+
interleaveChoice source1 source2 |> map (function Choice1Of2 x -> x | Choice2Of2 x -> x)
970+
1001971

1002972
let bufferByCount (bufferSize:int) (source:AsyncSeq<'T>) : AsyncSeq<'T[]> =
1003973
if (bufferSize < 1) then invalidArg "bufferSize" "must be positive"
@@ -1019,30 +989,34 @@ module AsyncSeq =
1019989
let mergeChoice (source1:AsyncSeq<'T1>) (source2:AsyncSeq<'T2>) : AsyncSeq<Choice<'T1,'T2>> = asyncSeq {
1020990
use ie1 = source1.GetEnumerator()
1021991
use ie2 = source2.GetEnumerator()
1022-
let! move = Async.chooseBoths (ie1.MoveNext()) (ie2.MoveNext())
992+
let! move1T = Async.StartChildAsTask (ie1.MoveNext())
993+
let! move2T = Async.StartChildAsTask (ie2.MoveNext())
994+
let! move = Async.chooseTasks move1T move2T
1023995
let b = ref move
1024-
while (match b.Value with Choice1Of2 (Some _,_) -> true | Choice2Of2 (Some _,_) -> true | _ -> false) do
996+
while (match b.Value with Choice1Of2 (Some _,_) | Choice2Of2 (Some _,_) -> true | _ -> false) do
1025997
match b.Value with
1026998
| Choice1Of2 (Some v1, rest2) ->
1027999
yield Choice1Of2 v1
1028-
let! move = Async.chooseBoths (ie1.MoveNext()) rest2
1000+
let! move1T = Async.StartChildAsTask (ie1.MoveNext())
1001+
let! move = Async.chooseTasks move1T rest2
10291002
b := move
10301003
| Choice2Of2 (Some v2, rest1) ->
10311004
yield Choice2Of2 v2
1032-
let! move = Async.chooseBoths rest1 (ie2.MoveNext())
1005+
let! move2T = Async.StartChildAsTask (ie2.MoveNext())
1006+
let! move = Async.chooseTasks rest1 move2T
10331007
b := move
10341008
| _ -> failwith "unreachable"
10351009
match b.Value with
10361010
| Choice1Of2 (None, rest2) ->
1037-
let! move2 = rest2
1011+
let! move2 = Async.AwaitTask rest2
10381012
let b2 = ref move2
10391013
while b2.Value.IsSome do
10401014
let v2 = b2.Value.Value
10411015
yield Choice2Of2 v2
10421016
let! move2n = ie2.MoveNext()
10431017
b2 := move2n
10441018
| Choice2Of2 (None, rest1) ->
1045-
let! move1 = rest1
1019+
let! move1 = Async.AwaitTask rest1
10461020
let b1 = ref move1
10471021
while b1.Value.IsSome do
10481022
let v1 = b1.Value.Value
@@ -1054,14 +1028,44 @@ module AsyncSeq =
10541028

10551029
let merge (source1:AsyncSeq<'T>) (source2:AsyncSeq<'T>) : AsyncSeq<'T> =
10561030
mergeChoice source1 source2 |> map (function Choice1Of2 x -> x | Choice2Of2 x -> x)
1057-
1058-
let rec mergeAll (ss:AsyncSeq<'T> list) : AsyncSeq<'T> =
1059-
match ss with
1060-
| [] -> empty
1061-
| [s] -> s
1062-
| [a;b] -> merge a b
1063-
| hd::tl -> merge hd (mergeAll tl)
10641031

1032+
type Disposables<'T when 'T :> IDisposable> (ss: 'T[]) =
1033+
interface System.IDisposable with
1034+
member x.Dispose() =
1035+
let err = ref None
1036+
for i in ss.Length - 1 .. -1 .. 0 do
1037+
try dispose ss.[i] with e -> err := Some e
1038+
match !err with
1039+
| Some e -> raise e
1040+
| None -> ()
1041+
1042+
let mergeAll (ss:AsyncSeq<'T> list) : AsyncSeq<'T> =
1043+
asyncSeq {
1044+
let n = ss.Length
1045+
if n > 0 then
1046+
let ies = [| for source in ss -> source.GetEnumerator() |]
1047+
use _ies = new Disposables<_>(ies)
1048+
let tasks = Array.zeroCreate n
1049+
for i in 0 .. ss.Length - 1 do
1050+
let! task = Async.StartChildAsTask (ies.[i].MoveNext())
1051+
do tasks.[i] <- (task :> Task)
1052+
let fin = ref n
1053+
while fin.Value > 0 do
1054+
let! ct = Async.CancellationToken
1055+
let i = Task.WaitAny(tasks, ct)
1056+
let v = (tasks.[i] :?> Task<'T option>).Result
1057+
match v with
1058+
| Some res ->
1059+
yield res
1060+
let! task = Async.StartChildAsTask (ies.[i].MoveNext())
1061+
do tasks.[i] <- (task :> Task)
1062+
| None ->
1063+
let t = System.Threading.Tasks.TaskCompletionSource()
1064+
tasks.[i] <- (t.Task :> Task) // result never gets set
1065+
fin := fin.Value - 1
1066+
}
1067+
1068+
10651069
let distinctUntilChangedWithAsync (f:'T -> 'T -> Async<bool>) (source:AsyncSeq<'T>) : AsyncSeq<'T> = asyncSeq {
10661070
use ie = source.GetEnumerator()
10671071
let! move = ie.MoveNext()
@@ -1144,6 +1148,6 @@ module Seq =
11441148
let ofAsyncSeq (source : AsyncSeq<'T>) =
11451149
AsyncSeq.toBlockingSeq source
11461150

1147-
11481151
[<assembly:System.Runtime.CompilerServices.InternalsVisibleTo("FSharp.Control.AsyncSeq.Tests")>]
11491152
do ()
1153+

src/FSharp.Control.AsyncSeq/AsyncSeq.fsi

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ type IAsyncEnumerable<'T> =
1717
/// started to give an enumerator for pulling results asynchronously
1818
type AsyncSeq<'T> = IAsyncEnumerable<'T>
1919

20+
[<RequireQualifiedAccess>]
2021
module AsyncSeq =
2122
/// Creates an empty asynchronou sequence that immediately ends
2223
[<GeneralizableValueAttribute>]
@@ -242,7 +243,7 @@ module AsyncSeq =
242243
/// Combines two asynchronous sequences using the specified function.
243244
/// The values from sequences are retrieved in parallel.
244245
/// The resulting sequence stops when either of the argument sequences stop.
245-
val inline zipWith : mapping:('T1 -> 'T2 -> 'U) -> source1:AsyncSeq<'T1> -> source2:AsyncSeq<'T2> -> AsyncSeq<'U>
246+
val zipWith : mapping:('T1 -> 'T2 -> 'U) -> source1:AsyncSeq<'T1> -> source2:AsyncSeq<'T2> -> AsyncSeq<'U>
246247

247248
/// Builds a new asynchronous sequence whose elements are generated by
248249
/// applying the specified function to all elements of the input sequence.
@@ -321,9 +322,13 @@ module AsyncSeq =
321322
/// Flattens an AsyncSeq of sequences.
322323
val concatSeq : source:AsyncSeq<#seq<'T>> -> AsyncSeq<'T>
323324

325+
/// Interleaves two async sequences of the same type into a resulting sequence. The provided
326+
/// sequences are consumed in lock-step.
327+
val interleave : source1:AsyncSeq<'T> -> source2:AsyncSeq<'T> -> AsyncSeq<'T>
328+
324329
/// Interleaves two async sequences into a resulting sequence. The provided
325330
/// sequences are consumed in lock-step.
326-
val interleave : source1:AsyncSeq<'T1> -> source2:AsyncSeq<'T2> -> AsyncSeq<Choice<'T1,'T2>>
331+
val interleaveChoice : source1:AsyncSeq<'T1> -> source2:AsyncSeq<'T2> -> AsyncSeq<Choice<'T1,'T2>>
327332

328333
/// Buffer items from the async sequence into buffers of a specified size.
329334
/// The last buffer returned may be less than the specified buffer size.

0 commit comments

Comments
 (0)