Skip to content

fix(observe): preserve asyncio.timeout awareness in async generator wrapper#1690

Open
goingforstudying-ctrl wants to merge 2 commits into
langfuse:mainfrom
goingforstudying-ctrl:fix/async-generator-timeout-awareness
Open

fix(observe): preserve asyncio.timeout awareness in async generator wrapper#1690
goingforstudying-ctrl wants to merge 2 commits into
langfuse:mainfrom
goingforstudying-ctrl:fix/async-generator-timeout-awareness

Conversation

@goingforstudying-ctrl
Copy link
Copy Markdown

@goingforstudying-ctrl goingforstudying-ctrl commented Jun 7, 2026

Ran into an issue where asyncio.timeout becomes a no-op inside an async generator decorated with @observe. Turns out the _ContextPreservedAsyncGeneratorWrapper wraps every __anext__ call in a fresh asyncio.Task to preserve contextvars — but that changes the task identity between iterations. asyncio.timeout binds to asyncio.current_task() at enter time, so it ends up tracking a task that's already completed by the time the deadline fires.

The fix replaces asyncio.create_task(coro, context=self.context) with a token-based approach: set the preserved contextvars directly on the current task before calling __anext__, reset them after the call returns. Context vars are task-local, so they survive generator suspension points without changing the task identity. Same treatment for aclose().

Added two tests:

  • One verifies that asyncio.timeout works normally when the generator finishes before the deadline.
  • One verifies that a short timeout actually raises TimeoutError (without the fix it would hang forever).

Let me know if there's a different approach you'd prefer — I considered a few alternatives (persistent task with a queue, re-entering the timeout each iteration) but the token approach seemed cleanest since it doesn't introduce any new task machinery.

Closes #13349

Greptile Summary

This PR fixes asyncio.timeout becoming a no-op inside async generators wrapped by _ContextPreservedAsyncGeneratorWrapper. The previous implementation created a fresh asyncio.Task per __anext__ call (to run the generator in a preserved context), which broke asyncio.timeout because the context manager binds to asyncio.current_task() at entry and the freshly created task is already gone by the time the deadline fires.

  • Core fix: Both __anext__ and aclose now apply the preserved context vars directly to the current task using var.set(val) / var.reset(token) token pairs, and then restore the originals in a finally block — no new task is created, so task identity is preserved and asyncio.timeout works correctly.
  • Behavioural trade-off: Context var mutations made inside the generator body during one iteration are no longer carried forward to the next (the snapshot in self.context is re-applied fresh each call and then reset). This is benign for Langfuse's tracing use case but differs from the old semantics.
  • Tests: Two new tests verify that a deadline is both harmless when not exceeded and actually fires when the generator is slow, replacing the now-unneeded Python 3.10 fallback test.

Confidence Score: 4/5

The change is safe to merge. The token-reset approach is correct for Python 3.10+ and fixes a real observable hang. The only gap is that the timeout test omits a span-finalization assertion, leaving that path unverified.

The implementation correctly resolves the task-identity problem and handles all exception paths (StopAsyncIteration, CancelledError, general exceptions) with proper token cleanup. One test is missing a span.ended assertion after timeout, and the changed context-var mutation semantics across iterations could surprise future callers — neither rises to a blocking issue, but both are worth addressing before merge.

tests/unit/test_observe.py — the second new test should assert span.ended to confirm finalization on timeout.

Sequence Diagram

sequenceDiagram
    participant Caller as Caller task
    participant Wrapper as _ContextPreservedAsyncGeneratorWrapper
    participant Gen as Async Generator
    participant EL as Event Loop

    Note over Caller: asyncio.timeout(T) binds to Caller task

    Caller->>Wrapper: async for item in wrapper (→ __anext__)
    Wrapper->>Wrapper: var.set(val) for each var in self.context snapshot
    Wrapper->>Gen: await generator.__anext__()
    Gen-->>EL: suspend at await asyncio.sleep(...)
    EL-->>Gen: resume after sleep
    Gen-->>Wrapper: yield item
    Wrapper->>Wrapper: var.reset(token) [restore previous values]
    Wrapper-->>Caller: return item

    Note over Caller,Wrapper: On timeout — CancelledError raised into Caller task
    Caller->>Wrapper: __anext__() [CancelledError injected]
    Wrapper->>Wrapper: var.set(val) for each var in self.context snapshot
    Wrapper->>Gen: await generator.__anext__() → CancelledError raised
    Wrapper->>Wrapper: finally: var.reset(token)
    Wrapper->>Wrapper: _finalize_with_error(CancelledError)
    Wrapper-->>Caller: re-raise CancelledError → asyncio.timeout converts to TimeoutError
Loading
Prompt To Fix All With AI
Fix the following 2 code review issues. Work through them one at a time, proposing concise fixes.

---

### Issue 1 of 2
tests/unit/test_observe.py:310-317
**Missing span-finalization assertion after timeout**

The first new test asserts `span.ended == 1` after normal completion, but this test doesn't verify the same after a `TimeoutError`. The `CancelledError` raised by `asyncio.timeout` should be caught by the `except (Exception, asyncio.CancelledError)` handler in `__anext__`, which calls `_finalize_with_error` — but there's no assertion here to confirm that path actually fires. A dangling unfinished span would silently corrupt traces.

### Issue 2 of 2
langfuse/_client/observe.py:727-735
**Context-var mutations inside the generator no longer persist across iterations**

In the old `create_task(coro, context=self.context)` approach the generator ran in the same `self.context` object on every iteration, so any `var.set()` the generator body issued before a `yield` was visible in `self.context` and therefore in all subsequent calls to `__anext__`. The new approach snapshots `self.context` once (at construction time) and re-applies those values before each `__anext__`, then resets them afterward via tokens, so intra-generator mutations are discarded between iterations.

