Skip to content

Commit b40d4a1

Browse files
committed
cancellable to BlockingSeq
1 parent ec9904d commit b40d4a1

7 files changed

Lines changed: 66 additions & 35 deletions

File tree

src/FSharp.Control.AsyncSeq/Agent.fs

Lines changed: 0 additions & 8 deletions
This file was deleted.

src/FSharp.Control.AsyncSeq/Async.fs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,4 +66,5 @@ module internal AsyncExtensions =
6666
if (Interlocked.CompareExchange(state, 1, 0) = 0) then cnc ex
6767
else tcs.SetCanceled()
6868
Async.StartWithContinuations(a, ok, err, cnc)
69-
Async.StartWithContinuations(b, ok, err, cnc)
69+
Async.StartWithContinuations(b, ok, err, cnc)
70+

src/FSharp.Control.AsyncSeq/AsyncSeq.fs

Lines changed: 31 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -350,9 +350,9 @@ module AsyncSeq =
350350

351351
/// Converts observable to an asynchronous sequence. Values that are produced
352352
/// by the observable while the asynchronous sequence is blocked are discarded
353-
/// (this function doesn't guarantee that asynchronou ssequence will return
353+
/// (this function doesn't guarantee that the asynchronous sequence will return
354354
/// all values produced by the observable)
355-
let ofObservable (input : System.IObservable<_>) =
355+
let ofObservableDiscarding (input : System.IObservable<_>) =
356356
ofObservableUsingAgent input (fun mbox -> async {
357357
while true do
358358
// Allow timeout (when the observable ends, caller will
@@ -370,6 +370,10 @@ module AsyncSeq =
370370
| Put v -> repl.Reply(v)
371371
| _ -> failwith "Unexpected Get" })
372372

