Skip to content

Commit 3bcceb0

Browse files
Copilotjason810496
andcommitted
Add read_archive, detach_archive methods and time-based partition support
Co-authored-by: jason810496 <[email protected]>
1 parent 296410d commit 3bcceb0

2 files changed

Lines changed: 417 additions & 9 deletions

File tree

pgmq_sqlalchemy/queue.py

Lines changed: 277 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import asyncio
2-
from typing import List, Optional
2+
import re
3+
from typing import List, Optional, Union
34

45
from sqlalchemy import create_engine, text
56
from sqlalchemy.orm import sessionmaker
@@ -239,11 +240,40 @@ async def _create_partitioned_queue_async(
239240
)
240241
await session.commit()
241242

243+
def _validate_partition_interval(self, interval: Union[int, str]) -> str:
244+
"""Validate partition interval format.
245+
246+
Args:
247+
interval: Either an integer for numeric partitioning or a string for time-based partitioning
248+
(e.g., '1 day', '1 hour', '7 days')
249+
250+
Returns:
251+
The validated interval as a string
252+
253+
Raises:
254+
ValueError: If the interval format is invalid
255+
"""
256+
if isinstance(interval, int):
257+
if interval <= 0:
258+
raise ValueError("Numeric partition interval must be positive")
259+
return str(interval)
260+
261+
# Validate time-based interval format
262+
# Valid PostgreSQL interval formats: '1 day', '7 days', '1 hour', '1 month', etc.
263+
time_pattern = r'^\d+\s+(microsecond|millisecond|second|minute|hour|day|week|month|year)s?$'
264+
if not re.match(time_pattern, interval.strip(), re.IGNORECASE):
265+
raise ValueError(
266+
f"Invalid time-based partition interval: '{interval}'. "
267+
"Expected format: '<number> <unit>' where unit is one of: "
268+
"microsecond, millisecond, second, minute, hour, day, week, month, year"
269+
)
270+
return interval.strip()
271+
242272
def create_partitioned_queue(
243273
self,
244274
queue_name: str,
245-
partition_interval: int = 10000,
246-
retention_interval: int = 100000,
275+
partition_interval: Union[int, str] = 10000,
276+
retention_interval: Union[int, str] = 100000,
247277
) -> None:
248278
"""Create a new **partitioned** queue.
249279
@@ -252,16 +282,23 @@ def create_partitioned_queue(
252282
253283
.. code-block:: python
254284
285+
# Numeric partitioning (by msg_id)
255286
pgmq_client.create_partitioned_queue('my_partitioned_queue', partition_interval=10000, retention_interval=100000)
287+
288+
# Time-based partitioning (by enqueued_at)
289+
pgmq_client.create_partitioned_queue('my_time_queue', partition_interval='1 day', retention_interval='7 days')
256290
257291
Args:
258292
queue_name (str): The name of the queue, should be less than 48 characters.
259-
partition_interval (int): Will create a new partition every ``partition_interval`` messages.
260-
retention_interval (int): The interval for retaining partitions. Any messages that have a `msg_id` less than ``max(msg_id)`` - ``retention_interval`` will be dropped.
293+
partition_interval (Union[int, str]): For numeric partitioning, the number of messages per partition.
294+
For time-based partitioning, a PostgreSQL interval string (e.g., '1 day', '1 hour').
295+
retention_interval (Union[int, str]): For numeric partitioning, messages with msg_id less than max(msg_id) - retention_interval will be dropped.
296+
For time-based partitioning, a PostgreSQL interval string (e.g., '7 days').
261297
262298
.. note::
263-
| Currently, only support for partitioning by **msg_id**.
264-
| Will add **time-based partitioning** in the future ``pgmq-sqlalchemy`` release.
299+
| Supports both **numeric** (by ``msg_id``) and **time-based** (by ``enqueued_at``) partitioning.
300+
| For time-based partitioning, use interval strings like '1 day', '1 hour', '7 days', etc.
301+
| For numeric partitioning, use integer values.
265302
266303
.. important::
267304
| You must make sure that the ``pg_partman`` extension already **installed** in the Postgres.
@@ -272,15 +309,19 @@ def create_partitioned_queue(
272309
"""
273310
# check if the pg_partman extension exists before creating a partitioned queue at runtime
274311
self._check_pg_partman_ext()
312+
313+
# Validate partition intervals
314+
validated_partition_interval = self._validate_partition_interval(partition_interval)
315+
validated_retention_interval = self._validate_partition_interval(retention_interval)
275316

276317
if self.is_async:
277318
return self.loop.run_until_complete(
278319
self._create_partitioned_queue_async(
279-
queue_name, str(partition_interval), str(retention_interval)
320+
queue_name, validated_partition_interval, validated_retention_interval
280321
)
281322
)
282323
return self._create_partitioned_queue_sync(
283-
queue_name, str(partition_interval), str(retention_interval)
324+
queue_name, validated_partition_interval, validated_retention_interval
284325
)
285326

