Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions tests/test_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -480,3 +480,89 @@ def test_create_partitioned_queue_invalid_numeric_interval(pgmq_all_variants):
queue_name, partition_interval=-100, retention_interval=100000
)
assert "Numeric partition interval must be positive" in str(e.value)


def test_read_with_poll_without_vt(pgmq_setup_teardown: PGMQ_WITH_QUEUE):
"""Test read_with_poll when vt parameter is not provided (None)."""

pgmq, queue_name = pgmq_setup_teardown

# Set a custom default vt for the pgmq instance
pgmq.vt = 100

# Send a message
msg_id = pgmq.send(queue_name, MSG)

# Call read_with_poll with vt=None to test the fallback logic
# When vt is None, it should fall back to using pgmq.vt value (100)
msgs = pgmq.read_with_poll(
queue_name,
vt=None, # Explicitly passing None to test the None fallback logic
qty=1,
max_poll_seconds=2,
poll_interval_ms=100,
)

assert msgs is not None
assert len(msgs) == 1
assert msgs[0].msg_id == msg_id
assert msgs[0].message == MSG


def test_execute_operation_with_provided_sync_session(pgmq_by_session_maker, get_session_maker, db_session):
"""Test _execute_operation sync path when session is provided."""

pgmq: PGMQueue = pgmq_by_session_maker
queue_name = f"test_queue_{uuid.uuid4().hex}"

# Create a session to pass to the operations
# Using the same session across multiple operations demonstrates
# that the sync path with provided session works correctly
with get_session_maker() as session:
# Create queue with provided session
pgmq.create_queue(queue_name, session=session)

# Verify queue was created
assert check_queue_exists(db_session, queue_name) is True

# Send a message with the same provided session
msg_id = pgmq.send(queue_name, MSG, session=session)

# Read message with the same provided session
msg = pgmq.read(queue_name, vt=30, session=session)

assert msg is not None
assert msg.msg_id == msg_id
assert msg.message == MSG

# Clean up with the same provided session
pgmq.drop_queue(queue_name, session=session)

# Verify queue was dropped
assert check_queue_exists(db_session, queue_name) is False


def test_execute_operation_async_with_session_none(pgmq_by_async_dsn, db_session):
"""Test _execute_operation async path when session is None."""

pgmq: PGMQueue = pgmq_by_async_dsn
queue_name = f"test_queue_{uuid.uuid4().hex}"

# Verify this is an async PGMQueue
assert pgmq.is_async is True

# When session is None, _execute_operation creates a new async session
# and uses loop.run_until_complete to execute the operation
pgmq.create_queue(queue_name)
msg_id = pgmq.send(queue_name, MSG)
msg = pgmq.read(queue_name, vt=30)

assert msg is not None
assert msg.msg_id == msg_id
assert msg.message == MSG

# Clean up
pgmq.drop_queue(queue_name)

# Verify queue was dropped
assert check_queue_exists(db_session, queue_name) is False