Skip to content

Commit 8e463b0

Browse files
Fix #191: Add producer-consumer model (#194)
Introduce run_parallel Synopsis: def run_parallel[T, R, **P](items: Iterable[T] | AsyncIterableABC[T], worker_fn: Callable[Concatenate[T, P], Awaitable[R]], limit: int, *worker_args: P.args, **worker_kwargs: P.kwargs, ) -> AsyncIterator[R | TaskFailedError[T]]: - A single `producer` task feeds items into a bounded input queue. - `limit` `worker` tasks pull from the input queue, call ``worker_fn``, and push results into a bounded result queue. - The `caller` consumes results by iterating this async generator. All three stages run concurrently. Backpressure propagates naturally: a slow consumer stalls workers; stalled workers stall the producer. Order of results is NOT guaranteed. If `worker_fn` raises, the exception is wrapped in :class:`TaskFailedError` and yielded rather than re-raised, so one failing item does not abort the pipeline. --------- Signed-off-by: sushant-suse <[email protected]> Co-authored-by: sushant-suse <[email protected]>
1 parent d5320da commit 8e463b0

5 files changed

Lines changed: 566 additions & 1 deletion

File tree

.github/workflows/ci.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ jobs:
4646
container:
4747
# image: registry.opensuse.org/documentation/containers/15.6/opensuse-daps-toolchain:latest
4848
image: ghcr.io/opensuse/doc-container:latest
49+
options: --user 0:0
4950
steps:
5051
- name: Checkout repository
5152
uses: actions/checkout@v6

LINKS.md

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,24 @@ This document lists some links that may be helpful for this project.
1111

