Skip to content

Commit 4232c3f

Browse files
Enforce event loop affinity for cache instances (#732)
1 parent 6b06d62 commit 4232c3f

5 files changed

Lines changed: 324 additions & 67 deletions

File tree

README.rst

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,53 @@ The library supports explicit invalidation for specific function call by
104104
The method returns `True` if corresponding arguments set was cached already, `False`
105105
otherwise.
106106

107+
Limitations
108+
-----------
109+
110+
**Event Loop Affinity**: ``alru_cache`` enforces that a cache instance is used with only
111+
one event loop. If you attempt to use a cached function from a different event loop than
112+
where it was first called, a ``RuntimeError`` will be raised:
113+
114+
.. code-block:: text
115+
116+
RuntimeError: alru_cache is not safe to use across event loops: this cache
117+
instance was first used with a different event loop.
118+
Use separate cache instances per event loop.
119+
120+
For typical asyncio applications using a single event loop, this is automatic and requires
121+
no configuration. If your application uses multiple event loops, create separate cache
122+
instances per loop:
123+
124+
.. code-block:: python
125+
126+
import threading
127+
128+
_local = threading.local()
129+
130+
def get_cached_fetcher():
131+
if not hasattr(_local, 'fetcher'):
132+
@alru_cache(maxsize=100)
133+
async def fetch_data(key):
134+
...
135+
_local.fetcher = fetch_data
136+
return _local.fetcher
137+
138+
You can also reuse the logic of an already decorated function in a new loop by accessing ``__wrapped__``:
139+
140+
.. code-block:: python
141+
142+
@alru_cache(maxsize=32)
143+
async def my_task(x):
144+
...
145+
146+
# In Loop 1:
147+
# my_task() uses the default global cache instance
148+
149+
# In Loop 2 (or a new thread):
150+
# Create a fresh cache instance for the same logic
151+
cached_task_loop2 = alru_cache(maxsize=32)(my_task.__wrapped__)
152+
await cached_task_loop2(x)
153+
107154
Benchmarks
108155
----------
109156

async_lru/__init__.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,10 +112,12 @@ def __init__(
112112
self.__closed = False
113113
self.__hits = 0
114114
self.__misses = 0
115+
self.__first_loop: Optional[asyncio.AbstractEventLoop] = None
115116

116117
@property
117118
def __tasks(self) -> List["asyncio.Task[_R]"]:
118-
# NOTE: I don't think we need to form a set first here but not too sure we want it for guarantees
119+
# NOTE: I don't think we need to form a set first here but not
120+
# too sure we want it for guarantees
119121
return list(
120122
{
121123
cache_item.task
@@ -124,6 +126,16 @@ def __tasks(self) -> List["asyncio.Task[_R]"]:
124126
}
125127
)
126128

129+
def _check_loop(self, loop: asyncio.AbstractEventLoop) -> None:
130+
if self.__first_loop is None:
131+
self.__first_loop = loop
132+
elif self.__first_loop is not loop:
133+
raise RuntimeError(
134+
"alru_cache is not safe to use across event loops: this cache "
135+
"instance was first used with a different event loop. "
136+
"Use separate cache instances per event loop."
137+
)
138+
127139
def cache_invalidate(self, /, *args: Hashable, **kwargs: Any) -> bool:
128140
key = _make_key(args, kwargs, self.__typed)
129141

@@ -144,6 +156,8 @@ def cache_clear(self) -> None:
144156
self.__cache.clear()
145157

146158
async def cache_close(self, *, wait: bool = False) -> None:
159+
loop = asyncio.get_running_loop()
160+
self._check_loop(loop)
147161
self.__closed = True
148162

149163
tasks = self.__tasks
@@ -222,6 +236,7 @@ async def __call__(self, /, *fn_args: Any, **fn_kwargs: Any) -> _R:
222236
raise RuntimeError(f"alru_cache is closed for {self}")
223237

224238
loop = asyncio.get_running_loop()
239+
self._check_loop(loop)
225240

226241
key = _make_key(fn_args, fn_kwargs, self.__typed)
227242

@@ -341,7 +356,6 @@ def wrapper(fn: _CBP[_R]) -> _LRUCacheWrapper[_R]:
341356
if not inspect.iscoroutinefunction(origin):
342357
raise RuntimeError(f"Coroutine function is required, got {fn!r}")
343358

344-
# functools.partialmethod support
345359
if hasattr(fn, "_make_unbound_method"):
346360
fn = fn._make_unbound_method()
347361

0 commit comments

Comments
 (0)