Skip to content

Commit fbf77c9

Browse files
committed
resolve with master
2 parents 7d67e39 + e734f12 commit fbf77c9

2 files changed

Lines changed: 96 additions & 20 deletions

File tree

src/FSharp.Control.AsyncSeq/AsyncSeq.fs

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,7 @@ module AsyncSeq =
349349
repls.Dequeue().Reply(buffer.Dequeue()) })
350350

351351

352-
let ofObservable (input : System.IObservable<_>) =
352+
let ofObservableDiscarding (input : System.IObservable<_>) =
353353
ofObservableUsingAgent input (fun mbox -> async {
354354
while true do
355355
// Allow timeout (when the observable ends, caller will
@@ -367,6 +367,10 @@ module AsyncSeq =
367367
| Put v -> repl.Reply(v)
368368
| _ -> failwith "Unexpected Get" })
369369

370+
[<System.Obsolete("Use AsyncSeq.ofObservableDiscarding. This function doesn't guarantee that the asynchronous sequence will return all values produced by the observable")>]
371+
let ofObservable (input : System.IObservable<_>) =
372+
ofObservableDiscarding input
373+
370374
let toObservable (aseq:AsyncSeq<_>) =
371375
let start (obs:IObserver<_>) =
372376
async {
@@ -380,23 +384,31 @@ module AsyncSeq =
380384
member x.Subscribe(obs) = start obs }
381385

382386
let toBlockingSeq (input : AsyncSeq<'T>) =
383-
// Write all elements to a blocking buffer and then add None to denote end
384-
let buf = new BlockingQueueAgent<_>(1)
385-
let iterator =
386-
async {
387-
let! res = iterAsync (Some >> buf.AsyncAdd) input |> Async.Catch
388-
do! buf.AsyncAdd(None)
389-
return match res with Choice2Of2 e -> raise e | _ -> ()
390-
} |> Async.StartAsTask
391-
392-
// Read elements from the blocking buffer & return a sequences
393-
let rec loop () = seq {
394-
match buf.Get() with
395-
| None -> iterator.Result
396-
| Some v ->
397-
yield v
398-
yield! loop() }
399-
loop ()
387+
seq {
388+
// Write all elements to a blocking buffer and then add None to denote end
389+
let buf = new BlockingQueueAgent<_>(1)
390+
391+
use cts = new System.Threading.CancellationTokenSource()
392+
use _cancel = { new IDisposable with member __.Dispose() = cts.Cancel() }
393+
let iteratorTask =
394+
async {
395+
let! res = iterAsync (Some >> buf.AsyncAdd) input |> Async.Catch
396+
do! buf.AsyncAdd(None)
397+
return res
398+
}
399+
|> fun p -> Async.StartAsTask(p, cancellationToken = cts.Token)
400+
401+
// Read elements from the blocking buffer & return a sequences
402+
let fin = ref false
403+
while not fin.Value do
404+
match buf.Get() with
405+
| None ->
406+
fin := true
407+
match iteratorTask.Result with
408+
| Choice1Of2() -> ()
409+
| Choice2Of2 exn -> raise exn
410+
| Some v -> yield v
411+
}
400412

401413
let rec cache (input : AsyncSeq<'T>) =
402414
let agent = MailboxProcessor<AsyncReplyChannel<_>>.Start(fun agent -> async {

tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

Lines changed: 66 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,12 @@ let ``AsyncSeq.toBlockingSeq does not hung forever and rethrows exception``() =
241241
yield 1
242242
failwith "error"
243243
}
244-
Assert.Throws<AggregateException>(fun _ -> s |> AsyncSeq.toBlockingSeq |> Seq.toList |> ignore) |> ignore
244+
Assert.Throws<Exception>(fun _ -> s |> AsyncSeq.toBlockingSeq |> Seq.toList |> ignore) |> ignore
245+
try
246+
let _ = s |> AsyncSeq.toBlockingSeq |> Seq.toList
247+
()
248+
with e ->
249+
Assert.AreEqual(e.Message, "error")
245250

246251

247252
[<Test>]
@@ -284,4 +289,63 @@ let ``AsyncSeq.skipUntil should not skip with completed signal``() =
284289
[<Test>]
285290
let ``AsyncSeq.skipUntil should skip everything with never signal``() =
286291
let actual = [1;2;3;4] |> AsyncSeq.ofSeq |> AsyncSeq.skipUntil AsyncOps.never
287-
Assert.True(EQ AsyncSeq.empty actual)
292+
Assert.True(EQ AsyncSeq.empty actual)
293+
294+
295+
[<Test>]
296+
let ``AsyncSeq.toBlockingSeq should work length 1``() =
297+
let s = asyncSeq { yield 1 } |> AsyncSeq.toBlockingSeq |> Seq.toList
298+
Assert.True((s = [1]))
299+
300+
[<Test>]
301+
let ``AsyncSeq.toBlockingSeq should work length 0``() =
302+
let s = asyncSeq { () } |> AsyncSeq.toBlockingSeq |> Seq.toList
303+
Assert.True((s = []))
304+
305+
[<Test>]
306+
let ``AsyncSeq.toBlockingSeq should work length 2 with sleep``() =
307+
let s = asyncSeq { yield 1
308+
do! Async.Sleep 10
309+
yield 2 } |> AsyncSeq.toBlockingSeq |> Seq.toList
310+
Assert.True((s = [1; 2]))
311+
312+
[<Test>]
313+
let ``AsyncSeq.toBlockingSeq should work length 1 with fail``() =
314+
let s =
315+
asyncSeq { yield 1
316+
failwith "fail" }
317+
|> AsyncSeq.toBlockingSeq
318+
|> Seq.truncate 1
319+
|> Seq.toList
320+
Assert.True((s = [1]))
321+
322+
[<Test>]
323+
let ``AsyncSeq.toBlockingSeq should work length 0 with fail``() =
324+
let s =
325+
asyncSeq { failwith "fail" }
326+
|> AsyncSeq.toBlockingSeq
327+
|> Seq.truncate 0
328+
|> Seq.toList
329+
Assert.True((s = []))
330+
331+
[<Test>]
332+
let ``AsyncSeq.toBlockingSeq should be cancellable``() =
333+
let cancelCount = ref 0
334+
let aseq =
335+
asyncSeq {
336+
use! a = Async.OnCancel(fun x -> incr cancelCount)
337+
while true do
338+
yield 1
339+
do! Async.Sleep 10
340+
}
341+
342+
let asSeq = aseq |> AsyncSeq.toBlockingSeq
343+
let enum = asSeq.GetEnumerator()
344+
Assert.AreEqual(cancelCount.Value, 0)
345+
let canMoveNext = enum.MoveNext()
346+
Assert.AreEqual(canMoveNext, true)
347+
Assert.AreEqual(cancelCount.Value, 0)
348+
enum.Dispose()
349+
System.Threading.Thread.Sleep(1000) // wait for task cancellation to be effective
350+
Assert.AreEqual(cancelCount.Value, 1)
351+

0 commit comments

Comments
 (0)