Skip to content

Commit e40fbf6

Browse files
authored
Merge pull request #143 from channaj/sort
Sort functions
2 parents 4474df1 + 3c60d5f commit e40fbf6

3 files changed

Lines changed: 86 additions & 17 deletions

File tree

src/FSharp.Control.AsyncSeq/AsyncSeq.fs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ module internal Utils =
5050
{ new IDisposable with member __.Dispose () = () }
5151

5252
// ----------------------------------------------------------------------------
53-
53+
5454
#if FABLE_COMPILER
5555
type ExceptionDispatchInfo private (err: exn) =
5656
member _.SourceException = err
@@ -214,7 +214,7 @@ module AsyncGenerator =
214214
member x.Disposer =
215215
g.Disposer
216216

217-
217+
218218
static member Bind (g:AsyncGenerator<'a>, cont:unit -> AsyncGenerator<'a>) : AsyncGenerator<'a> =
219219
#if !FABLE_COMPILER
220220
match g with
@@ -1062,7 +1062,7 @@ module AsyncSeq =
10621062
mb.Post (Some b) })
10631063
|> Async.map (fun _ -> mb.Post None)
10641064
|> Async.StartChildAsTask
1065-
1065+
10661066
return!
10671067
replicateUntilNoneAsync (Task.chooseTask (err |> Task.taskFault) (async.Delay mb.Receive))
10681068
|> iterAsync id }
@@ -1483,6 +1483,21 @@ module AsyncSeq =
14831483
if (buffer.Count > 0) then
14841484
yield buffer.ToArray() }
14851485

