Skip to content

Commit 7a7ce72

Browse files
committed
AsyncSeq.merge
1 parent 98406ad commit 7a7ce72

3 files changed

Lines changed: 61 additions & 3 deletions

File tree

src/FSharpx.Async/Async.fs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
namespace FSharpx.Control
66
open System
77
open System.Threading
8+
open System.Threading.Tasks
89

910
// ----------------------------------------------------------------------------
1011

@@ -92,4 +93,25 @@ module AsyncExtensions =
9293
static member bindChoices (f:'a -> Async<Choice<'b, 'e2>>) (a:Async<Choice<'a, 'e1>>) : Async<Choice<'b, Choice<'e1, 'e2>>> =
9394
a |> Async.bind (function
9495
| Choice1Of2 a' -> f a' |> Async.map (function Choice1Of2 b -> Choice1Of2 b | Choice2Of2 e2 -> Choice2Of2 (Choice2Of2 e2))
95-
| Choice2Of2 e1 -> Choice2Of2 (Choice1Of2 e1) |> async.Return)
96+
| Choice2Of2 e1 -> Choice2Of2 (Choice1Of2 e1) |> async.Return)
97+
98+
/// Creates a computation which produces a tuple consiting of the value produces by the first
99+
/// argument computation to complete and a handle to the other computation. The second computation
100+
/// to complete is memoized.
101+
static member internal chooseBoth (a:Async<'a>) (b:Async<'a>) : Async<'a * Async<'a>> =
102+
Async.FromContinuations <| fun (ok,err,cnc) ->
103+
let state = ref 0
104+
let tcs = TaskCompletionSource<'a>()
105+
let inline ok a =
106+
if (Interlocked.CompareExchange(state, 1, 0) = 0) then
107+
ok (a, tcs.Task |> Async.AwaitTask)
108+
else
109+
tcs.SetResult a
110+
let inline err (ex:exn) =
111+
if (Interlocked.CompareExchange(state, 1, 0) = 0) then err ex
112+
else tcs.SetException ex
113+
let inline cnc ex =
114+
if (Interlocked.CompareExchange(state, 1, 0) = 0) then cnc ex
115+
else tcs.SetCanceled()
116+
Async.StartWithContinuations(a, ok, err, cnc)
117+
Async.StartWithContinuations(b, ok, err, cnc)

src/FSharpx.Async/AsyncSeq.fs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -605,7 +605,23 @@ module AsyncSeq =
605605
return! loop tl
606606
}
607607
return! loop s
608-
}
608+
}
609+
610+
/// Merges two async sequences into an async sequence non-deterministically.
611+
let rec merge (a:AsyncSeq<'a>) (b:AsyncSeq<'a>) : AsyncSeq<'a> = async {
612+
let! one,other = Async.chooseBoth a b
613+
match one with
614+
| Nil -> return! other
615+
| Cons(hd,tl) ->
616+
return Cons(hd, merge tl other) }
617+
618+
/// Merges all specified async sequences into an async sequence non-deterministically.
619+
let rec mergeAll (ss:AsyncSeq<'a> list) : AsyncSeq<'a> =
620+
match ss with
621+
| [] -> empty
622+
| [s] -> s
623+
| [a;b] -> merge a b
624+
| hd::tl -> merge hd (mergeAll tl)
609625

610626

611627

tests/FSharpx.Async.Tests/AsyncSeqTests.fs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,6 @@ let ``AsyncSeq.interleave``() =
6060
let s1 = AsyncSeq.ofSeq ["a";"b";"c"]
6161
let s2 = AsyncSeq.ofSeq [1;2;3]
6262
let merged = AsyncSeq.interleave s1 s2 |> AsyncSeq.toList |> Async.RunSynchronously
63-
printfn "%A" merged
6463
Assert.True([Choice1Of2 "a" ; Choice2Of2 1 ; Choice1Of2 "b" ; Choice2Of2 2 ; Choice1Of2 "c" ; Choice2Of2 3] = merged)
6564

6665

@@ -180,6 +179,27 @@ let ``AsyncSeq.filterAsync``() =
180179
Assert.True(EQ expected actual)
181180

182181

182+
[<Test>]
183+
let ``AsyncSeq.merge``() =
184+
let ls1 = [1;2;3;4;5]
185+
let ls2 = [6;7;8;9;10]
186+
let actual = AsyncSeq.merge (AsyncSeq.ofSeq ls1) (AsyncSeq.ofSeq ls2) |> AsyncSeq.toList |> Async.RunSynchronously |> Set.ofList
187+
let expected = ls1 @ ls2 |> Set.ofList
188+
Assert.True((expected = actual))
189+
190+
191+
[<Test>]
192+
let ``AsyncSeq.merge should be fair``() =
193+
let s1 = asyncSeq {
194+
do! Async.Sleep 10
195+
yield 1
196+
}
197+
let s2 = asyncSeq {
198+
yield 2
199+
}
200+
let actual = AsyncSeq.merge s1 s2
201+
let expected = [2;1] |> AsyncSeq.ofSeq
202+
Assert.True(EQ expected actual)
183203

184204

185205

0 commit comments

Comments
 (0)