-
Notifications
You must be signed in to change notification settings - Fork 9
Expand file tree
/
Copy pathbind.py
More file actions
335 lines (264 loc) · 12.2 KB
/
bind.py
File metadata and controls
335 lines (264 loc) · 12.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
from __future__ import annotations
import os
import typing as t
from contextlib import contextmanager
import sqlalchemy
import sqlalchemy.event
import sqlalchemy.exc
import sqlalchemy.ext
import sqlalchemy.ext.asyncio
import sqlalchemy.orm
import sqlalchemy.util
from . import signals
from .config import BindConfig
from .model import setup_soft_delete_for_session
from .testing import AsyncTestTransaction
from .testing import TestTransaction
sa = sqlalchemy
class BindBase:
config: BindConfig
metadata: sa.MetaData
engine: sa.Engine
Session: sa.orm.sessionmaker
def __init__(
self,
config: BindConfig,
metadata: sa.MetaData,
):
self.config = config
self.metadata = metadata
@property
def url(self) -> str:
if not hasattr(self, "engine"):
raise RuntimeError("Database not initialized yet. Call initialize() first.")
return str(self.engine.url)
@property
def is_async(self) -> bool:
if not hasattr(self, "engine"):
raise RuntimeError("Database not initialized yet. Call initialize() first.")
return self.engine.url.get_dialect().is_async
@property
def is_read_only(self):
return self.config.read_only
class BindContext(BindBase):
pass
class Bind(BindBase):
def __init__(
self,
config: BindConfig,
metadata: sa.MetaData,
initialize: bool = True,
):
self.config = config
self.metadata = metadata
if initialize:
self.initialize()
def initialize(self):
if hasattr(self, "engine"):
self.engine.dispose()
self.engine = self.create_engine(
self.config.engine.dict(exclude_unset=True, exclude_none=True),
prefix="",
)
self.Session = self.create_session_factory(
self.config.session.dict(exclude_unset=True, exclude_none=True),
)
return self
@contextmanager
def context(
self,
engine_execution_options: t.Optional[t.Dict[str, t.Any]] = None,
session_execution__options: t.Optional[t.Dict[str, t.Any]] = None,
) -> t.Generator[BindContext, None, None]:
context = BindContext(self.config, self.metadata)
context.engine = self.engine.execution_options(**engine_execution_options or {})
context.Session = self.create_session_factory(session_execution__options or {})
context.Session.configure(bind=context.engine)
signals.bind_context_entered.send(
self,
engine_execution_options=engine_execution_options,
session_execution__options=session_execution__options,
context=context,
)
yield context
signals.bind_context_exited.send(
self,
engine_execution_options=engine_execution_options,
session_execution__options=session_execution__options,
context=context,
)
def create_session_factory(
self, options: dict[str, t.Any]
) -> sa.orm.sessionmaker[sa.orm.Session]:
signals.before_bind_session_factory_created.send(self, options=options)
session_factory = sa.orm.sessionmaker(bind=self.engine, **options)
signals.after_bind_session_factory_created.send(
self, options=options, session_factory=session_factory
)
return session_factory
def create_engine(self, config: t.Dict[str, t.Any], prefix: str = "") -> sa.Engine:
signals.before_bind_engine_created.send(self, config=config, prefix=prefix)
engine = sa.engine_from_config(config, prefix=prefix)
signals.after_bind_engine_created.send(self, config=config, prefix=prefix, engine=engine)
return engine
def test_transaction(self, savepoint: bool = False):
return TestTransaction(self, savepoint=savepoint)
def _call_metadata(self, method: str):
with self.engine.connect() as conn:
with conn.begin():
return getattr(self.metadata, method)(bind=conn)
def create_all(self):
return self._call_metadata("create_all")
def drop_all(self):
return self._call_metadata("drop_all")
def reflect(self):
return self._call_metadata("reflect")
def __repr__(self) -> str:
return f"<{type(self).__name__} {self.engine.url}>"
class AsyncBind(Bind):
engine: sa.ext.asyncio.AsyncEngine
Session: sa.ext.asyncio.async_sessionmaker
def create_session_factory(
self, options: dict[str, t.Any]
) -> sa.ext.asyncio.async_sessionmaker[sa.ext.asyncio.AsyncSession]:
"""
It took some research to figure out the following trick which combines sync and async
sessionmakers to make the async_sessionmaker a valid target for sqlalchemy events.
Details can be found at:
https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html#examples-of-event-listeners-with-async-engines-sessions-sessionmakers
"""
signals.before_bind_session_factory_created.send(self, options=options)
sync_sessionmaker = sa.orm.sessionmaker()
session_factory = sa.ext.asyncio.async_sessionmaker(
bind=self.engine,
sync_session_class=sync_sessionmaker,
**options,
)
signals.after_bind_session_factory_created.send(
self, options=options, session_factory=session_factory
)
return session_factory
def create_engine(
self, config: dict[str, t.Any], prefix: str = ""
) -> sa.ext.asyncio.AsyncEngine:
signals.before_bind_engine_created.send(self, config=config, prefix=prefix)
engine = sa.ext.asyncio.async_engine_from_config(config, prefix=prefix)
signals.after_bind_engine_created.send(self, config=config, prefix=prefix, engine=engine)
return engine
def test_transaction(self, savepoint: bool = False):
return AsyncTestTransaction(self, savepoint=savepoint)
async def _call_metadata(self, method: str):
async with self.engine.connect() as conn:
async with conn.begin():
def sync_call(conn: sa.Connection, method: str):
getattr(self.metadata, method)(bind=conn)
return await conn.run_sync(sync_call, method)
@signals.after_bind_session_factory_created.connect
def register_soft_delete_support_for_session(
bind: t.Union[Bind, AsyncBind],
options: t.Dict[str, t.Any],
session_factory: t.Union[sa.orm.sessionmaker, sa.ext.asyncio.async_sessionmaker],
) -> None:
"""Register the event handlers that enable soft-delete logic to be applied automatically.
This functionality is opt-in by nature. Opt-in involves adding the SoftDeleteMixin to the
ORM models that should support soft-delete. You can learn more by checking out the
model.mixins module.
"""
if all(
[
isinstance(session_factory, sa.ext.asyncio.async_sessionmaker),
"sync_session_class" in session_factory.kw,
]
):
session_factory = session_factory.kw["sync_session_class"]
setup_soft_delete_for_session(session_factory) # type: ignore
# Beware of Dragons!
#
# The following handlers aren't at all crucial to understanding how this package works, they are
# mostly based on well known sqlalchemy recipes and their impact can be fully understood from
# their docstrings alone.
@signals.after_bind_engine_created.connect
def register_engine_connection_cross_process_safety_handlers(
sender: Bind,
config: t.Dict[str, t.Any],
prefix: str,
engine: t.Union[sa.Engine, sa.ext.asyncio.AsyncEngine],
) -> None:
"""Register event handlers to invalidate connections shared across process boundaries.
SQLAlchemy connections aren't safe to share across processes and most sqlalchemy engines
contain a connection pool full of them. This will cause issues when these connections are
used concurrently in multiple processes that are bizarre and become difficult to trace the
origin of. This quart application utilizes multiple processes, one for each API worker,
typically handled by the ASGI server, in our case hypercorn. Another place where we run into
multiple processes is during testing. We use the pytest-xdist plugin to split our tests
across multiple cores and drastically reduce the time required to complete.
Both of these use cases dictate our application needs to be concerned with what objects
could possibly be shared across processes and follow any recommendations made concerning
that library/service around process forking/spawning. Usually the only resources we need
to worry about are file descriptors (including sockets and network connections).
SQLAlchemy has a section of their docs dedicated to this exact concern, see that page for
more details: https://docs.sqlalchemy.org/en/20/core/pooling.html#pooling-multiprocessing
"""
# Use the sync_engine when AsyncEngine
if isinstance(engine, sa.ext.asyncio.AsyncEngine):
engine = engine.sync_engine
def close_connections_for_forking():
engine.dispose(close=False)
if os.name == 'posix':
os.register_at_fork(before=close_connections_for_forking)
def connect(dbapi_connection, connection_record):
connection_record.info["pid"] = os.getpid()
if not sa.event.contains(engine, "connect", connect):
sa.event.listen(engine, "connect", connect)
def checkout(dbapi_connection, connection_record, connection_proxy):
pid = os.getpid()
if connection_record.info["pid"] != pid:
connection_record.dbapi_connection = connection_proxy.dbapi_connection = None
raise sa.exc.DisconnectionError(
"Connection record belongs to pid {}, attempting to check out in pid {}".format(
connection_record.info["pid"], pid
)
)
if not sa.event.contains(engine, "checkout", checkout):
sa.event.listen(engine, "checkout", checkout)
@signals.after_bind_engine_created.connect
def register_engine_connection_sqlite_specific_transaction_fix(
sender: Bind,
config: t.Dict[str, t.Any],
prefix: str,
engine: t.Union[sa.Engine, sa.ext.asyncio.AsyncEngine],
) -> None:
"""Register event handlers to fix dbapi broken transaction for sqlite dialects.
The pysqlite DBAPI driver has several long-standing bugs which impact the correctness of its
transactional behavior. In its default mode of operation, SQLite features such as
SERIALIZABLE isolation, transactional DDL, and SAVEPOINT support are non-functional, and in
order to use these features, workarounds must be taken.
The issue is essentially that the driver attempts to second-guess the user’s intent, failing
to start transactions and sometimes ending them prematurely, in an effort to minimize the
SQLite databases’s file locking behavior, even though SQLite itself uses “shared” locks for
read-only activities.
SQLAlchemy chooses to not alter this behavior by default, as it is the long-expected behavior
of the pysqlite driver; if and when the pysqlite driver attempts to repair these issues, that
will be more of a driver towards defaults for SQLAlchemy.
The good news is that with a few events, we can implement transactional support fully, by
disabling pysqlite’s feature entirely and emitting BEGIN ourselves. This is achieved using
two event listeners:
To learn more about this recipe, check out the sqlalchemy docs link below:
https://docs.sqlalchemy.org/en/20/dialects/sqlite.html#pysqlite-serializable
"""
# Use the sync_engine when AsyncEngine
if isinstance(engine, sa.ext.asyncio.AsyncEngine):
engine = engine.sync_engine
if engine.dialect.name != "sqlite":
return
def do_connect(dbapi_connection, connection_record):
# disable pysqlite's emitting of the BEGIN statement entirely.
# also stops it from emitting COMMIT before any DDL.
dbapi_connection.isolation_level = None
if not sa.event.contains(engine, "connect", do_connect):
sa.event.listen(engine, "connect", do_connect)
def do_begin(conn_):
# emit our own BEGIN
conn_.exec_driver_sql("BEGIN")
if not sa.event.contains(engine, "begin", do_begin):
sa.event.listen(engine, "begin", do_begin)