diff --git a/src/FSharp.Control.AsyncSeq/AsyncSeq.fs b/src/FSharp.Control.AsyncSeq/AsyncSeq.fs index efa553fb..c24253b9 100644 --- a/src/FSharp.Control.AsyncSeq/AsyncSeq.fs +++ b/src/FSharp.Control.AsyncSeq/AsyncSeq.fs @@ -1060,6 +1060,28 @@ module AsyncSeq = | None -> return def | Some v -> return v } + let exactlyOne (source : AsyncSeq<'T>) = async { + use ie = source.GetEnumerator() + let! first = ie.MoveNext() + match first with + | None -> return raise (System.InvalidOperationException("The input sequence was empty.")) + | Some v -> + let! second = ie.MoveNext() + match second with + | None -> return v + | Some _ -> return raise (System.InvalidOperationException("The input sequence contains more than one element.")) } + + let tryExactlyOne (source : AsyncSeq<'T>) = async { + use ie = source.GetEnumerator() + let! first = ie.MoveNext() + match first with + | None -> return None + | Some v -> + let! second = ie.MoveNext() + match second with + | None -> return Some v + | Some _ -> return None } + let scanAsync f (state:'TState) (source : AsyncSeq<'T>) = asyncSeq { yield state let z = ref state @@ -1252,6 +1274,22 @@ module AsyncSeq = return LanguagePrimitives.DivideByInt sum count } + let countByAsync (projection : 'T -> Async<'Key>) (source : AsyncSeq<'T>) : Async<('Key * int) array> = + async { + let! dict = + source |> foldAsync (fun (d : System.Collections.Generic.Dictionary<'Key,int>) v -> + async { + let! k = projection v + let mutable cnt = 0 + if d.TryGetValue(k, &cnt) then d.[k] <- cnt + 1 + else d.[k] <- 1 + return d + }) (System.Collections.Generic.Dictionary<'Key,int>()) + return dict |> Seq.map (fun kv -> kv.Key, kv.Value) |> Seq.toArray } + + let countBy (projection : 'T -> 'Key) (source : AsyncSeq<'T>) : Async<('Key * int) array> = + countByAsync (projection >> async.Return) source + let scan f (state:'State) (source : AsyncSeq<'T>) = scanAsync (fun st v -> f st v |> async.Return) state source @@ -1997,6 +2035,19 @@ module AsyncSeq = let distinctUntilChanged (s:AsyncSeq<'T>) : AsyncSeq<'T> = distinctUntilChangedWith ((=)) s + let distinctByAsync (projection : 'T -> Async<'Key>) (source : AsyncSeq<'T>) : AsyncSeq<'T> = asyncSeq { + let seen = System.Collections.Generic.HashSet<'Key>() + for v in source do + let! k = projection v + if seen.Add(k) then + yield v } + + let distinctBy (projection : 'T -> 'Key) (source : AsyncSeq<'T>) : AsyncSeq<'T> = + distinctByAsync (projection >> async.Return) source + + let distinct (source : AsyncSeq<'T>) : AsyncSeq<'T> = + distinctBy id source + let getIterator (s:AsyncSeq<'T>) : (unit -> Async<'T option>) = let curr = s.GetEnumerator() fun () -> curr.MoveNext() diff --git a/src/FSharp.Control.AsyncSeq/AsyncSeq.fsi b/src/FSharp.Control.AsyncSeq/AsyncSeq.fsi index 2c6f0220..2bc95dca 100644 --- a/src/FSharp.Control.AsyncSeq/AsyncSeq.fsi +++ b/src/FSharp.Control.AsyncSeq/AsyncSeq.fsi @@ -163,6 +163,14 @@ module AsyncSeq = /// given asynchronous sequence (or None if the sequence is empty). val tryFirst : source:AsyncSeq<'T> -> Async<'T option> + /// Asynchronously returns the only element of the asynchronous sequence. + /// Raises InvalidOperationException if the sequence is empty or contains more than one element. + val exactlyOne : source:AsyncSeq<'T> -> Async<'T> + + /// Asynchronously returns the only element of the asynchronous sequence, or None if the + /// sequence is empty or contains more than one element. + val tryExactlyOne : source:AsyncSeq<'T> -> Async<'T option> + /// Aggregates the elements of the input asynchronous sequence using the /// specified 'aggregation' function. The result is an asynchronous /// sequence of intermediate aggregation result. @@ -276,6 +284,14 @@ module AsyncSeq = and ^U : (static member DivideByInt : ^U * int -> ^U) and ^U : (static member Zero : ^U) + /// Asynchronously count the elements of the input asynchronous sequence grouped by the result of the given asynchronous key projection. + /// Returns an array of (key, count) pairs. + val countByAsync : projection:('T -> Async<'Key>) -> source:AsyncSeq<'T> -> Async<('Key * int) array> when 'Key : equality + + /// Asynchronously count the elements of the input asynchronous sequence grouped by the result of the given key projection. + /// Returns an array of (key, count) pairs. + val countBy : projection:('T -> 'Key) -> source:AsyncSeq<'T> -> Async<('Key * int) array> when 'Key : equality + /// Asynchronously determine if the sequence contains the given value val contains : value:'T -> source:AsyncSeq<'T> -> Async when 'T : equality @@ -594,6 +610,18 @@ module AsyncSeq = /// Returns an async sequence which contains no contiguous duplicate elements. val distinctUntilChanged : source:AsyncSeq<'T> -> AsyncSeq<'T> when 'T : equality + /// Returns an async sequence containing only distinct elements, determined by the given asynchronous key projection. + /// Elements are compared using structural equality on the projected key. + val distinctByAsync : projection:('T -> Async<'Key>) -> source:AsyncSeq<'T> -> AsyncSeq<'T> when 'Key : equality + + /// Returns an async sequence containing only distinct elements, determined by the given key projection. + /// Elements are compared using structural equality on the projected key. + val distinctBy : projection:('T -> 'Key) -> source:AsyncSeq<'T> -> AsyncSeq<'T> when 'Key : equality + + /// Returns an async sequence containing only distinct elements. + /// Elements are compared using structural equality. + val distinct : source:AsyncSeq<'T> -> AsyncSeq<'T> when 'T : equality + #if FABLE_COMPILER [] #endif diff --git a/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs b/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs index f26b0dd6..597ea33e 100644 --- a/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs +++ b/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs @@ -2907,3 +2907,106 @@ let ``AsyncSeq.chunkByAsync groups consecutive equal keys`` () = } |> Async.RunSynchronously + +// ===== distinct / distinctBy / distinctByAsync ===== + +[] +let ``AsyncSeq.distinct removes all duplicates`` () = + let source = asyncSeq { yield 1; yield 2; yield 1; yield 3; yield 2 } + let result = AsyncSeq.distinct source |> AsyncSeq.toListSynchronously + Assert.AreEqual([1; 2; 3], result) + +[] +let ``AsyncSeq.distinct empty sequence returns empty`` () = + let result = AsyncSeq.distinct AsyncSeq.empty |> AsyncSeq.toListSynchronously + Assert.AreEqual([], result) + +[] +let ``AsyncSeq.distinct all unique elements returns all`` () = + let source = asyncSeq { yield 1; yield 2; yield 3 } + let result = AsyncSeq.distinct source |> AsyncSeq.toListSynchronously + Assert.AreEqual([1; 2; 3], result) + +[] +let ``AsyncSeq.distinctBy removes duplicates by key`` () = + let source = asyncSeq { yield (1, "a"); yield (2, "b"); yield (1, "c") } + let result = AsyncSeq.distinctBy fst source |> AsyncSeq.toListSynchronously + Assert.AreEqual([(1, "a"); (2, "b")], result) + +[] +let ``AsyncSeq.distinctByAsync removes duplicates by async key`` () = + async { + let source = asyncSeq { yield 1; yield 2; yield 1; yield 3 } + let result = AsyncSeq.distinctByAsync (fun x -> async { return x % 2 }) source |> AsyncSeq.toListSynchronously + Assert.AreEqual([1; 2], result) + } |> Async.RunSynchronously + +// ===== countBy / countByAsync ===== + +[] +let ``AsyncSeq.countBy counts elements by key`` () = + async { + let source = asyncSeq { yield 1; yield 2; yield 1; yield 3; yield 2; yield 2 } + let result = AsyncSeq.countBy id source |> Async.RunSynchronously + let sorted = result |> Array.sortBy fst + Assert.AreEqual([| (1, 2); (2, 3); (3, 1) |], sorted) + } |> Async.RunSynchronously + +[] +let ``AsyncSeq.countBy empty sequence returns empty array`` () = + async { + let result = AsyncSeq.countBy id AsyncSeq.empty |> Async.RunSynchronously + Assert.AreEqual([||], result) + } |> Async.RunSynchronously + +[] +let ``AsyncSeq.countByAsync counts elements by async key`` () = + async { + let source = asyncSeq { yield 1; yield 2; yield 3; yield 4 } + let result = AsyncSeq.countByAsync (fun x -> async { return x % 2 }) source |> Async.RunSynchronously + let sorted = result |> Array.sortBy fst + Assert.AreEqual([| (0, 2); (1, 2) |], sorted) + } |> Async.RunSynchronously + +// ===== exactlyOne / tryExactlyOne ===== + +[] +let ``AsyncSeq.exactlyOne returns single element`` () = + async { + let source = asyncSeq { yield 42 } + let result = AsyncSeq.exactlyOne source |> Async.RunSynchronously + Assert.AreEqual(42, result) + } |> Async.RunSynchronously + +[] +let ``AsyncSeq.exactlyOne raises on empty sequence`` () = + Assert.Throws(fun () -> + AsyncSeq.exactlyOne AsyncSeq.empty |> Async.RunSynchronously |> ignore) |> ignore + +[] +let ``AsyncSeq.exactlyOne raises on sequence with more than one element`` () = + Assert.Throws(fun () -> + asyncSeq { yield 1; yield 2 } |> AsyncSeq.exactlyOne |> Async.RunSynchronously |> ignore) |> ignore + +[] +let ``AsyncSeq.tryExactlyOne returns Some for single element`` () = + async { + let source = asyncSeq { yield 42 } + let result = AsyncSeq.tryExactlyOne source |> Async.RunSynchronously + Assert.AreEqual(Some 42, result) + } |> Async.RunSynchronously + +[] +let ``AsyncSeq.tryExactlyOne returns None for empty sequence`` () = + async { + let result = AsyncSeq.tryExactlyOne AsyncSeq.empty |> Async.RunSynchronously + Assert.AreEqual(None, result) + } |> Async.RunSynchronously + +[] +let ``AsyncSeq.tryExactlyOne returns None for sequence with more than one element`` () = + async { + let source = asyncSeq { yield 1; yield 2 } + let result = AsyncSeq.tryExactlyOne source |> Async.RunSynchronously + Assert.AreEqual(None, result) + } |> Async.RunSynchronously