Skip to content

Commit 6ae07ee

Browse files
committed
Fix vt naming for set_vt
1 parent b36dc17 commit 6ae07ee

1 file changed

Lines changed: 11 additions & 13 deletions

File tree

pgmq_sqlalchemy/queue.py

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -812,14 +812,12 @@ def read_with_poll(
812812
queue_name, vt, qty, max_poll_seconds, poll_interval_ms
813813
)
814814

815-
def _set_vt_sync(
816-
self, queue_name: str, msg_id: int, vt_offset: int
817-
) -> Optional[Message]:
815+
def _set_vt_sync(self, queue_name: str, msg_id: int, vt: int) -> Optional[Message]:
818816
"""Set the visibility timeout for a message."""
819817
with self.session_maker() as session:
820818
row = session.execute(
821-
text("select * from pgmq.set_vt(:queue_name,:msg_id,:vt_offset);"),
822-
{"queue_name": queue_name, "msg_id": msg_id, "vt_offset": vt_offset},
819+
text("select * from pgmq.set_vt(:queue_name,:msg_id,:vt);"),
820+
{"queue_name": queue_name, "msg_id": msg_id, "vt": vt},
823821
).fetchone()
824822
session.commit()
825823
if row is None:
@@ -829,17 +827,17 @@ def _set_vt_sync(
829827
)
830828

831829
async def _set_vt_async(
832-
self, queue_name: str, msg_id: int, vt_offset: int
830+
self, queue_name: str, msg_id: int, vt: int
833831
) -> Optional[Message]:
834832
"""Set the visibility timeout for a message."""
835833
async with self.session_maker() as session:
836834
row = (
837835
await session.execute(
838-
text("select * from pgmq.set_vt(:queue_name,:msg_id,:vt_offset);"),
836+
text("select * from pgmq.set_vt(:queue_name,:msg_id,:vt);"),
839837
{
840838
"queue_name": queue_name,
841839
"msg_id": msg_id,
842-
"vt_offset": vt_offset,
840+
"vt": vt,
843841
},
844842
)
845843
).fetchone()
@@ -851,7 +849,7 @@ async def _set_vt_async(
851849
msg_id=row[0], read_ct=row[1], enqueued_at=row[2], vt=row[3], message=row[4]
852850
)
853851

854-
def set_vt(self, queue_name: str, msg_id: int, vt_offset: int) -> Optional[Message]:
852+
def set_vt(self, queue_name: str, msg_id: int, vt: int) -> Optional[Message]:
855853
"""
856854
.. _set_vt_method: ref:`pgmq_sqlalchemy.PGMQueue.set_vt`
857855
.. |set_vt_method| replace:: :py:meth:`~pgmq_sqlalchemy.PGMQueue.set_vt`
@@ -861,7 +859,7 @@ def set_vt(self, queue_name: str, msg_id: int, vt_offset: int) -> Optional[Messa
861859
Args:
862860
queue_name (str): The name of the queue.
863861
msg_id (int): The message id.
864-
vt_offset (int): The visibility timeout in seconds.
862+
vt (int): The visibility timeout in seconds.
865863
866864
Returns:
867865
|schema_message_class|_ or ``None`` if the message does not exist.
@@ -904,15 +902,15 @@ def consumer_with_backoff_retry(pgmq_client: PGMQueue, queue_name: str):
904902
pgmq_client.set_vt(
905903
queue_name=query_name,
906904
msg_id=msg.msg_id,
907-
vt_offset=_exp_backoff_retry(msg)
905+
vt=_exp_backoff_retry(msg)
908906
)
909907
910908
"""
911909
if self.is_async:
912910
return self.loop.run_until_complete(
913-
self._set_vt_async(queue_name, msg_id, vt_offset)
911+
self._set_vt_async(queue_name, msg_id, vt)
914912
)
915-
return self._set_vt_sync(queue_name, msg_id, vt_offset)
913+
return self._set_vt_sync(queue_name, msg_id, vt)
916914

917915
def _pop_sync(self, queue_name: str) -> Optional[Message]:
918916
with self.session_maker() as session:

0 commit comments

Comments
 (0)