Skip to content

Commit 32e4030

Browse files
Copilotjason810496
andauthored
Fix vt=0 being ignored in read() method (#11)
* feat: add set_vt_method and usage docs * test: add test for set_vt method * Initial plan * Fix bug: vt=0 should be valid visibility timeout value Added check for `vt is None` in the read() method to properly handle vt=0. Previously, vt=0 would be treated as falsy and would default to self.vt (30 seconds). Now vt=0 correctly sets the visibility timeout to 0 seconds as expected. Also added test case to verify vt=0 works correctly. Co-authored-by: jason810496 <[email protected]> --------- Co-authored-by: jason810496 <[email protected]> Co-authored-by: LIU ZHE YOU <[email protected]> Co-authored-by: copilot-swe-agent[bot] <[email protected]>
1 parent 0a45829 commit 32e4030

2 files changed

Lines changed: 148 additions & 0 deletions

File tree

pgmq_sqlalchemy/queue.py

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -573,6 +573,8 @@ def read(self, queue_name: str, vt: Optional[int] = None) -> Optional[Message]:
573573
574574
575575
"""
576+
if vt is None:
577+
vt = self.vt
576578
if self.is_async:
577579
return self.loop.run_until_complete(self._read_async(queue_name, vt))
578580
return self._read_sync(queue_name, vt)
@@ -812,6 +814,108 @@ def read_with_poll(
812814
queue_name, vt, qty, max_poll_seconds, poll_interval_ms
813815
)
814816

817+
def _set_vt_sync(
818+
self, queue_name: str, msg_id: int, vt_offset: int
819+
) -> Optional[Message]:
820+
"""Set the visibility timeout for a message."""
821+
with self.session_maker() as session:
822+
row = session.execute(
823+
text("select * from pgmq.set_vt(:queue_name,:msg_id,:vt_offset);"),
824+
{"queue_name": queue_name, "msg_id": msg_id, "vt_offset": vt_offset},
825+
).fetchone()
826+
session.commit()
827+
if row is None:
828+
return None
829+
return Message(
830+
msg_id=row[0], read_ct=row[1], enqueued_at=row[2], vt=row[3], message=row[4]
831+
)
832+
833+
async def _set_vt_async(
834+
self, queue_name: str, msg_id: int, vt_offset: int
835+
) -> Optional[Message]:
836+
"""Set the visibility timeout for a message."""
837+
async with self.session_maker() as session:
838+
row = (
839+
await session.execute(
840+
text("select * from pgmq.set_vt(:queue_name,:msg_id,:vt_offset);"),
841+
{
842+
"queue_name": queue_name,
843+
"msg_id": msg_id,
844+
"vt_offset": vt_offset,
845+
},
846+
)
847+
).fetchone()
848+
await session.commit()
849+
print("row", row)
850+
if row is None:
851+
return None
852+
return Message(
853+
msg_id=row[0], read_ct=row[1], enqueued_at=row[2], vt=row[3], message=row[4]
854+
)
855+
856+
def set_vt(self, queue_name: str, msg_id: int, vt_offset: int) -> Optional[Message]:
857+
"""
858+
.. _set_vt_method: ref:`pgmq_sqlalchemy.PGMQueue.set_vt`
859+
.. |set_vt_method| replace:: :py:meth:`~pgmq_sqlalchemy.PGMQueue.set_vt`
860+
861+
Set the visibility timeout for a message.
862+
863+
Args:
864+
queue_name (str): The name of the queue.
865+
msg_id (int): The message id.
866+
vt_offset (int): The visibility timeout in seconds.
867+
868+
Returns:
869+
|schema_message_class|_ or ``None`` if the message does not exist.
870+
871+
Usage:
872+
873+
.. code-block:: python
874+
875+
msg_id = pgmq_client.send('my_queue', {'key': 'value'}, delay=10)
876+
msg = pgmq_client.read('my_queue')
877+
assert msg is not None
878+
msg = pgmq_client.set_vt('my_queue', msg.msg_id, 10)
879+
assert msg is not None
880+
881+
.. tip::
882+
| |read_method|_ and |set_vt_method|_ can be used together to implement **exponential backoff** mechanism.
883+
| `ref: Exponential Backoff And Jitter <https://aws.amazon.com/tw/blogs/architecture/exponential-backoff-and-jitter/>`_.
884+
| **For example:**
885+
886+
.. code-block:: python
887+
888+
from pgmq_sqlalchemy import PGMQueue
889+
from pgmq_sqlalchemy.schema import Message
890+
891+
def _exp_backoff_retry(msg: Message)->int:
892+
# exponential backoff retry
893+
if msg.read_ct < 5:
894+
return 2 ** msg.read_ct
895+
return 2 ** 5
896+
897+
def consumer_with_backoff_retry(pgmq_client: PGMQueue, queue_name: str):
898+
msg = pgmq_client.read(
899+
queue_name=queue_name,
900+
vt=1000, # set vt to 1000 seconds temporarily
901+
)
902+
if msg is None:
903+
return
904+
905+
# set exponential backoff retry
906+
pgmq_client.set_vt(
907+
queue_name=query_name,
908+
msg_id=msg.msg_id,
909+
vt_offset=_exp_backoff_retry(msg)
910+
)
911+
912+
"""
913+
if self.is_async:
914+
return self.loop.run_until_complete(
915+
self._set_vt_async(queue_name, msg_id, vt_offset)
916+
)
917+
return self._set_vt_sync(queue_name, msg_id, vt_offset)
918+
815919
def _pop_sync(self, queue_name: str) -> Optional[Message]:
816920
with self.session_maker() as session:
817921
row = session.execute(

tests/test_queue.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,21 @@ def test_send_and_read_msg_with_vt_and_delay(pgmq_setup_teardown: PGMQ_WITH_QUEU
162162
assert msg_read.msg_id == msg_id
163163

164164

165+
def test_send_and_read_msg_with_vt_zero(pgmq_setup_teardown: PGMQ_WITH_QUEUE):
166+
"""Test that vt=0 works correctly and message becomes visible immediately."""
167+
pgmq, queue_name = pgmq_setup_teardown
168+
msg = MSG
169+
msg_id: int = pgmq.send(queue_name, msg)
170+
# Read with vt=0 means message should be immediately visible again
171+
msg_read = pgmq.read(queue_name, vt=0)
172+
assert msg_read.message == msg
173+
assert msg_read.msg_id == msg_id
174+
# Message should be visible immediately (no waiting)
175+
msg_read = pgmq.read(queue_name)
176+
assert msg_read.message == msg
177+
assert msg_read.msg_id == msg_id
178+
179+
165180
def test_read_empty_queue(pgmq_setup_teardown: PGMQ_WITH_QUEUE):
166181
pgmq, queue_name = pgmq_setup_teardown
167182
msg_read = pgmq.read(queue_name)
@@ -242,6 +257,35 @@ def test_read_with_poll_with_empty_queue(pgmq_setup_teardown: PGMQ_WITH_QUEUE):
242257
assert duration > 1.9
243258

244259

260+
def test_set_vt(pgmq_setup_teardown: PGMQ_WITH_QUEUE):
261+
pgmq, queue_name = pgmq_setup_teardown
262+
msg = MSG
263+
msg_id = pgmq.send(queue_name, msg)
264+
msg_read = pgmq.set_vt(queue_name, msg_id, 2)
265+
assert msg is not None
266+
assert pgmq.read(queue_name) is None
267+
time.sleep(1.5)
268+
assert pgmq.read(queue_name) is None
269+
time.sleep(0.6)
270+
msg_read = pgmq.read(queue_name)
271+
assert msg_read.message == msg
272+
273+
274+
def test_set_vt_to_smaller_value(pgmq_setup_teardown: PGMQ_WITH_QUEUE):
275+
pgmq, queue_name = pgmq_setup_teardown
276+
msg = MSG
277+
msg_id = pgmq.send(queue_name, msg)
278+
_ = pgmq.read(queue_name, vt=5) # set vt to 5 seconds
279+
assert msg is not None
280+
assert pgmq.read(queue_name) is None
281+
time.sleep(0.5)
282+
assert pgmq.set_vt(queue_name, msg_id, 1) is not None
283+
time.sleep(0.3)
284+
assert pgmq.read(queue_name) is None
285+
time.sleep(0.8)
286+
assert pgmq.read(queue_name) is not None
287+
288+
245289
def test_pop(pgmq_setup_teardown: PGMQ_WITH_QUEUE):
246290
pgmq, queue_name = pgmq_setup_teardown
247291
msg = MSG

0 commit comments

Comments
 (0)