Skip to content

Commit e1f1630

Browse files
committed
simplify ofObservableBuffered
1 parent b999f2e commit e1f1630

3 files changed

Lines changed: 53 additions & 15 deletions

File tree

src/FSharp.Control.AsyncSeq/AsyncSeq.fs

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -335,20 +335,21 @@ module AsyncSeq =
335335
yield! loop() }
336336

337337
let ofObservableBuffered (input : System.IObservable<_>) =
338-
ofObservableUsingAgent input (fun mbox -> async {
339-
let buffer = new System.Collections.Generic.Queue<_>()
340-
let repls = new System.Collections.Generic.Queue<_>()
341-
while true do
342-
// Receive next message (when observable ends, caller will
343-
// cancel the agent, so we need timeout to allow cancleation)
344-
let! msg = mbox.TryReceive(200)
345-
match msg with
346-
| Some(Put(v)) -> buffer.Enqueue(v)
347-
| Some(Get(repl)) -> repls.Enqueue(repl)
348-
| _ -> ()
349-
// Process matching calls from buffers
350-
while buffer.Count > 0 && repls.Count > 0 do
351-
repls.Dequeue().Reply(buffer.Dequeue()) })
338+
asyncSeq {
339+
let cts = new CancellationTokenSource()
340+
try
341+
let agent = MailboxProcessor<_>.Start((fun inbox -> async.Return() ), cancellationToken = cts.Token)
342+
use d = input |> Observable.asUpdates |> Observable.subscribe agent.Post
343+
let fin = ref false
344+
while not fin.Value do
345+
let! msg = agent.Receive()
346+
match msg with
347+
| Observable.ObservableUpdate.Error e -> raise e
348+
| Observable.Completed -> fin := true
349+
| Observable.Next v -> yield v
350+
finally
351+
// Cancel on early exit
352+
cts.Cancel() }
352353

353354

354355
let ofObservableDiscarding (input : System.IObservable<_>) =
@@ -369,7 +370,7 @@ module AsyncSeq =
369370
| Put v -> repl.Reply(v)
370371
| _ -> failwith "Unexpected Get" })
371372

372-
[<System.Obsolete("Use AsyncSeq.ofObservableDiscarding. This function doesn't guarantee that the asynchronous sequence will return all values produced by the observable")>]
373+
[<System.Obsolete("Use AsyncSeq.ofObservableDiscarding or AsyncSeq.ofObservableBuffered. This function doesn't guarantee that the asynchronous sequence will return all values produced by the observable")>]
373374
let ofObservable (input : System.IObservable<_>) =
374375
ofObservableDiscarding input
375376

src/FSharp.Control.AsyncSeq/AsyncSeq.fsi

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,9 @@ module AsyncSeq =
185185
/// by the observable while the asynchronous sequence is blocked are discarded
186186
/// (this function doesn't guarantee that asynchronou ssequence will return
187187
/// all values produced by the observable)
188+
val ofObservableDiscarding : input:System.IObservable<'T> -> AsyncSeq<'T>
189+
190+
[<System.Obsolete("Use AsyncSeq.ofObservableDiscarding or AsyncSeq.ofObservableBuffered. This function doesn't guarantee that the asynchronous sequence will return all values produced by the observable")>]
188191
val ofObservable : input:System.IObservable<'T> -> AsyncSeq<'T>
189192

190193
/// Converts asynchronous sequence to an IObservable<_>. When the client subscribes

tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,3 +365,37 @@ let ``AsyncSeq.while should allow do at end``() =
365365
do! Async.Sleep 10
366366
}
367367
Assert.True(true)
368+
369+
let observe vs err =
370+
{ new IObservable<'U> with
371+
member x.Subscribe(observer) =
372+
for v in vs do
373+
observer.OnNext v
374+
if err then
375+
observer.OnError (Failure "fail")
376+
observer.OnCompleted()
377+
{ new IDisposable with member __.Dispose() = () } }
378+
379+
[<Test>]
380+
let ``AsyncSeq.ofObservable should work (empty)``() =
381+
Assert.True(observe [] false |> AsyncSeq.ofObservableBuffered |> AsyncSeq.toList |> Async.RunSynchronously = [])
382+
383+
[<Test>]
384+
let ``AsyncSeq.ofObservable should work (sinngleton)``() =
385+
Assert.True(observe [1] false |> AsyncSeq.ofObservableBuffered |> AsyncSeq.toList |> Async.RunSynchronously = [1])
386+
387+
[<Test>]
388+
let ``AsyncSeq.ofObservable should work (ten)``() =
389+
Assert.True(observe [1..10] false |> AsyncSeq.ofObservableBuffered |> AsyncSeq.toList |> Async.RunSynchronously = [1..10])
390+
391+
[<Test>]
392+
let ``AsyncSeq.ofObservable should work (empty, fail)``() =
393+
Assert.True(try (observe [] true |> AsyncSeq.ofObservableBuffered |> AsyncSeq.toList |> Async.RunSynchronously |> ignore); false with _ -> true)
394+
395+
[<Test>]
396+
let ``AsyncSeq.ofObservable should work (one, fail)``() =
397+
Assert.True(try (observe [1] true |> AsyncSeq.ofObservableBuffered |> AsyncSeq.toList |> Async.RunSynchronously |> ignore); false with _ -> true)
398+
399+
[<Test>]
400+
let ``AsyncSeq.ofObservable should work (one, take)``() =
401+
Assert.True(observe [1] false |> AsyncSeq.ofObservableBuffered |> AsyncSeq.take 1 |> AsyncSeq.toList |> Async.RunSynchronously = [1])

0 commit comments

Comments
 (0)