From 282df124e3c818d97a2bb2b74fcc1dae2c9034fb Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 2 Jan 2026 09:16:44 +0000 Subject: [PATCH 1/6] Initial plan From acba17dd99d176179e0dad6708a7dbc0358d86e6 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 2 Jan 2026 09:21:52 +0000 Subject: [PATCH 2/6] Fix SQL injection vulnerability and QueueMetrics bug in operation.py Co-authored-by: jason810496 <68415893+jason810496@users.noreply.github.com> --- pgmq_sqlalchemy/operation.py | 51 +++++++++++++++++++++--------------- 1 file changed, 30 insertions(+), 21 deletions(-) diff --git a/pgmq_sqlalchemy/operation.py b/pgmq_sqlalchemy/operation.py index 5f55626..3c6911c 100644 --- a/pgmq_sqlalchemy/operation.py +++ b/pgmq_sqlalchemy/operation.py @@ -6,7 +6,6 @@ from sqlalchemy.ext.asyncio import AsyncSession from .schema import Message, QueueMetrics -from ._utils import encode_dict_to_psql, encode_list_to_psql class PGMQOperation: @@ -106,19 +105,33 @@ def _get_list_queues_statement() -> Tuple[str, Dict[str, Any]]: return "select queue_name from pgmq.list_queues();", {} @staticmethod - def _get_send_statement(queue_name: str, message: dict, delay: int) -> str: - """Get statement for send (no params, using f-string).""" - encoded_message = encode_dict_to_psql(message) - return f"select * from pgmq.send('{queue_name}',{encoded_message},{delay});" + def _get_send_statement( + queue_name: str, message: dict, delay: int + ) -> Tuple[str, Dict[str, Any]]: + """Get statement and params for send.""" + import json + return ( + "select * from pgmq.send(:queue_name, :message::jsonb, :delay);", + { + "queue_name": queue_name, + "message": json.dumps(message), + "delay": delay, + }, + ) @staticmethod def _get_send_batch_statement( queue_name: str, messages: List[dict], delay: int - ) -> str: - """Get statement for send_batch (no params, using f-string).""" - encoded_messages = encode_list_to_psql(messages) + ) -> Tuple[str, Dict[str, Any]]: + """Get statement and params for send_batch.""" + import json return ( - f"select * from pgmq.send_batch('{queue_name}',{encoded_messages},{delay});" + "select * from pgmq.send_batch(:queue_name, :messages, :delay);", + { + "queue_name": queue_name, + "messages": [json.dumps(m) for m in messages], + "delay": delay, + }, ) @staticmethod @@ -546,8 +559,8 @@ def send( Returns: The message ID. """ - stmt = PGMQOperation._get_send_statement(queue_name, message, delay) - row = session.execute(text(stmt)).fetchone() + stmt, params = PGMQOperation._get_send_statement(queue_name, message, delay) + row = session.execute(text(stmt), params).fetchone() if commit: session.commit() return row[0] @@ -573,8 +586,8 @@ async def send_async( Returns: The message ID. """ - stmt = PGMQOperation._get_send_statement(queue_name, message, delay) - row = (await session.execute(text(stmt))).fetchone() + stmt, params = PGMQOperation._get_send_statement(queue_name, message, delay) + row = (await session.execute(text(stmt), params)).fetchone() if commit: await session.commit() return row[0] @@ -600,8 +613,8 @@ def send_batch( Returns: List of message IDs. """ - stmt = PGMQOperation._get_send_batch_statement(queue_name, messages, delay) - rows = session.execute(text(stmt)).fetchall() + stmt, params = PGMQOperation._get_send_batch_statement(queue_name, messages, delay) + rows = session.execute(text(stmt), params).fetchall() if commit: session.commit() return [row[0] for row in rows] @@ -627,8 +640,8 @@ async def send_batch_async( Returns: List of message IDs. """ - stmt = PGMQOperation._get_send_batch_statement(queue_name, messages, delay) - rows = (await session.execute(text(stmt))).fetchall() + stmt, params = PGMQOperation._get_send_batch_statement(queue_name, messages, delay) + rows = (await session.execute(text(stmt), params)).fetchall() if commit: await session.commit() return [row[0] for row in rows] @@ -1250,7 +1263,6 @@ def metrics( newest_msg_age_sec=row[2], oldest_msg_age_sec=row[3], total_messages=row[4], - scrape_time=row[5], ) @staticmethod @@ -1282,7 +1294,6 @@ async def metrics_async( newest_msg_age_sec=row[2], oldest_msg_age_sec=row[3], total_messages=row[4], - scrape_time=row[5], ) @staticmethod @@ -1313,7 +1324,6 @@ def metrics_all( newest_msg_age_sec=row[2], oldest_msg_age_sec=row[3], total_messages=row[4], - scrape_time=row[5], ) for row in rows ] @@ -1346,7 +1356,6 @@ async def metrics_all_async( newest_msg_age_sec=row[2], oldest_msg_age_sec=row[3], total_messages=row[4], - scrape_time=row[5], ) for row in rows ] From afd8e34faaacf4829ce64d544c866cfa33f9d9e5 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 2 Jan 2026 09:24:11 +0000 Subject: [PATCH 3/6] Refactor queue.py to reduce boilerplate with _execute_operation helper Co-authored-by: jason810496 <68415893+jason810496@users.noreply.github.com> --- pgmq_sqlalchemy/queue.py | 253 +++++++++++++-------------------------- 1 file changed, 84 insertions(+), 169 deletions(-) diff --git a/pgmq_sqlalchemy/queue.py b/pgmq_sqlalchemy/queue.py index 07ba5a8..79ea5c7 100644 --- a/pgmq_sqlalchemy/queue.py +++ b/pgmq_sqlalchemy/queue.py @@ -141,6 +141,43 @@ def _check_pg_partman_ext(self) -> None: return self.loop.run_until_complete(self._check_pg_partman_ext_async()) return self._check_pg_partman_ext_sync() + def _execute_operation( + self, + op_sync, + op_async, + session: Optional[SESSION_TYPE], + commit: bool, + *args, + **kwargs + ): + """Helper method to execute sync or async operations with session management. + + Args: + op_sync: The synchronous operation function from PGMQOperation + op_async: The asynchronous operation function from PGMQOperation + session: Optional session to use (if None, creates a new one) + commit: Whether to commit the transaction + *args: Positional arguments to pass to the operation + **kwargs: Keyword arguments to pass to the operation + + Returns: + The result from the operation + """ + if self.is_async: + if session is None: + async def _run(): + async with self.session_maker() as s: + return await op_async(*args, session=s, commit=commit, **kwargs) + return self.loop.run_until_complete(_run()) + return self.loop.run_until_complete( + op_async(*args, session=session, commit=commit, **kwargs) + ) + + if session is None: + with self.session_maker() as s: + return op_sync(*args, session=s, commit=commit, **kwargs) + return op_sync(*args, session=session, commit=commit, **kwargs) + def create_queue( self, queue_name: str, @@ -165,29 +202,13 @@ def create_queue( pgmq_client.create_queue('my_queue', unlogged=True) """ - if self.is_async: - if session is None: - - async def _create(): - async with self.session_maker() as s: - await PGMQOperation.create_queue_async( - queue_name, unlogged, session=s, commit=commit - ) - - return self.loop.run_until_complete(_create()) - return self.loop.run_until_complete( - PGMQOperation.create_queue_async( - queue_name, unlogged, session=session, commit=commit - ) - ) - - if session is None: - with self.session_maker() as s: - return PGMQOperation.create_queue( - queue_name, unlogged, session=s, commit=commit - ) - return PGMQOperation.create_queue( - queue_name, unlogged, session=session, commit=commit + return self._execute_operation( + PGMQOperation.create_queue, + PGMQOperation.create_queue_async, + session, + commit, + queue_name, + unlogged, ) def create_partitioned_queue( @@ -234,45 +255,14 @@ def create_partitioned_queue( # check if the pg_partman extension exists before creating a partitioned queue at runtime self._check_pg_partman_ext() - if self.is_async: - if session is None: - - async def _create(): - async with self.session_maker() as s: - await PGMQOperation.create_partitioned_queue_async( - queue_name, - str(partition_interval), - str(retention_interval), - session=s, - commit=commit, - ) - - return self.loop.run_until_complete(_create()) - return self.loop.run_until_complete( - PGMQOperation.create_partitioned_queue_async( - queue_name, - str(partition_interval), - str(retention_interval), - session=session, - commit=commit, - ) - ) - - if session is None: - with self.session_maker() as s: - return PGMQOperation.create_partitioned_queue( - queue_name, - str(partition_interval), - str(retention_interval), - session=s, - commit=commit, - ) - return PGMQOperation.create_partitioned_queue( + return self._execute_operation( + PGMQOperation.create_partitioned_queue, + PGMQOperation.create_partitioned_queue_async, + session, + commit, queue_name, str(partition_interval), str(retention_interval), - session=session, - commit=commit, ) def validate_queue_name( @@ -285,29 +275,12 @@ def validate_queue_name( """ * Will raise an error if the ``queue_name`` is more than 48 characters. """ - if self.is_async: - if session is None: - - async def _validate(): - async with self.session_maker() as s: - await PGMQOperation.validate_queue_name_async( - queue_name, session=s, commit=commit - ) - - return self.loop.run_until_complete(_validate()) - return self.loop.run_until_complete( - PGMQOperation.validate_queue_name_async( - queue_name, session=session, commit=commit - ) - ) - - if session is None: - with self.session_maker() as s: - return PGMQOperation.validate_queue_name( - queue_name, session=s, commit=commit - ) - return PGMQOperation.validate_queue_name( - queue_name, session=session, commit=commit + return self._execute_operation( + PGMQOperation.validate_queue_name, + PGMQOperation.validate_queue_name_async, + session, + commit, + queue_name, ) def drop_queue( @@ -339,29 +312,13 @@ def drop_queue( if partitioned: self._check_pg_partman_ext() - if self.is_async: - if session is None: - - async def _drop(): - async with self.session_maker() as s: - return await PGMQOperation.drop_queue_async( - queue, partitioned, session=s, commit=commit - ) - - return self.loop.run_until_complete(_drop()) - return self.loop.run_until_complete( - PGMQOperation.drop_queue_async( - queue, partitioned, session=session, commit=commit - ) - ) - - if session is None: - with self.session_maker() as s: - return PGMQOperation.drop_queue( - queue, partitioned, session=s, commit=commit - ) - return PGMQOperation.drop_queue( - queue, partitioned, session=session, commit=commit + return self._execute_operation( + PGMQOperation.drop_queue, + PGMQOperation.drop_queue_async, + session, + commit, + queue, + partitioned, ) def list_queues( @@ -377,24 +334,12 @@ def list_queues( queue_list = pgmq_client.list_queues() print(queue_list) """ - if self.is_async: - if session is None: - - async def _list(): - async with self.session_maker() as s: - return await PGMQOperation.list_queues_async( - session=s, commit=commit - ) - - return self.loop.run_until_complete(_list()) - return self.loop.run_until_complete( - PGMQOperation.list_queues_async(session=session, commit=commit) - ) - - if session is None: - with self.session_maker() as s: - return PGMQOperation.list_queues(session=s, commit=commit) - return PGMQOperation.list_queues(session=session, commit=commit) + return self._execute_operation( + PGMQOperation.list_queues, + PGMQOperation.list_queues_async, + session, + commit, + ) def send( self, @@ -423,29 +368,14 @@ def send( msg = pgmq_client.read('my_queue') assert msg is not None """ - if self.is_async: - if session is None: - - async def _send(): - async with self.session_maker() as s: - return await PGMQOperation.send_async( - queue_name, message, delay, session=s, commit=commit - ) - - return self.loop.run_until_complete(_send()) - return self.loop.run_until_complete( - PGMQOperation.send_async( - queue_name, message, delay, session=session, commit=commit - ) - ) - - if session is None: - with self.session_maker() as s: - return PGMQOperation.send( - queue_name, message, delay, session=s, commit=commit - ) - return PGMQOperation.send( - queue_name, message, delay, session=session, commit=commit + return self._execute_operation( + PGMQOperation.send, + PGMQOperation.send_async, + session, + commit, + queue_name, + message, + delay, ) def send_batch( @@ -469,29 +399,14 @@ def send_batch( msg_ids = pgmq_client.send_batch('my_queue', msgs, delay=10) """ - if self.is_async: - if session is None: - - async def _send_batch(): - async with self.session_maker() as s: - return await PGMQOperation.send_batch_async( - queue_name, messages, delay, session=s, commit=commit - ) - - return self.loop.run_until_complete(_send_batch()) - return self.loop.run_until_complete( - PGMQOperation.send_batch_async( - queue_name, messages, delay, session=session, commit=commit - ) - ) - - if session is None: - with self.session_maker() as s: - return PGMQOperation.send_batch( - queue_name, messages, delay, session=s, commit=commit - ) - return PGMQOperation.send_batch( - queue_name, messages, delay, session=session, commit=commit + return self._execute_operation( + PGMQOperation.send_batch, + PGMQOperation.send_batch_async, + session, + commit, + queue_name, + messages, + delay, ) def read( From adbd5afd05ba5daef6125f6475c52056df7deb1b Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 2 Jan 2026 09:26:39 +0000 Subject: [PATCH 4/6] Add comprehensive tests for PGMQOperation class Co-authored-by: jason810496 <68415893+jason810496@users.noreply.github.com> --- tests/test_operation.py | 395 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 395 insertions(+) diff --git a/tests/test_operation.py b/tests/test_operation.py index e69de29..76b8df5 100644 --- a/tests/test_operation.py +++ b/tests/test_operation.py @@ -0,0 +1,395 @@ +"""Tests for PGMQOperation class. + +This test suite tests the PGMQOperation class methods directly, +which are transaction-friendly static methods that accept sessions. +""" +import uuid +import pytest + +from sqlalchemy.exc import ProgrammingError + +from pgmq_sqlalchemy.operation import PGMQOperation +from pgmq_sqlalchemy.schema import Message, QueueMetrics +from tests._utils import check_queue_exists +from tests.constant import MSG + + +# Sync tests + + +def test_check_pgmq_ext_sync(get_session_maker): + """Test that pgmq extension check works.""" + with get_session_maker() as session: + # Should not raise any exception + PGMQOperation.check_pgmq_ext(session=session, commit=True) + + +def test_create_queue_sync(get_session_maker, db_session): + """Test creating a queue using PGMQOperation.""" + queue_name = f"test_queue_{uuid.uuid4().hex}" + + with get_session_maker() as session: + PGMQOperation.create_queue(queue_name, unlogged=False, session=session, commit=True) + + assert check_queue_exists(db_session, queue_name) is True + + # Clean up + with get_session_maker() as session: + PGMQOperation.drop_queue(queue_name, partitioned=False, session=session, commit=True) + + +def test_create_unlogged_queue_sync(get_session_maker, db_session): + """Test creating an unlogged queue using PGMQOperation.""" + queue_name = f"test_queue_{uuid.uuid4().hex}" + + with get_session_maker() as session: + PGMQOperation.create_queue(queue_name, unlogged=True, session=session, commit=True) + + assert check_queue_exists(db_session, queue_name) is True + + # Clean up + with get_session_maker() as session: + PGMQOperation.drop_queue(queue_name, partitioned=False, session=session, commit=True) + + +def test_validate_queue_name_sync(get_session_maker): + """Test queue name validation.""" + queue_name = f"test_queue_{uuid.uuid4().hex}" + + with get_session_maker() as session: + # Should not raise for valid name + PGMQOperation.validate_queue_name(queue_name, session=session, commit=True) + + # Should raise for name that's too long + with pytest.raises(Exception) as e: + PGMQOperation.validate_queue_name("a" * 49, session=session, commit=True) + error_msg = str(e.value.orig) + assert "queue name is too long" in error_msg + + +def test_list_queues_sync(get_session_maker, db_session): + """Test listing queues.""" + queue_name = f"test_queue_{uuid.uuid4().hex}" + + # Create a queue + with get_session_maker() as session: + PGMQOperation.create_queue(queue_name, unlogged=False, session=session, commit=True) + + # List queues + with get_session_maker() as session: + queues = PGMQOperation.list_queues(session=session, commit=True) + + assert queue_name in queues + + # Clean up + with get_session_maker() as session: + PGMQOperation.drop_queue(queue_name, partitioned=False, session=session, commit=True) + + +def test_send_and_read_sync(get_session_maker, db_session): + """Test sending and reading messages.""" + queue_name = f"test_queue_{uuid.uuid4().hex}" + + # Create queue + with get_session_maker() as session: + PGMQOperation.create_queue(queue_name, unlogged=False, session=session, commit=True) + + # Send a message + with get_session_maker() as session: + msg_id = PGMQOperation.send(queue_name, MSG, delay=0, session=session, commit=True) + + assert msg_id > 0 + + # Read the message + with get_session_maker() as session: + msg = PGMQOperation.read(queue_name, vt=30, session=session, commit=True) + + assert msg is not None + assert msg.msg_id == msg_id + assert msg.message == MSG + + # Clean up + with get_session_maker() as session: + PGMQOperation.drop_queue(queue_name, partitioned=False, session=session, commit=True) + + +def test_send_batch_sync(get_session_maker, db_session): + """Test sending a batch of messages.""" + queue_name = f"test_queue_{uuid.uuid4().hex}" + messages = [{"key": f"value{i}"} for i in range(5)] + + # Create queue + with get_session_maker() as session: + PGMQOperation.create_queue(queue_name, unlogged=False, session=session, commit=True) + + # Send batch + with get_session_maker() as session: + msg_ids = PGMQOperation.send_batch(queue_name, messages, delay=0, session=session, commit=True) + + assert len(msg_ids) == 5 + + # Read batch + with get_session_maker() as session: + msgs = PGMQOperation.read_batch(queue_name, vt=30, batch_size=5, session=session, commit=True) + + assert len(msgs) == 5 + + # Clean up + with get_session_maker() as session: + PGMQOperation.drop_queue(queue_name, partitioned=False, session=session, commit=True) + + +def test_pop_sync(get_session_maker, db_session): + """Test popping a message from the queue.""" + queue_name = f"test_queue_{uuid.uuid4().hex}" + + # Create queue and send message + with get_session_maker() as session: + PGMQOperation.create_queue(queue_name, unlogged=False, session=session, commit=True) + msg_id = PGMQOperation.send(queue_name, MSG, delay=0, session=session, commit=True) + + # Pop message + with get_session_maker() as session: + msg = PGMQOperation.pop(queue_name, session=session, commit=True) + + assert msg is not None + assert msg.msg_id == msg_id + + # Verify queue is empty + with get_session_maker() as session: + msg2 = PGMQOperation.pop(queue_name, session=session, commit=True) + + assert msg2 is None + + # Clean up + with get_session_maker() as session: + PGMQOperation.drop_queue(queue_name, partitioned=False, session=session, commit=True) + + +def test_delete_sync(get_session_maker, db_session): + """Test deleting a message.""" + queue_name = f"test_queue_{uuid.uuid4().hex}" + + # Create queue and send message + with get_session_maker() as session: + PGMQOperation.create_queue(queue_name, unlogged=False, session=session, commit=True) + msg_id = PGMQOperation.send(queue_name, MSG, delay=0, session=session, commit=True) + + # Delete message + with get_session_maker() as session: + deleted = PGMQOperation.delete(queue_name, msg_id, session=session, commit=True) + + assert deleted is True + + # Clean up + with get_session_maker() as session: + PGMQOperation.drop_queue(queue_name, partitioned=False, session=session, commit=True) + + +def test_set_vt_sync(get_session_maker, db_session): + """Test setting visibility timeout.""" + queue_name = f"test_queue_{uuid.uuid4().hex}" + + # Create queue and send message + with get_session_maker() as session: + PGMQOperation.create_queue(queue_name, unlogged=False, session=session, commit=True) + msg_id = PGMQOperation.send(queue_name, MSG, delay=0, session=session, commit=True) + # Read message to set initial vt + PGMQOperation.read(queue_name, vt=5, session=session, commit=True) + + # Set new vt + with get_session_maker() as session: + msg = PGMQOperation.set_vt(queue_name, msg_id, vt=60, session=session, commit=True) + + assert msg is not None + + # Clean up + with get_session_maker() as session: + PGMQOperation.drop_queue(queue_name, partitioned=False, session=session, commit=True) + + +def test_archive_sync(get_session_maker, db_session): + """Test archiving a message.""" + queue_name = f"test_queue_{uuid.uuid4().hex}" + + # Create queue and send message + with get_session_maker() as session: + PGMQOperation.create_queue(queue_name, unlogged=False, session=session, commit=True) + msg_id = PGMQOperation.send(queue_name, MSG, delay=0, session=session, commit=True) + + # Archive message + with get_session_maker() as session: + archived = PGMQOperation.archive(queue_name, msg_id, session=session, commit=True) + + assert archived is True + + # Clean up + with get_session_maker() as session: + PGMQOperation.drop_queue(queue_name, partitioned=False, session=session, commit=True) + + +def test_metrics_sync(get_session_maker, db_session): + """Test getting queue metrics.""" + queue_name = f"test_queue_{uuid.uuid4().hex}" + + # Create queue + with get_session_maker() as session: + PGMQOperation.create_queue(queue_name, unlogged=False, session=session, commit=True) + + # Get metrics for empty queue + with get_session_maker() as session: + metrics = PGMQOperation.metrics(queue_name, session=session, commit=True) + + assert metrics is not None + assert isinstance(metrics, QueueMetrics) + assert metrics.queue_name == queue_name + assert metrics.queue_length == 0 + assert metrics.total_messages == 0 + + # Send some messages + with get_session_maker() as session: + for i in range(3): + PGMQOperation.send(queue_name, {"index": i}, delay=0, session=session, commit=True) + + # Get metrics after adding messages + with get_session_maker() as session: + metrics = PGMQOperation.metrics(queue_name, session=session, commit=True) + + assert metrics.queue_length == 3 + assert metrics.total_messages == 3 + + # Clean up + with get_session_maker() as session: + PGMQOperation.drop_queue(queue_name, partitioned=False, session=session, commit=True) + + +def test_metrics_all_sync(get_session_maker, db_session): + """Test getting metrics for all queues.""" + queue_name1 = f"test_queue_{uuid.uuid4().hex}" + queue_name2 = f"test_queue_{uuid.uuid4().hex}" + + # Create two queues + with get_session_maker() as session: + PGMQOperation.create_queue(queue_name1, unlogged=False, session=session, commit=True) + PGMQOperation.create_queue(queue_name2, unlogged=False, session=session, commit=True) + + # Get metrics for all queues + with get_session_maker() as session: + all_metrics = PGMQOperation.metrics_all(session=session, commit=True) + + assert all_metrics is not None + assert len(all_metrics) >= 2 + queue_names = [m.queue_name for m in all_metrics] + assert queue_name1 in queue_names + assert queue_name2 in queue_names + + # Clean up + with get_session_maker() as session: + PGMQOperation.drop_queue(queue_name1, partitioned=False, session=session, commit=True) + PGMQOperation.drop_queue(queue_name2, partitioned=False, session=session, commit=True) + + +def test_transaction_rollback_sync(get_session_maker, db_session): + """Test that operations can be rolled back when commit=False.""" + queue_name = f"test_queue_{uuid.uuid4().hex}" + + # Create queue with commit=False, then rollback + with get_session_maker() as session: + PGMQOperation.create_queue(queue_name, unlogged=False, session=session, commit=False) + session.rollback() + + # Queue should not exist + assert check_queue_exists(db_session, queue_name) is False + + +def test_transaction_commit_sync(get_session_maker, db_session): + """Test that operations are committed when commit=True.""" + queue_name = f"test_queue_{uuid.uuid4().hex}" + + # Create queue with commit=True + with get_session_maker() as session: + PGMQOperation.create_queue(queue_name, unlogged=False, session=session, commit=True) + + # Queue should exist + assert check_queue_exists(db_session, queue_name) is True + + # Clean up + with get_session_maker() as session: + PGMQOperation.drop_queue(queue_name, partitioned=False, session=session, commit=True) + + +# Async tests + + +@pytest.mark.asyncio +async def test_check_pgmq_ext_async(get_async_session_maker): + """Test that pgmq extension check works asynchronously.""" + async with get_async_session_maker() as session: + # Should not raise any exception + await PGMQOperation.check_pgmq_ext_async(session=session, commit=True) + + +@pytest.mark.asyncio +async def test_create_queue_async(get_async_session_maker, db_session): + """Test creating a queue using PGMQOperation asynchronously.""" + queue_name = f"test_queue_{uuid.uuid4().hex}" + + async with get_async_session_maker() as session: + await PGMQOperation.create_queue_async(queue_name, unlogged=False, session=session, commit=True) + + assert check_queue_exists(db_session, queue_name) is True + + # Clean up + async with get_async_session_maker() as session: + await PGMQOperation.drop_queue_async(queue_name, partitioned=False, session=session, commit=True) + + +@pytest.mark.asyncio +async def test_send_and_read_async(get_async_session_maker, db_session): + """Test sending and reading messages asynchronously.""" + queue_name = f"test_queue_{uuid.uuid4().hex}" + + # Create queue + async with get_async_session_maker() as session: + await PGMQOperation.create_queue_async(queue_name, unlogged=False, session=session, commit=True) + + # Send a message + async with get_async_session_maker() as session: + msg_id = await PGMQOperation.send_async(queue_name, MSG, delay=0, session=session, commit=True) + + assert msg_id > 0 + + # Read the message + async with get_async_session_maker() as session: + msg = await PGMQOperation.read_async(queue_name, vt=30, session=session, commit=True) + + assert msg is not None + assert msg.msg_id == msg_id + assert msg.message == MSG + + # Clean up + async with get_async_session_maker() as session: + await PGMQOperation.drop_queue_async(queue_name, partitioned=False, session=session, commit=True) + + +@pytest.mark.asyncio +async def test_metrics_async(get_async_session_maker, db_session): + """Test getting queue metrics asynchronously.""" + queue_name = f"test_queue_{uuid.uuid4().hex}" + + # Create queue + async with get_async_session_maker() as session: + await PGMQOperation.create_queue_async(queue_name, unlogged=False, session=session, commit=True) + + # Get metrics + async with get_async_session_maker() as session: + metrics = await PGMQOperation.metrics_async(queue_name, session=session, commit=True) + + assert metrics is not None + assert isinstance(metrics, QueueMetrics) + assert metrics.queue_name == queue_name + assert metrics.queue_length == 0 + + # Clean up + async with get_async_session_maker() as session: + await PGMQOperation.drop_queue_async(queue_name, partitioned=False, session=session, commit=True) From 9ef221a7c9ef6ba499073dfbd5819e2cfbecb4b7 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 2 Jan 2026 09:27:50 +0000 Subject: [PATCH 5/6] Fix json import and send_batch parameter handling Co-authored-by: jason810496 <68415893+jason810496@users.noreply.github.com> --- pgmq_sqlalchemy/operation.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/pgmq_sqlalchemy/operation.py b/pgmq_sqlalchemy/operation.py index 3c6911c..9803b75 100644 --- a/pgmq_sqlalchemy/operation.py +++ b/pgmq_sqlalchemy/operation.py @@ -1,5 +1,6 @@ from typing import List, Optional, Tuple, Dict, Any, Union import re +import json from sqlalchemy import text from sqlalchemy.orm import Session @@ -109,7 +110,6 @@ def _get_send_statement( queue_name: str, message: dict, delay: int ) -> Tuple[str, Dict[str, Any]]: """Get statement and params for send.""" - import json return ( "select * from pgmq.send(:queue_name, :message::jsonb, :delay);", { @@ -124,12 +124,11 @@ def _get_send_batch_statement( queue_name: str, messages: List[dict], delay: int ) -> Tuple[str, Dict[str, Any]]: """Get statement and params for send_batch.""" - import json return ( - "select * from pgmq.send_batch(:queue_name, :messages, :delay);", + "select * from pgmq.send_batch(:queue_name, :messages::jsonb, :delay);", { "queue_name": queue_name, - "messages": [json.dumps(m) for m in messages], + "messages": json.dumps(messages), "delay": delay, }, ) From ddabdaf27d0cd7241a4c1066a2dbb5b9e030f1c4 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 2 Jan 2026 09:29:02 +0000 Subject: [PATCH 6/6] Fix test exception handling to be more specific Co-authored-by: jason810496 <68415893+jason810496@users.noreply.github.com> --- tests/test_operation.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_operation.py b/tests/test_operation.py index 76b8df5..e884a5d 100644 --- a/tests/test_operation.py +++ b/tests/test_operation.py @@ -61,9 +61,9 @@ def test_validate_queue_name_sync(get_session_maker): PGMQOperation.validate_queue_name(queue_name, session=session, commit=True) # Should raise for name that's too long - with pytest.raises(Exception) as e: + with pytest.raises(ProgrammingError) as e: PGMQOperation.validate_queue_name("a" * 49, session=session, commit=True) - error_msg = str(e.value.orig) + error_msg = str(e.value.orig) if hasattr(e.value, 'orig') else str(e.value) assert "queue name is too long" in error_msg