Skip to content

Commit 6785d77

Browse files
committed
AsyncSeqSrc safety, tests
1 parent 968df4a commit 6785d77

2 files changed

Lines changed: 18 additions & 27 deletions

File tree

src/FSharp.Control.AsyncSeq/AsyncSeq.fs

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,11 @@ type IAsyncEnumerable<'T> =
2525
type AsyncSeq<'T> = IAsyncEnumerable<'T>
2626
// abstract GetEnumerator : unit -> IAsyncEnumerator<'T>
2727

28-
type AsyncSeqSrc<'a> = private { mutable tail : AsyncSeqSrcNode<'a> }
28+
type AsyncSeqSrc<'a> = private { tail : AsyncSeqSrcNode<'a> ref }
2929

3030
and private AsyncSeqSrcNode<'a> =
31-
struct
32-
val tcs : TaskCompletionSource<('a * AsyncSeqSrcNode<'a>) option>
33-
new (tcs) = { tcs = tcs }
34-
end
31+
val tcs : TaskCompletionSource<('a * AsyncSeqSrcNode<'a>) option>
32+
new (tcs) = { tcs = tcs }
3533

3634
[<AutoOpen>]
3735
module internal Utils =
@@ -1400,19 +1398,18 @@ module AsyncSeq =
14001398
new AsyncSeqSrcNode<_>(new TaskCompletionSource<_>())
14011399

14021400
let create () : AsyncSeqSrc<'a> =
1403-
{ tail = createNode () }
1401+
{ tail = ref (createNode ()) }
14041402

14051403
let put (a:'a) (s:AsyncSeqSrc<'a>) =
14061404
let newTail = createNode ()
1407-
let tail = s.tail
1408-
s.tail <- newTail
1405+
let tail = Interlocked.Exchange(s.tail, newTail)
14091406
tail.tcs.SetResult(Some(a, newTail))
14101407

14111408
let close (s:AsyncSeqSrc<'a>) : unit =
1412-
s.tail.tcs.SetResult(None)
1409+
s.tail.Value.tcs.SetResult(None)
14131410

14141411
let error (ex:exn) (s:AsyncSeqSrc<'a>) : unit =
1415-
s.tail.tcs.SetException(ex)
1412+
s.tail.Value.tcs.SetException(ex)
14161413

14171414
let rec private toAsyncSeqImpl (s:AsyncSeqSrcNode<'a>) : AsyncSeq<'a> =
14181415
asyncSeq {
@@ -1424,7 +1421,7 @@ module AsyncSeq =
14241421
yield! toAsyncSeqImpl tl }
14251422

14261423
let toAsyncSeq (s:AsyncSeqSrc<'a>) : AsyncSeq<'a> =
1427-
toAsyncSeqImpl s.tail
1424+
toAsyncSeqImpl s.tail.Value
14281425

14291426

14301427

tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1162,22 +1162,16 @@ let ``AsyncSeq.mapParallelAsync should be parallel`` () =
11621162

11631163

11641164
[<Test>]
1165-
let ``AsyncSeqSrc.create should create empty sequence`` () =
1166-
let src = AsyncSeqSrc.create ()
1167-
let s = src |> AsyncSeqSrc.toAsyncSeq
1168-
src |> AsyncSeqSrc.close
1169-
let expected = AsyncSeq.empty
1170-
Assert.True(EQ expected s)
1171-
1172-
[<Test>]
1173-
let ``AsyncSeqSrc.put should yield when tapped before put`` () =
1174-
let item = 1
1175-
let src = AsyncSeqSrc.create ()
1176-
let actual = src |> AsyncSeqSrc.toAsyncSeq
1177-
src |> AsyncSeqSrc.put item
1178-
src |> AsyncSeqSrc.close
1179-
let expected = AsyncSeq.singleton item
1180-
Assert.AreEqual (expected, actual)
1165+
let ``AsyncSeqSrc.should work`` () =
1166+
for n in 0..100 do
1167+
let items = List.init n id
1168+
let src = AsyncSeqSrc.create ()
1169+
let actual = src |> AsyncSeqSrc.toAsyncSeq
1170+
for item in items do
1171+
src |> AsyncSeqSrc.put item
1172+
src |> AsyncSeqSrc.close
1173+
let expected = items |> AsyncSeq.ofSeq
1174+
Assert.AreEqual (expected, actual)
11811175

11821176
[<Test>]
11831177
let ``AsyncSeqSrc.put should yield when tapped after put`` () =

0 commit comments

Comments
 (0)