Skip to content

Commit 97be330

Browse files
committed
combineLatest
1 parent b740b4a commit 97be330

3 files changed

Lines changed: 72 additions & 10 deletions

File tree

src/FSharp.Control.AsyncSeq/AsyncSeq.fs

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -887,7 +887,7 @@ module AsyncSeq =
887887

888888
let zapp (fs:AsyncSeq<'T -> 'U>) (s:AsyncSeq<'T>) : AsyncSeq<'U> =
889889
zipWith (|>) s fs
890-
890+
891891
let takeWhileAsync p (source : AsyncSeq<'T>) : AsyncSeq<_> = asyncSeq {
892892
use ie = source.GetEnumerator()
893893
let! move = ie.MoveNext()
@@ -1095,9 +1095,7 @@ module AsyncSeq =
10951095
yield! loop None timeoutMs
10961096
}
10971097

1098-
let mergeChoice (source1:AsyncSeq<'T1>) (source2:AsyncSeq<'T2>) : AsyncSeq<Choice<'T1,'T2>> = asyncSeq {
1099-
use ie1 = source1.GetEnumerator()
1100-
use ie2 = source2.GetEnumerator()
1098+
let private mergeChoiceEnum (ie1:IAsyncEnumerator<'T1>) (ie2:IAsyncEnumerator<'T2>) : AsyncSeq<Choice<'T1,'T2>> = asyncSeq {
11011099
let! move1T = Async.StartChildAsTask (ie1.MoveNext())
11021100
let! move2T = Async.StartChildAsTask (ie2.MoveNext())
11031101
let! move = Async.chooseTasks move1T move2T
@@ -1134,6 +1132,10 @@ module AsyncSeq =
11341132
b1 := move1n
11351133
| _ -> failwith "unreachable" }
11361134

1135+
let mergeChoice (source1:AsyncSeq<'T1>) (source2:AsyncSeq<'T2>) : AsyncSeq<Choice<'T1,'T2>> = asyncSeq {
1136+
use ie1 = source1.GetEnumerator()
1137+
use ie2 = source2.GetEnumerator()
1138+
yield! mergeChoiceEnum ie1 ie2 }
11371139

11381140
let merge (source1:AsyncSeq<'T>) (source2:AsyncSeq<'T>) : AsyncSeq<'T> =
11391141
mergeChoice source1 source2 |> map (function Choice1Of2 x -> x | Choice2Of2 x -> x)
@@ -1174,6 +1176,37 @@ module AsyncSeq =
11741176
fin := fin.Value - 1
11751177
}
11761178

1179+
let combineLatestAsync (f:'a -> 'b -> Async<'c>) (source1:AsyncSeq<'a>) (source2:AsyncSeq<'b>) : AsyncSeq<'c> =
1180+
asyncSeq {
1181+
use en1 = source1.GetEnumerator()
1182+
use en2 = source2.GetEnumerator()
1183+
let! a = Async.StartChild (en1.MoveNext())
1184+
let! b = Async.StartChild (en2.MoveNext())
1185+
let! a = a
1186+
let! b = b
1187+
match a,b with
1188+
| Some a, Some b ->
1189+
let! c = f a b
1190+
yield c
1191+
let merged = mergeChoiceEnum en1 en2
1192+
use mergedEnum = merged.GetEnumerator()
1193+
let rec loop (prevA:'a, prevB:'b) = asyncSeq {
1194+
let! next = mergedEnum.MoveNext ()
1195+
match next with
1196+
| None -> ()
1197+
| Some (Choice1Of2 nextA) ->
1198+
let! c = f nextA prevB
1199+
yield c
1200+
yield! loop (nextA,prevB)
1201+
| Some (Choice2Of2 nextB) ->
1202+
let! c = f prevA nextB
1203+
yield c
1204+
yield! loop (prevA,nextB) }
1205+
yield! loop (a,b)
1206+
| _ -> () }
1207+
1208+
let combineLatest (f:'a -> 'b -> 'c) (source1:AsyncSeq<'a>) (source2:AsyncSeq<'b>) : AsyncSeq<'c> =
1209+
combineLatestAsync (fun a b -> f a b |> async.Return) source1 source2
11771210

11781211
let distinctUntilChangedWithAsync (f:'T -> 'T -> Async<bool>) (source:AsyncSeq<'T>) : AsyncSeq<'T> = asyncSeq {
11791212
use ie = source.GetEnumerator()
@@ -1242,7 +1275,6 @@ module AsyncSeq =
12421275
return Choice1Of2 (asyncSeq { for v in res do yield v })
12431276
}
12441277

1245-
12461278
module AsyncSeqSrcImpl =
12471279

12481280
let private createNode () =

src/FSharp.Control.AsyncSeq/AsyncSeq.fsi

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,16 @@ module AsyncSeq =
294294
/// Feeds an async sequence of values into an async sequence of functions.
295295
val zapp : functions:AsyncSeq<('T -> 'U)> -> source:AsyncSeq<'T> -> AsyncSeq<'U>
296296

297+
/// Merges two async sequences using the specified combine function. The resulting async sequence produces an element when either
298+
/// input sequence produces an element, passing the new element from the emitting sequence and the previously emitted element from the other sequence.
299+
/// If either of the input sequences is empty, the resulting sequence is empty.
300+
val combineLatestAsync : combine:('T -> 'U -> Async<'V>) -> source1:AsyncSeq<'T> -> source2:AsyncSeq<'U> -> AsyncSeq<'V>
301+
302+
/// Merges two async sequences using the specified combine function. The resulting async sequence produces an element when either
303+
/// input sequence produces an element, passing the new element from the emitting sequence and the previously emitted element from the other sequence.
304+
/// If either of the input sequences is empty, the resulting sequence is empty.
305+
val combineLatest : combine:('T -> 'U -> 'V) -> source1:AsyncSeq<'T> -> source2:AsyncSeq<'U> -> AsyncSeq<'V>
306+
297307
/// Traverses an async sequence an applies to specified function such that if None is returned the traversal short-circuits
298308
/// and None is returned as the result. Otherwise, the entire sequence is traversed and the result returned as Some.
299309
val traverseOptionAsync : mapping:('T -> Async<'U option>) -> source:AsyncSeq<'T> -> Async<AsyncSeq<'U> option>

tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,18 +35,23 @@ let EQ (a:AsyncSeq<'a>) (b:AsyncSeq<'a>) =
3535

3636
type Assert with
3737
/// Determines equality of two async sequences by convering them to lists, ignoring side-effects.
38+
static member AreEqual (expected:AsyncSeq<'a>, actual:AsyncSeq<'a>, message:string) =
39+
Assert.AreEqual (expected, actual, 2000, exnEq=(fun _ _ -> true), message=message)
40+
/// Determines equality of two async sequences by convering them to lists, ignoring side-effects.
3841
static member AreEqual (expected:AsyncSeq<'a>, actual:AsyncSeq<'a>) =
39-
Assert.AreEqual (expected, actual, 1000, exnEq=(fun _ _ -> true))
42+
Assert.AreEqual (expected, actual, 2000, exnEq=(fun _ _ -> true), message=null)
4043
/// Determines equality of two async sequences by convering them to lists, ignoring side-effects.
4144
static member AreEqual (expected:AsyncSeq<'a>, actual:AsyncSeq<'a>, timeout) =
42-
Assert.AreEqual (expected, actual, timeout=timeout, exnEq=(fun _ _ -> true))
45+
Assert.AreEqual (expected, actual, timeout=timeout, exnEq=(fun _ _ -> true), message=null)
4346
/// Determines equality of two async sequences by convering them to lists, ignoring side-effects.
44-
static member AreEqual (expected:AsyncSeq<'a>, actual:AsyncSeq<'a>, timeout, exnEq:exn -> exn -> bool) =
47+
static member AreEqual (expected:AsyncSeq<'a>, actual:AsyncSeq<'a>, timeout, exnEq:exn -> exn -> bool, message:string) =
4548
let exp = expected |> AsyncSeq.toListAsync |> Async.Catch
4649
let exp = Async.RunSynchronously (exp, timeout)
4750
let act = actual |> AsyncSeq.toListAsync |> Async.Catch
4851
let act = Async.RunSynchronously(act, timeout)
49-
let message = sprintf "expected=%A actual=%A" exp act
52+
let message =
53+
if message = null then sprintf "expected=%A actual=%A" exp act
54+
else sprintf "message=%s expected=%A actual=%A" message exp act
5055
match exp,act with
5156
| Choice1Of2 exp, Choice1Of2 act ->
5257
Assert.True((exp = act), message)
@@ -1219,4 +1224,19 @@ let ``AsyncSeq.groupBy should propagate exception and terminate all groups``() =
12191224
asyncSeq { raise (exn("test")) }
12201225
|> AsyncSeq.groupBy (fun i -> i % 3)
12211226
|> AsyncSeq.mapAsyncParallel (snd >> AsyncSeq.toListAsync)
1222-
Assert.AreEqual(expected, actual)
1227+
Assert.AreEqual(expected, actual)
1228+
1229+
[<Test>]
1230+
let ``AsyncSeq.combineLatest should behave like merge after initial``() =
1231+
for n in 0..20 do
1232+
for m in 0..10 do
1233+
let ls1 = List.init n id
1234+
let ls2 = List.init m id
1235+
// expect each element to increase combined sum by 1
1236+
// expected count is sum of source counts minus 1 for first result
1237+
let expectedCount =
1238+
if n = 0 || m = 0 then 0
1239+
else max (n + m - 1) 0
1240+
let expected = List.init expectedCount id |> AsyncSeq.ofSeq
1241+
let actual = AsyncSeq.combineLatest (+) (AsyncSeq.ofSeq ls1) (AsyncSeq.ofSeq ls2)
1242+
Assert.AreEqual(expected, actual, (sprintf "n=%i m=%i" n m))

0 commit comments

Comments
 (0)