Skip to content

Commit b82812e

Browse files
jason810496Copilot
andauthored
Add time-based partitioning and archive operations method (#12) (#19)
* Initial plan * Add read_archive, detach_archive methods and time-based partition support * Fix detach_archive test cleanup to properly handle detached tables * Apply ruff linting and formatting fixes * Add queue name validation to read_archive methods for SQL injection prevention * Remove duplicate set_vt * Remove TODO in README --------- Co-authored-by: Copilot <[email protected]>
1 parent ec3ff94 commit b82812e

3 files changed

Lines changed: 392 additions & 16 deletions

File tree

README.md

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -116,10 +116,3 @@ print(metrics.total_messages)
116116

117117
Welcome to open an issue or pull request ! <br>
118118
See [`Development` on Online Document](https://pgmq-sqlalchemy.readthedocs.io/en/latest/) or [CONTRIBUTING.md](.github/CONTRIBUTING.md) for more information.
119-
120-
## TODO
121-
122-
- [ ] Add **time-based** partition option and validation to `create_partitioned_queue` method.
123-
- [ ] Read(single/batch) Archive Table ( `read_archive` method )
124-
- [ ] Detach Archive Table ( `detach_archive` method )
125-
- [ ] Add `set_vt` utils method.

pgmq_sqlalchemy/queue.py

Lines changed: 230 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import asyncio
2-
from typing import List, Optional
2+
import re
3+
from typing import List, Optional, Union
34

45
from sqlalchemy import create_engine, text
56
from sqlalchemy.orm import sessionmaker
@@ -239,11 +240,40 @@ async def _create_partitioned_queue_async(
239240
)
240241
await session.commit()
241242

243+
def _validate_partition_interval(self, interval: Union[int, str]) -> str:
244+
"""Validate partition interval format.
245+
246+
Args:
247+
interval: Either an integer for numeric partitioning or a string for time-based partitioning
248+
(e.g., '1 day', '1 hour', '7 days')
249+
250+
Returns:
251+
The validated interval as a string
252+
253+
Raises:
254+
ValueError: If the interval format is invalid
255+
"""
256+
if isinstance(interval, int):
257+
if interval <= 0:
258+
raise ValueError("Numeric partition interval must be positive")
259+
return str(interval)
260+
261+
# Validate time-based interval format
262+
# Valid PostgreSQL interval formats: '1 day', '7 days', '1 hour', '1 month', etc.
263+
time_pattern = r"^\d+\s+(microsecond|millisecond|second|minute|hour|day|week|month|year)s?$"
264+
if not re.match(time_pattern, interval.strip(), re.IGNORECASE):
265+
raise ValueError(
266+
f"Invalid time-based partition interval: '{interval}'. "
267+
"Expected format: '<number> <unit>' where unit is one of: "
268+
"microsecond, millisecond, second, minute, hour, day, week, month, year"
269+
)
270+
return interval.strip()
271+
242272
def create_partitioned_queue(
243273
self,
244274
queue_name: str,
245-
partition_interval: int = 10000,
246-
retention_interval: int = 100000,
275+
partition_interval: Union[int, str] = 10000,
276+
retention_interval: Union[int, str] = 100000,
247277
) -> None:
248278
"""Create a new **partitioned** queue.
249279
@@ -252,16 +282,23 @@ def create_partitioned_queue(
252282
253283
.. code-block:: python
254284
285+
# Numeric partitioning (by msg_id)
255286
pgmq_client.create_partitioned_queue('my_partitioned_queue', partition_interval=10000, retention_interval=100000)
256287
288+
# Time-based partitioning (by enqueued_at)
289+
pgmq_client.create_partitioned_queue('my_time_queue', partition_interval='1 day', retention_interval='7 days')
290+
257291
Args:
258292
queue_name (str): The name of the queue, should be less than 48 characters.
259-
partition_interval (int): Will create a new partition every ``partition_interval`` messages.
260-
retention_interval (int): The interval for retaining partitions. Any messages that have a `msg_id` less than ``max(msg_id)`` - ``retention_interval`` will be dropped.
293+
partition_interval (Union[int, str]): For numeric partitioning, the number of messages per partition.
294+
For time-based partitioning, a PostgreSQL interval string (e.g., '1 day', '1 hour').
295+
retention_interval (Union[int, str]): For numeric partitioning, messages with msg_id less than max(msg_id) - retention_interval will be dropped.
296+
For time-based partitioning, a PostgreSQL interval string (e.g., '7 days').
261297
262298
.. note::
263-
| Currently, only support for partitioning by **msg_id**.
264-
| Will add **time-based partitioning** in the future ``pgmq-sqlalchemy`` release.
299+
| Supports both **numeric** (by ``msg_id``) and **time-based** (by ``enqueued_at``) partitioning.
300+
| For time-based partitioning, use interval strings like '1 day', '1 hour', '7 days', etc.
301+
| For numeric partitioning, use integer values.
265302
266303
.. important::
267304
| You must make sure that the ``pg_partman`` extension already **installed** in the Postgres.
@@ -273,14 +310,24 @@ def create_partitioned_queue(
273310
# check if the pg_partman extension exists before creating a partitioned queue at runtime
274311
self._check_pg_partman_ext()
275312

313+
# Validate partition intervals
314+
validated_partition_interval = self._validate_partition_interval(
315+
partition_interval
316+
)
317+
validated_retention_interval = self._validate_partition_interval(
318+
retention_interval
319+
)
320+
276321
if self.is_async:
277322
return self.loop.run_until_complete(
278323
self._create_partitioned_queue_async(
279-
queue_name, str(partition_interval), str(retention_interval)
324+
queue_name,
325+
validated_partition_interval,
326+
validated_retention_interval,
280327
)
281328
)
282329
return self._create_partitioned_queue_sync(
283-
queue_name, str(partition_interval), str(retention_interval)
330+
queue_name, validated_partition_interval, validated_retention_interval
284331
)
285332

286333
def _validate_queue_name_sync(self, queue_name: str) -> None:
@@ -1316,3 +1363,177 @@ def metrics_all(self) -> Optional[List[QueueMetrics]]:
13161363
if self.is_async:
13171364
return self.loop.run_until_complete(self._metrics_all_async())
13181365
return self._metrics_all_sync()
1366+
1367+
def _detach_archive_sync(self, queue_name: str) -> None:
1368+
"""Detach the archive table for a queue synchronously."""
1369+
with self.session_maker() as session:
1370+
session.execute(
1371+
text("select pgmq.detach_archive(:queue_name);"),
1372+
{"queue_name": queue_name},
1373+
)
1374+
session.commit()
1375+
1376+
async def _detach_archive_async(self, queue_name: str) -> None:
1377+
"""Detach the archive table for a queue asynchronously."""
1378+
async with self.session_maker() as session:
1379+
await session.execute(
1380+
text("select pgmq.detach_archive(:queue_name);"),
1381+
{"queue_name": queue_name},
1382+
)
1383+
await session.commit()
1384+
1385+
def detach_archive(self, queue_name: str) -> None:
1386+
"""
1387+
Detach the archive table for a queue.
1388+
1389+
* The archive table (``pgmq.a_<queue_name>``) will be detached from the queue.
1390+
* The archive table will remain in the database but will no longer be associated with the queue.
1391+
* This is useful when you want to keep the archived messages but stop archiving new messages.
1392+
1393+
.. code-block:: python
1394+
1395+
pgmq_client.detach_archive('my_queue')
1396+
1397+
"""
1398+
if self.is_async:
1399+
return self.loop.run_until_complete(self._detach_archive_async(queue_name))
1400+
return self._detach_archive_sync(queue_name)
1401+
1402+
def _read_archive_sync(self, queue_name: str) -> Optional[Message]:
1403+
"""Read a single message from the archive table synchronously."""
1404+
with self.session_maker() as session:
1405+
row = session.execute(
1406+
text(
1407+
f"select msg_id, read_ct, enqueued_at, vt, message from pgmq.a_{queue_name} limit 1;"
1408+
)
1409+
).fetchone()
1410+
session.commit()
1411+
if row is None:
1412+
return None
1413+
return Message(
1414+
msg_id=row[0], read_ct=row[1], enqueued_at=row[2], vt=row[3], message=row[4]
1415+
)
1416+
1417+
async def _read_archive_async(self, queue_name: str) -> Optional[Message]:
1418+
"""Read a single message from the archive table asynchronously."""
1419+
async with self.session_maker() as session:
1420+
row = (
1421+
await session.execute(
1422+
text(
1423+
f"select msg_id, read_ct, enqueued_at, vt, message from pgmq.a_{queue_name} limit 1;"
1424+
)
1425+
)
1426+
).fetchone()
1427+
await session.commit()
1428+
if row is None:
1429+
return None
1430+
return Message(
1431+
msg_id=row[0], read_ct=row[1], enqueued_at=row[2], vt=row[3], message=row[4]
1432+
)
1433+
1434+
def read_archive(self, queue_name: str) -> Optional[Message]:
1435+
"""
1436+
Read a single message from the archive table.
1437+
1438+
Returns:
1439+
|schema_message_class|_ or ``None`` if the archive is empty.
1440+
1441+
Usage:
1442+
1443+
.. code-block:: python
1444+
1445+
msg_id = pgmq_client.send('my_queue', {'key': 'value'})
1446+
pgmq_client.archive('my_queue', msg_id)
1447+
archived_msg = pgmq_client.read_archive('my_queue')
1448+
print(archived_msg.message)
1449+
1450+
"""
1451+
# Validate queue name first to prevent SQL injection
1452+
self.validate_queue_name(queue_name)
1453+
if self.is_async:
1454+
return self.loop.run_until_complete(self._read_archive_async(queue_name))
1455+
return self._read_archive_sync(queue_name)
1456+
1457+
def _read_archive_batch_sync(
1458+
self, queue_name: str, batch_size: int = 1
1459+
) -> Optional[List[Message]]:
1460+
"""Read multiple messages from the archive table synchronously."""
1461+
with self.session_maker() as session:
1462+
rows = session.execute(
1463+
text(
1464+
f"select msg_id, read_ct, enqueued_at, vt, message from pgmq.a_{queue_name} limit :batch_size;"
1465+
),
1466+
{"batch_size": batch_size},
1467+
).fetchall()
1468+
session.commit()
1469+
if not rows:
1470+
return None
1471+
return [
1472+
Message(
1473+
msg_id=row[0],
1474+
read_ct=row[1],
1475+
enqueued_at=row[2],
1476+
vt=row[3],
1477+
message=row[4],
1478+
)
1479+
for row in rows
1480+
]
1481+
1482+
async def _read_archive_batch_async(
1483+
self, queue_name: str, batch_size: int = 1
1484+
) -> Optional[List[Message]]:
1485+
"""Read multiple messages from the archive table asynchronously."""
1486+
async with self.session_maker() as session:
1487+
rows = (
1488+
await session.execute(
1489+
text(
1490+
f"select msg_id, read_ct, enqueued_at, vt, message from pgmq.a_{queue_name} limit :batch_size;"
1491+
),
1492+
{"batch_size": batch_size},
1493+
)
1494+
).fetchall()
1495+
await session.commit()
1496+
if not rows:
1497+
return None
1498+
return [
1499+
Message(
1500+
msg_id=row[0],
1501+
read_ct=row[1],
1502+
enqueued_at=row[2],
1503+
vt=row[3],
1504+
message=row[4],
1505+
)
1506+
for row in rows
1507+
]
1508+
1509+
def read_archive_batch(
1510+
self, queue_name: str, batch_size: int = 1
1511+
) -> Optional[List[Message]]:
1512+
"""
1513+
Read multiple messages from the archive table.
1514+
1515+
Args:
1516+
queue_name (str): The name of the queue.
1517+
batch_size (int): The number of messages to read.
1518+
1519+
Returns:
1520+
List of |schema_message_class|_ or ``None`` if the archive is empty.
1521+
1522+
Usage:
1523+
1524+
.. code-block:: python
1525+
1526+
msg_ids = pgmq_client.send_batch('my_queue', [{'key': 'value'}, {'key': 'value'}])
1527+
pgmq_client.archive_batch('my_queue', msg_ids)
1528+
archived_msgs = pgmq_client.read_archive_batch('my_queue', batch_size=10)
1529+
for msg in archived_msgs:
1530+
print(msg.message)
1531+
1532+
"""
1533+
# Validate queue name first to prevent SQL injection
1534+
self.validate_queue_name(queue_name)
1535+
if self.is_async:
1536+
return self.loop.run_until_complete(
1537+
self._read_archive_batch_async(queue_name, batch_size)
1538+
)
1539+
return self._read_archive_batch_sync(queue_name, batch_size)

0 commit comments

Comments
 (0)