1212
* [PEX](https://docs.pex-tool.org)
1313

14-
## Async I/O
14+
## General Async I/O
1515

1616
* [Python's asyncio: A Hands-On Walkthrough](https://realpython.com/async-io-python/)
17+
* [Awesome asyncio](https://github.com/timofurrer/awesome-asyncio)
18+
19+
## Producer-Consumer-Workers / Channels
20+
21+
* [aiomultiprocess](https://github.com/omnilib/aiomultiprocess)
22+
* [anyio](https://github.com/agronholm/anyio)
23+
* [pychanasync](https://github.com/Gwali-1/PY_CHANNELS_ASYNC)
24+
* [janus](https://github.com/aio-libs/janus)
1725
* [joblib](https://joblib.readthedocs.io)
1826

27+
## Pipelines
28+
29+
* [aiostream](https://github.com/vxgmichel/aiostream)
30+
* [asyncstdlib](https://github.com/maxfischer2781/asyncstdlib)
31+
1932
## Task Queues
2033

2134
* [Taskiq](https://taskiq-python.github.io)

changelog.d/191.feature.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add a producer/consumer model implementation.

src/docbuild/utils/concurrency.py

Lines changed: 314 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,314 @@
1+
"""Concurrency utilities using producer-consumer patterns.
2+
3+
This module provides helpers for managing concurrent asyncio tasks with
4+
strict concurrency limits, backpressure handling, and robust exception tracking.
5+
6+
It is designed to handle both I/O-bound tasks (via native asyncio coroutines) and
7+
CPU-bound tasks (via `loop.run_in_executor`) while keeping resource usage deterministic.
8+
"""
9+
10+
import asyncio
11+
from collections.abc import (
12+
AsyncIterable as AsyncIterableABC,
13+
AsyncIterator,
14+
Awaitable,
15+
Callable,
16+
Iterable,
17+
)
18+
from contextlib import suppress
19+
import functools
20+
import logging
21+
from typing import Concatenate
22+
23+
log = logging.getLogger(__name__)
24+
25+
#: Sentinel value for internal use when needed (e.g., to signal completion).
26+
SENTINEL = object()
27+
28+
29+
class TaskFailedError[T](Exception):
30+
"""Exception raised when a task fails during processing.
31+
32+
This wrapper preserves the context of a failure in concurrent processing pipelines.
33+
Since results may be returned out of order or aggregated later, wrapping the
34+
exception allows the caller to link a failure back to the specific input item
35+
that caused it.
36+
37+
:param item: The item that was being processed.
38+
:param original_exception: The exception that caused the failure.
39+
"""
40+
41+
def __init__(self, item: T, original_exception: Exception) -> None:
42+
super().__init__(f"Task failed for item {item}: {original_exception}")
43+
self.item = item
44+
self.original_exception = original_exception
45+
46+
47+
async def producer[T](
48+
items: Iterable[T] | AsyncIterableABC[T],
49+
input_queue: asyncio.Queue,
50+
num_workers: int,
51+
) -> None:
52+
"""Feed items into the input queue, then send one sentinel per worker.
53+
54+
:param items: An iterable or async iterable of items to be processed.
55+
:param input_queue: The queue for items to be processed by workers.
56+
:param num_workers: The number of workers, used to send the correct number of sentinels.
57+
"""
58+
try:
59+
if isinstance(items, AsyncIterableABC):
60+
async for item in items:
61+
await input_queue.put(item)
62+
else:
63+
for item in items:
64+
await input_queue.put(item)
65+
finally:
66+
# Use put_nowait and we must not block here.
67+
# If the queue is full, skip. Workers don't need more than one
68+
# sentinel to know it's time to quit.
69+
for _ in range(num_workers):
70+
try:
71+
input_queue.put_nowait(SENTINEL)
72+
except (asyncio.QueueFull, Exception):
73+
break
74+
75+
76+
async def worker[T, R](
77+
worker_fn: Callable[[T], Awaitable[R]],
78+
input_queue: asyncio.Queue,
79+
result_queue: asyncio.Queue,
80+
) -> None:
81+
"""Pull items from the input queue, process them, push results out.
82+
83+
:param worker_fn: The asynchronous function that processes a single item.
84+
:param input_queue: The queue for items to be processed by workers.
85+
:param result_queue: The queue for results from the workers.
86+
"""
87+
while True:
88+
# If the loop is closing, get() might raise CancelledError
89+
try:
90+
item = await input_queue.get()
91+
except asyncio.CancelledError:
92+
return
93+
94+
try:
95+
if item is SENTINEL:
96+
return
97+
98+
result = await worker_fn(item)
99+
await result_queue.put(result)
100+
except Exception as exc:
101+
# If putting an error fails (queue full), don't deadlock.
102+
try:
103+
result_queue.put_nowait(TaskFailedError(item, exc))
104+
except (asyncio.QueueFull, Exception):
105+
pass
106+
finally:
107+
input_queue.task_done()
108+
109+
110+
async def run_all[T, R](
111+
items: Iterable[T] | AsyncIterableABC[T],
112+
worker_fn: Callable[[T], Awaitable[R]],
113+
input_queue: asyncio.Queue,
114+
result_queue: asyncio.Queue,
115+
limit: int,
116+
) -> None:
117+
"""Orchestrate producer + workers, then signal the consumer when done.
118+
119+
:param items: An iterable or async iterable of items to be processed.
120+
:param worker_fn: The asynchronous function that processes a single item.
121+
:param input_queue: The queue for items to be processed by workers.
122+
:param result_queue: The queue for results from the workers.
123+
:param limit: The maximum number of concurrent workers.
124+
"""
125+
# Remove the internal .join() and let TaskGroup manage the lifecycle
126+
try:
127+
async with asyncio.TaskGroup() as tg:
128+
tg.create_task(producer(items, input_queue, limit))
129+
for _ in range(limit):
130+
tg.create_task(worker(worker_fn, input_queue, result_queue))
131+
finally:
132+
# We use put_nowait here. If the result_queue is full,
133+
# we do not want to deadlock the entire process.
134+
try:
135+
result_queue.put_nowait(SENTINEL)
136+
except (asyncio.QueueFull, Exception):
137+
pass
138+
139+
140+
async def run_parallel[T, R, **P](
141+
items: Iterable[T] | AsyncIterableABC[T],
142+
worker_fn: Callable[Concatenate[T, P], Awaitable[R]],
143+
limit: int,
144+
*worker_args: P.args,
145+
**worker_kwargs: P.kwargs,
146+
) -> AsyncIterator[R | TaskFailedError[T]]:
147+
"""Process items concurrently with bounded parallelism.
148+
149+
Uses a producer/worker/consumer pipeline:
150+
151+
- A single **producer** task feeds items into a bounded input queue.
152+
- ``limit`` **worker** tasks pull from the input queue, call ``worker_fn``,
153+
and push results into a bounded result queue.
154+
- The **caller** consumes results by iterating this async generator.
155+
156+
All three stages run concurrently. Backpressure propagates naturally:
157+
a slow consumer stalls workers; stalled workers stall the producer.
158+
Order of results is NOT guaranteed.
159+
160+
If ``worker_fn`` raises, the exception is wrapped in
161+
:class:`TaskFailedError` and yielded rather than re-raised, so one
162+
failing item does not abort the pipeline.
163+
164+
Performance characteristics
165+
---------------------------
166+
- **Throughput:** approaches ``limit * per-worker-throughput`` for
167+
I/O-bound workloads where workers spend most time awaiting external
168+
resources. CPU-bound work gains little due to the GIL; use
169+
``ProcessPoolExecutor`` wrapped in ``asyncio.run_in_executor`` instead.
170+
- **Startup cost:** O(limit) — one asyncio task per worker, each cheap
171+
to create (~microseconds).
172+
- **Memory:** O(limit). Both the input queue (``maxsize=limit * 2``)
173+
and the result queue (``maxsize=limit * 2``) are bounded. At most
174+
``limit`` items are in-flight inside workers at any time, giving a
175+
total live-item count of roughly ``5 * limit``.
176+
Note: each item itself may be arbitrarily large; the O(limit) bound
177+
refers to the *number* of items held in memory, not their byte size.
178+
- **Latency:** time-to-first-result equals one worker's latency.
179+
Remaining results stream out as workers complete, with no polling
180+
delay (sentinel-based signalling, zero busy-wait).
181+
- **Cancellation:** if the caller abandons the generator (e.g. ``break``
182+
in an ``async for`` loop), the internal runner task is cancelled and
183+
all worker tasks are cleaned up promptly via ``TaskGroup``.
184+
185+
:param items: Iterable or async iterable of items to process.
186+
:param worker_fn: Async callable invoked as ``worker_fn(item)`` for
187+
each item. Must be safe to call concurrently from ``limit`` tasks.
188+
:param limit: Maximum number of concurrent workers. Must be >= 1.
189+
Higher values increase throughput up to the point where the event
190+
loop, network, or downstream service becomes the bottleneck.
191+
:param worker_args: Positional arguments to pass to ``worker_fn``.
192+
:param worker_kwargs: Keyword arguments to pass to ``worker_fn``.
193+
:raises ValueError: If ``limit`` is less than 1.
194+
:yields: Results in completion order (not input order). Failed items
195+
are yielded as :class:`TaskFailedError` instances rather than
196+
raising, so the caller can handle partial failures inline.
197+
"""
198+
if limit <= 0:
199+
raise ValueError("limit must be >= 1")
200+
201+
bound_fn = (
202+
functools.partial(worker_fn, *worker_args, **worker_kwargs) if worker_kwargs else worker_fn
203+
)
204+
205+
input_queue: asyncio.Queue[T | object] = asyncio.Queue(maxsize=limit * 5)
206+
result_queue: asyncio.Queue[R | TaskFailedError[T] | object] = asyncio.Queue(maxsize=0)
207+
208+
runner = asyncio.create_task(
209+
run_all(items, bound_fn, input_queue, result_queue, limit)
210+
)
211+
212+
try:
213+
while True:
214+
result = await result_queue.get()
215+
if result is SENTINEL:
216+
break
217+
yield result # type: ignore[misc]
218+
219+
finally:
220+
if not runner.done():
221+
runner.cancel()
222+
223+
with suppress(asyncio.CancelledError, Exception):
224+
# Always await runner regardless of whether we cancelled it
225+
# or it finished on its own.
226+
# This ensures the task is fully cleaned up (no "task was
227+
# destroyed but it is pending" warnings) and re-raises any unexpected
228+
# exception from run_all — which we suppress here since we're
229+
# in a cleanup path and cannot meaningfully recover.
230+
await runner
231+
232+
233+
if __name__ == "__main__":
234+
import time
235+
236+
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
237+
238+
async def sample_worker(num: int) -> int:
239+
"""Create a simple worker that simulates some I/O-bound work."""
240+
if num in (5, 8):
241+
log.warning("Simulating failure for item %d", num)
242+
# HINT: This is the "alternative" implementation.
243+
# Instead of having a Failure class, we just raise the exception
244+
# and add the item into the exception as an additional metadata
245+
raise ValueError("Item 5 is not allowed!", num)
246+
# Alternative:
247+
# raise ValueError("Item 5 is not allowed!", {"item": num})
248+
249+
log.info("Processing item %d", num)
250+
await asyncio.sleep(0.1) # Simulate I/O delay
251+
return num * 2
252+
253+
# Make process intensive tasks in a executor
254+
# 1. Define the heavy lifting function (must be at module level for pickle)
255+
def heavy_cpu_math(item: int) -> int:
256+
"""Simulate a CPU-bound task."""
257+
return item * item
258+
259+
async def main() -> None:
260+
"""Run the example."""
261+
async def generate_items() -> AsyncIterableABC[int]:
262+
for i in range(10):
263+
yield i
264+
# yield from range(10)
265+
266+
log.info("--- Running process_unordered ---")
267+
start_time = time.monotonic()
268+
task_results = (
269+
res async for res in run_parallel(generate_items(), sample_worker, limit=3)
270+
)
271+
end_time = time.monotonic()
272+
log.info("Finished in %.2f seconds\n", end_time - start_time)
273+
274+
successful_results = []
275+
failed_tasks = []
276+
async for res in task_results:
277+
if isinstance(res, TaskFailedError):
278+
failed_tasks.append((res.item, res.original_exception))
279+
else:
280+
successful_results.append(res)
281+
282+
log.info("Successful results (unordered): %s", (successful_results))
283+
log.info("Caught exceptions: %s", failed_tasks)
284+
285+
## -------------------
286+
log.info("--- Running process executor ---")
287+
from concurrent.futures import Executor, ProcessPoolExecutor
288+
289+
# 2. Create the wrapper
290+
async def cpu_worker_wrapper(
291+
item: int, executor: Executor | None = None
292+
) -> int:
293+
loop = asyncio.get_running_loop()
294+
# Use the passed executor
295+
return await loop.run_in_executor(executor, heavy_cpu_math, item)
296+
297+
# 3. Use your existing utility with the executor passed as a kwarg
298+
items = range(10)
299+
with ProcessPoolExecutor() as process_pool:
300+
301+
successful_results = []
302+
failed_tasks = []
303+
async for res in run_parallel(
304+
items, cpu_worker_wrapper, limit=4, executor=process_pool
305+
):
306+
if isinstance(res, TaskFailedError):
307+
failed_tasks.append((res.item, res.original_exception))
308+
else:
309+
successful_results.append(res)
310+
311+
log.info("Successful results (unordered): %s", (successful_results))
312+
log.info("Caught exceptions: %s", failed_tasks)
313+
314+
asyncio.run(main())

0 commit comments

Comments
 (0)