Skip to content

Commit 0e1661b

Browse files
committed
Merge pull request #20 from eulerfx/master
Async.bindChoice, AsyncSeq.zipWithAsync, AsyncSeq.merge, etc
2 parents fa32ff0 + f22e96b commit 0e1661b

6 files changed

Lines changed: 428 additions & 67 deletions

File tree

RELEASE_NOTES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
### 1.11.1 - 26.03.2015
22
* BUGFIX: AsyncSeq.toBlockingSeq does not hung forever if an exception is thrown and reraise it outside - https://github.com/fsprojects/FSharpx.Async/pull/21
3+
* Added Async.bindChoice, Async.ParallelIgnore, AsyncSeq.zipWithAsync, AsyncSeq.zappAsync, AsyncSeq.threadStateAsync, AsyncSeq.merge, AsyncSeq.traverseOptionAsync, AsyncSeq.traverseChoiceAsync
34

45
### 1.11.0 - 27.02.2015
56
* Added Async.map, Async.bind, Async.unit

src/FSharpx.Async/Async.fs

Lines changed: 89 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
namespace FSharpx.Control
66
open System
77
open System.Threading
8+
open System.Threading.Tasks
89

910
// ----------------------------------------------------------------------------
1011

@@ -60,6 +61,50 @@ module AsyncExtensions =
6061
let! c = c
6162
return a,b,c }
6263

64+
/// Creates an async computation which runs the provided sequence of computations and completes
65+
/// when all computations in the sequence complete. Up to parallelism computations will
66+
/// be in-flight at any given point in time. Error or cancellation of any computation in
67+
/// the sequence causes the resulting computation to error or cancel, respectively.
68+
static member ParallelIgnore (parallelism:int) (xs:seq<Async<_>>) = async {
69+
70+
let sm = new SemaphoreSlim(parallelism)
71+
let cde = new CountdownEvent(1)
72+
let tcs = new TaskCompletionSource<unit>()
73+
74+
let inline ok _ =
75+
sm.Release() |> ignore
76+
if (cde.Signal()) then
77+
tcs.SetResult(())
78+
79+
let inline err (ex:exn) =
80+
tcs.SetException ex
81+
sm.Release() |> ignore
82+
83+
let inline cnc (ex:OperationCanceledException) =
84+
tcs.SetCanceled()
85+
sm.Release() |> ignore
86+
87+
try
88+
89+
for computation in xs do
90+
sm.Wait()
91+
cde.AddCount(1)
92+
// the following decreases throughput 3x but avoids blocking
93+
// do! sm.WaitAsync() |> Async.AwaitTask
94+
Async.StartWithContinuations(computation, ok, err, cnc)
95+
96+
if (cde.Signal()) then
97+
tcs.SetResult(())
98+
99+
do! tcs.Task |> Async.AwaitTask
100+
101+
finally
102+
103+
cde.Dispose()
104+
sm.Dispose()
105+
106+
}
107+
63108
/// An async computation which does nothing.
64109
static member inline unit = AsyncOps.unit
65110