373+
[<System.Obsolete("Use AsyncSeq.ofObservableDiscarding. This function doesn't guarantee that the asynchronous sequence will return all values produced by the observable")>]
374+
let ofObservable (input : System.IObservable<_>) =
375+
ofObservableDiscarding input
376+
373377
/// Converts asynchronous sequence to an IObservable<_>. When the client subscribes
374378
/// to the observable, a new copy of asynchronous sequence is started and is
375379
/// sequentially iterated over (at the maximal possible speed). Disposing of the
@@ -389,29 +393,36 @@ module AsyncSeq =
389393
/// Converts asynchronous sequence to a synchronous blocking sequence.
390394
/// The elements of the asynchronous sequence are consumed lazily.
391395
let toBlockingSeq (input : AsyncSeq<'T>) =
392-
// Write all elements to a blocking buffer and then add None to denote end
393-
let buf = new BlockingQueueAgent<_>(1)
394-
let iterator =
395-
async {
396-
let! res = iterAsync (Some >> buf.AsyncAdd) input |> Async.Catch
397-
do! buf.AsyncAdd(None)
398-
return match res with Choice2Of2 e -> raise e | _ -> ()
399-
} |> Async.StartAsTask
400-
401-
// Read elements from the blocking buffer & return a sequences
402-
let rec loop () = seq {
403-
match buf.Get() with
404-
| None -> iterator.Result
405-
| Some v ->
406-
yield v
407-
yield! loop() }
408-
loop ()
396+
seq {
397+
// Write all elements to a blocking buffer and then add None to denote end
398+
let buf = new BlockingQueueAgent<_>(1)
399+
400+
use cts = new System.Threading.CancellationTokenSource()
401+
use _cancel = { new IDisposable with member __.Dispose() = cts.Cancel() }
402+
let iteratorTask =
403+
async {
404+
let! res = iterAsync (Some >> buf.AsyncAdd) input |> Async.Catch
405+
do! buf.AsyncAdd(None)
406+
return res
407+
}
408+
|> fun p -> Async.StartAsTask(p, cancellationToken = cts.Token)
409+
410+
// Read elements from the blocking buffer & return a sequences
411+
let fin = ref false
412+
while not fin.Value do
413+
match buf.Get() with
414+
| None ->
415+
match iteratorTask.Result with
416+
| Choice1Of2() -> ()
417+
| Choice2Of2 exn -> raise exn
418+
| Some v -> yield v
419+
}
409420

410421
/// Create a new asynchronous sequence that caches all elements of the
411422
/// sequence specified as the input. When accessing the resulting sequence
412423
/// multiple times, the input will still be evaluated only once
413424
let rec cache (input : AsyncSeq<'T>) =
414-
let agent = Agent<AsyncReplyChannel<_>>.Start(fun agent -> async {
425+
let agent = MailboxProcessor<AsyncReplyChannel<_>>.Start(fun agent -> async {
415426
let! (repl:AsyncReplyChannel<AsyncSeqInner<'T>>) = agent.Receive()
416427
let! next = input
417428
let res =

src/FSharp.Control.AsyncSeq/AutoCancelAgent.fs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,13 @@ open System.Threading
1313
/// Wrapper for the standard F# agent (MailboxProcessor) that
1414
/// supports stopping of the agent's body using the IDisposable
1515
/// interface (the type automatically creates a cancellation token)
16-
type internal AutoCancelAgent<'T> private (mbox:Agent<'T>, cts:CancellationTokenSource) =
16+
type internal AutoCancelAgent<'T> private (mbox:MailboxProcessor<'T>, cts:CancellationTokenSource) =
1717

1818
/// Start a new disposable agent using the specified body function
1919
/// (the method creates a new cancellation token for the agent)
2020
static member Start(f) =
2121
let cts = new CancellationTokenSource()
22-
new AutoCancelAgent<'T>(Agent<'T>.Start(f, cancellationToken = cts.Token), cts)
22+
new AutoCancelAgent<'T>(MailboxProcessor<'T>.Start(f, cancellationToken = cts.Token), cts)
2323

2424
/// Returns the number of unprocessed messages in the message queue of the agent.
2525
member x.CurrentQueueLength = mbox.CurrentQueueLength

src/FSharp.Control.AsyncSeq/BlockingQueueAgent.fs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ type internal BlockingQueueAgent<'T>(maxLength) =
2323
[<VolatileField>]
2424
let mutable count = 0
2525

26-
let agent = Agent.Start(fun agent ->
26+
let agent = MailboxProcessor.Start(fun agent ->
2727

2828
let queue = new Queue<'T>()
2929

src/FSharp.Control.AsyncSeq/FSharp.Control.AsyncSeq.fsproj

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@
5959
<Import Project="..\..\.paket\paket.targets" />
6060
<ItemGroup>
6161
<Compile Include="Utils.fs" />
62-
<Compile Include="Agent.fs" />
6362
<Compile Include="AutoCancelAgent.fs" />
6463
<Compile Include="BlockingQueueAgent.fs" />
6564
<Compile Include="Observable.fs" />

tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,12 @@ let ``AsyncSeq.toBlockingSeq does not hung forever and rethrows exception``() =
237237
yield 1
238238
failwith "error"
239239
}
240-
Assert.Throws<AggregateException>(fun _ -> s |> AsyncSeq.toBlockingSeq |> Seq.toList |> ignore) |> ignore
240+
Assert.Throws<Exception>(fun _ -> s |> AsyncSeq.toBlockingSeq |> Seq.toList |> ignore) |> ignore
241+
try
242+
let _ = s |> AsyncSeq.toBlockingSeq |> Seq.toList
243+
()
244+
with e ->
245+
Assert.AreEqual(e.Message, "error")
241246

242247

243248
[<Test>]
@@ -280,4 +285,27 @@ let ``AsyncSeq.skipUntil should not skip with completed signal``() =
280285
[<Test>]
281286
let ``AsyncSeq.skipUntil should skip everything with never signal``() =
282287
let actual = [1;2;3;4] |> AsyncSeq.ofSeq |> AsyncSeq.skipUntil AsyncOps.never
283-
Assert.True(EQ AsyncSeq.empty actual)
288+
Assert.True(EQ AsyncSeq.empty actual)
289+
290+
291+
[<Test>]
292+
let ``AsyncSeq.toBlockingSeq should be cancellable``() =
293+
let cancelCount = ref 0
294+
let aseq =
295+
asyncSeq {
296+
use! a = Async.OnCancel(fun x -> incr cancelCount)
297+
while true do
298+
yield 1
299+
do! Async.Sleep 10
300+
}
301+
302+
let asSeq = aseq |> AsyncSeq.toBlockingSeq
303+
let enum = asSeq.GetEnumerator()
304+
Assert.AreEqual(cancelCount.Value, 0)
305+
let canMoveNext = enum.MoveNext()
306+
Assert.AreEqual(canMoveNext, true)
307+
Assert.AreEqual(cancelCount.Value, 0)
308+
enum.Dispose()
309+
System.Threading.Thread.Sleep(1000) // wait for task cancellation to be effective
310+
Assert.AreEqual(cancelCount.Value, 1)
311+

0 commit comments

Comments
 (0)