1486+
let toSortedSeq fn source =
1487+
toArrayAsync source |> Async.map fn |> Async.RunSynchronously
1488+
1489+
let sort (source:AsyncSeq<'T>) : array<'T> when 'T : comparison =
1490+
toSortedSeq Array.sort source
1491+
1492+
let sortBy (projection:'T -> 'Key) (source:AsyncSeq<'T>) : array<'T> when 'Key : comparison =
1493+
toSortedSeq (Array.sortBy projection) source
1494+
1495+
let sortDescending (source:AsyncSeq<'T>) : array<'T> when 'T : comparison =
1496+
toSortedSeq Array.sortDescending source
1497+
1498+
let sortByDescending (projection:'T -> 'Key) (source:AsyncSeq<'T>) : array<'T> when 'Key : comparison =
1499+
toSortedSeq (Array.sortByDescending projection) source
1500+
14861501
#if !FABLE_COMPILER
14871502
let bufferByCountAndTime (bufferSize:int) (timeoutMs:int) (source:AsyncSeq<'T>) : AsyncSeq<'T[]> =
14881503
if (bufferSize < 1) then invalidArg "bufferSize" "must be positive"

src/FSharp.Control.AsyncSeq/AsyncSeq.fsi

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -437,6 +437,30 @@ module AsyncSeq =
437437
/// Flattens an AsyncSeq of asynchronous sequences.
438438
val concat : AsyncSeq<AsyncSeq<'T>> -> AsyncSeq<'T>
439439

440+
/// Yields a sequence ordered by keys.
441+
/// This function returns a sequence that digests the whole initial sequence as soon as
442+
/// that sequence is iterated. As a result this function should not be used with
443+
/// large or infinite sequences.
444+
val sort : source:AsyncSeq<'T> -> array<'T> when 'T : comparison
445+
446+
/// Applies a key-generating function to each element of an AsyncSeq and yield an array ordered by keys.
447+
/// This function returns an array that digests the whole initial sequence as soon as
448+
/// that sequence is iterated. As a result this function should not be used with
449+
/// large or infinite sequences.
450+
val sortBy : projection:('T -> 'Key) -> source:AsyncSeq<'T> -> array<'T> when 'Key : comparison
451+
452+
/// Yields an array ordered descending by keys.
453+
/// This function returns an array that digests the whole initial sequence as soon as
454+
/// that sequence is iterated. As a result this function should not be used with
455+
/// large or infinite sequences.
456+
val sortDescending : source:AsyncSeq<'T> -> array<'T> when 'T : comparison
457+
458+
/// Applies a key-generating function to each element of an AsyncSeq and yield an array ordered descending by keys.
459+
/// This function returns an array that digests the whole initial sequence as soon as
460+
/// that sequence is iterated. As a result this function should not be used with
461+
/// large or infinite sequences.
462+
val sortByDescending : projection:('T -> 'Key) -> source:AsyncSeq<'T> -> array<'T> when 'Key : comparison
463+
440464
/// Interleaves two async sequences of the same type into a resulting sequence. The provided
441465
/// sequences are consumed in lock-step.
442466
val interleave : source1:AsyncSeq<'T> -> source2:AsyncSeq<'T> -> AsyncSeq<'T>
@@ -452,7 +476,7 @@ module AsyncSeq =
452476
#if !FABLE_COMPILER
453477
/// Buffer items from the async sequence until a specified buffer size is reached or a specified amount of time is elapsed.
454478
val bufferByCountAndTime : bufferSize:int -> timeoutMs:int -> source:AsyncSeq<'T> -> AsyncSeq<'T []>
455-
479+
456480
/// Buffers items from the async sequence by the specified time interval.
457481
/// If no items are received in an intervel and empty array is emitted.
458482
val bufferByTime : timeMs:int -> source:AsyncSeq<'T> -> AsyncSeq<'T[]>
@@ -490,7 +514,7 @@ module AsyncSeq =
490514
/// but in parallel, without waiting for a prior mapping operation to complete.
491515
/// Parallelism is bound by the ThreadPool.
492516
val mapAsyncParallel : mapping:('T -> Async<'U>) -> s:AsyncSeq<'T> -> AsyncSeq<'U>
493-
517+
494518
/// Applies a key-generating function to each element and returns an async sequence containing unique keys
495519
/// and async sequences containing elements corresponding to the key.
496520
///
@@ -504,7 +528,7 @@ module AsyncSeq =
504528
/// Note that the resulting async sequence has to be processed in parallel (e.g AsyncSeq.mapAsyncParallel) becaused
505529
/// completion of sub-sequences depends on completion of other sub-sequences.
506530
val groupBy<'T, 'Key when 'Key : equality> : projection:('T -> 'Key) -> source:AsyncSeq<'T> -> AsyncSeq<'Key * AsyncSeq<'T>>
507-
531+
508532
#if (NETSTANDARD2_1 || NETCOREAPP3_0)
509533

510534
/// Creates an asynchronous computation that asynchronously yields results from the provided .NET IAsyncEnumerable.

tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

Lines changed: 41 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1512,18 +1512,18 @@ let ``AsyncSeq.iterAsyncParallel should propagate exception`` () =
15121512

15131513
[<Test>]
15141514
let ``AsyncSeq.iterAsyncParallel should cancel and not block forever when run in parallel with another exception-throwing Async`` () =
1515-
1515+
15161516
let handle x = async {
1517-
do! Async.Sleep 50
1517+
do! Async.Sleep 50
15181518
}
15191519

1520-
let fakeAsync = async {
1520+
let fakeAsync = async {
15211521
do! Async.Sleep 500
15221522
return "fakeAsync"
15231523
}
15241524

15251525
let makeAsyncSeqBatch () =
1526-
let rec loop() = asyncSeq {
1526+
let rec loop() = asyncSeq {
15271527
let! batch = fakeAsync |> Async.Catch
15281528
match batch with
15291529
| Choice1Of2 batch ->
@@ -1532,11 +1532,11 @@ let ``AsyncSeq.iterAsyncParallel should cancel and not block forever when run in
15321532
yield! loop()
15331533
else
15341534
yield batch
1535-
yield! loop()
1535+
yield! loop()
15361536
| Choice2Of2 err ->
15371537
printfn "Problem getting batch: %A" err
15381538
}
1539-
1539+
15401540
loop()
15411541

15421542
let x = makeAsyncSeqBatch () |> AsyncSeq.concatSeq |> AsyncSeq.iterAsyncParallel handle
@@ -1733,6 +1733,36 @@ let ``Async.concat should work``() =
17331733

17341734
Assert.True(EQ expected actual)
17351735

1736+
[<Test>]
1737+
let ``AsyncSeq.sort should work for``() =
1738+
let input = [1; 3; 2; 5; 7; 4; 6] |> AsyncSeq.ofSeq
1739+
let expected = [|1..7|]
1740+
let actual = input |> AsyncSeq.sort
1741+
Assert.AreEqual(expected, actual)
1742+
1743+
[<Test>]
1744+
let ``AsyncSeq.sortDescending should work``() =
1745+
let input = [1; 3; 2; Int32.MaxValue; 4; 6; Int32.MinValue; 5; 7; 0] |> AsyncSeq.ofSeq
1746+
let expected = seq { yield Int32.MaxValue; yield! seq{ 7..-1..0 }; yield Int32.MinValue } |> Array.ofSeq
1747+
let actual = input |> AsyncSeq.sortDescending
1748+
Assert.AreEqual(expected, actual)
1749+
1750+
[<Test>]
1751+
let ``AsyncSeq.sortBy should work``() =
1752+
let fn x = Math.Abs(x-5)
1753+
let input = [1; 2; 4; 5; 7] |> AsyncSeq.ofSeq
1754+
let expected = [|5; 4; 7; 2; 1|]
1755+
let actual = input |> AsyncSeq.sortBy fn
1756+
Assert.AreEqual(expected, actual)
1757+
1758+
[<Test>]
1759+
let ``AsyncSeq.sortByDescending should work``() =
1760+
let fn x = Math.Abs(x-5)
1761+
let input = [1; 2; 4; 5; 6; 7;] |> AsyncSeq.ofSeq
1762+
let expected = [|1; 2; 7; 4; 6; 5;|]
1763+
let actual = input |> AsyncSeq.sortByDescending fn
1764+
Assert.AreEqual(expected, actual)
1765+
17361766
#if (NETSTANDARD2_1 || NETCOREAPP3_0)
17371767
[<Test>]
17381768
let ``AsyncSeq.ofAsyncEnum should roundtrip successfully``() =
@@ -1744,8 +1774,8 @@ let ``AsyncSeq.ofAsyncEnum should roundtrip successfully``() =
17441774
let ``AsyncSeq.toAsyncEnum raises exception``() : unit =
17451775
async {
17461776
let exceptionMessage = "Raised inside AsyncSeq"
1747-
let exceptionSequence =
1748-
asyncSeq { yield failwith exceptionMessage; yield 1 }
1777+
let exceptionSequence =
1778+
asyncSeq { yield failwith exceptionMessage; yield 1 }
17491779
|> AsyncSeq.toAsyncEnum
17501780
let mutable exceptionRaised = false
17511781
try
@@ -1765,9 +1795,9 @@ let ``AsyncSeq.toAsyncEnum raises exception``() : unit =
17651795
let ``AsyncSeq.ofAsyncEnum raises exception``() : unit =
17661796
async {
17671797
let exceptionMessage = "Raised inside AsyncSeq"
1768-
let exceptionSequence =
1769-
asyncSeq { yield failwith exceptionMessage; yield 1 }
1770-
|> AsyncSeq.toAsyncEnum
1798+
let exceptionSequence =
1799+
asyncSeq { yield failwith exceptionMessage; yield 1 }
1800+
|> AsyncSeq.toAsyncEnum
17711801
|> AsyncSeq.ofAsyncEnum
17721802
let mutable exceptionRaised = false
17731803
try

0 commit comments

Comments
 (0)