Skip to content

Define explicit StreamingBody API, add httpx read(amt) support#1539

Open
thehesiod wants to merge 7 commits intomainfrom
amohr/fix-streaming-body-api
Open

Define explicit StreamingBody API, add httpx read(amt) support#1539
thehesiod wants to merge 7 commits intomainfrom
amohr/fix-streaming-body-api

Conversation

@thehesiod
Copy link
Copy Markdown
Collaborator

@thehesiod thehesiod commented Apr 7, 2026

Summary

Restructure aiobotocore's StreamingBody classes to subclass botocore.response.StreamingBody, following the repo's established Aio* override pattern. Addresses @jakob-keller's review feedback that the original PR diverged from this convention.

Class hierarchy

  • botocore.response.StreamingBody
    • AioStreamingBody — aiohttp backend, async I/O overrides
      • AioHttpxStreamingBody — httpx backend, inherits all iteration/validation logic, only overrides the I/O layer (buffered read(amt), readinto, async close)

AioStreamingChecksumBody and AioHttpxStreamingChecksumBody mix in _ChecksumMixin over the corresponding base.

What this delivers

  • Drops wrapt.ObjectProxy so the API no longer leaks backend internals; users access the raw stream via .raw_stream.
  • Adds full API parity for httpx: read(amt), readinto(), readlines(), __aiter__/__anext__, iter_lines(), iter_chunks(), tell(), readable(), close(), content-length validation. Previously read(amt) raised ValueError and most other methods didn't exist.
  • Fixes HttpxStreamingChecksumBody.readinto which called content.read() on an httpx.Response (would have failed at runtime).
  • Eliminates duplication by having AioHttpxStreamingBody inherit __aiter__, __anext__, iter_chunks, iter_lines, readlines, tell, _verify_content_length from AioStreamingBody. Only the buffer machinery and httpx I/O layer live in the subclass.
  • Blocks the inherited sync API (__iter__, __next__, __enter__, __exit__, set_socket_timeout) with explicit errors so users don't accidentally get coroutines from for chunk in body or call urllib3-shaped APIs on an aiohttp/httpx stream.
  • Pre-Aio names preserved as module aliases: StreamingBody = AioStreamingBody, HttpxStreamingBody = AioHttpxStreamingBody, same for the checksum classes. External callers and the existing isinstance checks throughout the codebase continue to work unchanged.

Behavior change

AioStreamingBody.__aenter__ now returns self (previously returned the raw aiohttp ClientResponse). This matches the httpx side and is the standard pattern. Use body.raw_stream for the underlying response object. Noted in CHANGES.rst.

Test plan

  • 22 existing + 2 new aiohttp StreamingBody unit tests pass
  • 19 httpx StreamingBody unit tests pass
  • Checksum body tests (test_httpchecksum.py, test_response.py in botocore_tests/) pass
  • Patch hash tests (test_patches.py) pass — botocore class hash unchanged
  • Full unit test suite minus 3 pre-existing failures (test_configprovider::test_defaults_mode, test_eventstreams::test_kinesis_stream_json_parser) — unrelated to this PR
  • CI moto-based integration tests

Closes #1365

🤖 Generated with Claude Code

Remove wrapt.ObjectProxy from StreamingBody and HttpxStreamingBody
to stop leaking backend-specific internals. Both classes now provide
the same well-defined async streaming API (read, readinto, readlines,
iter_lines, iter_chunks, tell, readable, close) with a .raw_stream
property for advanced access.

Key changes:
- StreamingBody: plain class with _raw_stream instead of __wrapped__
- HttpxStreamingBody: buffered read(amt) via aiter_bytes(), full API parity
- _ChecksumMixin: eliminates duplicated checksum validation code
- endpoint.py: passes content-length to HttpxStreamingBody
- 20 new tests covering HttpxStreamingBody thoroughly

Closes #1365

Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
@gemini-code-assist
Copy link
Copy Markdown

Important

Installation incomplete: to start using Gemini Code Assist, please ask the organization owner(s) to visit the Gemini Code Assist Admin Console and sign the Terms of Services.

thehesiod and others added 4 commits April 7, 2026 01:05
Merge origin/main into feature branch. Add note about default
branch to CLAUDE.md.

Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
Add tests for tell() and readlines() on StreamingBody (aiohttp).
Remove @pytest.mark.moto/@pytest.mark.asyncio from httpx unit tests
since they are not moto integration tests.

Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
@thehesiod thehesiod requested a review from jakob-keller April 7, 2026 08:14
@codecov
Copy link
Copy Markdown

codecov Bot commented Apr 7, 2026

Codecov Report

❌ Patch coverage is 98.56631% with 4 lines in your changes missing coverage. Please review.
✅ Project coverage is 94.11%. Comparing base (d0ac8b9) to head (a3f2376).

Files with missing lines Patch % Lines
aiobotocore/response.py 96.46% 4 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1539      +/-   ##
==========================================
+ Coverage   93.86%   94.11%   +0.25%     
==========================================
  Files          77       77              
  Lines        8218     8434     +216     
