Skip to content

Commit b999f2e

Browse files
committed
Merge pull request #10 from fsprojects/fix-4
Fix #9 - toBlockingSeq doesn't cancel the task
2 parents d59128e + 14dbb89 commit b999f2e

2 files changed

Lines changed: 93 additions & 19 deletions

File tree

src/FSharp.Control.AsyncSeq/AsyncSeq.fs

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -351,7 +351,7 @@ module AsyncSeq =
351351
repls.Dequeue().Reply(buffer.Dequeue()) })
352352

353353

354-
let ofObservable (input : System.IObservable<_>) =
354+
let ofObservableDiscarding (input : System.IObservable<_>) =
355355
ofObservableUsingAgent input (fun mbox -> async {
356356
while true do
357357
// Allow timeout (when the observable ends, caller will
@@ -369,6 +369,10 @@ module AsyncSeq =
369369
| Put v -> repl.Reply(v)
370370
| _ -> failwith "Unexpected Get" })
371371

372+
[<System.Obsolete("Use AsyncSeq.ofObservableDiscarding. This function doesn't guarantee that the asynchronous sequence will return all values produced by the observable")>]
373+
let ofObservable (input : System.IObservable<_>) =
374+
ofObservableDiscarding input
375+
372376
let toObservable (aseq:AsyncSeq<_>) =
373377
let start (obs:IObserver<_>) =
374378
async {
@@ -382,23 +386,31 @@ module AsyncSeq =
382386
member x.Subscribe(obs) = start obs }
383387

384388
let toBlockingSeq (input : AsyncSeq<'T>) =
385-
// Write all elements to a blocking buffer and then add None to denote end
386-
let buf = new BlockingQueueAgent<_>(1)
387-
let iterator =
388-
async {
389-
let! res = iterAsync (Some >> buf.AsyncAdd) input |> Async.Catch
390-
do! buf.AsyncAdd(None)
391-
return match res with Choice2Of2 e -> raise e | _ -> ()
392-
} |> Async.StartAsTask
393-
394-
// Read elements from the blocking buffer & return a sequences
395-
let rec loop () = seq {
396-
match buf.Get() with
397-
| None -> iterator.Result
398-
| Some v ->
399-
yield v
400-
yield! loop() }
401-
loop ()
389+
seq {
390+
// Write all elements to a blocking buffer and then add None to denote end
391+
let buf = new BlockingQueueAgent<_>(1)
392+
393+
use cts = new System.Threading.CancellationTokenSource()
394+
use _cancel = { new IDisposable with member __.Dispose() = cts.Cancel() }
395+
let iteratorTask =
396+
async {
397+
let! res = iterAsync (Some >> buf.AsyncAdd) input |> Async.Catch
398+
do! buf.AsyncAdd(None)
399+
return res
400+
}
401+
|> fun p -> Async.StartAsTask(p, cancellationToken = cts.Token)
402+
403+
// Read elements from the blocking buffer & return a sequences
404+
let fin = ref false
405+
while not fin.Value do
406+
match buf.Get() with
407+
| None ->
408+
fin := true
409+
match iteratorTask.Result with
410+
| Choice1Of2() -> ()
411+
| Choice2Of2 exn -> raise exn
412+
| Some v -> yield v
413+
}
402414

403415
let rec cache (input : AsyncSeq<'T>) =
404416
let agent = MailboxProcessor<AsyncReplyChannel<_>>.Start(fun agent -> async {

tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,12 @@ let ``AsyncSeq.toBlockingSeq does not hung forever and rethrows exception``() =
250250
yield 1
251251
failwith "error"
252252
}
253-
Assert.Throws<AggregateException>(fun _ -> s |> AsyncSeq.toBlockingSeq |> Seq.toList |> ignore) |> ignore
253+
Assert.Throws<Exception>(fun _ -> s |> AsyncSeq.toBlockingSeq |> Seq.toList |> ignore) |> ignore
254+
try
255+
let _ = s |> AsyncSeq.toBlockingSeq |> Seq.toList
256+
()
257+
with e ->
258+
Assert.AreEqual(e.Message, "error")
254259

255260

256261
[<Test>]
@@ -295,6 +300,63 @@ let ``AsyncSeq.skipUntil should skip everything with never signal``() =
295300
let actual = [1;2;3;4] |> AsyncSeq.ofSeq |> AsyncSeq.skipUntil AsyncOps.never
296301
Assert.True(EQ AsyncSeq.empty actual)
297302

303+
[<Test>]
304+
let ``AsyncSeq.toBlockingSeq should work length 1``() =
305+
let s = asyncSeq { yield 1 } |> AsyncSeq.toBlockingSeq |> Seq.toList
306+
Assert.True((s = [1]))
307+
308+
[<Test>]
309+
let ``AsyncSeq.toBlockingSeq should work length 0``() =
310+
let s = asyncSeq { () } |> AsyncSeq.toBlockingSeq |> Seq.toList
311+
Assert.True((s = []))
312+
313+
[<Test>]
314+
let ``AsyncSeq.toBlockingSeq should work length 2 with sleep``() =
315+
let s = asyncSeq { yield 1
316+
do! Async.Sleep 10
317+
yield 2 } |> AsyncSeq.toBlockingSeq |> Seq.toList
318+
Assert.True((s = [1; 2]))
319+
320+
[<Test>]
321+
let ``AsyncSeq.toBlockingSeq should work length 1 with fail``() =
322+
let s =
323+
asyncSeq { yield 1
324+
failwith "fail" }
325+
|> AsyncSeq.toBlockingSeq
326+
|> Seq.truncate 1
327+
|> Seq.toList
328+
Assert.True((s = [1]))
329+
330+
[<Test>]
331+
let ``AsyncSeq.toBlockingSeq should work length 0 with fail``() =
332+
let s =
333+
asyncSeq { failwith "fail" }
334+
|> AsyncSeq.toBlockingSeq
335+
|> Seq.truncate 0
336+
|> Seq.toList
337+
Assert.True((s = []))
338+
339+
[<Test>]
340+
let ``AsyncSeq.toBlockingSeq should be cancellable``() =
341+
let cancelCount = ref 0
342+
let aseq =
343+
asyncSeq {
344+
use! a = Async.OnCancel(fun x -> incr cancelCount)
345+
while true do
346+
yield 1
347+
do! Async.Sleep 10
348+
}
349+
350+
let asSeq = aseq |> AsyncSeq.toBlockingSeq
351+
let enum = asSeq.GetEnumerator()
352+
Assert.AreEqual(cancelCount.Value, 0)
353+
let canMoveNext = enum.MoveNext()
354+
Assert.AreEqual(canMoveNext, true)
355+
Assert.AreEqual(cancelCount.Value, 0)
356+
enum.Dispose()
357+
System.Threading.Thread.Sleep(1000) // wait for task cancellation to be effective
358+
Assert.AreEqual(cancelCount.Value, 1)
359+
298360
[<Test>]
299361
let ``AsyncSeq.while should allow do at end``() =
300362
let s1 = asyncSeq {

0 commit comments

Comments
 (0)