Skip to content

Commit 3c56715

Browse files
Optimise decompression size (#12357) (#12391)
(cherry picked from commit 53f6e91)
1 parent 5c17b64 commit 3c56715

5 files changed

Lines changed: 53 additions & 35 deletions

File tree

aiohttp/compression_utils.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -326,9 +326,15 @@ def decompress_sync(
326326
) -> bytes:
327327
"""Decompress the given data."""
328328
if hasattr(self._obj, "decompress"):
329-
result = cast(bytes, self._obj.decompress(data, max_length))
329+
if max_length == ZLIB_MAX_LENGTH_UNLIMITED:
330+
result = cast(bytes, self._obj.decompress(data))
331+
else:
332+
result = cast(bytes, self._obj.decompress(data, max_length))
330333
else:
331-
result = cast(bytes, self._obj.process(data, max_length))
334+
if max_length == ZLIB_MAX_LENGTH_UNLIMITED:
335+
result = cast(bytes, self._obj.process(data))
336+
else:
337+
result = cast(bytes, self._obj.process(data, max_length))
332338
# Only way to know that brotli has no further data is checking we get no output
333339
self._last_empty = result == b""
334340
return result

aiohttp/http_parser.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import asyncio
33
import re
44
import string
5+
import sys
56
from contextlib import suppress
67
from enum import IntEnum
78
from re import Pattern
@@ -1130,10 +1131,12 @@ def feed_data(self, chunk: bytes, size: int) -> bool:
11301131
encoding=self.encoding, suppress_deflate_header=True
11311132
)
11321133

1134+
low_water = self.out._low_water
1135+
max_length = (
1136+
0 if low_water >= sys.maxsize else max(self._max_decompress_size, low_water)
1137+
)
11331138
try:
1134-
chunk = self.decompressor.decompress_sync(
1135-
chunk, max_length=self._max_decompress_size
1136-
)
1139+
chunk = self.decompressor.decompress_sync(chunk, max_length=max_length)
11371140
except Exception:
11381141
raise ContentEncodingError(
11391142
"Can not decode content-encoding: %s" % self.encoding

aiohttp/streams.py

Lines changed: 34 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import asyncio
22
import collections
3+
import sys
34
import warnings
45
from collections.abc import Awaitable, Callable
56
from typing import Final, Generic, TypeVar
@@ -67,31 +68,7 @@ async def __anext__(self) -> tuple[bytes, bool]:
6768
return rv
6869

6970

70-
class AsyncStreamReaderMixin:
71-
72-
__slots__ = ()
73-
74-
def __aiter__(self) -> AsyncStreamIterator[bytes]:
75-
return AsyncStreamIterator(self.readline) # type: ignore[attr-defined]
76-
77-
def iter_chunked(self, n: int) -> AsyncStreamIterator[bytes]:
78-
"""Returns an asynchronous iterator that yields chunks of size n."""
79-
return AsyncStreamIterator(lambda: self.read(n)) # type: ignore[attr-defined]
80-
81-
def iter_any(self) -> AsyncStreamIterator[bytes]:
82-
"""Yield all available data as soon as it is received."""
83-
return AsyncStreamIterator(self.readany) # type: ignore[attr-defined]
84-
85-
def iter_chunks(self) -> ChunkTupleAsyncStreamIterator:
86-
"""Yield chunks of data as they are received by the server.
87-
88-
The yielded objects are tuples
89-
of (bytes, bool) as returned by the StreamReader.readchunk method.
90-
"""
91-
return ChunkTupleAsyncStreamIterator(self) # type: ignore[arg-type]
92-
93-
94-
class StreamReader(AsyncStreamReaderMixin):
71+
class StreamReader:
9572
"""An enhancement of asyncio.StreamReader.
9673
9774
Supports asynchronous iteration by line, chunk or as available::
@@ -176,9 +153,35 @@ def __repr__(self) -> str:
176153
info.append("e=%r" % self._exception)
177154
return "<%s>" % " ".join(info)
178155

156+
def __aiter__(self) -> AsyncStreamIterator[bytes]:
157+
return AsyncStreamIterator(self.readline)
158+
159+
def iter_chunked(self, n: int) -> AsyncStreamIterator[bytes]:
160+
"""Returns an asynchronous iterator that yields chunks of size n."""
161+
self.set_read_chunk_size(n)
162+
return AsyncStreamIterator(lambda: self.read(n))
163+
164+
def iter_any(self) -> AsyncStreamIterator[bytes]:
165+
"""Yield all available data as soon as it is received."""
166+
return AsyncStreamIterator(self.readany)
167+
168+
def iter_chunks(self) -> ChunkTupleAsyncStreamIterator:
169+
"""Yield chunks of data as they are received by the server.
170+
171+
The yielded objects are tuples
172+
of (bytes, bool) as returned by the StreamReader.readchunk method.
173+
"""
174+
return ChunkTupleAsyncStreamIterator(self)
175+
179176
def get_read_buffer_limits(self) -> tuple[int, int]:
180177
return (self._low_water, self._high_water)
181178

179+
def set_read_chunk_size(self, n: int) -> None:
180+
"""Raise buffer limits to match the consumer's chunk size."""
181+
if n > self._low_water:
182+
self._low_water = n
183+
self._high_water = n * 2
184+
182185
def exception(self) -> BaseException | None:
183186
return self._exception
184187

@@ -427,10 +430,8 @@ async def read(self, n: int = -1) -> bytes:
427430
return b""
428431

429432
if n < 0:
430-
# This used to just loop creating a new waiter hoping to
431-
# collect everything in self._buffer, but that would
432-
# deadlock if the subprocess sends more than self.limit
433-
# bytes. So just call self.readany() until EOF.
433+
# Reading everything — remove decompression chunk limit.
434+
self.set_read_chunk_size(sys.maxsize)
434435
blocks = []
435436
while True:
436437
block = await self.readany()
@@ -439,6 +440,7 @@ async def read(self, n: int = -1) -> bytes:
439440
blocks.append(block)
440441
return b"".join(blocks)
441442

443+
self.set_read_chunk_size(n)
442444
# TODO: should be `if` instead of `while`
443445
# because waiter maybe triggered on chunk end,
444446
# without feeding any data
@@ -612,6 +614,9 @@ async def wait_eof(self) -> None:
612614
def feed_data(self, data: bytes, n: int = 0) -> bool:
613615
return False
614616

617+
def set_read_chunk_size(self, n: int) -> None:
618+
return
619+
615620
async def readline(self, *, max_line_length: int | None = None) -> bytes:
616621
return b""
617622

aiohttp/web_request.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -678,6 +678,10 @@ async def read(self) -> bytes:
678678
Returns bytes object with full request content.
679679
"""
680680
if self._read_bytes is None:
681+
# Raise the buffer limits so compressed payloads decompress in
682+
# larger chunks instead of many small pause/resume cycles.
683+
if self._client_max_size:
684+
self._payload.set_read_chunk_size(self._client_max_size)
681685
body = bytearray()
682686
while True:
683687
chunk = await self._payload.readany()

tests/test_flowcontrol_streams.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ async def test_readexactly(self, stream) -> None:
8282
stream.feed_data(b"data", 4)
8383
res = await stream.readexactly(3)
8484
assert res == b"dat"
85-
assert not stream._protocol.resume_reading.called
85+
assert stream._protocol.resume_reading.called
8686

8787
async def test_feed_data(self, stream) -> None:
8888
stream._protocol._reading_paused = False

0 commit comments

Comments
 (0)