Skip to content

Commit 6a04b4a

Browse files
committed
add new operators and tests
1 parent 03545ad commit 6a04b4a

4 files changed

Lines changed: 469 additions & 162 deletions

File tree

RELEASE_NOTES.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
### 1.16.0 - 13.05.2015
22
* Simplify ofObservableBuffered and toBlockingSeq
33
* Move to IAsyncEnumerable model to support try/finally and try/with
4+
* Rename replicate to replicateInfinite
5+
* Rename toList to toListAsync
6+
* Rename toArray to toArrayAsync
7+
* Rename zipWithIndexAsync to mapiAsync
8+
* Add init, initInfinite, initAsync, initInfiniteAsync, replicateInfinite
49

510
### 1.15.0 - 30.03.2015
611
* Add AsyncSeq.getIterator (unblocks use of AsyncSeq in FSharpx.Async)

src/FSharp.Control.AsyncSeq/AsyncSeq.fs

Lines changed: 81 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ module AsyncSeq =
119119

120120
let private dispose (d:System.IDisposable) = match d with null -> () | _ -> d.Dispose()
121121

122+
122123
[<GeneralizableValue>]
123124
let empty<'T> : AsyncSeq<'T> =
124125
{ new IAsyncEnumerable<'T> with
@@ -248,20 +249,14 @@ module AsyncSeq =
248249

249250
let asyncSeq = new AsyncSeqBuilder()
250251

251-
let rec unfoldAsync (f:'State -> Async<('T * 'State) option>) (s:'State) : AsyncSeq<'T> =
252-
asyncSeq {
253-
let! v = f s
254-
match v with
255-
| None -> ()
256-
| Some (v,s2) ->
257-
yield v
258-
yield! unfoldAsync f s2 }
259-
260-
let rec replicate (v:'T) : AsyncSeq<'T> =
261-
asyncSeq {
262-
yield v
263-
yield! replicate v }
264252

253+
let emitEnumerator (ie: IAsyncEnumerator<'T>) = asyncSeq {
254+
let! moven = ie.MoveNext()
255+
let b = ref moven
256+
while b.Value.IsSome do
257+
yield b.Value.Value
258+
let! moven = ie.MoveNext()
259+
b := moven }
265260

266261
/// Implements the 'TryWith' functionality for computation builder
267262
// this pushes the handler through all the async computations
@@ -467,6 +462,24 @@ module AsyncSeq =
467462
member internal x.For (seq:AsyncSeq<'T>, action:'T -> Async<unit>) =
468463
seq |> iterAsync action
469464

465+
let rec unfoldAsync (f:'State -> Async<('T * 'State) option>) (s:'State) : AsyncSeq<'T> =
466+
asyncSeq {
467+
let! v = f s
468+
match v with
469+
| None -> ()
470+
| Some (v,s2) ->
471+
yield v
472+
yield! unfoldAsync f s2 }
473+
474+
let replicateInfinite (v:'T) : AsyncSeq<'T> =
475+
asyncSeq {
476+
while true do
477+
yield v }
478+
479+
let replicate (count:int) (v:'T) : AsyncSeq<'T> =
480+
asyncSeq {
481+
for i in 1 .. count do
482+
yield v }
470483
// --------------------------------------------------------------------------
471484
// Additional combinators (implemented as async/asyncSeq computations)
472485

@@ -487,26 +500,38 @@ module AsyncSeq =
487500
let! b = f v
488501
if b then yield v }
489502

490-
let lastOrDefault def (source : AsyncSeq<'T>) = async {
503+
let tryLast (source : AsyncSeq<'T>) = async {
491504
use ie = source.GetEnumerator()
492505
let! v = ie.MoveNext()
493506
let b = ref v
494-
let res = ref def
507+
let res = ref None
495508
while b.Value.IsSome do
496-
res := b.Value.Value
509+
res := b.Value
497510
let! moven = ie.MoveNext()
498511
b := moven
499512
return res.Value }
500513

501-
let firstOrDefault def (source : AsyncSeq<'T>) = async {
514+
let lastOrDefault def (source : AsyncSeq<'T>) = async {
515+
let! v = tryLast source
516+
match v with
517+
| None -> return def
518+
| Some v -> return v }
519+
520+
521+
let tryFirst (source : AsyncSeq<'T>) = async {
502522
use ie = source.GetEnumerator()
503523
let! v = ie.MoveNext()
504524
let b = ref v
505525
if b.Value.IsSome then
506-
return b.Value.Value
526+
return b.Value
507527
else
508-
return def }
528+
return None }
509529

530+
let firstOrDefault def (source : AsyncSeq<'T>) = async {
531+
let! v = tryFirst source
532+
match v with
533+
| None -> return def
534+
| Some v -> return v }
510535

511536
let scanAsync f (state:'TState) (source : AsyncSeq<'T>) = asyncSeq {
512537
yield state
@@ -545,6 +570,29 @@ module AsyncSeq =
545570
let scan f (state:'State) (source : AsyncSeq<'T>) =
546571
scanAsync (fun st v -> f st v |> async.Return) state source
547572

573+
let unfold f (state:'State) =
574+
unfoldAsync (f >> async.Return) state
575+
576+
let initInfiniteAsync f =
577+
0L |> unfoldAsync (fun n ->
578+
async { let! x = f n
579+
return Some (x,n+1L) })
580+
581+
let initAsync (count:int64) f =
582+
0L |> unfoldAsync (fun n ->
583+
async {
584+
if n >= count then return None
585+
else
586+
let! x = f n
587+
return Some (x,n+1L) })
588+
589+
590+
let init count f =
591+
initAsync count (f >> async.Return)
592+
593+
let initInfinite f =
594+
initInfiniteAsync (f >> async.Return)
595+
548596
let map f (source : AsyncSeq<'T>) =
549597
mapAsync (f >> async.Return) source
550598

@@ -710,7 +758,7 @@ module AsyncSeq =
710758
else
711759
b := None }
712760

713-
let takeUntil (signal:Async<unit>) (source:AsyncSeq<'T>) : AsyncSeq<'T> = asyncSeq {
761+
let takeUntilSignal (signal:Async<unit>) (source:AsyncSeq<'T>) : AsyncSeq<'T> = asyncSeq {
714762
use ie = source.GetEnumerator()
715763
let! move = Async.chooseBoths signal (ie.MoveNext())
716764
let b = ref move
@@ -720,6 +768,8 @@ module AsyncSeq =
720768
let! move = Async.chooseBoths sg (ie.MoveNext())
721769
b := move }
722770

771+
let takeUntil signal source = takeUntilSignal signal source
772+
723773
let skipWhileAsync p (source : AsyncSeq<'T>) : AsyncSeq<_> = asyncSeq {
724774
use ie = source.GetEnumerator()
725775
let! move = ie.MoveNext()
@@ -737,7 +787,7 @@ module AsyncSeq =
737787
let! moven = ie.MoveNext()
738788
b := moven }
739789

740-
let skipUntil (signal:Async<unit>) (source:AsyncSeq<'T>) : AsyncSeq<'T> = asyncSeq {
790+
let skipUntilSignal (signal:Async<unit>) (source:AsyncSeq<'T>) : AsyncSeq<'T> = asyncSeq {
741791
use ie = source.GetEnumerator()
742792
let! move = Async.chooseBoths signal (ie.MoveNext())
743793
let b = ref move
@@ -759,6 +809,8 @@ module AsyncSeq =
759809
b2 := moven
760810
| Choice2Of2 (Some _,_) -> failwith "unreachable" }
761811

812+
let skipUntil signal source = skipUntilSignal signal source
813+
762814
let takeWhile p (source : AsyncSeq<'T>) =
763815
takeWhileAsync (p >> async.Return) source
764816

@@ -794,7 +846,7 @@ module AsyncSeq =
794846
let! moven = ie.MoveNext()
795847
b := moven }
796848

797-
let toArray (source : AsyncSeq<'T>) : Async<'T[]> = async {
849+
let toArrayAsync (source : AsyncSeq<'T>) : Async<'T[]> = async {
798850
let ra = (new ResizeArray<_>())
799851
use ie = source.GetEnumerator()
800852
let! move = ie.MoveNext()
@@ -805,7 +857,9 @@ module AsyncSeq =
805857
b := moven
806858
return ra.ToArray() }
807859

808-
let toList (source:AsyncSeq<'T>) : Async<'T list> = toArray source |> Async.map Array.toList
860+
let toListAsync (source:AsyncSeq<'T>) : Async<'T list> = toArrayAsync source |> Async.map Array.toList
861+
let toList (source:AsyncSeq<'T>) = toListAsync source |> Async.RunSynchronously
862+
let toArray (source:AsyncSeq<'T>) = toArrayAsync source |> Async.RunSynchronously
809863

810864
let concatSeq (source:AsyncSeq<#seq<'T>>) : AsyncSeq<'T> = asyncSeq {
811865
use ie = source.GetEnumerator()
@@ -827,7 +881,10 @@ module AsyncSeq =
827881
yield b.Value.Value
828882
is1 := not is1.Value
829883
let! moven = (if is1.Value then ie1.MoveNext() else ie2.MoveNext())
830-
b := moven }
884+
b := moven
885+
// emit the rest
886+
yield! emitEnumerator (if is1.Value then ie2 else ie1)
887+
}
831888

832889

833890
let bufferByCount (bufferSize:int) (source:AsyncSeq<'T>) : AsyncSeq<'T[]> =

0 commit comments

Comments
 (0)