286327
def _validate_queue_name_sync(self, queue_name: str) -> None:
@@ -1316,3 +1357,230 @@ def metrics_all(self) -> Optional[List[QueueMetrics]]:
13161357
if self.is_async:
13171358
return self.loop.run_until_complete(self._metrics_all_async())
13181359
return self._metrics_all_sync()
1360+
1361+
def _set_vt_sync(self, queue_name: str, msg_id: int, vt: int) -> Optional[Message]:
1362+
"""Set the visibility timeout of a message synchronously."""
1363+
with self.session_maker() as session:
1364+
row = session.execute(
1365+
text("select * from pgmq.set_vt(:queue_name, :msg_id, :vt);"),
1366+
{"queue_name": queue_name, "msg_id": msg_id, "vt": vt},
1367+
).fetchone()
1368+
session.commit()
1369+
if row is None:
1370+
return None
1371+
return Message(
1372+
msg_id=row[0], read_ct=row[1], enqueued_at=row[2], vt=row[3], message=row[4]
1373+
)
1374+
1375+
async def _set_vt_async(
1376+
self, queue_name: str, msg_id: int, vt: int
1377+
) -> Optional[Message]:
1378+
"""Set the visibility timeout of a message asynchronously."""
1379+
async with self.session_maker() as session:
1380+
row = (
1381+
await session.execute(
1382+
text("select * from pgmq.set_vt(:queue_name, :msg_id, :vt);"),
1383+
{"queue_name": queue_name, "msg_id": msg_id, "vt": vt},
1384+
)
1385+
).fetchone()
1386+
await session.commit()
1387+
if row is None:
1388+
return None
1389+
return Message(
1390+
msg_id=row[0], read_ct=row[1], enqueued_at=row[2], vt=row[3], message=row[4]
1391+
)
1392+
1393+
def set_vt(self, queue_name: str, msg_id: int, vt: int) -> Optional[Message]:
1394+
"""
1395+
Set the visibility timeout of a message.
1396+
1397+
Args:
1398+
queue_name (str): The name of the queue.
1399+
msg_id (int): The message ID.
1400+
vt (int): The new visibility timeout in seconds.
1401+
1402+
Returns:
1403+
|schema_message_class|_ or ``None`` if the message does not exist.
1404+
1405+
Usage:
1406+
1407+
.. code-block:: python
1408+
1409+
msg_id = pgmq_client.send('my_queue', {'key': 'value'})
1410+
msg = pgmq_client.read('my_queue', vt=10)
1411+
# extend the visibility timeout
1412+
msg = pgmq_client.set_vt('my_queue', msg_id, 20)
1413+
assert msg is not None
1414+
1415+
"""
1416+
if self.is_async:
1417+
return self.loop.run_until_complete(
1418+
self._set_vt_async(queue_name, msg_id, vt)
1419+
)
1420+
return self._set_vt_sync(queue_name, msg_id, vt)
1421+
1422+
def _detach_archive_sync(self, queue_name: str) -> None:
1423+
"""Detach the archive table for a queue synchronously."""
1424+
with self.session_maker() as session:
1425+
session.execute(
1426+
text("select pgmq.detach_archive(:queue_name);"),
1427+
{"queue_name": queue_name},
1428+
)
1429+
session.commit()
1430+
1431+
async def _detach_archive_async(self, queue_name: str) -> None:
1432+
"""Detach the archive table for a queue asynchronously."""
1433+
async with self.session_maker() as session:
1434+
await session.execute(
1435+
text("select pgmq.detach_archive(:queue_name);"),
1436+
{"queue_name": queue_name},
1437+
)
1438+
await session.commit()
1439+
1440+
def detach_archive(self, queue_name: str) -> None:
1441+
"""
1442+
Detach the archive table for a queue.
1443+
1444+
* The archive table (``pgmq.a_<queue_name>``) will be detached from the queue.
1445+
* The archive table will remain in the database but will no longer be associated with the queue.
1446+
* This is useful when you want to keep the archived messages but stop archiving new messages.
1447+
1448+
.. code-block:: python
1449+
1450+
pgmq_client.detach_archive('my_queue')
1451+
1452+
"""
1453+
if self.is_async:
1454+
return self.loop.run_until_complete(self._detach_archive_async(queue_name))
1455+
return self._detach_archive_sync(queue_name)
1456+
1457+
def _read_archive_sync(self, queue_name: str) -> Optional[Message]:
1458+
"""Read a single message from the archive table synchronously."""
1459+
with self.session_maker() as session:
1460+
row = session.execute(
1461+
text(f"select msg_id, read_ct, enqueued_at, vt, message from pgmq.a_{queue_name} limit 1;")
1462+
).fetchone()
1463+
session.commit()
1464+
if row is None:
1465+
return None
1466+
return Message(
1467+
msg_id=row[0], read_ct=row[1], enqueued_at=row[2], vt=row[3], message=row[4]
1468+
)
1469+
1470+
async def _read_archive_async(self, queue_name: str) -> Optional[Message]:
1471+
"""Read a single message from the archive table asynchronously."""
1472+
async with self.session_maker() as session:
1473+
row = (
1474+
await session.execute(
1475+
text(f"select msg_id, read_ct, enqueued_at, vt, message from pgmq.a_{queue_name} limit 1;")
1476+
)
1477+
).fetchone()
1478+
await session.commit()
1479+
if row is None:
1480+
return None
1481+
return Message(
1482+
msg_id=row[0], read_ct=row[1], enqueued_at=row[2], vt=row[3], message=row[4]
1483+
)
1484+
1485+
def read_archive(self, queue_name: str) -> Optional[Message]:
1486+
"""
1487+
Read a single message from the archive table.
1488+
1489+
Returns:
1490+
|schema_message_class|_ or ``None`` if the archive is empty.
1491+
1492+
Usage:
1493+
1494+
.. code-block:: python
1495+
1496+
msg_id = pgmq_client.send('my_queue', {'key': 'value'})
1497+
pgmq_client.archive('my_queue', msg_id)
1498+
archived_msg = pgmq_client.read_archive('my_queue')
1499+
print(archived_msg.message)
1500+
1501+
"""
1502+
if self.is_async:
1503+
return self.loop.run_until_complete(self._read_archive_async(queue_name))
1504+
return self._read_archive_sync(queue_name)
1505+
1506+
def _read_archive_batch_sync(
1507+
self, queue_name: str, batch_size: int = 1
1508+
) -> Optional[List[Message]]:
1509+
"""Read multiple messages from the archive table synchronously."""
1510+
with self.session_maker() as session:
1511+
rows = session.execute(
1512+
text(
1513+
f"select msg_id, read_ct, enqueued_at, vt, message from pgmq.a_{queue_name} limit :batch_size;"
1514+
),
1515+
{"batch_size": batch_size},
1516+
).fetchall()
1517+
session.commit()
1518+
if not rows:
1519+
return None
1520+
return [
1521+
Message(
1522+
msg_id=row[0],
1523+
read_ct=row[1],
1524+
enqueued_at=row[2],
1525+
vt=row[3],
1526+
message=row[4],
1527+
)
1528+
for row in rows
1529+
]
1530+
1531+
async def _read_archive_batch_async(
1532+
self, queue_name: str, batch_size: int = 1
1533+
) -> Optional[List[Message]]:
1534+
"""Read multiple messages from the archive table asynchronously."""
1535+
async with self.session_maker() as session:
1536+
rows = (
1537+
await session.execute(
1538+
text(
1539+
f"select msg_id, read_ct, enqueued_at, vt, message from pgmq.a_{queue_name} limit :batch_size;"
1540+
),
1541+
{"batch_size": batch_size},
1542+
)
1543+
).fetchall()
1544+
await session.commit()
1545+
if not rows:
1546+
return None
1547+
return [
1548+
Message(
1549+
msg_id=row[0],
1550+
read_ct=row[1],
1551+
enqueued_at=row[2],
1552+
vt=row[3],
1553+
message=row[4],
1554+
)
1555+
for row in rows
1556+
]
1557+
1558+
def read_archive_batch(
1559+
self, queue_name: str, batch_size: int = 1
1560+
) -> Optional[List[Message]]:
1561+
"""
1562+
Read multiple messages from the archive table.
1563+
1564+
Args:
1565+
queue_name (str): The name of the queue.
1566+
batch_size (int): The number of messages to read.
1567+
1568+
Returns:
1569+
List of |schema_message_class|_ or ``None`` if the archive is empty.
1570+
1571+
Usage:
1572+
1573+
.. code-block:: python
1574+
1575+
msg_ids = pgmq_client.send_batch('my_queue', [{'key': 'value'}, {'key': 'value'}])
1576+
pgmq_client.archive_batch('my_queue', msg_ids)
1577+
archived_msgs = pgmq_client.read_archive_batch('my_queue', batch_size=10)
1578+
for msg in archived_msgs:
1579+
print(msg.message)
1580+
1581+
"""
1582+
if self.is_async:
1583+
return self.loop.run_until_complete(
1584+
self._read_archive_batch_async(queue_name, batch_size)
1585+
)
1586+
return self._read_archive_batch_sync(queue_name, batch_size)

0 commit comments

Comments
 (0)