@@ -70,4 +115,47 @@ module AsyncExtensions =
70115
/// Creates an async computation which binds the result of the specified
71116
/// async computation to the specified function. The computation produced
72117
/// by the specified function is returned.
73-
static member inline bind f a = async.Bind(a, f)
118+
static member inline bind f a = async.Bind(a, f)
119+
120+
/// Maps over an async computation which produces a choice value
121+
/// using a function which maps over Choice1Of2 and itself returns a choice.
122+
/// A value of Choice2Of2 is treated like an error and passed through.
123+
static member mapChoice (f:'a -> Choice<'b, 'e>) (a:Async<Choice<'a, 'e>>) : Async<Choice<'b, 'e>> =
124+
a |> Async.map (function
125+
| Choice1Of2 a' -> f a'
126+
| Choice2Of2 e -> Choice2Of2 e)
127+
128+
/// Binds an async computation producing a choice value to another async
129+
/// computation producing a choice such that a Choice2Of2 value is passed through.
130+
static member bindChoice (f:'a -> Async<Choice<'b, 'e>>) (a:Async<Choice<'a, 'e>>) : Async<Choice<'b, 'e>> =
131+
a |> Async.bind (function
132+
| Choice1Of2 a' -> f a'
133+
| Choice2Of2 e -> Choice2Of2 e |> async.Return)
134+
135+
/// Binds an async computation producing a choice value to another async
136+
/// computation producing a choice such that a Choice2Of2 value is passed through.
137+
static member bindChoices (f:'a -> Async<Choice<'b, 'e2>>) (a:Async<Choice<'a, 'e1>>) : Async<Choice<'b, Choice<'e1, 'e2>>> =
138+
a |> Async.bind (function
139+
| Choice1Of2 a' -> f a' |> Async.map (function Choice1Of2 b -> Choice1Of2 b | Choice2Of2 e2 -> Choice2Of2 (Choice2Of2 e2))
140+
| Choice2Of2 e1 -> Choice2Of2 (Choice1Of2 e1) |> async.Return)
141+
142+
/// Creates a computation which produces a tuple consiting of the value produces by the first
143+
/// argument computation to complete and a handle to the other computation. The second computation
144+
/// to complete is memoized.
145+
static member internal chooseBoth (a:Async<'a>) (b:Async<'a>) : Async<'a * Async<'a>> =
146+
Async.FromContinuations <| fun (ok,err,cnc) ->
147+
let state = ref 0
148+
let tcs = TaskCompletionSource<'a>()
149+
let inline ok a =
150+
if (Interlocked.CompareExchange(state, 1, 0) = 0) then
151+
ok (a, tcs.Task |> Async.AwaitTask)
152+
else
153+
tcs.SetResult a
154+
let inline err (ex:exn) =
155+
if (Interlocked.CompareExchange(state, 1, 0) = 0) then err ex
156+
else tcs.SetException ex
157+
let inline cnc ex =
158+
if (Interlocked.CompareExchange(state, 1, 0) = 0) then cnc ex
159+
else tcs.SetCanceled()
160+
Async.StartWithContinuations(a, ok, err, cnc)
161+
Async.StartWithContinuations(b, ok, err, cnc)

src/FSharpx.Async/AsyncSeq.fs

Lines changed: 100 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ namespace FSharpx.Control
77
open System
88
open System.Threading
99
open System.IO
10+
open FSharpx.Control.Utils
1011

1112
// ----------------------------------------------------------------------------
1213

@@ -34,6 +35,17 @@ module AsyncSeq =
3435
/// Creates an asynchronous sequence that generates a single element and then ends
3536
let singleton (v:'T) : AsyncSeq<'T> =
3637
async { return Cons(v, empty) }
38+
39+
/// Generates an async sequence using the specified generator function.
40+
let rec unfoldAsync (f:'State -> Async<('T * 'State) option>) (s:'State) : AsyncSeq<'T> =
41+
f s
42+
|> Async.map (function
43+
| Some (a,s) -> Cons(a, unfoldAsync f s)
44+
| None -> Nil)
45+
46+
/// Creates an async sequence which repeats the specified value indefinitely.
47+
let rec replicate (v:'T) : AsyncSeq<'T> =
48+
Cons(v, async.Delay <| fun() -> replicate v) |> async.Return
3749

3850
/// Yields all elements of the first asynchronous sequence and then
3951
/// all elements of the second asynchronous sequence.
@@ -413,8 +425,19 @@ module AsyncSeq =
413425

414426
// --------------------------------------------------------------------------
415427

428+
/// Threads a state through the mapping over an async sequence using an async function.
429+
let rec threadStateAsync (f:'s -> 'a -> Async<'b * 's>) (st:'s) (s:AsyncSeq<'a>) : AsyncSeq<'b> = asyncSeq {
430+
let! s = s
431+
match s with
432+
| Nil -> ()
433+
| Cons(a,tl) ->
434+
let! b,st' = f st a
435+
yield b
436+
yield! threadStateAsync f st' tl }
437+
416438
/// Combines two asynchronous sequences into a sequence of pairs.
417439
/// The values from sequences are retrieved in parallel.
440+
/// The resulting sequence stops when either of the argument sequences stop.
418441
let rec zip (input1 : AsyncSeq<'T1>) (input2 : AsyncSeq<'T2>) : AsyncSeq<_> = async {
419442
let! ft = input1 |> Async.StartChild
420443
let! s = input2
@@ -424,6 +447,65 @@ module AsyncSeq =
424447
return Cons( (hf, hs), zip tf ts)
425448
| _ -> return Nil }
426449

450+
/// Combines two asynchronous sequences using the specified function.
451+
/// The values from sequences are retrieved in parallel.
452+
/// The resulting sequence stops when either of the argument sequences stop.
453+
let rec zipWithAsync (z:'a -> 'b -> Async<'c>) (a:AsyncSeq<'a>) (b:AsyncSeq<'b>) : AsyncSeq<'c> = async {
454+
let! a,b = Async.Parallel(a, b)
455+
match a,b with
456+
| Cons(a, atl), Cons(b, btl) ->
457+
let! c = z a b
458+
return Cons(c, zipWithAsync z atl btl)
459+
| _ -> return Nil }
460+
461+
/// Combines two asynchronous sequences using the specified function.
462+
/// The values from sequences are retrieved in parallel.
463+
/// The resulting sequence stops when either of the argument sequences stop.
464+
let inline zipWith (z:'a -> 'b -> 'c) (a:AsyncSeq<'a>) (b:AsyncSeq<'b>) : AsyncSeq<'c> =
465+
zipWithAsync (fun a b -> z a b |> async.Return) a b
466+
467+
/// Combines two asynchronous sequences using the specified function to which it also passes the index.
468+
/// The values from sequences are retrieved in parallel.
469+
/// The resulting sequence stops when either of the argument sequences stop.
470+
let inline zipWithIndexAsync (f:int -> 'a -> Async<'b>) (s:AsyncSeq<'a>) : AsyncSeq<'b> =
471+
threadStateAsync (fun i a -> f i a |> Async.map (fun b -> b,i + 1)) 0 s
472+
473+
/// Feeds an async sequence of values into an async sequence of async functions.
474+
let inline zappAsync (fs:AsyncSeq<'a -> Async<'b>>) (s:AsyncSeq<'a>) : AsyncSeq<'b> =
475+
zipWithAsync ((|>)) s fs
476+
477+
/// Feeds an async sequence of values into an async sequence of functions.
478+
let inline zapp (fs:AsyncSeq<'a -> 'b>) (s:AsyncSeq<'a>) : AsyncSeq<'b> =
479+
zipWith ((|>)) s fs
480+
481+
/// Traverses an async sequence an applies to specified function such that if None is returned the traversal short-circuits
482+
/// and None is returned as the result. Otherwise, the entire sequence is traversed and the result returned as Some.
483+
let rec traverseOptionAsync (f:'a -> Async<'b option>) (s:AsyncSeq<'a>) : Async<AsyncSeq<'b> option> = async {
484+
let! s = s
485+
match s with
486+
| Nil -> return Some (Nil |> async.Return)
487+
| Cons(a,tl) ->
488+
let! b = f a
489+
match b with
490+
| Some b ->
491+
return! traverseOptionAsync f tl |> Async.map (Option.map (fun tl -> Cons(b, tl) |> async.Return))
492+
| None ->
493+
return None }
494+
495+
/// Traverses an async sequence an applies to specified function such that if Choice2Of2 is returned the traversal short-circuits
496+
/// and Choice2Of2 is returned as the result. Otherwise, the entire sequence is traversed and the result returned as Choice1Of2.
497+
let rec traverseChoiceAsync (f:'a -> Async<Choice<'b, 'e>>) (s:AsyncSeq<'a>) : Async<Choice<AsyncSeq<'b>, 'e>> = async {
498+
let! s = s
499+
match s with
500+
| Nil -> return Choice1Of2 (Nil |> async.Return)
501+
| Cons(a,tl) ->
502+
let! b = f a
503+
match b with
504+
| Choice1Of2 b ->
505+
return! traverseChoiceAsync f tl |> Async.map (Choice.mapl (fun tl -> Cons(b, tl) |> async.Return))
506+
| Choice2Of2 e ->
507+
return Choice2Of2 e }
508+
427509
/// Returns elements from an asynchronous sequence while the specified
428510
/// predicate holds. The predicate is evaluated asynchronously.
429511
let rec takeWhileAsync p (input : AsyncSeq<'T>) : AsyncSeq<_> = async {
@@ -492,15 +574,6 @@ module AsyncSeq =
492574
|> fold (fun arr a -> a::arr) []
493575
|> Async.map List.rev
494576

495-
/// Generates an async sequence using the specified generator function.
496-
let rec unfoldAsync (f:'State -> Async<('T * 'State) option>) (s:'State) : AsyncSeq<'T> = asyncSeq {
497-
let! r = f s
498-
match r with
499-
| Some (a,s) ->
500-
yield a
501-
yield! unfoldAsync f s
502-
| None -> () }
503-
504577
/// Flattens an AsyncSeq of sequences.
505578
let rec concatSeq (input:AsyncSeq<#seq<'T>>) : AsyncSeq<'T> = asyncSeq {
506579
let! v = input
@@ -529,7 +602,6 @@ module AsyncSeq =
529602

530603
left
531604

532-
533605
/// Buffer items from the async sequence into buffers of a specified size.
534606
/// The last buffer returned may be less than the specified buffer size.
535607
let rec bufferByCount (bufferSize:int) (s:AsyncSeq<'T>) : AsyncSeq<'T[]> =
@@ -552,7 +624,24 @@ module AsyncSeq =
552624
return! loop tl
553625
}
554626
return! loop s
555-
}
627+
}
628+
629+
/// Merges two async sequences into an async sequence non-deterministically.
630+
let rec merge (a:AsyncSeq<'a>) (b:AsyncSeq<'a>) : AsyncSeq<'a> = async {
631+
let! one,other = Async.chooseBoth a b
632+
match one with
633+
| Nil -> return! other
634+
| Cons(hd,tl) ->
635+
return Cons(hd, merge tl other) }
636+
637+
/// Merges all specified async sequences into an async sequence non-deterministically.
638+
let rec mergeAll (ss:AsyncSeq<'a> list) : AsyncSeq<'a> =
639+
match ss with
640+
| [] -> empty
641+
| [s] -> s
642+
| [a;b] -> merge a b
643+
| hd::tl -> merge hd (mergeAll tl)
644+
556645

557646

558647
[<AutoOpen>]

src/FSharpx.Async/Utils.fs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,15 @@
1-
module FSharpx.Control.Utils
1+
module internal FSharpx.Control.Utils
22

3-
let inline konst a _ = a
3+
let inline konst a _ = a
4+
5+
module Choice =
6+
7+
/// Maps over the left result type.
8+
let mapl (f:'a -> 'b) = function
9+
| Choice1Of2 a -> f a |> Choice1Of2
10+
| Choice2Of2 e -> Choice2Of2 e
11+
12+
/// Maps over the right result type.
13+
let mapr (f:'b -> 'c) = function
14+
| Choice1Of2 a -> Choice1Of2 a
15+
| Choice2Of2 e -> f e |> Choice2Of2

0 commit comments

Comments
 (0)