@@ -1366,67 +1366,6 @@ def metrics_all(self) -> Optional[List[QueueMetrics]]:
13661366 return self .loop .run_until_complete (self ._metrics_all_async ())
13671367 return self ._metrics_all_sync ()
13681368
1369- def _set_vt_sync (self , queue_name : str , msg_id : int , vt : int ) -> Optional [Message ]:
1370- """Set the visibility timeout of a message synchronously."""
1371- with self .session_maker () as session :
1372- row = session .execute (
1373- text ("select * from pgmq.set_vt(:queue_name, :msg_id, :vt);" ),
1374- {"queue_name" : queue_name , "msg_id" : msg_id , "vt" : vt },
1375- ).fetchone ()
1376- session .commit ()
1377- if row is None :
1378- return None
1379- return Message (
1380- msg_id = row [0 ], read_ct = row [1 ], enqueued_at = row [2 ], vt = row [3 ], message = row [4 ]
1381- )
1382-
1383- async def _set_vt_async (
1384- self , queue_name : str , msg_id : int , vt : int
1385- ) -> Optional [Message ]:
1386- """Set the visibility timeout of a message asynchronously."""
1387- async with self .session_maker () as session :
1388- row = (
1389- await session .execute (
1390- text ("select * from pgmq.set_vt(:queue_name, :msg_id, :vt);" ),
1391- {"queue_name" : queue_name , "msg_id" : msg_id , "vt" : vt },
1392- )
1393- ).fetchone ()
1394- await session .commit ()
1395- if row is None :
1396- return None
1397- return Message (
1398- msg_id = row [0 ], read_ct = row [1 ], enqueued_at = row [2 ], vt = row [3 ], message = row [4 ]
1399- )
1400-
1401- def set_vt (self , queue_name : str , msg_id : int , vt : int ) -> Optional [Message ]:
1402- """
1403- Set the visibility timeout of a message.
1404-
1405- Args:
1406- queue_name (str): The name of the queue.
1407- msg_id (int): The message ID.
1408- vt (int): The new visibility timeout in seconds.
1409-
1410- Returns:
1411- |schema_message_class|_ or ``None`` if the message does not exist.
1412-
1413- Usage:
1414-
1415- .. code-block:: python
1416-
1417- msg_id = pgmq_client.send('my_queue', {'key': 'value'})
1418- msg = pgmq_client.read('my_queue', vt=10)
1419- # extend the visibility timeout
1420- msg = pgmq_client.set_vt('my_queue', msg_id, 20)
1421- assert msg is not None
1422-
1423- """
1424- if self .is_async :
1425- return self .loop .run_until_complete (
1426- self ._set_vt_async (queue_name , msg_id , vt )
1427- )
1428- return self ._set_vt_sync (queue_name , msg_id , vt )
1429-
14301369 def _detach_archive_sync (self , queue_name : str ) -> None :
14311370 """Detach the archive table for a queue synchronously."""
14321371 with self .session_maker () as session :
0 commit comments