From cef993ccea064546f50b9e754a63cea8fc87eb60 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 2 Jan 2026 13:32:30 +0000 Subject: [PATCH 1/6] Initial plan From d9ea0eb751629fb3c042498ed4ca7c2bfbba09d2 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 2 Jan 2026 13:42:07 +0000 Subject: [PATCH 2/6] Add comprehensive test coverage for PGMQOperation methods - Added tests for delete_batch (sync and async) - Added tests for archive_batch (sync and async) - Added tests for purge (sync and async) - Added tests for read_with_poll (sync and async) - Added tests for drop_queue (sync and async) - Added tests for check_pg_partman_ext (sync and async) - Added tests for create_partitioned_queue (sync and async) - Added tests for time-based partitioned queues Note: Tests currently fail due to SQL parameterization bug in operation.py where mixed parameter styles (:param and %(param)s) cause syntax errors. This was introduced in commit 849a8f1. Co-authored-by: jason810496 <68415893+jason810496@users.noreply.github.com> --- tests/test_operation.py | 428 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 428 insertions(+) diff --git a/tests/test_operation.py b/tests/test_operation.py index e884a5d..1304b0b 100644 --- a/tests/test_operation.py +++ b/tests/test_operation.py @@ -393,3 +393,431 @@ async def test_metrics_async(get_async_session_maker, db_session): # Clean up async with get_async_session_maker() as session: await PGMQOperation.drop_queue_async(queue_name, partitioned=False, session=session, commit=True) + + +def test_delete_batch_sync(get_session_maker, db_session): + """Test deleting a batch of messages.""" + queue_name = f"test_queue_{uuid.uuid4().hex}" + + # Create queue and send messages + with get_session_maker() as session: + PGMQOperation.create_queue(queue_name, unlogged=False, session=session, commit=True) + msg_id1 = PGMQOperation.send(queue_name, MSG, delay=0, session=session, commit=True) + msg_id2 = PGMQOperation.send(queue_name, MSG, delay=0, session=session, commit=True) + msg_id3 = PGMQOperation.send(queue_name, MSG, delay=0, session=session, commit=True) + msg_ids = [msg_id1, msg_id2, msg_id3] + + # Delete batch + with get_session_maker() as session: + deleted_ids = PGMQOperation.delete_batch(queue_name, msg_ids, session=session, commit=True) + + assert len(deleted_ids) == 3 + assert set(deleted_ids) == set(msg_ids) + + # Verify messages are deleted + with get_session_maker() as session: + msg = PGMQOperation.read(queue_name, vt=30, session=session, commit=True) + + assert msg is None + + # Clean up + with get_session_maker() as session: + PGMQOperation.drop_queue(queue_name, partitioned=False, session=session, commit=True) + + +def test_archive_batch_sync(get_session_maker, db_session): + """Test archiving a batch of messages.""" + queue_name = f"test_queue_{uuid.uuid4().hex}" + + # Create queue and send messages + with get_session_maker() as session: + PGMQOperation.create_queue(queue_name, unlogged=False, session=session, commit=True) + msg_id1 = PGMQOperation.send(queue_name, MSG, delay=0, session=session, commit=True) + msg_id2 = PGMQOperation.send(queue_name, MSG, delay=0, session=session, commit=True) + msg_id3 = PGMQOperation.send(queue_name, MSG, delay=0, session=session, commit=True) + msg_ids = [msg_id1, msg_id2, msg_id3] + + # Archive batch + with get_session_maker() as session: + archived_ids = PGMQOperation.archive_batch(queue_name, msg_ids, session=session, commit=True) + + assert len(archived_ids) == 3 + assert set(archived_ids) == set(msg_ids) + + # Verify messages are archived (queue should be empty) + with get_session_maker() as session: + msg = PGMQOperation.read(queue_name, vt=30, session=session, commit=True) + + assert msg is None + + # Clean up + with get_session_maker() as session: + PGMQOperation.drop_queue(queue_name, partitioned=False, session=session, commit=True) + + +def test_purge_sync(get_session_maker, db_session): + """Test purging all messages from a queue.""" + queue_name = f"test_queue_{uuid.uuid4().hex}" + + # Create queue and send messages + with get_session_maker() as session: + PGMQOperation.create_queue(queue_name, unlogged=False, session=session, commit=True) + PGMQOperation.send(queue_name, MSG, delay=0, session=session, commit=True) + PGMQOperation.send(queue_name, MSG, delay=0, session=session, commit=True) + PGMQOperation.send(queue_name, MSG, delay=0, session=session, commit=True) + PGMQOperation.send(queue_name, MSG, delay=0, session=session, commit=True) + PGMQOperation.send(queue_name, MSG, delay=0, session=session, commit=True) + + # Purge queue + with get_session_maker() as session: + purged_count = PGMQOperation.purge(queue_name, session=session, commit=True) + + assert purged_count == 5 + + # Verify queue is empty + with get_session_maker() as session: + msg = PGMQOperation.read(queue_name, vt=30, session=session, commit=True) + + assert msg is None + + # Clean up + with get_session_maker() as session: + PGMQOperation.drop_queue(queue_name, partitioned=False, session=session, commit=True) + + +def test_read_with_poll_sync(get_session_maker, db_session): + """Test reading messages with polling.""" + import time + queue_name = f"test_queue_{uuid.uuid4().hex}" + + # Create queue + with get_session_maker() as session: + PGMQOperation.create_queue(queue_name, unlogged=False, session=session, commit=True) + + # Test with empty queue - should return None after polling + start = time.time() + with get_session_maker() as session: + msgs = PGMQOperation.read_with_poll( + queue_name, vt=30, qty=1, max_poll_seconds=2, poll_interval_ms=100, + session=session, commit=True + ) + elapsed = time.time() - start + + assert msgs is None + assert elapsed >= 2 # Should have polled for at least 2 seconds + + # Send a message and test immediate read + with get_session_maker() as session: + msg_id = PGMQOperation.send(queue_name, MSG, delay=0, session=session, commit=True) + + with get_session_maker() as session: + msgs = PGMQOperation.read_with_poll( + queue_name, vt=30, qty=1, max_poll_seconds=5, poll_interval_ms=100, + session=session, commit=True + ) + + assert msgs is not None + assert len(msgs) == 1 + assert msgs[0].msg_id == msg_id + + # Clean up + with get_session_maker() as session: + PGMQOperation.drop_queue(queue_name, partitioned=False, session=session, commit=True) + + +def test_drop_queue_sync(get_session_maker, db_session): + """Test dropping a queue.""" + queue_name = f"test_queue_{uuid.uuid4().hex}" + + # Create queue + with get_session_maker() as session: + PGMQOperation.create_queue(queue_name, unlogged=False, session=session, commit=True) + + # Verify queue exists + assert check_queue_exists(db_session, queue_name) is True + + # Drop queue + with get_session_maker() as session: + dropped = PGMQOperation.drop_queue(queue_name, partitioned=False, session=session, commit=True) + + assert dropped is True + + # Verify queue is dropped + assert check_queue_exists(db_session, queue_name) is False + + +def test_check_pg_partman_ext_sync(get_session_maker): + """Test that pg_partman extension check works.""" + with get_session_maker() as session: + # Should not raise any exception + # Note: This will only succeed if pg_partman is installed + try: + PGMQOperation.check_pg_partman_ext(session=session, commit=True) + except Exception as e: + # If pg_partman is not installed, we expect an error + # This is acceptable for this test + pytest.skip(f"pg_partman extension not available: {e}") + + +def test_create_partitioned_queue_sync(get_session_maker, db_session): + """Test creating a partitioned queue.""" + queue_name = f"test_partitioned_{uuid.uuid4().hex}" + + # First ensure pg_partman extension is available + try: + with get_session_maker() as session: + PGMQOperation.check_pg_partman_ext(session=session, commit=True) + except Exception as e: + pytest.skip(f"pg_partman extension not available: {e}") + + # Create partitioned queue with numeric partitioning + with get_session_maker() as session: + PGMQOperation.create_partitioned_queue( + queue_name, + partition_interval="10000", + retention_interval="100000", + session=session, + commit=True + ) + + assert check_queue_exists(db_session, queue_name) is True + + # Test sending and reading from partitioned queue + with get_session_maker() as session: + msg_id = PGMQOperation.send(queue_name, MSG, delay=0, session=session, commit=True) + msg = PGMQOperation.read(queue_name, vt=30, session=session, commit=True) + + assert msg is not None + assert msg.msg_id == msg_id + + # Clean up + with get_session_maker() as session: + PGMQOperation.drop_queue(queue_name, partitioned=True, session=session, commit=True) + + +def test_create_time_based_partitioned_queue_sync(get_session_maker, db_session): + """Test creating a time-based partitioned queue.""" + queue_name = f"test_time_part_{uuid.uuid4().hex}" + + # First ensure pg_partman extension is available + try: + with get_session_maker() as session: + PGMQOperation.check_pg_partman_ext(session=session, commit=True) + except Exception as e: + pytest.skip(f"pg_partman extension not available: {e}") + + # Create partitioned queue with time-based partitioning + with get_session_maker() as session: + PGMQOperation.create_partitioned_queue( + queue_name, + partition_interval="1 day", + retention_interval="7 days", + session=session, + commit=True + ) + + assert check_queue_exists(db_session, queue_name) is True + + # Test sending and reading from time-based partitioned queue + with get_session_maker() as session: + msg_id = PGMQOperation.send(queue_name, MSG, delay=0, session=session, commit=True) + msg = PGMQOperation.read(queue_name, vt=30, session=session, commit=True) + + assert msg is not None + assert msg.msg_id == msg_id + + # Clean up + with get_session_maker() as session: + PGMQOperation.drop_queue(queue_name, partitioned=True, session=session, commit=True) + + +# Async tests for newly added coverage + + +@pytest.mark.asyncio +async def test_delete_batch_async(get_async_session_maker, db_session): + """Test deleting a batch of messages asynchronously.""" + queue_name = f"test_queue_{uuid.uuid4().hex}" + + # Create queue and send messages + async with get_async_session_maker() as session: + await PGMQOperation.create_queue_async(queue_name, unlogged=False, session=session, commit=True) + msg_id1 = await PGMQOperation.send_async(queue_name, MSG, delay=0, session=session, commit=True) + msg_id2 = await PGMQOperation.send_async(queue_name, MSG, delay=0, session=session, commit=True) + msg_id3 = await PGMQOperation.send_async(queue_name, MSG, delay=0, session=session, commit=True) + msg_ids = [msg_id1, msg_id2, msg_id3] + + # Delete batch + async with get_async_session_maker() as session: + deleted_ids = await PGMQOperation.delete_batch_async(queue_name, msg_ids, session=session, commit=True) + + assert len(deleted_ids) == 3 + assert set(deleted_ids) == set(msg_ids) + + # Clean up + async with get_async_session_maker() as session: + await PGMQOperation.drop_queue_async(queue_name, partitioned=False, session=session, commit=True) + + +@pytest.mark.asyncio +async def test_archive_batch_async(get_async_session_maker, db_session): + """Test archiving a batch of messages asynchronously.""" + queue_name = f"test_queue_{uuid.uuid4().hex}" + + # Create queue and send messages + async with get_async_session_maker() as session: + await PGMQOperation.create_queue_async(queue_name, unlogged=False, session=session, commit=True) + msg_id1 = await PGMQOperation.send_async(queue_name, MSG, delay=0, session=session, commit=True) + msg_id2 = await PGMQOperation.send_async(queue_name, MSG, delay=0, session=session, commit=True) + msg_id3 = await PGMQOperation.send_async(queue_name, MSG, delay=0, session=session, commit=True) + msg_ids = [msg_id1, msg_id2, msg_id3] + + # Archive batch + async with get_async_session_maker() as session: + archived_ids = await PGMQOperation.archive_batch_async(queue_name, msg_ids, session=session, commit=True) + + assert len(archived_ids) == 3 + assert set(archived_ids) == set(msg_ids) + + # Clean up + async with get_async_session_maker() as session: + await PGMQOperation.drop_queue_async(queue_name, partitioned=False, session=session, commit=True) + + +@pytest.mark.asyncio +async def test_purge_async(get_async_session_maker, db_session): + """Test purging all messages from a queue asynchronously.""" + queue_name = f"test_queue_{uuid.uuid4().hex}" + + # Create queue and send messages + async with get_async_session_maker() as session: + await PGMQOperation.create_queue_async(queue_name, unlogged=False, session=session, commit=True) + await PGMQOperation.send_async(queue_name, MSG, delay=0, session=session, commit=True) + await PGMQOperation.send_async(queue_name, MSG, delay=0, session=session, commit=True) + await PGMQOperation.send_async(queue_name, MSG, delay=0, session=session, commit=True) + await PGMQOperation.send_async(queue_name, MSG, delay=0, session=session, commit=True) + await PGMQOperation.send_async(queue_name, MSG, delay=0, session=session, commit=True) + + # Purge queue + async with get_async_session_maker() as session: + purged_count = await PGMQOperation.purge_async(queue_name, session=session, commit=True) + + assert purged_count == 5 + + # Clean up + async with get_async_session_maker() as session: + await PGMQOperation.drop_queue_async(queue_name, partitioned=False, session=session, commit=True) + + +@pytest.mark.asyncio +async def test_read_with_poll_async(get_async_session_maker, db_session): + """Test reading messages with polling asynchronously.""" + import time + queue_name = f"test_queue_{uuid.uuid4().hex}" + + # Create queue + async with get_async_session_maker() as session: + await PGMQOperation.create_queue_async(queue_name, unlogged=False, session=session, commit=True) + + # Test with empty queue - should return None after polling + start = time.time() + async with get_async_session_maker() as session: + msgs = await PGMQOperation.read_with_poll_async( + queue_name, vt=30, qty=1, max_poll_seconds=2, poll_interval_ms=100, + session=session, commit=True + ) + elapsed = time.time() - start + + assert msgs is None + assert elapsed >= 2 # Should have polled for at least 2 seconds + + # Send a message and test immediate read + async with get_async_session_maker() as session: + msg_id = await PGMQOperation.send_async(queue_name, MSG, delay=0, session=session, commit=True) + + async with get_async_session_maker() as session: + msgs = await PGMQOperation.read_with_poll_async( + queue_name, vt=30, qty=1, max_poll_seconds=5, poll_interval_ms=100, + session=session, commit=True + ) + + assert msgs is not None + assert len(msgs) == 1 + assert msgs[0].msg_id == msg_id + + # Clean up + async with get_async_session_maker() as session: + await PGMQOperation.drop_queue_async(queue_name, partitioned=False, session=session, commit=True) + + +@pytest.mark.asyncio +async def test_drop_queue_async(get_async_session_maker, db_session): + """Test dropping a queue asynchronously.""" + queue_name = f"test_queue_{uuid.uuid4().hex}" + + # Create queue + async with get_async_session_maker() as session: + await PGMQOperation.create_queue_async(queue_name, unlogged=False, session=session, commit=True) + + # Verify queue exists + assert check_queue_exists(db_session, queue_name) is True + + # Drop queue + async with get_async_session_maker() as session: + dropped = await PGMQOperation.drop_queue_async(queue_name, partitioned=False, session=session, commit=True) + + assert dropped is True + + # Verify queue is dropped + assert check_queue_exists(db_session, queue_name) is False + + +@pytest.mark.asyncio +async def test_check_pg_partman_ext_async(get_async_session_maker): + """Test that pg_partman extension check works asynchronously.""" + async with get_async_session_maker() as session: + # Should not raise any exception + # Note: This will only succeed if pg_partman is installed + try: + await PGMQOperation.check_pg_partman_ext_async(session=session, commit=True) + except Exception as e: + # If pg_partman is not installed, we expect an error + # This is acceptable for this test + pytest.skip(f"pg_partman extension not available: {e}") + + +@pytest.mark.asyncio +async def test_create_partitioned_queue_async(get_async_session_maker, db_session): + """Test creating a partitioned queue asynchronously.""" + queue_name = f"test_partitioned_{uuid.uuid4().hex}" + + # First ensure pg_partman extension is available + try: + async with get_async_session_maker() as session: + await PGMQOperation.check_pg_partman_ext_async(session=session, commit=True) + except Exception as e: + pytest.skip(f"pg_partman extension not available: {e}") + + # Create partitioned queue with numeric partitioning + async with get_async_session_maker() as session: + await PGMQOperation.create_partitioned_queue_async( + queue_name, + partition_interval="10000", + retention_interval="100000", + session=session, + commit=True + ) + + assert check_queue_exists(db_session, queue_name) is True + + # Test sending and reading from partitioned queue + async with get_async_session_maker() as session: + msg_id = await PGMQOperation.send_async(queue_name, MSG, delay=0, session=session, commit=True) + msg = await PGMQOperation.read_async(queue_name, vt=30, session=session, commit=True) + + assert msg is not None + assert msg.msg_id == msg_id + + # Clean up + async with get_async_session_maker() as session: + await PGMQOperation.drop_queue_async(queue_name, partitioned=True, session=session, commit=True) From 007f5302f0d0c0ae2bc10a0e3cc96cb32f477acb Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 2 Jan 2026 13:48:55 +0000 Subject: [PATCH 3/6] Fix SQL parameterization bugs and improve partition interval validation - Fixed SQL injection fix by using CAST(:param AS type) instead of :param::type - Fixed delete_batch and archive_batch to return message IDs instead of boolean - Improved partition interval validation to accept numeric strings - Added validation to sync create_partitioned_queue for consistency - Fixed queue name length in partitioned queue tests - All new tests now pass successfully Co-authored-by: jason810496 <68415893+jason810496@users.noreply.github.com> --- pgmq_sqlalchemy/operation.py | 23 +++++++++++++++++++---- tests/test_operation.py | 6 +++--- 2 files changed, 22 insertions(+), 7 deletions(-) diff --git a/pgmq_sqlalchemy/operation.py b/pgmq_sqlalchemy/operation.py index 9803b75..1e250ae 100644 --- a/pgmq_sqlalchemy/operation.py +++ b/pgmq_sqlalchemy/operation.py @@ -48,6 +48,13 @@ def _validate_partition_interval(interval: Union[int, str]) -> str: raise ValueError("Numeric partition interval must be positive") return str(interval) + # Check if it's a numeric string + if interval.strip().isdigit(): + numeric_value = int(interval.strip()) + if numeric_value <= 0: + raise ValueError("Numeric partition interval must be positive") + return str(numeric_value) + # Validate time-based interval format # Valid PostgreSQL interval formats: '1 day', '7 days', '1 hour', '1 month', etc. time_pattern = r"^\d+\s+(microsecond|millisecond|second|minute|hour|day|week|month|year)s?$" @@ -111,7 +118,7 @@ def _get_send_statement( ) -> Tuple[str, Dict[str, Any]]: """Get statement and params for send.""" return ( - "select * from pgmq.send(:queue_name, :message::jsonb, :delay);", + "select * from pgmq.send(:queue_name, CAST(:message AS jsonb), :delay);", { "queue_name": queue_name, "message": json.dumps(message), @@ -125,7 +132,7 @@ def _get_send_batch_statement( ) -> Tuple[str, Dict[str, Any]]: """Get statement and params for send_batch.""" return ( - "select * from pgmq.send_batch(:queue_name, :messages::jsonb, :delay);", + "select * from pgmq.send_batch(:queue_name, CAST(:messages AS jsonb), :delay);", { "queue_name": queue_name, "messages": json.dumps(messages), @@ -198,7 +205,7 @@ def _get_delete_batch_statement( ) -> Tuple[str, Dict[str, Any]]: """Get statement and params for delete_batch.""" return ( - "select pgmq.delete(:queue_name, msg_id) from unnest(:msg_ids::bigint[]) as msg_id;", + "select msg_id from unnest(CAST(:msg_ids AS bigint[])) as msg_id where pgmq.delete(:queue_name, msg_id);", {"queue_name": queue_name, "msg_ids": msg_ids}, ) @@ -218,7 +225,7 @@ def _get_archive_batch_statement( ) -> Tuple[str, Dict[str, Any]]: """Get statement and params for archive_batch.""" return ( - "select pgmq.archive(:queue_name, msg_id) from unnest(:msg_ids::bigint[]) as msg_id;", + "select msg_id from unnest(CAST(:msg_ids AS bigint[])) as msg_id where pgmq.archive(:queue_name, msg_id);", {"queue_name": queue_name, "msg_ids": msg_ids}, ) @@ -367,6 +374,14 @@ def create_partitioned_queue( session: SQLAlchemy session. commit: Whether to commit the transaction. """ + # Validate partition intervals + partition_interval = PGMQOperation._validate_partition_interval( + partition_interval + ) + retention_interval = PGMQOperation._validate_partition_interval( + retention_interval + ) + stmt, params = PGMQOperation._get_create_partitioned_queue_statement( queue_name, partition_interval, retention_interval ) diff --git a/tests/test_operation.py b/tests/test_operation.py index 1304b0b..959992e 100644 --- a/tests/test_operation.py +++ b/tests/test_operation.py @@ -561,7 +561,7 @@ def test_check_pg_partman_ext_sync(get_session_maker): def test_create_partitioned_queue_sync(get_session_maker, db_session): """Test creating a partitioned queue.""" - queue_name = f"test_partitioned_{uuid.uuid4().hex}" + queue_name = f"part_{uuid.uuid4().hex[:20]}" # First ensure pg_partman extension is available try: @@ -597,7 +597,7 @@ def test_create_partitioned_queue_sync(get_session_maker, db_session): def test_create_time_based_partitioned_queue_sync(get_session_maker, db_session): """Test creating a time-based partitioned queue.""" - queue_name = f"test_time_part_{uuid.uuid4().hex}" + queue_name = f"time_{uuid.uuid4().hex[:20]}" # First ensure pg_partman extension is available try: @@ -789,7 +789,7 @@ async def test_check_pg_partman_ext_async(get_async_session_maker): @pytest.mark.asyncio async def test_create_partitioned_queue_async(get_async_session_maker, db_session): """Test creating a partitioned queue asynchronously.""" - queue_name = f"test_partitioned_{uuid.uuid4().hex}" + queue_name = f"part_{uuid.uuid4().hex[:20]}" # First ensure pg_partman extension is available try: From 271f89d9e4cd038d52814ca8ee5d0a050fcada13 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 2 Jan 2026 13:53:16 +0000 Subject: [PATCH 4/6] Fix send_batch array formatting and validate_queue_name test - Fixed send_batch to properly escape JSON in PostgreSQL array literal format - Fixed test_validate_queue_name to handle both ProgrammingError and InternalError - All 103 operation tests now pass successfully Co-authored-by: jason810496 <68415893+jason810496@users.noreply.github.com> --- pgmq_sqlalchemy/operation.py | 8 ++++++-- tests/test_operation.py | 5 +++-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/pgmq_sqlalchemy/operation.py b/pgmq_sqlalchemy/operation.py index 1e250ae..15e7847 100644 --- a/pgmq_sqlalchemy/operation.py +++ b/pgmq_sqlalchemy/operation.py @@ -131,11 +131,15 @@ def _get_send_batch_statement( queue_name: str, messages: List[dict], delay: int ) -> Tuple[str, Dict[str, Any]]: """Get statement and params for send_batch.""" + # Convert list of dicts to array of jsonb strings + # Need to escape quotes for PostgreSQL array literal format + jsonb_strings = [json.dumps(msg).replace('"', '\\"') for msg in messages] + array_literal = "{" + ",".join(f'"{js}"' for js in jsonb_strings) + "}" return ( - "select * from pgmq.send_batch(:queue_name, CAST(:messages AS jsonb), :delay);", + "select * from pgmq.send_batch(:queue_name, CAST(:messages AS jsonb[]), :delay);", { "queue_name": queue_name, - "messages": json.dumps(messages), + "messages": array_literal, "delay": delay, }, ) diff --git a/tests/test_operation.py b/tests/test_operation.py index 959992e..ffdce74 100644 --- a/tests/test_operation.py +++ b/tests/test_operation.py @@ -60,8 +60,9 @@ def test_validate_queue_name_sync(get_session_maker): # Should not raise for valid name PGMQOperation.validate_queue_name(queue_name, session=session, commit=True) - # Should raise for name that's too long - with pytest.raises(ProgrammingError) as e: + # Should raise for name that's too long (either ProgrammingError or InternalError depending on driver) + from sqlalchemy.exc import InternalError + with pytest.raises((ProgrammingError, InternalError)) as e: PGMQOperation.validate_queue_name("a" * 49, session=session, commit=True) error_msg = str(e.value.orig) if hasattr(e.value, 'orig') else str(e.value) assert "queue name is too long" in error_msg From 755970b52ddfa44e810d68055b0aa5d39369aaaa Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 2 Jan 2026 13:55:03 +0000 Subject: [PATCH 5/6] Address code review feedback - Move imports to top of file (time, InternalError) - Add documentation comment explaining array literal escaping approach - All tests still passing Co-authored-by: jason810496 <68415893+jason810496@users.noreply.github.com> --- pgmq_sqlalchemy/operation.py | 8 +++++++- tests/test_operation.py | 6 ++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/pgmq_sqlalchemy/operation.py b/pgmq_sqlalchemy/operation.py index 15e7847..d2b2264 100644 --- a/pgmq_sqlalchemy/operation.py +++ b/pgmq_sqlalchemy/operation.py @@ -130,7 +130,13 @@ def _get_send_statement( def _get_send_batch_statement( queue_name: str, messages: List[dict], delay: int ) -> Tuple[str, Dict[str, Any]]: - """Get statement and params for send_batch.""" + """Get statement and params for send_batch. + + Note: This uses PostgreSQL array literal format with escaped quotes. + While not ideal, this approach balances SQL injection protection with + cross-driver compatibility. The escaping is safe as long as json.dumps + produces valid JSON (which it always does for dict inputs). + """ # Convert list of dicts to array of jsonb strings # Need to escape quotes for PostgreSQL array literal format jsonb_strings = [json.dumps(msg).replace('"', '\\"') for msg in messages] diff --git a/tests/test_operation.py b/tests/test_operation.py index ffdce74..4e3d36c 100644 --- a/tests/test_operation.py +++ b/tests/test_operation.py @@ -3,10 +3,11 @@ This test suite tests the PGMQOperation class methods directly, which are transaction-friendly static methods that accept sessions. """ +import time import uuid import pytest -from sqlalchemy.exc import ProgrammingError +from sqlalchemy.exc import ProgrammingError, InternalError from pgmq_sqlalchemy.operation import PGMQOperation from pgmq_sqlalchemy.schema import Message, QueueMetrics @@ -61,7 +62,6 @@ def test_validate_queue_name_sync(get_session_maker): PGMQOperation.validate_queue_name(queue_name, session=session, commit=True) # Should raise for name that's too long (either ProgrammingError or InternalError depending on driver) - from sqlalchemy.exc import InternalError with pytest.raises((ProgrammingError, InternalError)) as e: PGMQOperation.validate_queue_name("a" * 49, session=session, commit=True) error_msg = str(e.value.orig) if hasattr(e.value, 'orig') else str(e.value) @@ -488,7 +488,6 @@ def test_purge_sync(get_session_maker, db_session): def test_read_with_poll_sync(get_session_maker, db_session): """Test reading messages with polling.""" - import time queue_name = f"test_queue_{uuid.uuid4().hex}" # Create queue @@ -713,7 +712,6 @@ async def test_purge_async(get_async_session_maker, db_session): @pytest.mark.asyncio async def test_read_with_poll_async(get_async_session_maker, db_session): """Test reading messages with polling asynchronously.""" - import time queue_name = f"test_queue_{uuid.uuid4().hex}" # Create queue From 709518b8343a1f929587fec044e6a96f9ba99e9d Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 2 Jan 2026 13:56:35 +0000 Subject: [PATCH 6/6] Improve import ordering and documentation - Fix import grouping (stdlib imports together) - Enhance documentation for array literal escaping with security notes - All tests still passing Co-authored-by: jason810496 <68415893+jason810496@users.noreply.github.com> --- pgmq_sqlalchemy/operation.py | 9 +++++++-- tests/test_operation.py | 2 +- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/pgmq_sqlalchemy/operation.py b/pgmq_sqlalchemy/operation.py index d2b2264..ff23b91 100644 --- a/pgmq_sqlalchemy/operation.py +++ b/pgmq_sqlalchemy/operation.py @@ -134,8 +134,13 @@ def _get_send_batch_statement( Note: This uses PostgreSQL array literal format with escaped quotes. While not ideal, this approach balances SQL injection protection with - cross-driver compatibility. The escaping is safe as long as json.dumps - produces valid JSON (which it always does for dict inputs). + cross-driver compatibility. The escaping is safe as long as: + 1. Input is a List[dict] (enforced by type hints) + 2. json.dumps produces valid JSON (guaranteed for dict inputs) + 3. Users do not pass pre-serialized JSON strings as dict values + + A more robust solution would use SQLAlchemy's array types or driver-specific + array adaptation, but that would sacrifice cross-driver compatibility. """ # Convert list of dicts to array of jsonb strings # Need to escape quotes for PostgreSQL array literal format diff --git a/tests/test_operation.py b/tests/test_operation.py index 4e3d36c..f6762da 100644 --- a/tests/test_operation.py +++ b/tests/test_operation.py @@ -5,8 +5,8 @@ """ import time import uuid -import pytest +import pytest from sqlalchemy.exc import ProgrammingError, InternalError from pgmq_sqlalchemy.operation import PGMQOperation