==========================================
+ Hits         7714     7938     +224     
+ Misses        504      496       -8     
Flag Coverage Δ
no-httpx 91.24% <97.13%> (+0.52%) ⬆️
os-ubuntu-24.04 94.11% <98.56%> (+0.25%) ⬆️
os-ubuntu-24.04-arm 92.21% <98.56%> (+0.30%) ⬆️
python-3.10 92.18% <98.56%> (+0.30%) ⬆️
python-3.11 92.18% <98.56%> (+0.30%) ⬆️
python-3.12 92.18% <98.56%> (+0.30%) ⬆️
python-3.13 92.18% <98.56%> (+0.30%) ⬆️
python-3.14 94.09% <98.56%> (+0.25%) ⬆️
python-3.9 92.19% <98.56%> (+0.30%) ⬆️
unittests 94.11% <98.56%> (+0.25%) ⬆️
with-awscrt 93.77% <98.56%> (+0.26%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Tests and user code call .aclose() on response bodies. With
wrapt.ObjectProxy removed, this no longer proxies through to the
underlying stream. Add explicit aclose() methods.

Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
@jakob-keller
Copy link
Copy Markdown
Collaborator

I'm all for dropping wrapt.ObjectProxy, but beyond that this still confuses me a lot. We discussed potential designs in #1365. Did we ever reach consensus? Why do we deviate from our usual pattern of subclassing upstream with asynchronous methods where required?

@thehesiod
Copy link
Copy Markdown
Collaborator Author

@jakob-keller great point, let me point the agent at the context, this was a simple v1 agent draft

@thehesiod
Copy link
Copy Markdown
Collaborator Author

@claude can you review the pr cmts and related issue to see if there's a better design that follows our repo design patterns

@claude
Copy link
Copy Markdown
Contributor

claude Bot commented Apr 9, 2026

Design analysis: aligning with repo patterns

Looking at the PR diff, #1365 discussion, and the repo's docs/override-patterns.md, here's the gap and a recommended path forward.


The deviation @jakob-keller identified

Every other aiobotocore override follows Pattern 1 from docs/override-patterns.md:

Subclass with Aio prefix, override the method as async def, await I/O operations.

Examples: AioBaseClient(BaseClient), AioEndpoint(Endpoint), AioSession(Session), AioRefreshableCredentials(RefreshableCredentials).

The PR's StreamingBody and HttpxStreamingBody are standalone classes with no relationship to botocore.StreamingBody. They don't carry the Aio prefix either. This is the core deviation.


Recommended design

AioStreamingBody(botocore.StreamingBody) — aiohttp backend

Subclass upstream directly. botocore.StreamingBody.__init__ stores _raw_stream and _content_length — use the same attribute names (the PR already does this in the new standalone version, so refactoring is minimal). Override the sync methods as async:

class AioStreamingBody(botocore.response.StreamingBody):
    def __init__(self, raw_stream: aiohttp.ClientResponse, content_length):
        # Don't call super().__init__() — it expects urllib3
        # Call IOBase.__init__() directly
        IOBase.__init__(self)
        self._raw_stream = raw_stream          # same attribute name as botocore
        self._content_length = content_length  # same attribute name as botocore
        self._amount_read = 0                  # same attribute name as botocore

    async def read(self, amt=None): ...        # overrides botocore sync read()
    async def readinto(self, b): ...
    async def readlines(self): ...
    def __aiter__(self): ...
    async def __anext__(self): ...
    async def __aenter__(self): ...
    async def __aexit__(self): ...
    async def iter_lines(self, ...): ...
    async def iter_chunks(self, ...): ...
    async def close(self): ...
    # tell(), _verify_content_length(), _DEFAULT_CHUNK_SIZE — inherit unchanged

This preserves isinstance(body, botocore.response.StreamingBody) for downstream code that type-checks, and keeps the inheritance chain explicit.

AioHttpxStreamingBody(AioStreamingBody) — httpx backend

Subclass AioStreamingBody. Only override the I/O layer (read(), readinto(), __aenter__/__aexit__, close()). All the iteration, line-splitting, chunk logic, tell(), _verify_content_length() etc. are inherited from AioStreamingBody — no duplication. The buffered read(amt) implementation from the PR belongs here.

class AioHttpxStreamingBody(AioStreamingBody):
    def __init__(self, raw_stream: httpx.Response, content_length=None):
        IOBase.__init__(self)
        self._raw_stream = raw_stream
        self._content_length = content_length
        self._amount_read = 0
        self._buffer = b''
        self._stream_iter = None
        self._stream_exhausted = False

    async def read(self, amt=None): ...        # buffered read logic from PR
    async def readinto(self, b): ...
    async def close(self): ...                 # httpx uses aclose()
    async def __aenter__(self): return self
    async def __aexit__(self, *a): await self.close()
    # Everything else inherits from AioStreamingBody

What to keep from the PR vs. what to change

Area PR Recommended
Remove wrapt.ObjectProxy ✅ correct Keep
Standalone classes, no botocore inheritance ❌ deviates Subclass botocore.StreamingBody
Aio prefix naming ❌ missing AioStreamingBody, AioHttpxStreamingBody
Attribute names _raw_stream, _content_length, _amount_read ✅ matches botocore Keep
Buffered read(amt) for httpx ✅ solves real problem Keep in AioHttpxStreamingBody
_ChecksumMixin ✅ good deduplication Keep
.raw_stream property ✅ useful Keep
Pass content-length to httpx ✅ correctness fix Keep
StreamingChecksumBody(_ChecksumMixin, StreamingBody) Fine Rename base to AioStreamingBody

On jakkdl's objection to subclassing

The concern was: "most methods change from sync to async, leaving only a few with the same signature". This is true of every Aio* class in the repo though — AioBaseClient overrides nearly every method. The pattern holds precisely because subclassing lets the sync methods be shadowed by async overrides without breaking the type hierarchy.

The one real concern is IOBase introspection (e.g. io.IOBase.readlines() might call readline() which calls read() and get a coroutine). The fix is to override readlines() explicitly in AioStreamingBody (the PR already does this), and not call super().readlines().


Summary

The PR's logic is sound; the buffered httpx read(amt) is the right approach. The main change needed is structural: rename with Aio prefix and subclass botocore.StreamingBody, with AioHttpxStreamingBody subclassing AioStreamingBody. This eliminates the code duplication between the two classes (all shared iteration/validation logic lives in AioStreamingBody) and aligns with the repo's established override pattern.

🤖 Generated with Claude Code

@thehesiod
Copy link
Copy Markdown
Collaborator Author

I'll have my local claude do the refactor to save some money once i get some sleep ;)

…amingBody

Addresses @jakob-keller's PR review: align with the repo's standard pattern
of subclassing the botocore upstream with `Aio` prefix, instead of standalone
classes that duplicate logic.

- AioStreamingBody(botocore.response.StreamingBody) — aiohttp backend.
  Async overrides for read/readinto/readlines/iter_*; sync API explicitly
  blocked (raises TypeError) so __iter__/__enter__ can't silently yield
  coroutines on this async-only class. set_socket_timeout raises
  NotImplementedError (urllib3-specific).
- AioHttpxStreamingBody(AioStreamingBody) — httpx backend. Inherits all
  iteration, line-splitting, tell, content-length validation, and the
  __aenter__/__aexit__ contract; overrides only the I/O layer (buffered
  read(amt), readinto, async close).
- _ChecksumMixin unchanged; checksum bodies renamed to AioStreamingChecksumBody
  / AioHttpxStreamingChecksumBody.
- Old names (StreamingBody, HttpxStreamingBody, StreamingChecksumBody,
  HttpxStreamingChecksumBody) preserved as module-level aliases so external
  callers and isinstance checks continue to work.
- AioStreamingBody.__aenter__ now returns self (was: raw aiohttp response),
  matching the httpx backend and mirroring the pattern used elsewhere in the
  repo. Use `body.raw_stream` for the underlying response object.

Closes the design discussion from #1365.
@thehesiod
Copy link
Copy Markdown
Collaborator Author

@jakob-keller restructured the design to follow the repo's Aio* subclass pattern (35e68c3). The hierarchy now:

botocore.response.StreamingBody
  └── AioStreamingBody                   (aiohttp)
        └── AioHttpxStreamingBody        (httpx, inherits all iter/tell/validate logic)

AioHttpxStreamingBody now inherits __aiter__/__anext__/iter_chunks/iter_lines/readlines/tell/_verify_content_length from AioStreamingBody — only the buffered httpx I/O layer (read, readinto, close) and the _buffer/_stream_iter machinery live in the subclass. Net result: ~120 lines deleted, ~89 added.

Old class names (StreamingBody, HttpxStreamingBody, both checksum variants) remain as module-level aliases so isinstance checks and external imports keep working.

One deliberate behavior change worth flagging: AioStreamingBody.__aenter__ now returns self instead of the raw aiohttp ClientResponse — matches the httpx side and the surrounding Aio* convention. Noted in CHANGES.rst. Use body.raw_stream if you need the underlying response.

Re jakkdl's earlier objection in #1365 about subclassing being low-value: the win turned out to be sharing the iteration/validation layer across both backends (which we couldn't do without a common base), not method-signature reuse. Most methods do go sync→async, but that just means the base provides a stable type identity (isinstance(body, botocore.StreamingBody) works) while async overrides shadow each sync method. Sync-API entry points (__iter__, __enter__, set_socket_timeout) are now explicit TypeError/NotImplementedError rather than being silently inherited.

Comment thread aiobotocore/response.py
try:
async for chunk in self._stream_iter:
chunks.append(chunk)
except StopAsyncIteration:
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Specify API for StreamingBody

3 participants