1- """Concurrency utilities."""
1+ """Concurrency utilities using producer-consumer patterns.
2+
3+ This module provides helpers for managing concurrent asyncio tasks with
4+ strict concurrency limits, backpressure handling, and robust exception tracking.
5+
6+ It is designed to handle both I/O-bound tasks (via native asyncio coroutines) and
7+ CPU-bound tasks (via `loop.run_in_executor`) while keeping resource usage deterministic.
8+ """
29
310import asyncio
411from collections .abc import Awaitable , Callable , Iterable
1118class TaskFailedError [T ](Exception ):
1219 """Exception raised when a task fails during processing.
1320
21+ This wrapper preserves the context of a failure in concurrent processing pipelines.
22+ Since results may be returned out of order or aggregated later, wrapping the
23+ exception allows the caller to link a failure back to the specific input item
24+ that caused it.
25+
1426 :param item: The item that was being processed.
1527 :param original_exception: The exception that caused the failure.
1628 """
@@ -32,14 +44,14 @@ async def process_unordered[T, R, **P](
3244
3345 Uses a producer-consumer model via asyncio.TaskGroup.
3446 Order of results is NOT guaranteed.
35- If an exception occurs, it is wrapped in ` TaskFailedError`.
47+ If an exception occurs, it is wrapped in :class:`~docbuild.utils.concurrency. TaskFailedError`.
3648
3749 :param items: Iterable of items to process.
3850 :param worker_fn: Async function processing a single item.
39- Result signature: `worker_fn(item, *worker_args, **worker_kwargs)`.
51+ Result signature: `` worker_fn(item, *worker_args, **worker_kwargs)` `.
4052 :param limit: Max concurrent workers.
41- :param worker_args: Additional positional arguments passed to `worker_fn`.
42- :param worker_kwargs: Additional keyword arguments passed to `worker_fn`.
53+ :param worker_args: Additional positional arguments passed to `` worker_fn` `.
54+ :param worker_kwargs: Additional keyword arguments passed to `` worker_fn` `.
4355 """
4456 # Limit queue size to prevent memory explosion if producer is faster than consumers
4557 queue : asyncio .Queue [T | None ] = asyncio .Queue (maxsize = limit * 2 )
0 commit comments