For Langfuse's own tracing (where child spans are opened and closed within a single `__anext__` call), this should be fine. If user generators carry state across `yield` points through context vars, those mutations will silently disappear in each new iteration. It's worth a note in the class-level docstring so future maintainers understand the trade-off.

Reviews (1): Last reviewed commit: "fix(observe): preserve asyncio.timeout a..." | Re-trigger Greptile

Greptile also left 2 inline comments on this PR.

…rapper

The _ContextPreservedAsyncGeneratorWrapper was creating a fresh
asyncio.Task for every __anext__ call (via asyncio.create_task(
context=...)).  This changed the task identity between generator
iterations, which silently broke asyncio.timeout / asyncio.timeout_at
because those context managers bind to asyncio.current_task() at
enter time and the bound task is already done by the next iteration.

Replace the asyncio.create_task wrapping with a token-based approach
that sets preserved contextvars directly on the current task.  Context
vars are task-local so they survive generator suspension points without
changing the task identity.  The same pattern is applied to aclose()
for consistency.

Closes langfuse/langfuse#13349
Copy link
Copy Markdown

@claude claude Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Claude Code Review

This pull request is from a fork — automated review is disabled. A repository maintainer can comment @claude review to run a one-time review.

@CLAassistant
Copy link
Copy Markdown

CLAassistant commented Jun 7, 2026

CLA assistant check
All committers have signed the CLA.

Comment on lines +310 to +317
with pytest.raises(asyncio.TimeoutError):
async with asyncio.timeout(0.05):
async for _item in wrapper:
pass

await wrapper.aclose()

assert seen == ["preserved"]
assert span.ended == 1
# Should have yielded at least once before the timeout fired
assert len(seen) >= 1
assert all(v == "preserved" for v in seen)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Missing span-finalization assertion after timeout

The first new test asserts span.ended == 1 after normal completion, but this test doesn't verify the same after a TimeoutError. The CancelledError raised by asyncio.timeout should be caught by the except (Exception, asyncio.CancelledError) handler in __anext__, which calls _finalize_with_error — but there's no assertion here to confirm that path actually fires. A dangling unfinished span would silently corrupt traces.

Prompt To Fix With AI
This is a comment left during a code review.
Path: tests/unit/test_observe.py
Line: 310-317

Comment:
**Missing span-finalization assertion after timeout**

The first new test asserts `span.ended == 1` after normal completion, but this test doesn't verify the same after a `TimeoutError`. The `CancelledError` raised by `asyncio.timeout` should be caught by the `except (Exception, asyncio.CancelledError)` handler in `__anext__`, which calls `_finalize_with_error` — but there's no assertion here to confirm that path actually fires. A dangling unfinished span would silently corrupt traces.

How can I resolve this? If you propose a fix, please make it concise.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added assert span.ended == 1 to confirm the error-finalization path fires after the timeout.

Comment on lines +727 to +735
tokens: list[tuple[contextvars.ContextVar[Any], contextvars.Token[Any]]] = []
for var in list(self.context.keys()):
val = self.context.get(var)
tokens.append((var, var.set(val)))
try:
# Python 3.11+ approach with explicit task context
item = await asyncio.create_task(
self.generator.__anext__(), # type: ignore
context=self.context,
) # type: ignore
except TypeError:
# Python 3.10 fallback - create the task inside the preserved context.
item = await self.context.run(
asyncio.create_task,
self.generator.__anext__(), # type: ignore
)
item = await self.generator.__anext__() # type: ignore
finally:
for var, token in reversed(tokens):
var.reset(token)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Context-var mutations inside the generator no longer persist across iterations

In the old create_task(coro, context=self.context) approach the generator ran in the same self.context object on every iteration, so any var.set() the generator body issued before a yield was visible in self.context and therefore in all subsequent calls to __anext__. The new approach snapshots self.context once (at construction time) and re-applies those values before each __anext__, then resets them afterward via tokens, so intra-generator mutations are discarded between iterations.

For Langfuse's own tracing (where child spans are opened and closed within a single __anext__ call), this should be fine. If user generators carry state across yield points through context vars, those mutations will silently disappear in each new iteration. It's worth a note in the class-level docstring so future maintainers understand the trade-off.

Prompt To Fix With AI
This is a comment left during a code review.
Path: langfuse/_client/observe.py
Line: 727-735

Comment:
**Context-var mutations inside the generator no longer persist across iterations**

In the old `create_task(coro, context=self.context)` approach the generator ran in the same `self.context` object on every iteration, so any `var.set()` the generator body issued before a `yield` was visible in `self.context` and therefore in all subsequent calls to `__anext__`. The new approach snapshots `self.context` once (at construction time) and re-applies those values before each `__anext__`, then resets them afterward via tokens, so intra-generator mutations are discarded between iterations.

For Langfuse's own tracing (where child spans are opened and closed within a single `__anext__` call), this should be fine. If user generators carry state across `yield` points through context vars, those mutations will silently disappear in each new iteration. It's worth a note in the class-level docstring so future maintainers understand the trade-off.

How can I resolve this? If you propose a fix, please make it concise.

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a note about the context-var mutation trade-off to the class-level docstring.

…t-var trade-off

- Add span.ended assertion to the timeout test so we verify
  _finalize_with_error fires when asyncio.timeout cancels the generator
- Expand the class docstring to note that context-var mutations
  inside the generator are discarded between iterations because
  the wrapper snapshots context once and re-applies it per __anext__
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants