Skip to content

Commit 63b6f8a

Browse files
committed
Merge pull request #19 from dsyme/fix-14
simplify ofObservableBuffered and toBlockingSeq
2 parents d2e3dc9 + 75045b1 commit 63b6f8a

8 files changed

Lines changed: 90 additions & 253 deletions

File tree

RELEASE_NOTES.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
### 1.16.0 - 13.05.2015
2+
* Simplify ofObservableBuffered and toBlockingSeq
3+
14
### 1.15.0 - 30.03.2015
25
* Add AsyncSeq.getIterator (unblocks use of AsyncSeq in FSharpx.Async)
36

docs/content/index.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ FSharp.Control.AsyncSeq is a collection of asynchronous programming utilities fo
77
<div class="span1"></div>
88
<div class="span6">
99
<div class="well well-small" id="nuget">
10-
The FSharp.Control.AsyncSeq library can be <a href="https://nuget.org/packages/AsyncSeq">installed from NuGet</a>:
10+
The FSharp.Control.AsyncSeq library can be <a href="http://www.nuget.org/packages/FSharp.Control.AsyncSeq">installed from NuGet</a>:
1111
<pre>PM> Install-Package FSharp.Control.AsyncSeq</pre>
1212
</div>
1313
</div>

src/FSharp.Control.AsyncSeq/AsyncSeq.fs

Lines changed: 36 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -308,70 +308,30 @@ module AsyncSeq =
308308
for el in input do
309309
yield el }
310310

