Skip to content

Commit c5bff08

Browse files
committed
simplify toBlockingSeq
1 parent fd4c737 commit c5bff08

3 files changed

Lines changed: 14 additions & 115 deletions

File tree

src/FSharp.Control.AsyncSeq/AsyncSeq.fs

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,7 @@ module AsyncSeq =
338338
asyncSeq {
339339
let cts = new CancellationTokenSource()
340340
try
341-
let agent = MailboxProcessor<_>.Start((fun inbox -> async.Return() ), cancellationToken = cts.Token)
341+
use agent = MailboxProcessor<_>.Start((fun inbox -> async.Return() ), cancellationToken = cts.Token)
342342
use d = input |> Observable.asUpdates |> Observable.subscribe agent.Post
343343
let fin = ref false
344344
while not fin.Value do
@@ -388,29 +388,28 @@ module AsyncSeq =
388388

389389
let toBlockingSeq (input : AsyncSeq<'T>) =
390390
seq {
391-
// Write all elements to a blocking buffer and then add None to denote end
392-
let buf = new BlockingQueueAgent<_>(1)
391+
// Write all elements to a blocking buffer
392+
use buf = new System.Collections.Concurrent.BlockingCollection<_>()
393393

394394
use cts = new System.Threading.CancellationTokenSource()
395395
use _cancel = { new IDisposable with member __.Dispose() = cts.Cancel() }
396396
let iteratorTask =
397397
async {
398-
let! res = iterAsync (Some >> buf.AsyncAdd) input |> Async.Catch
399-
do! buf.AsyncAdd(None)
400-
return res
398+
try
399+
do! iter (Observable.Next >> buf.Add) input
400+
buf.CompleteAdding()
401+
with err ->
402+
buf.Add(Observable.Error err)
403+
buf.CompleteAdding()
401404
}
402405
|> fun p -> Async.StartAsTask(p, cancellationToken = cts.Token)
403406

404407
// Read elements from the blocking buffer & return a sequences
405-
let fin = ref false
406-
while not fin.Value do
407-
match buf.Get() with
408-
| None ->
409-
fin := true
410-
match iteratorTask.Result with
411-
| Choice1Of2() -> ()
412-
| Choice2Of2 exn -> raise exn
413-
| Some v -> yield v
408+
for x in buf.GetConsumingEnumerable() do
409+
match x with
410+
| Observable.Next v -> yield v
411+
| Observable.Error err -> raise err
412+
| Observable.Completed -> failwith "unexpected"
414413
}
415414

416415
let rec cache (input : AsyncSeq<'T>) =

src/FSharp.Control.AsyncSeq/BlockingQueueAgent.fs

Lines changed: 0 additions & 99 deletions
This file was deleted.

src/FSharp.Control.AsyncSeq/FSharp.Control.AsyncSeq.fsproj

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@
6161
<Import Project="..\..\.paket\paket.targets" />
6262
<ItemGroup>
6363
<Compile Include="AutoCancelAgent.fs" />
64-
<Compile Include="BlockingQueueAgent.fs" />
6564
<Compile Include="AsyncSeq.fsi" />
6665
<Compile Include="AsyncSeq.fs" />
6766
<None Include="paket.references" />

0 commit comments

Comments
 (0)