Skip to content

Commit 2ce2a6e

Browse files
committed
copied more async stuff from FSharpx
1 parent d10e81b commit 2ce2a6e

18 files changed

Lines changed: 1383 additions & 23 deletions

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ Please also join the [F# Open Source Group](http://fsharp.github.com)
1515
# Maintainer(s)
1616

1717
- [@forki](https://github.com/forki)
18+
- [@tpetricek](https://github.com/tpetricek)
1819
- [@mausch](https://github.com/mausch)
1920
- [@panesofglass](https://github.com/panesofglass)
2021

RELEASE_NOTES.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
### 1.9.6 - 23.02.2015
1+
### 1.9.7 - 23.02.2015
22
* Copied stuff from FSharpx

build.fsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ let summary = "Async extensions for F#"
3939
let description = "Async extensions for F#"
4040

4141
// List of author names (for NuGet package)
42-
let authors = [ "Thomas Petricek, David Thomas, Steffen Forkmann" ]
42+
let authors = [ "Thomas Petricek"; "David Thomas"; "Ryan Riley"; "Steffen Forkmann" ]
4343

4444
// Tags for your project (for NuGet package)
4545
let tags = "F#, async, fsharp"

paket.dependencies

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,5 @@ nuget Nuget.CommandLine
77
nuget FAKE
88
nuget SourceLink.Fake
99

10-
github fsharp/FAKE modules/Octokit/Octokit.fsx
10+
github fsharp/FAKE modules/Octokit/Octokit.fsx
11+
github fsprojects/FSharpx.Collections src/FSharpx.Collections/CircularBuffer.fs

paket.lock

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,7 @@ GITHUB
2424
remote: fsharp/FAKE
2525
specs:
2626
modules/Octokit/Octokit.fsx (575e7ab5050d2ddec80ba8d3a98cfb10ba327965)
27-
Octokit
27+
Octokit
28+
remote: fsprojects/FSharpx.Collections
29+
specs:
30+
src/FSharpx.Collections/CircularBuffer.fs (e8fdc9c6d61618bdcc5c4f143ecbbfbe785b8ec4)

src/FSharpx.Async/Async.IO.fs

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
// ----------------------------------------------------------------------------
2+
// F# async extensions (IO.fs)
3+
// (c) Tomas Petricek and Ryan Riley, 2011-2012, Available under Apache 2.0 license.
4+
// ----------------------------------------------------------------------------
5+
6+
namespace FSharpx.IO
7+
open System.IO
8+
open FSharpx.Control
9+
// ----------------------------------------------------------------------------
10+
// Extensions that simplify working with Stream using async sequences
11+
12+
[<AutoOpen>]
13+
module IOExtensions =
14+
type Stream with
15+
/// Asynchronously reads the stream in chunks of a specified size
16+
/// and returns the result as an asynchronous sequence.
17+
member x.AsyncReadSeq(?bufferSize) =
18+
let bufferSize = defaultArg bufferSize 1024
19+
let buffer = Array.zeroCreate bufferSize
20+
let rec loop () = asyncSeq {
21+
let! count = x.AsyncRead(buffer, 0, bufferSize)
22+
if count > 0 then
23+
yield Array.sub buffer 0 count
24+
yield! loop() }
25+
loop ()
26+
27+
/// Asynchronously writes all data specified by the
28+
/// given asynchronous sequence to the stream.
29+
member x.AsyncWriteSeq(input : AsyncSeq<byte[]>) = async {
30+
for data in input do
31+
do! x.AsyncWrite(data) }
32+
33+
open System
34+
#if NET40
35+
open System.Diagnostics.Contracts
36+
#else
37+
open System.Diagnostics
38+
#endif
39+
40+
// Loosely based on Stephen Toub's Stream Pipelines article in MSDN.
41+
// See http://msdn.microsoft.com/en-us/magazine/cc163290.aspx
42+
type CircularStream(maxLength) =
43+
inherit Stream()
44+
45+
let queue = new CircularQueueAgent<byte>(maxLength)
46+
47+
override x.CanRead = true
48+
override x.CanSeek = false
49+
// We deviate from Toub's implementation in that we
50+
// never prevent writes.
51+
override x.CanWrite = true
52+
53+
override x.Flush() = ()
54+
override x.Length = raise <| new NotSupportedException()
55+
override x.Position
56+
with get() = raise <| new NotSupportedException()
57+
and set(v) = raise <| new NotSupportedException()
58+
override x.Seek(offset, origin) = raise <| new NotSupportedException()
59+
override x.SetLength(value) = raise <| new NotSupportedException()
60+
61+
override x.Read(buffer, offset, count) =
62+
#if NET40
63+
Contract.Requires(buffer <> null, "buffer cannot be null")
64+
Contract.Requires(offset >= 0 && offset < buffer.Length, "offset is out of range")
65+
Contract.Requires(count >= 0 && offset + count <= buffer.Length, "count is out of range")
66+
#else
67+
Debug.Assert(buffer <> null, "buffer cannot be null")
68+
Debug.Assert(offset >= 0 && offset < buffer.Length, "offset is out of range")
69+
Debug.Assert(count >= 0 && offset + count <= buffer.Length, "count is out of range")
70+
#endif
71+
72+
if count = 0 then 0 else
73+
let chunk = queue.Dequeue(count)
74+
Buffer.BlockCopy(chunk, 0, buffer, offset, chunk.Length)
75+
chunk.Length
76+
77+
override x.Write(buffer, offset, count) =
78+
#if NET40
79+
Contract.Requires(buffer <> null, "buffer cannot be null")
80+
Contract.Requires(offset >= 0 && offset < buffer.Length, "offset is out of range")
81+
Contract.Requires(count >= 0 && offset + count <= buffer.Length, "count is out of range")
82+
#else
83+
Debug.Assert(buffer <> null, "buffer cannot be null")
84+
Debug.Assert(offset >= 0 && offset < buffer.Length, "offset is out of range")
85+
Debug.Assert(count >= 0 && offset + count <= buffer.Length, "count is out of range")
86+
#endif
87+
88+
if count = 0 then () else
89+
queue.Enqueue(buffer, offset, count)
90+
91+
member x.AsyncRead(buffer: byte[], offset, count, ?timeout) =
92+
#if NET40
93+
Contract.Requires(buffer <> null, "buffer cannot be null")
94+
Contract.Requires(offset >= 0 && offset < buffer.Length, "offset is out of range")
95+
Contract.Requires(count >= 0 && offset + count <= buffer.Length, "count is out of range")
96+
#else
97+
Debug.Assert(buffer <> null, "buffer cannot be null")
98+
Debug.Assert(offset >= 0 && offset < buffer.Length, "offset is out of range")
99+
Debug.Assert(count >= 0 && offset + count <= buffer.Length, "count is out of range")
100+
#endif
101+
102+
if count = 0 then async.Return(0) else
103+
async {
104+
let! chunk = queue.AsyncDequeue(count, ?timeout = timeout)
105+
Buffer.BlockCopy(chunk, 0, buffer, offset, chunk.Length)
106+
return chunk.Length }
107+
108+
member x.AsyncWrite(buffer: byte[], offset, count, ?timeout) =
109+
#if NET40
110+
Contract.Requires(buffer <> null, "buffer cannot be null")
111+
Contract.Requires(offset >= 0 && offset < buffer.Length, "offset is out of range")
112+
Contract.Requires(count >= 0 && offset + count <= buffer.Length, "count is out of range")
113+
#else
114+
Debug.Assert(buffer <> null, "buffer cannot be null")
115+
Debug.Assert(offset >= 0 && offset < buffer.Length, "offset is out of range")
116+
Debug.Assert(count >= 0 && offset + count <= buffer.Length, "count is out of range")
117+
#endif
118+
119+
if count = 0 then async.Zero() else
120+
async { do! queue.AsyncEnqueue(buffer, offset, count, ?timeout = timeout) }
121+
122+
override x.Close() =
123+
base.Close()
124+
// TODO: Close the queue agent.

0 commit comments

Comments
 (0)