311-
/// A helper type for implementation of buffering when converting
312-
/// observable to an asynchronous sequence
313-
type internal BufferMessage<'T> =
314-
| Get of AsyncReplyChannel<'T>
315-
| Put of 'T
316-
317-
/// Converts observable to an asynchronous sequence using an agent with
318-
/// a body specified as the argument. The returnd async sequence repeatedly
319-
/// sends 'Get' message to the agent to get the next element. The observable
320-
/// sends 'Put' message to the agent (as new inputs are generated).
321-
let internal ofObservableUsingAgent (input : System.IObservable<_>) f =
322-
asyncSeq {
323-
use agent = AutoCancelAgent.Start(f)
324-
use d = input |> Observable.asUpdates
325-
|> Observable.subscribe (Put >> agent.Post)
326-
327-
let rec loop() = asyncSeq {
328-
let! msg = agent.PostAndAsyncReply(Get)
329-
match msg with
330-
| Observable.Error e -> raise e
331-
| Observable.Completed -> ()
332-
| Observable.Next v ->
333-
yield v
334-
yield! loop() }
335-
yield! loop() }
336-
337311
let ofObservableBuffered (input : System.IObservable<_>) =
338-
ofObservableUsingAgent input (fun mbox -> async {
339-
let buffer = new System.Collections.Generic.Queue<_>()
340-
let repls = new System.Collections.Generic.Queue<_>()
341-
while true do
342-
// Receive next message (when observable ends, caller will
343-
// cancel the agent, so we need timeout to allow cancleation)
344-
let! msg = mbox.TryReceive(200)
345-
match msg with
346-
| Some(Put(v)) -> buffer.Enqueue(v)
347-
| Some(Get(repl)) -> repls.Enqueue(repl)
348-
| _ -> ()
349-
// Process matching calls from buffers
350-
while buffer.Count > 0 && repls.Count > 0 do
351-
repls.Dequeue().Reply(buffer.Dequeue()) })
352-
353-
354-
let ofObservableDiscarding (input : System.IObservable<_>) =
355-
ofObservableUsingAgent input (fun mbox -> async {
356-
while true do
357-
// Allow timeout (when the observable ends, caller will
358-
// cancel the agent, so we need timeout to allow cancellation)
359-
let! msg = mbox.TryReceive(200)
360-
match msg with
361-
| Some(Put _) | None ->
362-
() // Ignore put or no message
363-
| Some(Get repl) ->
364-
// Reader is blocked, so next will be Put
365-
// (caller will not stop the agent at this point,
366-
// so timeout is not necessary)
367-
let! v = mbox.Receive()
368-
match v with
369-
| Put v -> repl.Reply(v)
370-
| _ -> failwith "Unexpected Get" })
371-
372-
[<System.Obsolete("Use AsyncSeq.ofObservableDiscarding. This function doesn't guarantee that the asynchronous sequence will return all values produced by the observable")>]
373-
let ofObservable (input : System.IObservable<_>) =
374-
ofObservableDiscarding input
312+
asyncSeq {
313+
let cts = new CancellationTokenSource()
314+
try
315+
// The body of this agent returns immediately. It turns out this is a valid use of an F# agent, and it
316+
// leaves the agent available as a queue that supports an asynchronous receive.
317+
//
318+
// This makes the cancellation token is somewhat meaningless since the body has already returned. However
319+
// if we don't pass it in then the default cancellation token will be used, so we pass one in for completeness.
320+
use agent = MailboxProcessor<_>.Start((fun inbox -> async.Return() ), cancellationToken = cts.Token)
321+
use d = input |> Observable.asUpdates |> Observable.subscribe agent.Post
322+
let fin = ref false
323+
while not fin.Value do
324+
let! msg = agent.Receive()
325+
match msg with
326+
| Observable.ObservableUpdate.Error e -> raise e
327+
| Observable.Completed -> fin := true
328+
| Observable.Next v -> yield v
329+
finally
330+
// Cancel on early exit
331+
cts.Cancel() }
332+
333+
[<System.Obsolete("Please use AsyncSeq.ofObservableBuffered. The original AsyncSeq.ofObservable doesn't guarantee that the asynchronous sequence will return all values produced by the observable",true) >]
334+
let ofObservable (input : System.IObservable<'T>) : AsyncSeq<'T> = failwith "no longer supported"
375335

376336
let toObservable (aseq:AsyncSeq<_>) =
377337
let start (obs:IObserver<_>) =
@@ -387,29 +347,28 @@ module AsyncSeq =
387347

388348
let toBlockingSeq (input : AsyncSeq<'T>) =
389349
seq {
390-
// Write all elements to a blocking buffer and then add None to denote end
391-
let buf = new BlockingQueueAgent<_>(1)
350+
// Write all elements to a blocking buffer
351+
use buf = new System.Collections.Concurrent.BlockingCollection<_>()
392352

393353
use cts = new System.Threading.CancellationTokenSource()
394354
use _cancel = { new IDisposable with member __.Dispose() = cts.Cancel() }
395355
let iteratorTask =
396356
async {
397-
let! res = iterAsync (Some >> buf.AsyncAdd) input |> Async.Catch
398-
do! buf.AsyncAdd(None)
399-
return res
357+
try
358+
do! iter (Observable.Next >> buf.Add) input
359+
buf.CompleteAdding()
360+
with err ->
361+
buf.Add(Observable.Error err)
362+
buf.CompleteAdding()
400363
}
401364
|> fun p -> Async.StartAsTask(p, cancellationToken = cts.Token)
402365

403366
// Read elements from the blocking buffer & return a sequences
404-
let fin = ref false
405-
while not fin.Value do
406-
match buf.Get() with
407-
| None ->
408-
fin := true
409-
match iteratorTask.Result with
410-
| Choice1Of2() -> ()
411-
| Choice2Of2 exn -> raise exn
412-
| Some v -> yield v
367+
for x in buf.GetConsumingEnumerable() do
368+
match x with
369+
| Observable.Next v -> yield v
370+
| Observable.Error err -> raise err
371+
| Observable.Completed -> failwith "unexpected"
413372
}
414373

415374
let rec cache (input : AsyncSeq<'T>) =
@@ -672,7 +631,6 @@ module AsyncSeqExtensions =
672631
| Cons(h, t) -> async.Combine(action h, x.For(t, action)))
673632

674633
module Seq =
675-
open FSharp.Control
676634

677635
let ofAsyncSeq (input : AsyncSeq<'T>) =
678636
AsyncSeq.toBlockingSeq input

src/FSharp.Control.AsyncSeq/AsyncSeq.fsi

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -181,10 +181,7 @@ module AsyncSeq =
181181
/// an unbounded buffer and are returned as next elements of the async sequence.
182182
val ofObservableBuffered : input:System.IObservable<'T> -> AsyncSeq<'T>
183183

184-
/// Converts observable to an asynchronous sequence. Values that are produced
185-
/// by the observable while the asynchronous sequence is blocked are discarded
186-
/// (this function doesn't guarantee that asynchronou ssequence will return
187-
/// all values produced by the observable)
184+
[<System.Obsolete("Please use AsyncSeq.ofObservableBuffered. The original AsyncSeq.ofObservable doesn't guarantee that the asynchronous sequence will return all values produced by the observable",true) >]
188185
val ofObservable : input:System.IObservable<'T> -> AsyncSeq<'T>
189186

190187
/// Converts asynchronous sequence to an IObservable<_>. When the client subscribes

src/FSharp.Control.AsyncSeq/AutoCancelAgent.fs

Lines changed: 0 additions & 69 deletions
This file was deleted.

src/FSharp.Control.AsyncSeq/BlockingQueueAgent.fs

Lines changed: 0 additions & 99 deletions
This file was deleted.

src/FSharp.Control.AsyncSeq/FSharp.Control.AsyncSeq.fsproj

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,6 @@
6060
-->
6161
<Import Project="..\..\.paket\paket.targets" />
6262
<ItemGroup>
63-
<Compile Include="AutoCancelAgent.fs" />
64-
<Compile Include="BlockingQueueAgent.fs" />
6563
<Compile Include="AsyncSeq.fsi" />
6664
<Compile Include="AsyncSeq.fs" />
6765
<None Include="paket.references" />

0 commit comments

Comments
 (0)