@@ -1364,67 +1364,6 @@ def metrics_all(self) -> Optional[List[QueueMetrics]]:
13641364 return self .loop .run_until_complete (self ._metrics_all_async ())
13651365 return self ._metrics_all_sync ()
13661366
1367- def _set_vt_sync (self , queue_name : str , msg_id : int , vt : int ) -> Optional [Message ]:
1368- """Set the visibility timeout of a message synchronously."""
1369- with self .session_maker () as session :
1370- row = session .execute (
1371- text ("select * from pgmq.set_vt(:queue_name, :msg_id, :vt);" ),
1372- {"queue_name" : queue_name , "msg_id" : msg_id , "vt" : vt },
1373- ).fetchone ()
1374- session .commit ()
1375- if row is None :
1376- return None
1377- return Message (
1378- msg_id = row [0 ], read_ct = row [1 ], enqueued_at = row [2 ], vt = row [3 ], message = row [4 ]
1379- )
1380-
1381- async def _set_vt_async (
1382- self , queue_name : str , msg_id : int , vt : int
1383- ) -> Optional [Message ]:
1384- """Set the visibility timeout of a message asynchronously."""
1385- async with self .session_maker () as session :
1386- row = (
1387- await session .execute (
1388- text ("select * from pgmq.set_vt(:queue_name, :msg_id, :vt);" ),
1389- {"queue_name" : queue_name , "msg_id" : msg_id , "vt" : vt },
1390- )
1391- ).fetchone ()
1392- await session .commit ()
1393- if row is None :
1394- return None
1395- return Message (
1396- msg_id = row [0 ], read_ct = row [1 ], enqueued_at = row [2 ], vt = row [3 ], message = row [4 ]
1397- )
1398-
1399- def set_vt (self , queue_name : str , msg_id : int , vt : int ) -> Optional [Message ]:
1400- """
1401- Set the visibility timeout of a message.
1402-
1403- Args:
1404- queue_name (str): The name of the queue.
1405- msg_id (int): The message ID.
1406- vt (int): The new visibility timeout in seconds.
1407-
1408- Returns:
1409- |schema_message_class|_ or ``None`` if the message does not exist.
1410-
1411- Usage:
1412-
1413- .. code-block:: python
1414-
1415- msg_id = pgmq_client.send('my_queue', {'key': 'value'})
1416- msg = pgmq_client.read('my_queue', vt=10)
1417- # extend the visibility timeout
1418- msg = pgmq_client.set_vt('my_queue', msg_id, 20)
1419- assert msg is not None
1420-
1421- """
1422- if self .is_async :
1423- return self .loop .run_until_complete (
1424- self ._set_vt_async (queue_name , msg_id , vt )
1425- )
1426- return self ._set_vt_sync (queue_name , msg_id , vt )
1427-
14281367 def _detach_archive_sync (self , queue_name : str ) -> None :
14291368 """Detach the archive table for a queue synchronously."""
14301369 with self .session_maker () as session :
0 commit comments