Skip to content

Commit 09757c9

Browse files
committed
remove ofObservableDiscarding
1 parent c5bff08 commit 09757c9

4 files changed

Lines changed: 8 additions & 126 deletions

File tree

src/FSharp.Control.AsyncSeq/AsyncSeq.fs

Lines changed: 7 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -308,36 +308,15 @@ module AsyncSeq =
308308
for el in input do
309309
yield el }
310310

311-
/// A helper type for implementation of buffering when converting
312-
/// observable to an asynchronous sequence
313-
type internal BufferMessage<'T> =
314-
| Get of AsyncReplyChannel<'T>
315-
| Put of 'T
316-
317-
/// Converts observable to an asynchronous sequence using an agent with
318-
/// a body specified as the argument. The returnd async sequence repeatedly
319-
/// sends 'Get' message to the agent to get the next element. The observable
320-
/// sends 'Put' message to the agent (as new inputs are generated).
321-
let internal ofObservableUsingAgent (input : System.IObservable<_>) f =
322-
asyncSeq {
323-
use agent = AutoCancelAgent.Start(f)
324-
use d = input |> Observable.asUpdates
325-
|> Observable.subscribe (Put >> agent.Post)
326-
327-
let rec loop() = asyncSeq {
328-
let! msg = agent.PostAndAsyncReply(Get)
329-
match msg with
330-
| Observable.Error e -> raise e
331-
| Observable.Completed -> ()
332-
| Observable.Next v ->
333-
yield v
334-
yield! loop() }
335-
yield! loop() }
336-
337311
let ofObservableBuffered (input : System.IObservable<_>) =
338312
asyncSeq {
339313
let cts = new CancellationTokenSource()
340314
try
315+
// The body of this agent returns immediately. It turns out this is a valid use of an F# agent, and it
316+
// leaves the agent available as a queue that supports an asynchronous receive.
317+
//
318+
// This makes the cancellation token is somewhat meaningless since the body has already returned. However
319+
// if we don't pass it in then the default cancellation token will be used, so we pass one in for completeness.
341320
use agent = MailboxProcessor<_>.Start((fun inbox -> async.Return() ), cancellationToken = cts.Token)
342321
use d = input |> Observable.asUpdates |> Observable.subscribe agent.Post
343322
let fin = ref false
@@ -351,28 +330,8 @@ module AsyncSeq =
351330
// Cancel on early exit
352331
cts.Cancel() }
353332

354-
355-
let ofObservableDiscarding (input : System.IObservable<_>) =
356-
ofObservableUsingAgent input (fun mbox -> async {
357-
while true do
358-
// Allow timeout (when the observable ends, caller will
359-
// cancel the agent, so we need timeout to allow cancellation)
360-
let! msg = mbox.TryReceive(200)
361-
match msg with
362-
| Some(Put _) | None ->
363-
() // Ignore put or no message
364-
| Some(Get repl) ->
365-
// Reader is blocked, so next will be Put
366-
// (caller will not stop the agent at this point,
367-
// so timeout is not necessary)
368-
let! v = mbox.Receive()
369-
match v with
370-
| Put v -> repl.Reply(v)
371-
| _ -> failwith "Unexpected Get" })
372-
373-
[<System.Obsolete("Use AsyncSeq.ofObservableDiscarding or AsyncSeq.ofObservableBuffered. This function doesn't guarantee that the asynchronous sequence will return all values produced by the observable")>]
374-
let ofObservable (input : System.IObservable<_>) =
375-
ofObservableDiscarding input
333+
[<System.Obsolete("Please use AsyncSeq.ofObservableBuffered. The original AsyncSeq.ofObservable doesn't guarantee that the asynchronous sequence will return all values produced by the observable",true) >]
334+
let ofObservable (input : System.IObservable<'T>) : AsyncSeq<'T> = failwith "no longer supported"
376335

377336
let toObservable (aseq:AsyncSeq<_>) =
378337
let start (obs:IObserver<_>) =
@@ -665,7 +624,6 @@ module AsyncSeqExtensions =
665624
| Cons(h, t) -> async.Combine(action h, x.For(t, action)))
666625

667626
module Seq =
668-
open FSharp.Control
669627

670628
let ofAsyncSeq (input : AsyncSeq<'T>) =
671629
AsyncSeq.toBlockingSeq input

src/FSharp.Control.AsyncSeq/AsyncSeq.fsi

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -181,13 +181,7 @@ module AsyncSeq =
181181
/// an unbounded buffer and are returned as next elements of the async sequence.
182182
val ofObservableBuffered : input:System.IObservable<'T> -> AsyncSeq<'T>
183183

184-
/// Converts observable to an asynchronous sequence. Values that are produced
185-
/// by the observable while the asynchronous sequence is blocked are discarded
186-
/// (this function doesn't guarantee that asynchronou ssequence will return
187-
/// all values produced by the observable)
188-
val ofObservableDiscarding : input:System.IObservable<'T> -> AsyncSeq<'T>
189-
190-
[<System.Obsolete("Use AsyncSeq.ofObservableDiscarding or AsyncSeq.ofObservableBuffered. This function doesn't guarantee that the asynchronous sequence will return all values produced by the observable")>]
184+
[<System.Obsolete("Please use AsyncSeq.ofObservableBuffered. The original AsyncSeq.ofObservable doesn't guarantee that the asynchronous sequence will return all values produced by the observable",true) >]
191185
val ofObservable : input:System.IObservable<'T> -> AsyncSeq<'T>
192186

193187
/// Converts asynchronous sequence to an IObservable<_>. When the client subscribes

src/FSharp.Control.AsyncSeq/AutoCancelAgent.fs

Lines changed: 0 additions & 69 deletions
This file was deleted.

src/FSharp.Control.AsyncSeq/FSharp.Control.AsyncSeq.fsproj

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,6 @@
6060
-->
6161
<Import Project="..\..\.paket\paket.targets" />
6262
<ItemGroup>
63-
<Compile Include="AutoCancelAgent.fs" />
6463
<Compile Include="AsyncSeq.fsi" />
6564
<Compile Include="AsyncSeq.fs" />
6665
<None Include="paket.references" />

0 commit comments

Comments
 (0)