Skip to content

Commit f373b79

Browse files
authored
Fix wrong event loop usage from PGMQueue (#42)
1 parent 55460fc commit f373b79

7 files changed

Lines changed: 530 additions & 417 deletions

File tree

pgmq_sqlalchemy/queue.py

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import asyncio
21
from typing import List, Optional
32

43
from sqlalchemy import create_engine
@@ -23,14 +22,12 @@ class PGMQueue:
2322

2423
is_async: bool = False
2524
is_pg_partman_ext_checked: bool = False
26-
loop: asyncio.AbstractEventLoop = None
2725

2826
def __init__(
2927
self,
3028
dsn: Optional[str] = None,
3129
engine: Optional[ENGINE_TYPE] = None,
3230
session_maker: Optional[sessionmaker] = None,
33-
loop: Optional[asyncio.AbstractEventLoop] = None,
3431
) -> None:
3532
"""
3633
@@ -79,8 +76,6 @@ def __init__(
7976
dsn (Optional[str]): Database connection string.
8077
engine (Optional[ENGINE_TYPE]): SQLAlchemy engine (sync or async).
8178
session_maker (Optional[sessionmaker]): SQLAlchemy session maker.
82-
loop (Optional[asyncio.AbstractEventLoop]): Event loop for async operations.
83-
If not provided, a new event loop will be created for async engines.
8479
8580
.. note::
8681
| ``PGMQueue`` will **auto create** the ``pgmq`` extension ( and ``pg_partman`` extension if the method is related with **partitioned_queue** ) if it does not exist in the Postgres.
@@ -107,27 +102,6 @@ def __init__(
107102
bind=self.engine, class_=get_session_type(self.engine)
108103
)
109104

110-
if self.is_async:
111-
if loop is not None:
112-
# Use the provided event loop
113-
self.loop = loop
114-
else:
115-
# Create a new event loop
116-
self.loop = asyncio.new_event_loop()
117-
118-
# create pgmq extension if not exists
119-
self._check_pgmq_ext()
120-
121-
async def _check_pgmq_ext_async(self) -> None:
122-
"""Check if the pgmq extension exists."""
123-
async with self.session_maker() as session:
124-
await PGMQOperation.check_pgmq_ext_async(session=session, commit=True)
125-
126-
def _check_pgmq_ext_sync(self) -> None:
127-
"""Check if the pgmq extension exists."""
128-
with self.session_maker() as session:
129-
PGMQOperation.check_pgmq_ext(session=session, commit=True)
130-
131105
def _check_pgmq_ext(self) -> None:
132106
"""Check if the pgmq extension exists."""
133107
if self.is_async:

tests/conftest.py

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,14 @@
1010
from tests.constant import ASYNC_DRIVERS, SYNC_DRIVERS
1111

1212
# Async fixture names for test filtering
13-
ASYNC_FIXTURE_NAMES = ['pgmq_by_async_dsn', 'pgmq_by_async_engine', 'pgmq_by_async_session_maker']
13+
ASYNC_FIXTURE_NAMES = [
14+
"pgmq_by_async_dsn",
15+
"pgmq_by_async_engine",
16+
"pgmq_by_async_session_maker",
17+
]
1418

1519

16-
def pytest_addoption(parser):
20+
def pytest_addoption(parser: pytest.Parser):
1721
"""Add custom command-line options for pytest."""
1822
parser.addoption(
1923
"--driver",
@@ -29,30 +33,30 @@ def pytest_addoption(parser):
2933
)
3034

3135

32-
def pytest_generate_tests(metafunc):
36+
def pytest_generate_tests(metafunc: pytest.Metafunc):
3337
"""
3438
Dynamically generate test parametrization based on CLI options.
35-
39+
3640
This allows us to parametrize fixtures based on the --driver option.
3741
"""
3842
if "pgmq_all_variants" in metafunc.fixturenames:
3943
driver_from_cli = metafunc.config.getoption("--driver")
40-
44+
4145
# Define sync and async fixture variants
4246
sync_fixtures = [
43-
'pgmq_by_dsn',
44-
'pgmq_by_engine',
45-
'pgmq_by_session_maker',
46-
'pgmq_by_dsn_and_engine',
47-
'pgmq_by_dsn_and_session_maker',
47+
"pgmq_by_dsn",
48+
"pgmq_by_engine",
49+
"pgmq_by_session_maker",
50+
"pgmq_by_dsn_and_engine",
51+
"pgmq_by_dsn_and_session_maker",
4852
]
49-
53+
5054
async_fixtures = [
51-
'pgmq_by_async_dsn',
52-
'pgmq_by_async_engine',
53-
'pgmq_by_async_session_maker',
55+
"pgmq_by_async_dsn",
56+
"pgmq_by_async_engine",
57+
"pgmq_by_async_session_maker",
5458
]
55-
59+
5660
# Determine which fixtures to use
5761
if not driver_from_cli:
5862
# No driver specified, use all fixtures
@@ -63,13 +67,9 @@ def pytest_generate_tests(metafunc):
6367
else:
6468
# Sync driver specified
6569
fixture_params = sync_fixtures
66-
70+
6771
# Parametrize the test
68-
metafunc.parametrize(
69-
"pgmq_all_variants",
70-
fixture_params,
71-
indirect=True
72-
)
72+
metafunc.parametrize("pgmq_all_variants", fixture_params, indirect=True)
7373

7474

7575
@pytest.fixture(scope="module")
@@ -93,7 +93,7 @@ def get_sa_password():
9393

9494

9595
@pytest.fixture(scope="module")
96-
def get_sa_db(request):
96+
def get_sa_db(request: pytest.FixtureRequest):
9797
"""Get database name from CLI argument or environment variable."""
9898
db_name_from_cli = request.config.getoption("--db-name")
9999
if db_name_from_cli:
@@ -112,14 +112,14 @@ def get_dsn(
112112
):
113113
"""Get DSN for sync drivers based on CLI option."""
114114
driver_from_cli = request.config.getoption("--driver")
115-
115+
116116
# Use CLI driver if specified and it's a sync driver
117117
if driver_from_cli and driver_from_cli in SYNC_DRIVERS:
118118
driver = driver_from_cli
119119
else:
120120
# Default to first sync driver if no CLI option or invalid
121121
driver = SYNC_DRIVERS[0]
122-
122+
123123
return f"postgresql+{driver}://{get_sa_user}:{get_sa_password}@{get_sa_host}:{get_sa_port}/{get_sa_db}"
124124

125125

@@ -134,14 +134,14 @@ def get_async_dsn(
134134
):
135135
"""Get DSN for async drivers based on CLI option."""
136136
driver_from_cli = request.config.getoption("--driver")
137-
137+
138138
# Use CLI driver if specified and it's an async driver
139139
if driver_from_cli and driver_from_cli in ASYNC_DRIVERS:
140140
driver = driver_from_cli
141141
else:
142142
# Default to first async driver if no CLI option or invalid
143143
driver = ASYNC_DRIVERS[0]
144-
144+
145145
return f"postgresql+{driver}://{get_sa_user}:{get_sa_password}@{get_sa_host}:{get_sa_port}/{get_sa_db}"
146146

147147

tests/fixture_deps.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
import uuid
22
from typing import Tuple
3+
from inspect import iscoroutinefunction
34

45
import pytest
56

67
from pgmq_sqlalchemy import PGMQueue
8+
from tests.constant import ASYNC_DRIVERS
79
from tests._utils import check_queue_exists
810

911
PGMQ_WITH_QUEUE = Tuple[PGMQueue, str]
@@ -13,18 +15,24 @@
1315
def pgmq_all_variants(request: pytest.FixtureRequest) -> PGMQueue:
1416
"""
1517
Fixture that parametrizes tests across all appropriate PGMQueue initialization methods.
16-
18+
1719
When --driver is specified, only fixtures matching that driver type (sync/async) are used.
1820
Without --driver, all fixtures are used.
19-
21+
2022
The parametrization is handled by pytest_generate_tests in conftest.py.
21-
23+
2224
Usage:
2325
def test_something(pgmq_all_variants):
2426
pgmq: PGMQueue = pgmq_all_variants
2527
# test code here
2628
"""
2729
# The param is set by pytest_generate_tests via indirect parametrization
30+
is_async_test = iscoroutinefunction(request.function)
31+
driver_from_cli = request.config.getoption("--driver")
32+
if driver_from_cli and (driver_from_cli in ASYNC_DRIVERS and not is_async_test):
33+
pytest.skip(
34+
reason=f"Skip sync test: {request.function.__name__}, as driver: {driver_from_cli} is async"
35+
)
2836
return request.getfixturevalue(request.param)
2937

3038

tests/test_construct_pgmq.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33

44
from tests.fixture_deps import pgmq_all_variants
55

6+
use_fixtures = [pgmq_all_variants]
7+
68

79
def test_construct_pgmq(pgmq_all_variants):
810
pgmq: PGMQueue = pgmq_all_variants

tests/test_event_loop.py

Lines changed: 0 additions & 58 deletions
This file was deleted.

0 commit comments

Comments
 (0)