From 74f7fd464e297b9cb6f67d4ae2d9933eed27c3d9 Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Tue, 6 Jan 2026 17:10:07 +0800 Subject: [PATCH] Fix wrong event loop usage from PGMQueue --- pgmq_sqlalchemy/queue.py | 26 -- tests/conftest.py | 52 +-- tests/fixture_deps.py | 14 +- tests/test_construct_pgmq.py | 2 + tests/test_event_loop.py | 58 --- tests/test_operation.py | 738 ++++++++++++++++++++++------------- tests/test_queue.py | 57 +-- 7 files changed, 530 insertions(+), 417 deletions(-) delete mode 100644 tests/test_event_loop.py diff --git a/pgmq_sqlalchemy/queue.py b/pgmq_sqlalchemy/queue.py index 7be8fc1..b55c51a 100644 --- a/pgmq_sqlalchemy/queue.py +++ b/pgmq_sqlalchemy/queue.py @@ -1,4 +1,3 @@ -import asyncio from typing import List, Optional from sqlalchemy import create_engine @@ -23,14 +22,12 @@ class PGMQueue: is_async: bool = False is_pg_partman_ext_checked: bool = False - loop: asyncio.AbstractEventLoop = None def __init__( self, dsn: Optional[str] = None, engine: Optional[ENGINE_TYPE] = None, session_maker: Optional[sessionmaker] = None, - loop: Optional[asyncio.AbstractEventLoop] = None, ) -> None: """ @@ -79,8 +76,6 @@ def __init__( dsn (Optional[str]): Database connection string. engine (Optional[ENGINE_TYPE]): SQLAlchemy engine (sync or async). session_maker (Optional[sessionmaker]): SQLAlchemy session maker. - loop (Optional[asyncio.AbstractEventLoop]): Event loop for async operations. - If not provided, a new event loop will be created for async engines. .. note:: | ``PGMQueue`` will **auto create** the ``pgmq`` extension ( and ``pg_partman`` extension if the method is related with **partitioned_queue** ) if it does not exist in the Postgres. @@ -107,27 +102,6 @@ def __init__( bind=self.engine, class_=get_session_type(self.engine) ) - if self.is_async: - if loop is not None: - # Use the provided event loop - self.loop = loop - else: - # Create a new event loop - self.loop = asyncio.new_event_loop() - - # create pgmq extension if not exists - self._check_pgmq_ext() - - async def _check_pgmq_ext_async(self) -> None: - """Check if the pgmq extension exists.""" - async with self.session_maker() as session: - await PGMQOperation.check_pgmq_ext_async(session=session, commit=True) - - def _check_pgmq_ext_sync(self) -> None: - """Check if the pgmq extension exists.""" - with self.session_maker() as session: - PGMQOperation.check_pgmq_ext(session=session, commit=True) - def _check_pgmq_ext(self) -> None: """Check if the pgmq extension exists.""" if self.is_async: diff --git a/tests/conftest.py b/tests/conftest.py index 5a65cb7..9bf6393 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -10,10 +10,14 @@ from tests.constant import ASYNC_DRIVERS, SYNC_DRIVERS # Async fixture names for test filtering -ASYNC_FIXTURE_NAMES = ['pgmq_by_async_dsn', 'pgmq_by_async_engine', 'pgmq_by_async_session_maker'] +ASYNC_FIXTURE_NAMES = [ + "pgmq_by_async_dsn", + "pgmq_by_async_engine", + "pgmq_by_async_session_maker", +] -def pytest_addoption(parser): +def pytest_addoption(parser: pytest.Parser): """Add custom command-line options for pytest.""" parser.addoption( "--driver", @@ -29,30 +33,30 @@ def pytest_addoption(parser): ) -def pytest_generate_tests(metafunc): +def pytest_generate_tests(metafunc: pytest.Metafunc): """ Dynamically generate test parametrization based on CLI options. - + This allows us to parametrize fixtures based on the --driver option. """ if "pgmq_all_variants" in metafunc.fixturenames: driver_from_cli = metafunc.config.getoption("--driver") - + # Define sync and async fixture variants sync_fixtures = [ - 'pgmq_by_dsn', - 'pgmq_by_engine', - 'pgmq_by_session_maker', - 'pgmq_by_dsn_and_engine', - 'pgmq_by_dsn_and_session_maker', + "pgmq_by_dsn", + "pgmq_by_engine", + "pgmq_by_session_maker", + "pgmq_by_dsn_and_engine", + "pgmq_by_dsn_and_session_maker", ] - + async_fixtures = [ - 'pgmq_by_async_dsn', - 'pgmq_by_async_engine', - 'pgmq_by_async_session_maker', + "pgmq_by_async_dsn", + "pgmq_by_async_engine", + "pgmq_by_async_session_maker", ] - + # Determine which fixtures to use if not driver_from_cli: # No driver specified, use all fixtures @@ -63,13 +67,9 @@ def pytest_generate_tests(metafunc): else: # Sync driver specified fixture_params = sync_fixtures - + # Parametrize the test - metafunc.parametrize( - "pgmq_all_variants", - fixture_params, - indirect=True - ) + metafunc.parametrize("pgmq_all_variants", fixture_params, indirect=True) @pytest.fixture(scope="module") @@ -93,7 +93,7 @@ def get_sa_password(): @pytest.fixture(scope="module") -def get_sa_db(request): +def get_sa_db(request: pytest.FixtureRequest): """Get database name from CLI argument or environment variable.""" db_name_from_cli = request.config.getoption("--db-name") if db_name_from_cli: @@ -112,14 +112,14 @@ def get_dsn( ): """Get DSN for sync drivers based on CLI option.""" driver_from_cli = request.config.getoption("--driver") - + # Use CLI driver if specified and it's a sync driver if driver_from_cli and driver_from_cli in SYNC_DRIVERS: driver = driver_from_cli else: # Default to first sync driver if no CLI option or invalid driver = SYNC_DRIVERS[0] - + return f"postgresql+{driver}://{get_sa_user}:{get_sa_password}@{get_sa_host}:{get_sa_port}/{get_sa_db}" @@ -134,14 +134,14 @@ def get_async_dsn( ): """Get DSN for async drivers based on CLI option.""" driver_from_cli = request.config.getoption("--driver") - + # Use CLI driver if specified and it's an async driver if driver_from_cli and driver_from_cli in ASYNC_DRIVERS: driver = driver_from_cli else: # Default to first async driver if no CLI option or invalid driver = ASYNC_DRIVERS[0] - + return f"postgresql+{driver}://{get_sa_user}:{get_sa_password}@{get_sa_host}:{get_sa_port}/{get_sa_db}" diff --git a/tests/fixture_deps.py b/tests/fixture_deps.py index b1bc6e8..d691f8c 100644 --- a/tests/fixture_deps.py +++ b/tests/fixture_deps.py @@ -1,9 +1,11 @@ import uuid from typing import Tuple +from inspect import iscoroutinefunction import pytest from pgmq_sqlalchemy import PGMQueue +from tests.constant import ASYNC_DRIVERS from tests._utils import check_queue_exists PGMQ_WITH_QUEUE = Tuple[PGMQueue, str] @@ -13,18 +15,24 @@ def pgmq_all_variants(request: pytest.FixtureRequest) -> PGMQueue: """ Fixture that parametrizes tests across all appropriate PGMQueue initialization methods. - + When --driver is specified, only fixtures matching that driver type (sync/async) are used. Without --driver, all fixtures are used. - + The parametrization is handled by pytest_generate_tests in conftest.py. - + Usage: def test_something(pgmq_all_variants): pgmq: PGMQueue = pgmq_all_variants # test code here """ # The param is set by pytest_generate_tests via indirect parametrization + is_async_test = iscoroutinefunction(request.function) + driver_from_cli = request.config.getoption("--driver") + if driver_from_cli and (driver_from_cli in ASYNC_DRIVERS and not is_async_test): + pytest.skip( + reason=f"Skip sync test: {request.function.__name__}, as driver: {driver_from_cli} is async" + ) return request.getfixturevalue(request.param) diff --git a/tests/test_construct_pgmq.py b/tests/test_construct_pgmq.py index 4082028..3800d38 100644 --- a/tests/test_construct_pgmq.py +++ b/tests/test_construct_pgmq.py @@ -3,6 +3,8 @@ from tests.fixture_deps import pgmq_all_variants +use_fixtures = [pgmq_all_variants] + def test_construct_pgmq(pgmq_all_variants): pgmq: PGMQueue = pgmq_all_variants diff --git a/tests/test_event_loop.py b/tests/test_event_loop.py deleted file mode 100644 index 4930660..0000000 --- a/tests/test_event_loop.py +++ /dev/null @@ -1,58 +0,0 @@ -import asyncio -import pytest -from sqlalchemy.ext.asyncio import create_async_engine - -from pgmq_sqlalchemy import PGMQueue - - -def test_event_loop_with_provided_loop(get_async_dsn): - """Test that PGMQueue uses the provided event loop.""" - custom_loop = asyncio.new_event_loop() - pgmq = PGMQueue(dsn=get_async_dsn, loop=custom_loop) - - assert pgmq.loop is custom_loop - assert pgmq.is_async is True - - # Clean up - custom_loop.close() - - -def test_event_loop_creates_new_when_not_provided(get_async_dsn): - """Test that PGMQueue creates a new event loop when none is provided.""" - pgmq = PGMQueue(dsn=get_async_dsn) - - assert pgmq.loop is not None - assert pgmq.is_async is True - assert isinstance(pgmq.loop, asyncio.AbstractEventLoop) - - -def test_event_loop_with_sync_dsn_has_no_loop(get_dsn): - """Test that sync PGMQueue does not have an event loop.""" - pgmq = PGMQueue(dsn=get_dsn) - - assert pgmq.loop is None - assert pgmq.is_async is False - - -def test_event_loop_with_provided_engine(get_async_engine): - """Test that PGMQueue uses provided loop with async engine.""" - custom_loop = asyncio.new_event_loop() - pgmq = PGMQueue(engine=get_async_engine, loop=custom_loop) - - assert pgmq.loop is custom_loop - assert pgmq.is_async is True - - # Clean up - custom_loop.close() - - -def test_event_loop_different_instances_have_different_loops(get_async_dsn): - """Test that different PGMQueue instances create separate event loops when not provided.""" - pgmq1 = PGMQueue(dsn=get_async_dsn) - pgmq2 = PGMQueue(dsn=get_async_dsn) - - assert pgmq1.loop is not None - assert pgmq2.loop is not None - # Each instance should have its own event loop - assert pgmq1.loop is not pgmq2.loop - diff --git a/tests/test_operation.py b/tests/test_operation.py index f6762da..ce5bd62 100644 --- a/tests/test_operation.py +++ b/tests/test_operation.py @@ -10,7 +10,7 @@ from sqlalchemy.exc import ProgrammingError, InternalError from pgmq_sqlalchemy.operation import PGMQOperation -from pgmq_sqlalchemy.schema import Message, QueueMetrics +from pgmq_sqlalchemy.schema import QueueMetrics from tests._utils import check_queue_exists from tests.constant import MSG @@ -28,277 +28,347 @@ def test_check_pgmq_ext_sync(get_session_maker): def test_create_queue_sync(get_session_maker, db_session): """Test creating a queue using PGMQOperation.""" queue_name = f"test_queue_{uuid.uuid4().hex}" - + with get_session_maker() as session: - PGMQOperation.create_queue(queue_name, unlogged=False, session=session, commit=True) - + PGMQOperation.create_queue( + queue_name, unlogged=False, session=session, commit=True + ) + assert check_queue_exists(db_session, queue_name) is True - + # Clean up with get_session_maker() as session: - PGMQOperation.drop_queue(queue_name, partitioned=False, session=session, commit=True) + PGMQOperation.drop_queue( + queue_name, partitioned=False, session=session, commit=True + ) def test_create_unlogged_queue_sync(get_session_maker, db_session): """Test creating an unlogged queue using PGMQOperation.""" queue_name = f"test_queue_{uuid.uuid4().hex}" - + with get_session_maker() as session: - PGMQOperation.create_queue(queue_name, unlogged=True, session=session, commit=True) - + PGMQOperation.create_queue( + queue_name, unlogged=True, session=session, commit=True + ) + assert check_queue_exists(db_session, queue_name) is True - + # Clean up with get_session_maker() as session: - PGMQOperation.drop_queue(queue_name, partitioned=False, session=session, commit=True) + PGMQOperation.drop_queue( + queue_name, partitioned=False, session=session, commit=True + ) def test_validate_queue_name_sync(get_session_maker): """Test queue name validation.""" queue_name = f"test_queue_{uuid.uuid4().hex}" - + with get_session_maker() as session: # Should not raise for valid name PGMQOperation.validate_queue_name(queue_name, session=session, commit=True) - + # Should raise for name that's too long (either ProgrammingError or InternalError depending on driver) with pytest.raises((ProgrammingError, InternalError)) as e: PGMQOperation.validate_queue_name("a" * 49, session=session, commit=True) - error_msg = str(e.value.orig) if hasattr(e.value, 'orig') else str(e.value) + error_msg = str(e.value.orig) if hasattr(e.value, "orig") else str(e.value) assert "queue name is too long" in error_msg def test_list_queues_sync(get_session_maker, db_session): """Test listing queues.""" queue_name = f"test_queue_{uuid.uuid4().hex}" - + # Create a queue with get_session_maker() as session: - PGMQOperation.create_queue(queue_name, unlogged=False, session=session, commit=True) - + PGMQOperation.create_queue( + queue_name, unlogged=False, session=session, commit=True + ) + # List queues with get_session_maker() as session: queues = PGMQOperation.list_queues(session=session, commit=True) - + assert queue_name in queues - + # Clean up with get_session_maker() as session: - PGMQOperation.drop_queue(queue_name, partitioned=False, session=session, commit=True) + PGMQOperation.drop_queue( + queue_name, partitioned=False, session=session, commit=True + ) def test_send_and_read_sync(get_session_maker, db_session): """Test sending and reading messages.""" queue_name = f"test_queue_{uuid.uuid4().hex}" - + # Create queue with get_session_maker() as session: - PGMQOperation.create_queue(queue_name, unlogged=False, session=session, commit=True) - + PGMQOperation.create_queue( + queue_name, unlogged=False, session=session, commit=True + ) + # Send a message with get_session_maker() as session: - msg_id = PGMQOperation.send(queue_name, MSG, delay=0, session=session, commit=True) - + msg_id = PGMQOperation.send( + queue_name, MSG, delay=0, session=session, commit=True + ) + assert msg_id > 0 - + # Read the message with get_session_maker() as session: msg = PGMQOperation.read(queue_name, vt=30, session=session, commit=True) - + assert msg is not None assert msg.msg_id == msg_id assert msg.message == MSG - + # Clean up with get_session_maker() as session: - PGMQOperation.drop_queue(queue_name, partitioned=False, session=session, commit=True) + PGMQOperation.drop_queue( + queue_name, partitioned=False, session=session, commit=True + ) def test_send_batch_sync(get_session_maker, db_session): """Test sending a batch of messages.""" queue_name = f"test_queue_{uuid.uuid4().hex}" messages = [{"key": f"value{i}"} for i in range(5)] - + # Create queue with get_session_maker() as session: - PGMQOperation.create_queue(queue_name, unlogged=False, session=session, commit=True) - + PGMQOperation.create_queue( + queue_name, unlogged=False, session=session, commit=True + ) + # Send batch with get_session_maker() as session: - msg_ids = PGMQOperation.send_batch(queue_name, messages, delay=0, session=session, commit=True) - + msg_ids = PGMQOperation.send_batch( + queue_name, messages, delay=0, session=session, commit=True + ) + assert len(msg_ids) == 5 - + # Read batch with get_session_maker() as session: - msgs = PGMQOperation.read_batch(queue_name, vt=30, batch_size=5, session=session, commit=True) - + msgs = PGMQOperation.read_batch( + queue_name, vt=30, batch_size=5, session=session, commit=True + ) + assert len(msgs) == 5 - + # Clean up with get_session_maker() as session: - PGMQOperation.drop_queue(queue_name, partitioned=False, session=session, commit=True) + PGMQOperation.drop_queue( + queue_name, partitioned=False, session=session, commit=True + ) def test_pop_sync(get_session_maker, db_session): """Test popping a message from the queue.""" queue_name = f"test_queue_{uuid.uuid4().hex}" - + # Create queue and send message with get_session_maker() as session: - PGMQOperation.create_queue(queue_name, unlogged=False, session=session, commit=True) - msg_id = PGMQOperation.send(queue_name, MSG, delay=0, session=session, commit=True) - + PGMQOperation.create_queue( + queue_name, unlogged=False, session=session, commit=True + ) + msg_id = PGMQOperation.send( + queue_name, MSG, delay=0, session=session, commit=True + ) + # Pop message with get_session_maker() as session: msg = PGMQOperation.pop(queue_name, session=session, commit=True) - + assert msg is not None assert msg.msg_id == msg_id - + # Verify queue is empty with get_session_maker() as session: msg2 = PGMQOperation.pop(queue_name, session=session, commit=True) - + assert msg2 is None - + # Clean up with get_session_maker() as session: - PGMQOperation.drop_queue(queue_name, partitioned=False, session=session, commit=True) + PGMQOperation.drop_queue( + queue_name, partitioned=False, session=session, commit=True + ) def test_delete_sync(get_session_maker, db_session): """Test deleting a message.""" queue_name = f"test_queue_{uuid.uuid4().hex}" - + # Create queue and send message with get_session_maker() as session: - PGMQOperation.create_queue(queue_name, unlogged=False, session=session, commit=True) - msg_id = PGMQOperation.send(queue_name, MSG, delay=0, session=session, commit=True) - + PGMQOperation.create_queue( + queue_name, unlogged=False, session=session, commit=True + ) + msg_id = PGMQOperation.send( + queue_name, MSG, delay=0, session=session, commit=True + ) + # Delete message with get_session_maker() as session: deleted = PGMQOperation.delete(queue_name, msg_id, session=session, commit=True) - + assert deleted is True - + # Clean up with get_session_maker() as session: - PGMQOperation.drop_queue(queue_name, partitioned=False, session=session, commit=True) + PGMQOperation.drop_queue( + queue_name, partitioned=False, session=session, commit=True + ) def test_set_vt_sync(get_session_maker, db_session): """Test setting visibility timeout.""" queue_name = f"test_queue_{uuid.uuid4().hex}" - + # Create queue and send message with get_session_maker() as session: - PGMQOperation.create_queue(queue_name, unlogged=False, session=session, commit=True) - msg_id = PGMQOperation.send(queue_name, MSG, delay=0, session=session, commit=True) + PGMQOperation.create_queue( + queue_name, unlogged=False, session=session, commit=True + ) + msg_id = PGMQOperation.send( + queue_name, MSG, delay=0, session=session, commit=True + ) # Read message to set initial vt PGMQOperation.read(queue_name, vt=5, session=session, commit=True) - + # Set new vt with get_session_maker() as session: - msg = PGMQOperation.set_vt(queue_name, msg_id, vt=60, session=session, commit=True) - + msg = PGMQOperation.set_vt( + queue_name, msg_id, vt=60, session=session, commit=True + ) + assert msg is not None - + # Clean up with get_session_maker() as session: - PGMQOperation.drop_queue(queue_name, partitioned=False, session=session, commit=True) + PGMQOperation.drop_queue( + queue_name, partitioned=False, session=session, commit=True + ) def test_archive_sync(get_session_maker, db_session): """Test archiving a message.""" queue_name = f"test_queue_{uuid.uuid4().hex}" - + # Create queue and send message with get_session_maker() as session: - PGMQOperation.create_queue(queue_name, unlogged=False, session=session, commit=True) - msg_id = PGMQOperation.send(queue_name, MSG, delay=0, session=session, commit=True) - + PGMQOperation.create_queue( + queue_name, unlogged=False, session=session, commit=True + ) + msg_id = PGMQOperation.send( + queue_name, MSG, delay=0, session=session, commit=True + ) + # Archive message with get_session_maker() as session: - archived = PGMQOperation.archive(queue_name, msg_id, session=session, commit=True) - + archived = PGMQOperation.archive( + queue_name, msg_id, session=session, commit=True + ) + assert archived is True - + # Clean up with get_session_maker() as session: - PGMQOperation.drop_queue(queue_name, partitioned=False, session=session, commit=True) + PGMQOperation.drop_queue( + queue_name, partitioned=False, session=session, commit=True + ) def test_metrics_sync(get_session_maker, db_session): """Test getting queue metrics.""" queue_name = f"test_queue_{uuid.uuid4().hex}" - + # Create queue with get_session_maker() as session: - PGMQOperation.create_queue(queue_name, unlogged=False, session=session, commit=True) - + PGMQOperation.create_queue( + queue_name, unlogged=False, session=session, commit=True + ) + # Get metrics for empty queue with get_session_maker() as session: metrics = PGMQOperation.metrics(queue_name, session=session, commit=True) - + assert metrics is not None assert isinstance(metrics, QueueMetrics) assert metrics.queue_name == queue_name assert metrics.queue_length == 0 assert metrics.total_messages == 0 - + # Send some messages with get_session_maker() as session: for i in range(3): - PGMQOperation.send(queue_name, {"index": i}, delay=0, session=session, commit=True) - + PGMQOperation.send( + queue_name, {"index": i}, delay=0, session=session, commit=True + ) + # Get metrics after adding messages with get_session_maker() as session: metrics = PGMQOperation.metrics(queue_name, session=session, commit=True) - + assert metrics.queue_length == 3 assert metrics.total_messages == 3 - + # Clean up with get_session_maker() as session: - PGMQOperation.drop_queue(queue_name, partitioned=False, session=session, commit=True) + PGMQOperation.drop_queue( + queue_name, partitioned=False, session=session, commit=True + ) def test_metrics_all_sync(get_session_maker, db_session): """Test getting metrics for all queues.""" queue_name1 = f"test_queue_{uuid.uuid4().hex}" queue_name2 = f"test_queue_{uuid.uuid4().hex}" - + # Create two queues with get_session_maker() as session: - PGMQOperation.create_queue(queue_name1, unlogged=False, session=session, commit=True) - PGMQOperation.create_queue(queue_name2, unlogged=False, session=session, commit=True) - + PGMQOperation.create_queue( + queue_name1, unlogged=False, session=session, commit=True + ) + PGMQOperation.create_queue( + queue_name2, unlogged=False, session=session, commit=True + ) + # Get metrics for all queues with get_session_maker() as session: all_metrics = PGMQOperation.metrics_all(session=session, commit=True) - + assert all_metrics is not None assert len(all_metrics) >= 2 queue_names = [m.queue_name for m in all_metrics] assert queue_name1 in queue_names assert queue_name2 in queue_names - + # Clean up with get_session_maker() as session: - PGMQOperation.drop_queue(queue_name1, partitioned=False, session=session, commit=True) - PGMQOperation.drop_queue(queue_name2, partitioned=False, session=session, commit=True) + PGMQOperation.drop_queue( + queue_name1, partitioned=False, session=session, commit=True + ) + PGMQOperation.drop_queue( + queue_name2, partitioned=False, session=session, commit=True + ) def test_transaction_rollback_sync(get_session_maker, db_session): """Test that operations can be rolled back when commit=False.""" queue_name = f"test_queue_{uuid.uuid4().hex}" - + # Create queue with commit=False, then rollback with get_session_maker() as session: - PGMQOperation.create_queue(queue_name, unlogged=False, session=session, commit=False) + PGMQOperation.create_queue( + queue_name, unlogged=False, session=session, commit=False + ) session.rollback() - + # Queue should not exist assert check_queue_exists(db_session, queue_name) is False @@ -306,17 +376,21 @@ def test_transaction_rollback_sync(get_session_maker, db_session): def test_transaction_commit_sync(get_session_maker, db_session): """Test that operations are committed when commit=True.""" queue_name = f"test_queue_{uuid.uuid4().hex}" - + # Create queue with commit=True with get_session_maker() as session: - PGMQOperation.create_queue(queue_name, unlogged=False, session=session, commit=True) - + PGMQOperation.create_queue( + queue_name, unlogged=False, session=session, commit=True + ) + # Queue should exist assert check_queue_exists(db_session, queue_name) is True - + # Clean up with get_session_maker() as session: - PGMQOperation.drop_queue(queue_name, partitioned=False, session=session, commit=True) + PGMQOperation.drop_queue( + queue_name, partitioned=False, session=session, commit=True + ) # Async tests @@ -334,214 +408,280 @@ async def test_check_pgmq_ext_async(get_async_session_maker): async def test_create_queue_async(get_async_session_maker, db_session): """Test creating a queue using PGMQOperation asynchronously.""" queue_name = f"test_queue_{uuid.uuid4().hex}" - + async with get_async_session_maker() as session: - await PGMQOperation.create_queue_async(queue_name, unlogged=False, session=session, commit=True) - + await PGMQOperation.create_queue_async( + queue_name, unlogged=False, session=session, commit=True + ) + assert check_queue_exists(db_session, queue_name) is True - + # Clean up async with get_async_session_maker() as session: - await PGMQOperation.drop_queue_async(queue_name, partitioned=False, session=session, commit=True) + await PGMQOperation.drop_queue_async( + queue_name, partitioned=False, session=session, commit=True + ) @pytest.mark.asyncio async def test_send_and_read_async(get_async_session_maker, db_session): """Test sending and reading messages asynchronously.""" queue_name = f"test_queue_{uuid.uuid4().hex}" - + # Create queue async with get_async_session_maker() as session: - await PGMQOperation.create_queue_async(queue_name, unlogged=False, session=session, commit=True) - + await PGMQOperation.create_queue_async( + queue_name, unlogged=False, session=session, commit=True + ) + # Send a message async with get_async_session_maker() as session: - msg_id = await PGMQOperation.send_async(queue_name, MSG, delay=0, session=session, commit=True) - + msg_id = await PGMQOperation.send_async( + queue_name, MSG, delay=0, session=session, commit=True + ) + assert msg_id > 0 - + # Read the message async with get_async_session_maker() as session: - msg = await PGMQOperation.read_async(queue_name, vt=30, session=session, commit=True) - + msg = await PGMQOperation.read_async( + queue_name, vt=30, session=session, commit=True + ) + assert msg is not None assert msg.msg_id == msg_id assert msg.message == MSG - + # Clean up async with get_async_session_maker() as session: - await PGMQOperation.drop_queue_async(queue_name, partitioned=False, session=session, commit=True) + await PGMQOperation.drop_queue_async( + queue_name, partitioned=False, session=session, commit=True + ) @pytest.mark.asyncio async def test_metrics_async(get_async_session_maker, db_session): """Test getting queue metrics asynchronously.""" queue_name = f"test_queue_{uuid.uuid4().hex}" - + # Create queue async with get_async_session_maker() as session: - await PGMQOperation.create_queue_async(queue_name, unlogged=False, session=session, commit=True) - + await PGMQOperation.create_queue_async( + queue_name, unlogged=False, session=session, commit=True + ) + # Get metrics async with get_async_session_maker() as session: - metrics = await PGMQOperation.metrics_async(queue_name, session=session, commit=True) - + metrics = await PGMQOperation.metrics_async( + queue_name, session=session, commit=True + ) + assert metrics is not None assert isinstance(metrics, QueueMetrics) assert metrics.queue_name == queue_name assert metrics.queue_length == 0 - + # Clean up async with get_async_session_maker() as session: - await PGMQOperation.drop_queue_async(queue_name, partitioned=False, session=session, commit=True) + await PGMQOperation.drop_queue_async( + queue_name, partitioned=False, session=session, commit=True + ) def test_delete_batch_sync(get_session_maker, db_session): """Test deleting a batch of messages.""" queue_name = f"test_queue_{uuid.uuid4().hex}" - + # Create queue and send messages with get_session_maker() as session: - PGMQOperation.create_queue(queue_name, unlogged=False, session=session, commit=True) - msg_id1 = PGMQOperation.send(queue_name, MSG, delay=0, session=session, commit=True) - msg_id2 = PGMQOperation.send(queue_name, MSG, delay=0, session=session, commit=True) - msg_id3 = PGMQOperation.send(queue_name, MSG, delay=0, session=session, commit=True) + PGMQOperation.create_queue( + queue_name, unlogged=False, session=session, commit=True + ) + msg_id1 = PGMQOperation.send( + queue_name, MSG, delay=0, session=session, commit=True + ) + msg_id2 = PGMQOperation.send( + queue_name, MSG, delay=0, session=session, commit=True + ) + msg_id3 = PGMQOperation.send( + queue_name, MSG, delay=0, session=session, commit=True + ) msg_ids = [msg_id1, msg_id2, msg_id3] - + # Delete batch with get_session_maker() as session: - deleted_ids = PGMQOperation.delete_batch(queue_name, msg_ids, session=session, commit=True) - + deleted_ids = PGMQOperation.delete_batch( + queue_name, msg_ids, session=session, commit=True + ) + assert len(deleted_ids) == 3 assert set(deleted_ids) == set(msg_ids) - + # Verify messages are deleted with get_session_maker() as session: msg = PGMQOperation.read(queue_name, vt=30, session=session, commit=True) - + assert msg is None - + # Clean up with get_session_maker() as session: - PGMQOperation.drop_queue(queue_name, partitioned=False, session=session, commit=True) + PGMQOperation.drop_queue( + queue_name, partitioned=False, session=session, commit=True + ) def test_archive_batch_sync(get_session_maker, db_session): """Test archiving a batch of messages.""" queue_name = f"test_queue_{uuid.uuid4().hex}" - + # Create queue and send messages with get_session_maker() as session: - PGMQOperation.create_queue(queue_name, unlogged=False, session=session, commit=True) - msg_id1 = PGMQOperation.send(queue_name, MSG, delay=0, session=session, commit=True) - msg_id2 = PGMQOperation.send(queue_name, MSG, delay=0, session=session, commit=True) - msg_id3 = PGMQOperation.send(queue_name, MSG, delay=0, session=session, commit=True) + PGMQOperation.create_queue( + queue_name, unlogged=False, session=session, commit=True + ) + msg_id1 = PGMQOperation.send( + queue_name, MSG, delay=0, session=session, commit=True + ) + msg_id2 = PGMQOperation.send( + queue_name, MSG, delay=0, session=session, commit=True + ) + msg_id3 = PGMQOperation.send( + queue_name, MSG, delay=0, session=session, commit=True + ) msg_ids = [msg_id1, msg_id2, msg_id3] - + # Archive batch with get_session_maker() as session: - archived_ids = PGMQOperation.archive_batch(queue_name, msg_ids, session=session, commit=True) - + archived_ids = PGMQOperation.archive_batch( + queue_name, msg_ids, session=session, commit=True + ) + assert len(archived_ids) == 3 assert set(archived_ids) == set(msg_ids) - + # Verify messages are archived (queue should be empty) with get_session_maker() as session: msg = PGMQOperation.read(queue_name, vt=30, session=session, commit=True) - + assert msg is None - + # Clean up with get_session_maker() as session: - PGMQOperation.drop_queue(queue_name, partitioned=False, session=session, commit=True) + PGMQOperation.drop_queue( + queue_name, partitioned=False, session=session, commit=True + ) def test_purge_sync(get_session_maker, db_session): """Test purging all messages from a queue.""" queue_name = f"test_queue_{uuid.uuid4().hex}" - + # Create queue and send messages with get_session_maker() as session: - PGMQOperation.create_queue(queue_name, unlogged=False, session=session, commit=True) + PGMQOperation.create_queue( + queue_name, unlogged=False, session=session, commit=True + ) PGMQOperation.send(queue_name, MSG, delay=0, session=session, commit=True) PGMQOperation.send(queue_name, MSG, delay=0, session=session, commit=True) PGMQOperation.send(queue_name, MSG, delay=0, session=session, commit=True) PGMQOperation.send(queue_name, MSG, delay=0, session=session, commit=True) PGMQOperation.send(queue_name, MSG, delay=0, session=session, commit=True) - + # Purge queue with get_session_maker() as session: purged_count = PGMQOperation.purge(queue_name, session=session, commit=True) - + assert purged_count == 5 - + # Verify queue is empty with get_session_maker() as session: msg = PGMQOperation.read(queue_name, vt=30, session=session, commit=True) - + assert msg is None - + # Clean up with get_session_maker() as session: - PGMQOperation.drop_queue(queue_name, partitioned=False, session=session, commit=True) + PGMQOperation.drop_queue( + queue_name, partitioned=False, session=session, commit=True + ) def test_read_with_poll_sync(get_session_maker, db_session): """Test reading messages with polling.""" queue_name = f"test_queue_{uuid.uuid4().hex}" - + # Create queue with get_session_maker() as session: - PGMQOperation.create_queue(queue_name, unlogged=False, session=session, commit=True) - + PGMQOperation.create_queue( + queue_name, unlogged=False, session=session, commit=True + ) + # Test with empty queue - should return None after polling start = time.time() with get_session_maker() as session: msgs = PGMQOperation.read_with_poll( - queue_name, vt=30, qty=1, max_poll_seconds=2, poll_interval_ms=100, - session=session, commit=True + queue_name, + vt=30, + qty=1, + max_poll_seconds=2, + poll_interval_ms=100, + session=session, + commit=True, ) elapsed = time.time() - start - + assert msgs is None assert elapsed >= 2 # Should have polled for at least 2 seconds - + # Send a message and test immediate read with get_session_maker() as session: - msg_id = PGMQOperation.send(queue_name, MSG, delay=0, session=session, commit=True) - + msg_id = PGMQOperation.send( + queue_name, MSG, delay=0, session=session, commit=True + ) + with get_session_maker() as session: msgs = PGMQOperation.read_with_poll( - queue_name, vt=30, qty=1, max_poll_seconds=5, poll_interval_ms=100, - session=session, commit=True + queue_name, + vt=30, + qty=1, + max_poll_seconds=5, + poll_interval_ms=100, + session=session, + commit=True, ) - + assert msgs is not None assert len(msgs) == 1 assert msgs[0].msg_id == msg_id - + # Clean up with get_session_maker() as session: - PGMQOperation.drop_queue(queue_name, partitioned=False, session=session, commit=True) + PGMQOperation.drop_queue( + queue_name, partitioned=False, session=session, commit=True + ) def test_drop_queue_sync(get_session_maker, db_session): """Test dropping a queue.""" queue_name = f"test_queue_{uuid.uuid4().hex}" - + # Create queue with get_session_maker() as session: - PGMQOperation.create_queue(queue_name, unlogged=False, session=session, commit=True) - + PGMQOperation.create_queue( + queue_name, unlogged=False, session=session, commit=True + ) + # Verify queue exists assert check_queue_exists(db_session, queue_name) is True - + # Drop queue with get_session_maker() as session: - dropped = PGMQOperation.drop_queue(queue_name, partitioned=False, session=session, commit=True) - + dropped = PGMQOperation.drop_queue( + queue_name, partitioned=False, session=session, commit=True + ) + assert dropped is True - + # Verify queue is dropped assert check_queue_exists(db_session, queue_name) is False @@ -562,73 +702,81 @@ def test_check_pg_partman_ext_sync(get_session_maker): def test_create_partitioned_queue_sync(get_session_maker, db_session): """Test creating a partitioned queue.""" queue_name = f"part_{uuid.uuid4().hex[:20]}" - + # First ensure pg_partman extension is available try: with get_session_maker() as session: PGMQOperation.check_pg_partman_ext(session=session, commit=True) except Exception as e: pytest.skip(f"pg_partman extension not available: {e}") - + # Create partitioned queue with numeric partitioning with get_session_maker() as session: PGMQOperation.create_partitioned_queue( - queue_name, - partition_interval="10000", + queue_name, + partition_interval="10000", retention_interval="100000", - session=session, - commit=True + session=session, + commit=True, ) - + assert check_queue_exists(db_session, queue_name) is True - + # Test sending and reading from partitioned queue with get_session_maker() as session: - msg_id = PGMQOperation.send(queue_name, MSG, delay=0, session=session, commit=True) + msg_id = PGMQOperation.send( + queue_name, MSG, delay=0, session=session, commit=True + ) msg = PGMQOperation.read(queue_name, vt=30, session=session, commit=True) - + assert msg is not None assert msg.msg_id == msg_id - + # Clean up with get_session_maker() as session: - PGMQOperation.drop_queue(queue_name, partitioned=True, session=session, commit=True) + PGMQOperation.drop_queue( + queue_name, partitioned=True, session=session, commit=True + ) def test_create_time_based_partitioned_queue_sync(get_session_maker, db_session): """Test creating a time-based partitioned queue.""" queue_name = f"time_{uuid.uuid4().hex[:20]}" - + # First ensure pg_partman extension is available try: with get_session_maker() as session: PGMQOperation.check_pg_partman_ext(session=session, commit=True) except Exception as e: pytest.skip(f"pg_partman extension not available: {e}") - + # Create partitioned queue with time-based partitioning with get_session_maker() as session: PGMQOperation.create_partitioned_queue( - queue_name, - partition_interval="1 day", + queue_name, + partition_interval="1 day", retention_interval="7 days", - session=session, - commit=True + session=session, + commit=True, ) - + assert check_queue_exists(db_session, queue_name) is True - + # Test sending and reading from time-based partitioned queue with get_session_maker() as session: - msg_id = PGMQOperation.send(queue_name, MSG, delay=0, session=session, commit=True) + msg_id = PGMQOperation.send( + queue_name, MSG, delay=0, session=session, commit=True + ) msg = PGMQOperation.read(queue_name, vt=30, session=session, commit=True) - + assert msg is not None assert msg.msg_id == msg_id - + # Clean up with get_session_maker() as session: - PGMQOperation.drop_queue(queue_name, partitioned=True, session=session, commit=True) + PGMQOperation.drop_queue( + queue_name, partitioned=True, session=session, commit=True + ) # Async tests for newly added coverage @@ -638,135 +786,195 @@ def test_create_time_based_partitioned_queue_sync(get_session_maker, db_session) async def test_delete_batch_async(get_async_session_maker, db_session): """Test deleting a batch of messages asynchronously.""" queue_name = f"test_queue_{uuid.uuid4().hex}" - + # Create queue and send messages async with get_async_session_maker() as session: - await PGMQOperation.create_queue_async(queue_name, unlogged=False, session=session, commit=True) - msg_id1 = await PGMQOperation.send_async(queue_name, MSG, delay=0, session=session, commit=True) - msg_id2 = await PGMQOperation.send_async(queue_name, MSG, delay=0, session=session, commit=True) - msg_id3 = await PGMQOperation.send_async(queue_name, MSG, delay=0, session=session, commit=True) + await PGMQOperation.create_queue_async( + queue_name, unlogged=False, session=session, commit=True + ) + msg_id1 = await PGMQOperation.send_async( + queue_name, MSG, delay=0, session=session, commit=True + ) + msg_id2 = await PGMQOperation.send_async( + queue_name, MSG, delay=0, session=session, commit=True + ) + msg_id3 = await PGMQOperation.send_async( + queue_name, MSG, delay=0, session=session, commit=True + ) msg_ids = [msg_id1, msg_id2, msg_id3] - + # Delete batch async with get_async_session_maker() as session: - deleted_ids = await PGMQOperation.delete_batch_async(queue_name, msg_ids, session=session, commit=True) - + deleted_ids = await PGMQOperation.delete_batch_async( + queue_name, msg_ids, session=session, commit=True + ) + assert len(deleted_ids) == 3 assert set(deleted_ids) == set(msg_ids) - + # Clean up async with get_async_session_maker() as session: - await PGMQOperation.drop_queue_async(queue_name, partitioned=False, session=session, commit=True) + await PGMQOperation.drop_queue_async( + queue_name, partitioned=False, session=session, commit=True + ) @pytest.mark.asyncio async def test_archive_batch_async(get_async_session_maker, db_session): """Test archiving a batch of messages asynchronously.""" queue_name = f"test_queue_{uuid.uuid4().hex}" - + # Create queue and send messages async with get_async_session_maker() as session: - await PGMQOperation.create_queue_async(queue_name, unlogged=False, session=session, commit=True) - msg_id1 = await PGMQOperation.send_async(queue_name, MSG, delay=0, session=session, commit=True) - msg_id2 = await PGMQOperation.send_async(queue_name, MSG, delay=0, session=session, commit=True) - msg_id3 = await PGMQOperation.send_async(queue_name, MSG, delay=0, session=session, commit=True) + await PGMQOperation.create_queue_async( + queue_name, unlogged=False, session=session, commit=True + ) + msg_id1 = await PGMQOperation.send_async( + queue_name, MSG, delay=0, session=session, commit=True + ) + msg_id2 = await PGMQOperation.send_async( + queue_name, MSG, delay=0, session=session, commit=True + ) + msg_id3 = await PGMQOperation.send_async( + queue_name, MSG, delay=0, session=session, commit=True + ) msg_ids = [msg_id1, msg_id2, msg_id3] - + # Archive batch async with get_async_session_maker() as session: - archived_ids = await PGMQOperation.archive_batch_async(queue_name, msg_ids, session=session, commit=True) - + archived_ids = await PGMQOperation.archive_batch_async( + queue_name, msg_ids, session=session, commit=True + ) + assert len(archived_ids) == 3 assert set(archived_ids) == set(msg_ids) - + # Clean up async with get_async_session_maker() as session: - await PGMQOperation.drop_queue_async(queue_name, partitioned=False, session=session, commit=True) + await PGMQOperation.drop_queue_async( + queue_name, partitioned=False, session=session, commit=True + ) @pytest.mark.asyncio async def test_purge_async(get_async_session_maker, db_session): """Test purging all messages from a queue asynchronously.""" queue_name = f"test_queue_{uuid.uuid4().hex}" - + # Create queue and send messages async with get_async_session_maker() as session: - await PGMQOperation.create_queue_async(queue_name, unlogged=False, session=session, commit=True) - await PGMQOperation.send_async(queue_name, MSG, delay=0, session=session, commit=True) - await PGMQOperation.send_async(queue_name, MSG, delay=0, session=session, commit=True) - await PGMQOperation.send_async(queue_name, MSG, delay=0, session=session, commit=True) - await PGMQOperation.send_async(queue_name, MSG, delay=0, session=session, commit=True) - await PGMQOperation.send_async(queue_name, MSG, delay=0, session=session, commit=True) - + await PGMQOperation.create_queue_async( + queue_name, unlogged=False, session=session, commit=True + ) + await PGMQOperation.send_async( + queue_name, MSG, delay=0, session=session, commit=True + ) + await PGMQOperation.send_async( + queue_name, MSG, delay=0, session=session, commit=True + ) + await PGMQOperation.send_async( + queue_name, MSG, delay=0, session=session, commit=True + ) + await PGMQOperation.send_async( + queue_name, MSG, delay=0, session=session, commit=True + ) + await PGMQOperation.send_async( + queue_name, MSG, delay=0, session=session, commit=True + ) + # Purge queue async with get_async_session_maker() as session: - purged_count = await PGMQOperation.purge_async(queue_name, session=session, commit=True) - + purged_count = await PGMQOperation.purge_async( + queue_name, session=session, commit=True + ) + assert purged_count == 5 - + # Clean up async with get_async_session_maker() as session: - await PGMQOperation.drop_queue_async(queue_name, partitioned=False, session=session, commit=True) + await PGMQOperation.drop_queue_async( + queue_name, partitioned=False, session=session, commit=True + ) @pytest.mark.asyncio async def test_read_with_poll_async(get_async_session_maker, db_session): """Test reading messages with polling asynchronously.""" queue_name = f"test_queue_{uuid.uuid4().hex}" - + # Create queue async with get_async_session_maker() as session: - await PGMQOperation.create_queue_async(queue_name, unlogged=False, session=session, commit=True) - + await PGMQOperation.create_queue_async( + queue_name, unlogged=False, session=session, commit=True + ) + # Test with empty queue - should return None after polling start = time.time() async with get_async_session_maker() as session: msgs = await PGMQOperation.read_with_poll_async( - queue_name, vt=30, qty=1, max_poll_seconds=2, poll_interval_ms=100, - session=session, commit=True + queue_name, + vt=30, + qty=1, + max_poll_seconds=2, + poll_interval_ms=100, + session=session, + commit=True, ) elapsed = time.time() - start - + assert msgs is None assert elapsed >= 2 # Should have polled for at least 2 seconds - + # Send a message and test immediate read async with get_async_session_maker() as session: - msg_id = await PGMQOperation.send_async(queue_name, MSG, delay=0, session=session, commit=True) - + msg_id = await PGMQOperation.send_async( + queue_name, MSG, delay=0, session=session, commit=True + ) + async with get_async_session_maker() as session: msgs = await PGMQOperation.read_with_poll_async( - queue_name, vt=30, qty=1, max_poll_seconds=5, poll_interval_ms=100, - session=session, commit=True + queue_name, + vt=30, + qty=1, + max_poll_seconds=5, + poll_interval_ms=100, + session=session, + commit=True, ) - + assert msgs is not None assert len(msgs) == 1 assert msgs[0].msg_id == msg_id - + # Clean up async with get_async_session_maker() as session: - await PGMQOperation.drop_queue_async(queue_name, partitioned=False, session=session, commit=True) + await PGMQOperation.drop_queue_async( + queue_name, partitioned=False, session=session, commit=True + ) @pytest.mark.asyncio async def test_drop_queue_async(get_async_session_maker, db_session): """Test dropping a queue asynchronously.""" queue_name = f"test_queue_{uuid.uuid4().hex}" - + # Create queue async with get_async_session_maker() as session: - await PGMQOperation.create_queue_async(queue_name, unlogged=False, session=session, commit=True) - + await PGMQOperation.create_queue_async( + queue_name, unlogged=False, session=session, commit=True + ) + # Verify queue exists assert check_queue_exists(db_session, queue_name) is True - + # Drop queue async with get_async_session_maker() as session: - dropped = await PGMQOperation.drop_queue_async(queue_name, partitioned=False, session=session, commit=True) - + dropped = await PGMQOperation.drop_queue_async( + queue_name, partitioned=False, session=session, commit=True + ) + assert dropped is True - + # Verify queue is dropped assert check_queue_exists(db_session, queue_name) is False @@ -789,34 +997,40 @@ async def test_check_pg_partman_ext_async(get_async_session_maker): async def test_create_partitioned_queue_async(get_async_session_maker, db_session): """Test creating a partitioned queue asynchronously.""" queue_name = f"part_{uuid.uuid4().hex[:20]}" - + # First ensure pg_partman extension is available try: async with get_async_session_maker() as session: await PGMQOperation.check_pg_partman_ext_async(session=session, commit=True) except Exception as e: pytest.skip(f"pg_partman extension not available: {e}") - + # Create partitioned queue with numeric partitioning async with get_async_session_maker() as session: await PGMQOperation.create_partitioned_queue_async( - queue_name, - partition_interval="10000", + queue_name, + partition_interval="10000", retention_interval="100000", - session=session, - commit=True + session=session, + commit=True, ) - + assert check_queue_exists(db_session, queue_name) is True - + # Test sending and reading from partitioned queue async with get_async_session_maker() as session: - msg_id = await PGMQOperation.send_async(queue_name, MSG, delay=0, session=session, commit=True) - msg = await PGMQOperation.read_async(queue_name, vt=30, session=session, commit=True) - + msg_id = await PGMQOperation.send_async( + queue_name, MSG, delay=0, session=session, commit=True + ) + msg = await PGMQOperation.read_async( + queue_name, vt=30, session=session, commit=True + ) + assert msg is not None assert msg.msg_id == msg_id - + # Clean up async with get_async_session_maker() as session: - await PGMQOperation.drop_queue_async(queue_name, partitioned=True, session=session, commit=True) + await PGMQOperation.drop_queue_async( + queue_name, partitioned=True, session=session, commit=True + ) diff --git a/tests/test_queue.py b/tests/test_queue.py index 9062b36..e4956f7 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -8,18 +8,15 @@ from tests.fixture_deps import ( PGMQ_WITH_QUEUE, - pgmq_all_variants, pgmq_setup_teardown, pgmq_partitioned_setup_teardown, + pgmq_all_variants, ) from tests._utils import check_queue_exists from tests.constant import MSG, LOCK_FILE_NAME -use_fixtures = [ - pgmq_setup_teardown, - pgmq_partitioned_setup_teardown, -] +use_fixtures = [pgmq_setup_teardown, pgmq_partitioned_setup_teardown, pgmq_all_variants] def test_create_queue(pgmq_all_variants, db_session): @@ -486,13 +483,13 @@ def test_read_with_poll_without_vt(pgmq_setup_teardown: PGMQ_WITH_QUEUE): """Test read_with_poll when vt parameter is not provided (None).""" pgmq, queue_name = pgmq_setup_teardown - + # Set a custom default vt for the pgmq instance pgmq.vt = 100 - + # Send a message msg_id = pgmq.send(queue_name, MSG) - + # Call read_with_poll with vt=None to test the fallback logic # When vt is None, it should fall back to using pgmq.vt value (100) msgs = pgmq.read_with_poll( @@ -502,67 +499,43 @@ def test_read_with_poll_without_vt(pgmq_setup_teardown: PGMQ_WITH_QUEUE): max_poll_seconds=2, poll_interval_ms=100, ) - + assert msgs is not None assert len(msgs) == 1 assert msgs[0].msg_id == msg_id assert msgs[0].message == MSG -def test_execute_operation_with_provided_sync_session(pgmq_by_session_maker, get_session_maker, db_session): +def test_execute_operation_with_provided_sync_session( + pgmq_by_session_maker, get_session_maker, db_session +): """Test _execute_operation sync path when session is provided.""" pgmq: PGMQueue = pgmq_by_session_maker queue_name = f"test_queue_{uuid.uuid4().hex}" - + # Create a session to pass to the operations # Using the same session across multiple operations demonstrates # that the sync path with provided session works correctly with get_session_maker() as session: # Create queue with provided session pgmq.create_queue(queue_name, session=session) - + # Verify queue was created assert check_queue_exists(db_session, queue_name) is True - + # Send a message with the same provided session msg_id = pgmq.send(queue_name, MSG, session=session) - + # Read message with the same provided session msg = pgmq.read(queue_name, vt=30, session=session) - + assert msg is not None assert msg.msg_id == msg_id assert msg.message == MSG - + # Clean up with the same provided session pgmq.drop_queue(queue_name, session=session) - - # Verify queue was dropped - assert check_queue_exists(db_session, queue_name) is False - -def test_execute_operation_async_with_session_none(pgmq_by_async_dsn, db_session): - """Test _execute_operation async path when session is None.""" - - pgmq: PGMQueue = pgmq_by_async_dsn - queue_name = f"test_queue_{uuid.uuid4().hex}" - - # Verify this is an async PGMQueue - assert pgmq.is_async is True - - # When session is None, _execute_operation creates a new async session - # and uses loop.run_until_complete to execute the operation - pgmq.create_queue(queue_name) - msg_id = pgmq.send(queue_name, MSG) - msg = pgmq.read(queue_name, vt=30) - - assert msg is not None - assert msg.msg_id == msg_id - assert msg.message == MSG - - # Clean up - pgmq.drop_queue(queue_name) - # Verify queue was dropped assert check_queue_exists(db_session, queue_name) is False