@@ -133,33 +133,34 @@ async def multiply(n: int, factor: int = 1) -> int:
133133 assert set (int_results ) == {3 , 6 , 9 }
134134
135135
136- async def test_finally_calls_cancel_on_early_exit (monkeypatch ):
137- cancelled = False
138- original_create_task = asyncio .create_task
139-
140- def patched_create_task (coro , ** kwargs ):
141- task = original_create_task (coro , ** kwargs )
142- original_cancel = task .cancel
143-
144- def tracking_cancel (* args , ** kwargs ):
145- nonlocal cancelled
146- cancelled = True
147- return original_cancel (* args , ** kwargs )
148-
149- task .cancel = tracking_cancel
150- return task
151-
152- monkeypatch .setattr (concurrency_module , "asyncio" , asyncio )
153- monkeypatch .setattr (asyncio , "create_task" , patched_create_task )
154-
155- async def worker (x ):
156- await asyncio .sleep (0 )
157- return x
158-
159- with pytest .raises (RuntimeError ):
160- async for _ in run_parallel (range (100 ), worker , limit = 2 ):
161- raise RuntimeError ("caller crashed" )
162-
163- for _ in range (5 ):
164- await asyncio .sleep (0 )
165- assert cancelled
136+ async def test_finally_calls_cancel_on_early_exit ():
137+ """Verify that if the caller stops iterating, the runner task is cancelled."""
138+ worker_started = asyncio .Event ()
139+ worker_cancelled = False
140+
141+ async def slow_worker (x ):
142+ try :
143+ worker_started .set ()
144+ await asyncio .sleep (10 ) # Wait a long time
145+ return x
146+ except asyncio .CancelledError :
147+ nonlocal worker_cancelled
148+ worker_cancelled = True
149+ raise
150+
151+ # 1. Start the generator
152+ gen = run_parallel (range (10 ), slow_worker , limit = 1 )
153+
154+ # 2. Start iterating and then 'break' or 'raise'
155+ try :
156+ async for _ in gen :
157+ await worker_started .wait ()
158+ raise RuntimeError ("Stop early" )
159+ except RuntimeError :
160+ pass
161+
162+ # 3. Give the event loop a moment to run the finally block in run_parallel
163+ await asyncio .sleep (0.1 )
164+
165+ # 4. Verify cleanup happened
166+ assert worker_cancelled , "Worker was not cancelled after early exit"
0 commit comments