Skip to content

Commit b6e7c64

Browse files
committed
Merge branch 'main' into feature/add-transaction-class
2 parents 20c5a51 + e7d93ab commit b6e7c64

6 files changed

Lines changed: 237 additions & 14 deletions

File tree

.readthedocs.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,6 @@ sphinx:
3232
# See https://docs.readthedocs.io/en/stable/guides/reproducible-builds.html
3333
python:
3434
install:
35-
- requirements: doc/requirements.txt
35+
- requirements: doc/requirements.txt
36+
- method: pip
37+
path: .

README.md

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -116,10 +116,3 @@ print(metrics.total_messages)
116116

117117
Welcome to open an issue or pull request ! <br>
118118
See [`Development` on Online Document](https://pgmq-sqlalchemy.readthedocs.io/en/latest/) or [CONTRIBUTING.md](.github/CONTRIBUTING.md) for more information.
119-
120-
## TODO
121-
122-
- [ ] Add **time-based** partition option and validation to `create_partitioned_queue` method.
123-
- [ ] Read(single/batch) Archive Table ( `read_archive` method )
124-
- [ ] Detach Archive Table ( `detach_archive` method )
125-
- [ ] Add `set_vt` utils method.

doc/conf.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,27 @@
66
import os
77
import sys
88

9+
# Handle Python 3.9+ compatibility for tomllib
10+
try:
11+
import tomllib
12+
except ModuleNotFoundError:
13+
import tomli as tomllib
14+
915
# path setup
1016
sys.path.insert(0, os.path.abspath(".."))
1117
sys.path.insert(0, os.path.abspath("../pgmq_sqlalchemy"))
1218

19+
# Read version from pyproject.toml
20+
_pyproject_path = os.path.join(os.path.dirname(__file__), "..", "pyproject.toml")
21+
try:
22+
with open(_pyproject_path, "rb") as f:
23+
_pyproject_data = tomllib.load(f)
24+
_version = _pyproject_data["project"]["version"]
25+
except (FileNotFoundError, KeyError, OSError) as e:
26+
# Fallback to a default version if pyproject.toml is missing or invalid
27+
print(f"Warning: Could not read version from pyproject.toml: {e}")
28+
_version = "0.0.0"
29+
1330
extensions = [
1431
"sphinx_copybutton",
1532
"sphinx.ext.autodoc",
@@ -22,6 +39,13 @@
2239
project = "pgmq-sqlalchemy"
2340
copyright = f'2024-{time.strftime("%Y")}, the pgmq-sqlalchemy developers'
2441

42+
# Version information
43+
# The short X.Y version
44+
_version_parts = _version.split(".")
45+
version = ".".join(_version_parts[:2]) if len(_version_parts) >= 2 else _version
46+
# The full version, including alpha/beta/rc tags
47+
release = _version
48+
2549
source_suffix = {
2650
".rst": "restructuredtext",
2751
}

doc/requirements.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
sphinx
22
sphinx-rtd-theme
33
sphinx-copybutton
4-
SQLAlchemy
4+
SQLAlchemy
5+
tomli; python_version < "3.11"

pgmq_sqlalchemy/queue.py

Lines changed: 46 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import asyncio
2-
from typing import List, Optional
2+
import re
3+
from typing import List, Optional, Union
34

45
from sqlalchemy import create_engine
56
from sqlalchemy.orm import sessionmaker
@@ -190,6 +191,35 @@ async def _create():
190191
queue_name, unlogged, session=session, commit=commit
191192
)
192193

194+
def _validate_partition_interval(self, interval: Union[int, str]) -> str:
195+
"""Validate partition interval format.
196+
197+
Args:
198+
interval: Either an integer for numeric partitioning or a string for time-based partitioning
199+
(e.g., '1 day', '1 hour', '7 days')
200+
201+
Returns:
202+
The validated interval as a string
203+
204+
Raises:
205+
ValueError: If the interval format is invalid
206+
"""
207+
if isinstance(interval, int):
208+
if interval <= 0:
209+
raise ValueError("Numeric partition interval must be positive")
210+
return str(interval)
211+
212+
# Validate time-based interval format
213+
# Valid PostgreSQL interval formats: '1 day', '7 days', '1 hour', '1 month', etc.
214+
time_pattern = r"^\d+\s+(microsecond|millisecond|second|minute|hour|day|week|month|year)s?$"
215+
if not re.match(time_pattern, interval.strip(), re.IGNORECASE):
216+
raise ValueError(
217+
f"Invalid time-based partition interval: '{interval}'. "
218+
"Expected format: '<number> <unit>' where unit is one of: "
219+
"microsecond, millisecond, second, minute, hour, day, week, month, year"
220+
)
221+
return interval.strip()
222+
193223
def create_partitioned_queue(
194224
self,
195225
queue_name: str,
@@ -206,16 +236,23 @@ def create_partitioned_queue(
206236
207237
.. code-block:: python
208238
239+
# Numeric partitioning (by msg_id)
209240
pgmq_client.create_partitioned_queue('my_partitioned_queue', partition_interval=10000, retention_interval=100000)
210241
242+
# Time-based partitioning (by enqueued_at)
243+
pgmq_client.create_partitioned_queue('my_time_queue', partition_interval='1 day', retention_interval='7 days')
244+
211245
Args:
212246
queue_name (str): The name of the queue, should be less than 48 characters.
213-
partition_interval (int): Will create a new partition every ``partition_interval`` messages.
214-
retention_interval (int): The interval for retaining partitions. Any messages that have a `msg_id` less than ``max(msg_id)`` - ``retention_interval`` will be dropped.
247+
partition_interval (Union[int, str]): For numeric partitioning, the number of messages per partition.
248+
For time-based partitioning, a PostgreSQL interval string (e.g., '1 day', '1 hour').
249+
retention_interval (Union[int, str]): For numeric partitioning, messages with msg_id less than max(msg_id) - retention_interval will be dropped.
250+
For time-based partitioning, a PostgreSQL interval string (e.g., '7 days').
215251
216252
.. note::
217-
| Currently, only support for partitioning by **msg_id**.
218-
| Will add **time-based partitioning** in the future ``pgmq-sqlalchemy`` release.
253+
| Supports both **numeric** (by ``msg_id``) and **time-based** (by ``enqueued_at``) partitioning.
254+
| For time-based partitioning, use interval strings like '1 day', '1 hour', '7 days', etc.
255+
| For numeric partitioning, use integer values.
219256
220257
.. important::
221258
| You must make sure that the ``pg_partman`` extension already **installed** in the Postgres.
@@ -227,6 +264,10 @@ def create_partitioned_queue(
227264
# check if the pg_partman extension exists before creating a partitioned queue at runtime
228265
self._check_pg_partman_ext()
229266

267+
# Validate partition intervals
268+
partition_interval = self._validate_partition_interval(partition_interval)
269+
retention_interval = self._validate_partition_interval(retention_interval)
270+
230271
if self.is_async:
231272
if session is None:
232273

tests/test_queue.py

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import pytest
33
import time
44

5+
from sqlalchemy import text
56
from sqlalchemy.exc import ProgrammingError
67
from filelock import FileLock
78
from pgmq_sqlalchemy import PGMQueue
@@ -286,6 +287,12 @@ def test_set_vt_to_smaller_value(pgmq_setup_teardown: PGMQ_WITH_QUEUE):
286287
assert pgmq.read(queue_name) is not None
287288

288289

290+
def test_set_vt_not_exist(pgmq_setup_teardown: PGMQ_WITH_QUEUE):
291+
pgmq, queue_name = pgmq_setup_teardown
292+
msg_updated = pgmq.set_vt(queue_name, 999, 20)
293+
assert msg_updated is None
294+
295+
289296
def test_pop(pgmq_setup_teardown: PGMQ_WITH_QUEUE):
290297
pgmq, queue_name = pgmq_setup_teardown
291298
msg = MSG
@@ -427,3 +434,158 @@ def test_metrics_all_queues(pgmq_setup_teardown: PGMQ_WITH_QUEUE):
427434
assert queue_2.queue_length == 2
428435
assert queue_1.total_messages == 3
429436
assert queue_2.total_messages == 2
437+
438+
439+
# Tests for detach_archive method
440+
@pgmq_deps
441+
def test_detach_archive(pgmq_fixture, db_session):
442+
"""Test detach_archive method - detaches archive table from queue."""
443+
pgmq: PGMQueue = pgmq_fixture
444+
queue_name = f"test_queue_{uuid.uuid4().hex}"
445+
pgmq.create_queue(queue_name)
446+
msg = MSG
447+
msg_id = pgmq.send(queue_name, msg)
448+
pgmq.archive(queue_name, msg_id)
449+
450+
# Detach archive should not raise an error
451+
pgmq.detach_archive(queue_name)
452+
453+
# Read the archive to ensure it still exists after detaching
454+
archived_msg = pgmq.read_archive(queue_name)
455+
assert archived_msg is not None
456+
assert archived_msg.msg_id == msg_id
457+
458+
# Cleanup: Drop the archive and queue tables
459+
# After detaching, the archive is no longer part of the extension
460+
# We need to drop both tables manually by first removing them from the extension
461+
if pgmq.is_async:
462+
463+
async def cleanup():
464+
async with pgmq.session_maker() as session:
465+
# Drop archive table (already detached)
466+
await session.execute(
467+
text(f"DROP TABLE IF EXISTS pgmq.a_{queue_name} CASCADE;")
468+
)
469+
# Detach and drop queue table
470+
await session.execute(
471+
text(f"ALTER EXTENSION pgmq DROP TABLE pgmq.q_{queue_name};")
472+
)
473+
await session.execute(
474+
text(f"DROP TABLE IF EXISTS pgmq.q_{queue_name} CASCADE;")
475+
)
476+
await session.commit()
477+
478+
pgmq.loop.run_until_complete(cleanup())
479+
else:
480+
with pgmq.session_maker() as session:
481+
# Drop archive table (already detached)
482+
session.execute(text(f"DROP TABLE IF EXISTS pgmq.a_{queue_name} CASCADE;"))
483+
# Detach and drop queue table
484+
session.execute(
485+
text(f"ALTER EXTENSION pgmq DROP TABLE pgmq.q_{queue_name};")
486+
)
487+
session.execute(text(f"DROP TABLE IF EXISTS pgmq.q_{queue_name} CASCADE;"))
488+
session.commit()
489+
490+
491+
# Tests for read_archive methods
492+
def test_read_archive(pgmq_setup_teardown: PGMQ_WITH_QUEUE):
493+
pgmq, queue_name = pgmq_setup_teardown
494+
msg = MSG
495+
msg_ids = pgmq.send_batch(queue_name, [msg, msg, msg])
496+
pgmq.archive(queue_name, msg_ids[0])
497+
archived_msg = pgmq.read_archive(queue_name)
498+
assert archived_msg is not None
499+
assert archived_msg.msg_id == msg_ids[0]
500+
assert archived_msg.message == msg
501+
502+
503+
def test_read_archive_empty(pgmq_setup_teardown: PGMQ_WITH_QUEUE):
504+
pgmq, queue_name = pgmq_setup_teardown
505+
archived_msg = pgmq.read_archive(queue_name)
506+
assert archived_msg is None
507+
508+
509+
def test_read_archive_batch(pgmq_setup_teardown: PGMQ_WITH_QUEUE):
510+
pgmq, queue_name = pgmq_setup_teardown
511+
msg = MSG
512+
msg_ids = pgmq.send_batch(queue_name, [msg, msg, msg])
513+
pgmq.archive_batch(queue_name, msg_ids)
514+
archived_msgs = pgmq.read_archive_batch(queue_name, batch_size=10)
515+
assert archived_msgs is not None
516+
assert len(archived_msgs) == 3
517+
assert [m.msg_id for m in archived_msgs] == msg_ids
518+
for m in archived_msgs:
519+
assert m.message == msg
520+
521+
522+
def test_read_archive_batch_empty(pgmq_setup_teardown: PGMQ_WITH_QUEUE):
523+
pgmq, queue_name = pgmq_setup_teardown
524+
archived_msgs = pgmq.read_archive_batch(queue_name, batch_size=10)
525+
assert archived_msgs is None
526+
527+
528+
def test_read_archive_batch_limit(pgmq_setup_teardown: PGMQ_WITH_QUEUE):
529+
pgmq, queue_name = pgmq_setup_teardown
530+
msg = MSG
531+
msg_ids = pgmq.send_batch(queue_name, [msg, msg, msg, msg, msg])
532+
pgmq.archive_batch(queue_name, msg_ids)
533+
archived_msgs = pgmq.read_archive_batch(queue_name, batch_size=3)
534+
assert archived_msgs is not None
535+
assert len(archived_msgs) == 3
536+
537+
538+
# Tests for time-based partitioned queues
539+
@pgmq_deps
540+
def test_create_time_based_partitioned_queue(pgmq_fixture, db_session):
541+
pgmq: PGMQueue = pgmq_fixture
542+
queue_name = f"test_queue_{uuid.uuid4().hex}"
543+
pgmq.create_partitioned_queue(
544+
queue_name, partition_interval="1 day", retention_interval="7 days"
545+
)
546+
assert check_queue_exists(db_session, queue_name) is True
547+
548+
549+
@pgmq_deps
550+
def test_create_time_based_partitioned_queue_various_intervals(
551+
pgmq_fixture, db_session
552+
):
553+
pgmq: PGMQueue = pgmq_fixture
554+
555+
# Test with hour
556+
queue_name_hour = f"test_queue_{uuid.uuid4().hex}"
557+
pgmq.create_partitioned_queue(
558+
queue_name_hour, partition_interval="1 hour", retention_interval="24 hours"
559+
)
560+
assert check_queue_exists(db_session, queue_name_hour) is True
561+
562+
# Test with week
563+
queue_name_week = f"test_queue_{uuid.uuid4().hex}"
564+
pgmq.create_partitioned_queue(
565+
queue_name_week, partition_interval="1 week", retention_interval="4 weeks"
566+
)
567+
assert check_queue_exists(db_session, queue_name_week) is True
568+
569+
570+
@pgmq_deps
571+
def test_create_partitioned_queue_invalid_time_interval(pgmq_fixture):
572+
pgmq: PGMQueue = pgmq_fixture
573+
queue_name = f"test_queue_{uuid.uuid4().hex}"
574+
with pytest.raises(ValueError) as e:
575+
pgmq.create_partitioned_queue(
576+
queue_name,
577+
partition_interval="invalid interval",
578+
retention_interval="7 days",
579+
)
580+
assert "Invalid time-based partition interval" in str(e.value)
581+
582+
583+
@pgmq_deps
584+
def test_create_partitioned_queue_invalid_numeric_interval(pgmq_fixture):
585+
pgmq: PGMQueue = pgmq_fixture
586+
queue_name = f"test_queue_{uuid.uuid4().hex}"
587+
with pytest.raises(ValueError) as e:
588+
pgmq.create_partitioned_queue(
589+
queue_name, partition_interval=-100, retention_interval=100000
590+
)
591+
assert "Numeric partition interval must be positive" in str(e.value)

0 commit comments

Comments
 (0)