55namespace FSharp.Control
66
77open System
8- open System.Threading
98open System.IO
10- open FSharp.Control .Utils
9+ open System.Threading
10+ open System.Threading .Tasks
1111
1212// ----------------------------------------------------------------------------
1313
@@ -24,6 +24,94 @@ and AsyncSeqInner<'T> =
2424 | Nil
2525 | Cons of 'T * AsyncSeq < 'T >
2626
27+ [<AutoOpen>]
28+ module internal Utils =
29+ module internal Choice =
30+
31+ /// Maps over the left result type.
32+ let mapl ( f : 'a -> 'b ) = function
33+ | Choice1Of2 a -> f a |> Choice1Of2
34+ | Choice2Of2 e -> Choice2Of2 e
35+
36+ /// Maps over the right result type.
37+ let mapr ( f : 'b -> 'c ) = function
38+ | Choice1Of2 a -> Choice1Of2 a
39+ | Choice2Of2 e -> f e |> Choice2Of2
40+
41+ // ----------------------------------------------------------------------------
42+
43+ module internal Observable =
44+
45+ /// Union type that represents different messages that can be sent to the
46+ /// IObserver interface. The IObserver type is equivalent to a type that has
47+ /// just OnNext method that gets 'ObservableUpdate' as an argument.
48+ type internal ObservableUpdate < 'T > =
49+ | Next of 'T
50+ | Error of exn
51+ | Completed
52+
53+
54+ /// Turns observable into an observable that only calls OnNext method of the
55+ /// observer, but gives it a discriminated union that represents different
56+ /// kinds of events (error, next, completed)
57+ let asUpdates ( input : IObservable < 'T >) =
58+ { new IObservable<_> with
59+ member x.Subscribe ( observer ) =
60+ input.Subscribe
61+ ({ new IObserver<_> with
62+ member x.OnNext ( v ) = observer.OnNext( Next v)
63+ member x.OnCompleted () = observer.OnNext( Completed)
64+ member x.OnError ( e ) = observer.OnNext( Error e) }) }
65+
66+ type Microsoft.FSharp.Control.Async with
67+ /// Starts the specified operation using a new CancellationToken and returns
68+ /// IDisposable object that cancels the computation. This method can be used
69+ /// when implementing the Subscribe method of IObservable interface.
70+ static member StartDisposable ( op : Async < unit >) =
71+ let ct = new System.Threading.CancellationTokenSource()
72+ Async.Start( op, ct.Token)
73+ { new IDisposable with
74+ member x.Dispose () = ct.Cancel() }
75+
76+ /// Creates an async computations which runs the specified computations
77+ /// in parallel and returns their results.
78+ static member Parallel ( a : Async < 'a >, b : Async < 'b >) : Async < 'a * 'b > = async {
79+ let! a = a |> Async.StartChild
80+ let! b = b |> Async.StartChild
81+ let! a = a
82+ let! b = b
83+ return a, b }
84+
85+
86+ /// Creates an async computation which maps a function f over the
87+ /// value produced by the specified asynchronous computation.
88+ static member map f a = async.Bind( a, f >> async.Return)
89+
90+ /// Creates an async computation which binds the result of the specified
91+ /// async computation to the specified function. The computation produced
92+ /// by the specified function is returned.
93+ static member bind f a = async.Bind( a, f)
94+
95+ /// Creates a computation which produces a tuple consiting of the value produces by the first
96+ /// argument computation to complete and a handle to the other computation. The second computation
97+ /// to complete is memoized.
98+ static member internal chooseBoth ( a : Async < 'a >) ( b : Async < 'a >) : Async < 'a * Async < 'a >> =
99+ Async.FromContinuations <| fun ( ok , err , cnc ) ->
100+ let state = ref 0
101+ let tcs = TaskCompletionSource< 'a>()
102+ let inline ok a =
103+ if ( Interlocked.CompareExchange( state, 1 , 0 ) = 0 ) then
104+ ok ( a, tcs.Task |> Async.AwaitTask)
105+ else
106+ tcs.SetResult a
107+ let inline err ( ex : exn ) =
108+ if ( Interlocked.CompareExchange( state, 1 , 0 ) = 0 ) then err ex
109+ else tcs.SetException ex
110+ let inline cnc ex =
111+ if ( Interlocked.CompareExchange( state, 1 , 0 ) = 0 ) then cnc ex
112+ else tcs.SetCanceled()
113+ Async.StartWithContinuations( a, ok, err, cnc)
114+ Async.StartWithContinuations( b, ok, err, cnc)
27115
28116/// Module with helper functions for working with asynchronous sequences
29117module AsyncSeq =
@@ -321,9 +409,9 @@ module AsyncSeq =
321409 let rec loop () = asyncSeq {
322410 let! msg = agent.PostAndAsyncReply( Get)
323411 match msg with
324- | ObservableUpdate .Error e -> raise e
325- | ObservableUpdate .Completed -> ()
326- | ObservableUpdate .Next v ->
412+ | Observable .Error e -> raise e
413+ | Observable .Completed -> ()
414+ | Observable .Next v ->
327415 yield v
328416 yield ! loop() }
329417 yield ! loop() }
0 commit comments