Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .dialyzer_ignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
lib/mint/tunnel_proxy.ex:49
lib/mint/http1.ex:915
lib/mint/http1.ex:927
lib/mint/unsafe_proxy.ex:173
lib/mint/unsafe_proxy.ex:198
test/support
Expand Down
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## Unreleased

### New features

* Add `Mint.HTTP.request_body_window/2` to inspect the request body flow-control window for a streaming request. Returns `min(connection_window, stream_window)` for HTTP/2 and `:infinity` for HTTP/1, which has no application-level flow control.

## v1.7.1

### Bug Fixes and Improvements
Expand Down
2 changes: 2 additions & 0 deletions lib/mint/core/conn.ex
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,6 @@ defmodule Mint.Core.Conn do
@callback put_proxy_headers(conn(), Mint.Types.headers()) :: conn()

@callback put_log(conn(), boolean()) :: conn()

@callback request_body_window(conn(), Types.request_ref()) :: non_neg_integer() | :infinity
end
73 changes: 73 additions & 0 deletions lib/mint/http.ex
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,10 @@ defmodule Mint.HTTP do

This function always returns an updated connection to be stored over the old connection.

When streaming a body of arbitrary size, use `request_body_window/2` to learn
how many bytes you can send right now without violating HTTP/2 flow control,
then split your body accordingly before passing each chunk to this function.

For information about transfer encoding and content length in HTTP/1, see
`Mint.HTTP1.stream_request_body/3`.

Expand Down Expand Up @@ -1065,6 +1069,75 @@ defmodule Mint.HTTP do
@impl true
def put_proxy_headers(conn, headers), do: conn_apply(conn, :put_proxy_headers, [conn, headers])

@doc """
Returns the request body flow-control window for the streaming request
identified by `request_ref`.

The semantics differ by protocol:

* In HTTP/2, returns `min(connection_window, stream_window)` — the maximum
number of body bytes that can be sent right now without violating flow
control. Exceeding this value in a single `DATA` frame would close the
connection with a `FLOW_CONTROL_ERROR`. See `Mint.HTTP2.get_window_size/2`
for the underlying primitives.

* In HTTP/1, returns `:infinity`. HTTP/1 has no application-level
flow-control mechanism: any amount of body data is protocol-valid.

The value returned reflects only the protocol-level flow-control
constraint. It does not account for the operating-system socket send
buffer: under either protocol, `stream_request_body/3` can still block
when that buffer fills up. To bound this behavior, configure
`send_timeout` on the socket via `:transport_opts` when establishing the
connection (see `Mint.HTTP.connect/4`).

Raises `ArgumentError` if `request_ref` is not associated with an active
streaming request.

## Examples

Streaming a binary body in chunks that respect the protocol window:

defp stream_body(conn, ref, "") do
Mint.HTTP.stream_request_body(conn, ref, :eof)
end

defp stream_body(conn, ref, body) do
conn
|> Mint.HTTP.request_body_window(ref)
|> send_body_chunk(conn, ref, body)
end

defp send_body_chunk(0, conn, ref, body) do
with {:ok, conn} <- wait(conn, ref) do
stream_body(conn, ref, body)
end
end

defp send_body_chunk(window, conn, ref, body) do
chunk_size = min(window, byte_size(body))
<<chunk::binary-size(chunk_size), rest::binary>> = body

with {:ok, conn} <- Mint.HTTP.stream_request_body(conn, ref, chunk) do
stream_body(conn, ref, rest)
end
end
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two issues with this code example.

  • We can return 0 from request_body_window/2 so you can attempt to send an empty binary.
  • There is no backpressure, if there is no window available we will be busy looping, consuming CPU resources.

The latter can be hard to express in a code example because there are many different solutions to it depending on context, so maybe we should just call it out?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch! Fixed


defp wait(conn, ref) do
# Wait for the server to refill the request body window with a
# WINDOW_UPDATE frame. The concrete implementation depends on the
# socket mode and other context.
end

Note that `min(:infinity, n) == n` thanks to Erlang term ordering, so the
same loop works on HTTP/1 (each iteration sends the entire remaining body in
a single chunk) and on HTTP/2 (each iteration sends at most the current
flow-control window).
"""
@doc since: "1.8.0"
@impl true
def request_body_window(conn, ref), do: conn_apply(conn, :request_body_window, [conn, ref])

## Helpers

defp conn_apply(%UnsafeProxy{}, fun, args), do: apply(UnsafeProxy, fun, args)
Expand Down
12 changes: 12 additions & 0 deletions lib/mint/http1.ex
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,18 @@ defmodule Mint.HTTP1 do
%{conn | proxy_headers: headers}
end

@doc """
See `Mint.HTTP.request_body_window/2`.
"""
@doc since: "1.8.0"
@impl true
def request_body_window(%__MODULE__{streaming_request: %{ref: ref}}, ref), do: :infinity

def request_body_window(%__MODULE__{}, ref) do
raise ArgumentError,
"request with request reference #{inspect(ref)} was not found or is not streaming a body"
end

## Helpers

defp decode(:status, %{request: request} = conn, data, responses) do
Expand Down
9 changes: 9 additions & 0 deletions lib/mint/http2.ex
Original file line number Diff line number Diff line change
Expand Up @@ -1039,6 +1039,15 @@ defmodule Mint.HTTP2 do
%{conn | proxy_headers: headers}
end

@doc """
See `Mint.HTTP.request_body_window/2`.
"""
@doc since: "1.8.0"
@impl true
def request_body_window(%__MODULE__{} = conn, ref) do
min(get_window_size(conn, :connection), get_window_size(conn, {:request, ref}))
end

