11import asyncio
22import collections
3+ import sys
34import warnings
45from collections .abc import Awaitable , Callable
56from typing import Final , Generic , TypeVar
@@ -67,31 +68,7 @@ async def __anext__(self) -> tuple[bytes, bool]:
6768 return rv
6869
6970
70- class AsyncStreamReaderMixin :
71-
72- __slots__ = ()
73-
74- def __aiter__ (self ) -> AsyncStreamIterator [bytes ]:
75- return AsyncStreamIterator (self .readline ) # type: ignore[attr-defined]
76-
77- def iter_chunked (self , n : int ) -> AsyncStreamIterator [bytes ]:
78- """Returns an asynchronous iterator that yields chunks of size n."""
79- return AsyncStreamIterator (lambda : self .read (n )) # type: ignore[attr-defined]
80-
81- def iter_any (self ) -> AsyncStreamIterator [bytes ]:
82- """Yield all available data as soon as it is received."""
83- return AsyncStreamIterator (self .readany ) # type: ignore[attr-defined]
84-
85- def iter_chunks (self ) -> ChunkTupleAsyncStreamIterator :
86- """Yield chunks of data as they are received by the server.
87-
88- The yielded objects are tuples
89- of (bytes, bool) as returned by the StreamReader.readchunk method.
90- """
91- return ChunkTupleAsyncStreamIterator (self ) # type: ignore[arg-type]
92-
93-
94- class StreamReader (AsyncStreamReaderMixin ):
71+ class StreamReader :
9572 """An enhancement of asyncio.StreamReader.
9673
9774 Supports asynchronous iteration by line, chunk or as available::
@@ -174,9 +151,35 @@ def __repr__(self) -> str:
174151 info .append ("e=%r" % self ._exception )
175152 return "<%s>" % " " .join (info )
176153
154+ def __aiter__ (self ) -> AsyncStreamIterator [bytes ]:
155+ return AsyncStreamIterator (self .readline )
156+
157+ def iter_chunked (self , n : int ) -> AsyncStreamIterator [bytes ]:
158+ """Returns an asynchronous iterator that yields chunks of size n."""
159+ self .set_read_chunk_size (n )
160+ return AsyncStreamIterator (lambda : self .read (n ))
161+
162+ def iter_any (self ) -> AsyncStreamIterator [bytes ]:
163+ """Yield all available data as soon as it is received."""
164+ return AsyncStreamIterator (self .readany )
165+
166+ def iter_chunks (self ) -> ChunkTupleAsyncStreamIterator :
167+ """Yield chunks of data as they are received by the server.
168+
169+ The yielded objects are tuples
170+ of (bytes, bool) as returned by the StreamReader.readchunk method.
171+ """
172+ return ChunkTupleAsyncStreamIterator (self )
173+
177174 def get_read_buffer_limits (self ) -> tuple [int , int ]:
178175 return (self ._low_water , self ._high_water )
179176
177+ def set_read_chunk_size (self , n : int ) -> None :
178+ """Raise buffer limits to match the consumer's chunk size."""
179+ if n > self ._low_water :
180+ self ._low_water = n
181+ self ._high_water = n * 2
182+
180183 def exception (self ) -> type [BaseException ] | BaseException | None :
181184 return self ._exception
182185
@@ -410,10 +413,8 @@ async def read(self, n: int = -1) -> bytes:
410413 return b""
411414
412415 if n < 0 :
413- # This used to just loop creating a new waiter hoping to
414- # collect everything in self._buffer, but that would
415- # deadlock if the subprocess sends more than self.limit
416- # bytes. So just call self.readany() until EOF.
416+ # Reading everything — remove decompression chunk limit.
417+ self .set_read_chunk_size (sys .maxsize )
417418 blocks = []
418419 while True :
419420 block = await self .readany ()
@@ -422,6 +423,7 @@ async def read(self, n: int = -1) -> bytes:
422423 blocks .append (block )
423424 return b"" .join (blocks )
424425
426+ self .set_read_chunk_size (n )
425427 # TODO: should be `if` instead of `while`
426428 # because waiter maybe triggered on chunk end,
427429 # without feeding any data
@@ -595,6 +597,9 @@ async def wait_eof(self) -> None:
595597 def feed_data (self , data : bytes ) -> bool :
596598 return False
597599
600+ def set_read_chunk_size (self , n : int ) -> None :
601+ return
602+
598603 async def readline (self , * , max_line_length : int | None = None ) -> bytes :
599604 return b""
600605
0 commit comments