Skip to content

Commit 98406ad

Browse files
committed
AsyncSeq.zipWithAsync + testa
1 parent c10ac2d commit 98406ad

2 files changed

Lines changed: 205 additions & 63 deletions

File tree

src/FSharpx.Async/AsyncSeq.fs

Lines changed: 70 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ module AsyncSeq =
3434
/// Creates an asynchronous sequence that generates a single element and then ends
3535
let singleton (v:'T) : AsyncSeq<'T> =
3636
async { return Cons(v, empty) }
37-
37+
3838
/// Yields all elements of the first asynchronous sequence and then
3939
/// all elements of the second asynchronous sequence.
4040
let rec append (seq1: AsyncSeq<'T>) (seq2: AsyncSeq<'T>) : AsyncSeq<'T> =
@@ -280,6 +280,15 @@ module AsyncSeq =
280280
let filter f (input : AsyncSeq<'T>) =
281281
filterAsync (f >> async.Return) input
282282

283+
/// Generates an async sequence using the specified generator function.
284+
let rec unfoldAsync (f:'State -> Async<('T * 'State) option>) (s:'State) : AsyncSeq<'T> = asyncSeq {
285+
let! r = f s
286+
match r with
287+
| Some (a,s) ->
288+
yield a
289+
yield! unfoldAsync f s
290+
| None -> () }
291+
283292
// --------------------------------------------------------------------------
284293
// Converting from/to synchronous sequences or IObservables
285294

@@ -410,8 +419,19 @@ module AsyncSeq =
410419

411420
// --------------------------------------------------------------------------
412421

422+
/// Threads a state through the mapping over an async sequence using an async function.
423+
let rec threadStateAsync (f:'s -> 'a -> Async<'b * 's>) (st:'s) (s:AsyncSeq<'a>) : AsyncSeq<'b> = asyncSeq {
424+
let! s = s
425+
match s with
426+
| Nil -> ()
427+
| Cons(a,tl) ->
428+
let! b,st' = f st a
429+
yield b
430+
yield! threadStateAsync f st' tl }
431+
413432
/// Combines two asynchronous sequences into a sequence of pairs.
414433
/// The values from sequences are retrieved in parallel.
434+
/// The resulting sequence stops when either of the argument sequences stop.
415435
let rec zip (input1 : AsyncSeq<'T1>) (input2 : AsyncSeq<'T2>) : AsyncSeq<_> = async {
416436
let! ft = input1 |> Async.StartChild
417437
let! s = input2
@@ -421,6 +441,52 @@ module AsyncSeq =
421441
return Cons( (hf, hs), zip tf ts)
422442
| _ -> return Nil }
423443

444+
/// Combines two asynchronous sequences using the specified function.
445+
/// The values from sequences are retrieved in parallel.
446+
/// The resulting sequence stops when either of the argument sequences stop.
447+
let rec zipWithAsync (z:'a -> 'b -> Async<'c>) (a:AsyncSeq<'a>) (b:AsyncSeq<'b>) : AsyncSeq<'c> = async {
448+
let! a,b = Async.Parallel(a, b)
449+
match a,b with
450+
| Cons(a, atl), Cons(b, btl) ->
451+
let! c = z a b
452+
return Cons(c, zipWithAsync z atl btl)
453+
| _ -> return Nil }
454+
455+
/// Combines two asynchronous sequences using the specified function.
456+
/// The values from sequences are retrieved in parallel.
457+
/// The resulting sequence stops when either of the argument sequences stop.
458+
let inline zipWith (z:'a -> 'b -> 'c) (a:AsyncSeq<'a>) (b:AsyncSeq<'b>) : AsyncSeq<'c> =
459+
zipWithAsync (fun a b -> z a b |> async.Return) a b
460+
461+
/// Combines two asynchronous sequences using the specified function to which it also passes the index.
462+
/// The values from sequences are retrieved in parallel.
463+
/// The resulting sequence stops when either of the argument sequences stop.
464+
let inline zipWithIndexAsync (f:int -> 'a -> Async<'b>) (s:AsyncSeq<'a>) : AsyncSeq<'b> =
465+
threadStateAsync (fun i a -> f i a |> Async.map (fun b -> b,i + 1)) 0 s
466+
467+
/// Feeds an async sequence of values into an async sequence of async functions.
468+
let inline applyAsync (fs:AsyncSeq<'a -> Async<'b>>) (s:AsyncSeq<'a>) : AsyncSeq<'b> =
469+
zipWithAsync ((|>)) s fs
470+
471+
/// Feeds an async sequence of values into an async sequence of functions.
472+
let inline apply (fs:AsyncSeq<'a -> 'b>) (s:AsyncSeq<'a>) : AsyncSeq<'b> =
473+
zipWith ((|>)) s fs
474+
475+
/// Returns an async computation which iterates the async sequence for
476+
/// its side-effects.
477+
let inline runAsync (s:AsyncSeq<unit>) : Async<unit> =
478+
s |> iter id
479+
480+
/// Iterates an async sequence using the specified function until it returns false.
481+
let inline iterWhileAsync (f:'a -> Async<bool>) (s:AsyncSeq<'a>) : Async<unit> =
482+
let rec sink() = asyncSeq {
483+
let c = new AsyncResultCell<_>()
484+
yield (fun a -> f a |> Async.map (fun f -> c.RegisterResult(AsyncOk f, true)))
485+
let! flag = c.AsyncResult
486+
if flag then yield! sink()
487+
else () }
488+
applyAsync (sink()) s |> runAsync
489+
424490
/// Returns elements from an asynchronous sequence while the specified
425491
/// predicate holds. The predicate is evaluated asynchronously.
426492
let rec takeWhileAsync p (input : AsyncSeq<'T>) : AsyncSeq<_> = async {
@@ -489,15 +555,6 @@ module AsyncSeq =
489555
|> fold (fun arr a -> a::arr) []
490556
|> Async.map List.rev
491557

492-
/// Generates an async sequence using the specified generator function.
493-
let rec unfoldAsync (f:'State -> Async<('T * 'State) option>) (s:'State) : AsyncSeq<'T> = asyncSeq {
494-
let! r = f s
495-
match r with
496-
| Some (a,s) ->
497-
yield a
498-
yield! unfoldAsync f s
499-
| None -> () }
500-
501558
/// Flattens an AsyncSeq of sequences.
502559
let rec concatSeq (input:AsyncSeq<#seq<'T>>) : AsyncSeq<'T> = asyncSeq {
503560
let! v = input
@@ -526,7 +583,6 @@ module AsyncSeq =
526583

527584
left
528585

529-
530586
/// Buffer items from the async sequence into buffers of a specified size.
531587
/// The last buffer returned may be less than the specified buffer size.
532588
let rec bufferByCount (bufferSize:int) (s:AsyncSeq<'T>) : AsyncSeq<'T[]> =
@@ -549,7 +605,8 @@ module AsyncSeq =
549605
return! loop tl
550606
}
551607
return! loop s
552-
}
608+
}
609+
553610

554611

555612
[<AutoOpen>]
@@ -570,4 +627,4 @@ module Seq =
570627
/// Converts asynchronous sequence to a synchronous blocking sequence.
571628
/// The elements of the asynchronous sequence are consumed lazily.
572629
let ofAsyncSeq (input : AsyncSeq<'T>) =
573-
AsyncSeq.toBlockingSeq input
630+
AsyncSeq.toBlockingSeq input

tests/FSharpx.Async.Tests/AsyncSeqTests.fs

Lines changed: 135 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -3,82 +3,60 @@
33
open NUnit.Framework
44
open FSharpx.Control
55

6-
[<Test>]
7-
let ``skipping should return all elements after the first non-match``() =
8-
let expected = [ 3; 4 ]
9-
let result =
10-
[ 1; 2; 3; 4 ]
11-
|> AsyncSeq.ofSeq
12-
|> AsyncSeq.skipWhile (fun i -> i <= 2)
13-
|> AsyncSeq.toBlockingSeq
14-
|> Seq.toList
15-
Assert.AreEqual(expected, result)
6+
/// Determines equality of two async sequences by convering them to lists, ignoring side-effects.
7+
let EQ (a:AsyncSeq<'a>) (b:AsyncSeq<'a>) =
8+
let exp = a |> AsyncSeq.toList |> Async.RunSynchronously
9+
let act = b |> AsyncSeq.toList |> Async.RunSynchronously
10+
if (exp = act) then true
11+
else
12+
printfn "expected=%A" exp
13+
printfn "actual=%A" act
14+
false
1615

1716

1817
[<Test>]
19-
let ``AsyncSeq.toArray``() =
20-
18+
let ``AsyncSeq.toArray``() =
19+
let ls = [1;2;3]
2120
let s = asyncSeq {
2221
yield 1
2322
yield 2
2423
yield 3
2524
}
26-
2725
let a = s |> AsyncSeq.toArray |> Async.RunSynchronously |> Array.toList
28-
2926
Assert.True(([1;2;3] = a))
3027

3128

3229
[<Test>]
33-
let ``AsyncSeq.toList``() =
34-
30+
let ``AsyncSeq.toList``() =
3531
let s = asyncSeq {
3632
yield 1
3733
yield 2
3834
yield 3
3935
}
40-
4136
let a = s |> AsyncSeq.toList |> Async.RunSynchronously
42-
4337
Assert.True(([1;2;3] = a))
4438

4539

4640
[<Test>]
47-
let ``AsyncSeq.concatSeq``() =
48-
49-
let s = asyncSeq {
50-
yield [1;2]
51-
yield [3;4]
52-
}
53-
54-
let s =
55-
s
56-
|> AsyncSeq.concatSeq
57-
|> AsyncSeq.toList
58-
|> Async.RunSynchronously
59-
60-
Assert.True(([1;2;3;4] = s))
61-
41+
let ``AsyncSeq.concatSeq``() =
42+
let ls = [ [1;2] ; [3;4] ]
43+
let actual = AsyncSeq.ofSeq ls |> AsyncSeq.concatSeq
44+
let expected = ls |> List.concat |> AsyncSeq.ofSeq
45+
Assert.True(EQ expected actual)
6246

6347

6448
[<Test>]
65-
let ``AsyncSeq.unfoldAsync``() =
66-
67-
let gen s =
68-
if s < 3 then (s,s + 1) |> Some |> async.Return
69-
else None |> async.Return
70-
71-
let s =
72-
AsyncSeq.unfoldAsync gen 0
73-
|> AsyncSeq.toList
74-
|> Async.RunSynchronously
75-
76-
Assert.True(([0;1;2] = s))
77-
49+
let ``AsyncSeq.unfoldAsync``() =
50+
let gen s =
51+
if s < 3 then (s,s + 1) |> Some
52+
else None
53+
let expected = Seq.unfold gen 0 |> AsyncSeq.ofSeq
54+
let actual = AsyncSeq.unfoldAsync (gen >> async.Return) 0
55+
Assert.True(EQ expected actual)
7856

7957

8058
[<Test>]
81-
let ``AsyncSeq.interleave``() =
59+
let ``AsyncSeq.interleave``() =
8260
let s1 = AsyncSeq.ofSeq ["a";"b";"c"]
8361
let s2 = AsyncSeq.ofSeq [1;2;3]
8462
let merged = AsyncSeq.interleave s1 s2 |> AsyncSeq.toList |> Async.RunSynchronously
@@ -88,14 +66,121 @@ let ``AsyncSeq.interleave``() =
8866

8967
[<Test>]
9068
let ``AsyncSeq.bufferByCount``() =
91-
9269
let s = asyncSeq {
9370
yield 1
9471
yield 2
9572
yield 3
9673
yield 4
74+
yield 5
9775
}
98-
9976
let s' = s |> AsyncSeq.bufferByCount 2 |> AsyncSeq.toList |> Async.RunSynchronously
77+
Assert.True(([[|1;2|];[|3;4|];[|5|]] = s'))
78+
79+
80+
[<Test>]
81+
let ``AsyncSeq.zip``() =
82+
let la = [1;2;3;4;5]
83+
let lb = [1;2;3;4;5]
84+
let a = la |> AsyncSeq.ofSeq
85+
let b = lb |> AsyncSeq.ofSeq
86+
let actual = AsyncSeq.zip a b
87+
let expected = List.zip la lb |> AsyncSeq.ofSeq
88+
Assert.True(EQ expected actual)
89+
90+
91+
[<Test>]
92+
let ``AsyncSeq.zipWithAsync``() =
93+
let la = [1;2;3;4;5]
94+
let lb = [1;2;3;4;5]
95+
let a = la |> AsyncSeq.ofSeq
96+
let b = lb |> AsyncSeq.ofSeq
97+
let actual = AsyncSeq.zipWithAsync (fun a b -> a + b |> async.Return) a b
98+
let expected = List.zip la lb |> List.map ((<||) (+)) |> AsyncSeq.ofSeq
99+
Assert.True(EQ expected actual)
100+
101+
102+
[<Test>]
103+
let ``AsyncSeq.iterWhileAsync``() =
104+
let s = [1;2;3;4;5] |> AsyncSeq.ofSeq
105+
let seen = ResizeArray<_>()
106+
s |> AsyncSeq.iterWhileAsync (fun x -> seen.Add(x) ; (x < 3) |> async.Return) |> Async.RunSynchronously
107+
Assert.True((seen |> List.ofSeq = [1;2;3]))
108+
109+
110+
[<Test>]
111+
let ``AsyncSeq.skipWhileAsync``() =
112+
let ls = [1;2;3;4;5]
113+
let p i = i <= 2
114+
let actual = ls |> AsyncSeq.ofSeq |> AsyncSeq.skipWhileAsync (p >> async.Return)
115+
let expected = ls |> Seq.skipWhile p |> AsyncSeq.ofSeq
116+
Assert.True(EQ expected actual)
117+
118+
119+
[<Test>]
120+
let ``AsyncSeq.takeWhileAsync``() =
121+
let ls = [1;2;3;4;5]
122+
let p i = i < 4
123+
let actual = ls |> AsyncSeq.ofSeq |> AsyncSeq.takeWhileAsync (p >> async.Return)
124+
let expected = ls |> Seq.takeWhile p |> AsyncSeq.ofSeq
125+
Assert.True(EQ expected actual)
126+
127+
128+
[<Test>]
129+
let ``AsyncSeq.take``() =
130+
let ls = [1;2;3;4;5]
131+
let c = 3
132+
let actual = ls |> AsyncSeq.ofSeq |> AsyncSeq.take c
133+
let expected = ls |> Seq.take c |> AsyncSeq.ofSeq
134+
Assert.True(EQ expected actual)
135+
136+
137+
[<Test>]
138+
let ``AsyncSeq.skip``() =
139+
let ls = [1;2;3;4;5]
140+
let c = 3
141+
let actual = ls |> AsyncSeq.ofSeq |> AsyncSeq.skip c
142+
let expected = ls |> Seq.skip c |> AsyncSeq.ofSeq
143+
Assert.True(EQ expected actual)
144+
145+
146+
[<Test>]
147+
let ``AsyncSeq.threadStateAsync``() =
148+
let ls = [1;2;3;4;5]
149+
let actual = ls |> AsyncSeq.ofSeq |> AsyncSeq.threadStateAsync (fun i a -> async.Return(i + a, i + 1)) 0
150+
let expected = [1;3;5;7;9] |> AsyncSeq.ofSeq
151+
Assert.True(EQ expected actual)
152+
153+
154+
[<Test>]
155+
let ``AsyncSeq.scanAsync``() =
156+
let ls = [1;2;3;4;5]
157+
let f i a = i + a
158+
let z = 0
159+
let actual = ls |> AsyncSeq.ofSeq |> AsyncSeq.scanAsync (fun i a -> f i a |> async.Return) z
160+
let expected = ls |> Seq.scan f z |> Seq.skip 1 |> AsyncSeq.ofSeq
161+
Assert.True(EQ expected actual)
162+
163+
164+
[<Test>]
165+
let ``AsyncSeq.foldAsync``() =
166+
let ls = [1;2;3;4;5]
167+
let f i a = i + a
168+
let z = 0
169+
let actual = ls |> AsyncSeq.ofSeq |> AsyncSeq.foldAsync (fun i a -> f i a |> async.Return) z |> Async.RunSynchronously
170+
let expected = ls |> Seq.fold f z
171+
Assert.True((expected = actual))
172+
173+
174+
[<Test>]
175+
let ``AsyncSeq.filterAsync``() =
176+
let ls = [1;2;3;4;5]
177+
let p i = i > 3
178+
let actual = ls |> AsyncSeq.ofSeq |> AsyncSeq.filterAsync (p >> async.Return)
179+
let expected = ls |> Seq.filter p |> AsyncSeq.ofSeq
180+
Assert.True(EQ expected actual)
181+
182+
183+
184+
185+
100186

101-
Assert.True(([[|1;2|];[|3;4|]] = s'))

0 commit comments

Comments
 (0)