## Helpers

defp handle_closed(conn) do
Expand Down
5 changes: 5 additions & 0 deletions lib/mint/unsafe_proxy.ex
Original file line number Diff line number Diff line change
Expand Up @@ -199,4 +199,9 @@ defmodule Mint.UnsafeProxy do
def put_proxy_headers(%__MODULE__{}, _headers) do
raise "invalid function for proxy unsafe proxy connections"
end

@impl true
def request_body_window(%__MODULE__{module: module, state: state}, ref) do
module.request_body_window(state, ref)
end
end
23 changes: 23 additions & 0 deletions test/http_test.exs
Original file line number Diff line number Diff line change
@@ -1,4 +1,27 @@
defmodule Mint.HTTPTest do
use ExUnit.Case, async: true
doctest Mint.HTTP

alias Mint.{HTTP, HTTP1.TestServer}

setup do
{:ok, port, server_ref} = TestServer.start()
assert {:ok, conn} = HTTP.connect(:http, "localhost", port)
assert_receive {^server_ref, server_socket}

[conn: conn, server_socket: server_socket]
end

describe "request_body_window/2" do
test "returns :infinity for an HTTP/1 streaming request", %{conn: conn} do
{:ok, conn, ref} = HTTP.request(conn, "GET", "/", [], :stream)
assert HTTP.request_body_window(conn, ref) == :infinity
end

test "raises ArgumentError for an unknown request ref", %{conn: conn} do
assert_raise ArgumentError, fn ->
HTTP.request_body_window(conn, make_ref())
end
end
end
end
13 changes: 13 additions & 0 deletions test/mint/http1/conn_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -1151,6 +1151,19 @@ defmodule Mint.HTTP1Test do
{:ok, conn, responses}
end

describe "request_body_window/2" do
test "returns :infinity for an active streaming request", %{conn: conn} do
{:ok, conn, ref} = HTTP1.request(conn, "GET", "/", [], :stream)
assert HTTP1.request_body_window(conn, ref) == :infinity
end

test "raises if no request is currently streaming a body", %{conn: conn} do
assert_raise ArgumentError, ~r/was not found or is not streaming a body/, fn ->
HTTP1.request_body_window(conn, make_ref())
end
end
end

@mint_user_agent "mint/#{Mix.Project.config()[:version]}"
defp mint_user_agent, do: @mint_user_agent
end
76 changes: 76 additions & 0 deletions test/mint/http2/conn_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -1772,6 +1772,82 @@ defmodule Mint.HTTP2Test do
HTTP2.get_window_size(conn, {:request, make_ref()})
end
end

test "request_body_window/2 returns the minimum of connection and request window sizes",
%{conn: conn} do
{conn, ref} = open_request(conn, :stream)

send_window = HTTP2.request_body_window(conn, ref)
conn_window = HTTP2.get_window_size(conn, :connection)
request_window = HTTP2.get_window_size(conn, {:request, ref})

assert send_window == min(conn_window, request_window)
end

test "request_body_window/2 decreases after streaming body data", %{conn: conn} do
{conn, ref} = open_request(conn, :stream)

initial_send_window = HTTP2.request_body_window(conn, ref)
assert initial_send_window > 0

body_chunk = "hello"
{:ok, conn} = HTTP2.stream_request_body(conn, ref, body_chunk)

assert HTTP2.request_body_window(conn, ref) == initial_send_window - byte_size(body_chunk)
end

test "request_body_window/2 raises if the request is not found", %{conn: conn} do
assert_raise ArgumentError, ~r/request with request reference .+ was not found/, fn ->
HTTP2.request_body_window(conn, make_ref())
end
end

@tag server_settings: [initial_window_size: 5]
test "streaming a body larger than the window using request_body_window/2 in a loop",
%{conn: conn} do
{conn, ref} = open_request(conn, :stream)

assert_recv_frames [headers(stream_id: stream_id)]

body = "0123456789ABCDE"

# First chunk: window is 5, so we send 5 bytes.
assert HTTP2.request_body_window(conn, ref) == 5
<<chunk1::binary-size(5), rest1::binary>> = body
{:ok, conn} = HTTP2.stream_request_body(conn, ref, chunk1)

assert HTTP2.request_body_window(conn, ref) == 0

assert_recv_frames [data(stream_id: ^stream_id, data: ^chunk1, flags: flags1)]
assert flags1 == set_flags(:data, [])

# Server replenishes the stream window so we can send more.
{:ok, conn, []} =
stream_frames(conn, [window_update(stream_id: stream_id, window_size_increment: 5)])

assert HTTP2.request_body_window(conn, ref) == 5
<<chunk2::binary-size(5), rest2::binary>> = rest1
{:ok, conn} = HTTP2.stream_request_body(conn, ref, chunk2)

assert_recv_frames [data(stream_id: ^stream_id, data: ^chunk2)]

# Final replenishment for the remaining bytes plus :eof.
{:ok, conn, []} =
stream_frames(conn, [
window_update(stream_id: stream_id, window_size_increment: byte_size(rest2))
])

assert HTTP2.request_body_window(conn, ref) == byte_size(rest2)
{:ok, conn} = HTTP2.stream_request_body(conn, ref, rest2)
{:ok, _conn} = HTTP2.stream_request_body(conn, ref, :eof)

assert_recv_frames [
data(stream_id: ^stream_id, data: ^rest2),
data(stream_id: ^stream_id, data: "", flags: end_flags)
]

assert end_flags == set_flags(:data, [:end_stream])
end
end

describe "settings" do
Expand Down
Loading