Skip to content

Commit f01f52c

Browse files
committed
Move _validate_partition_interval to operations
1 parent b6e7c64 commit f01f52c

2 files changed

Lines changed: 41 additions & 36 deletions

File tree

pgmq_sqlalchemy/operation.py

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
from typing import List, Optional, Tuple, Dict, Any
1+
from typing import List, Optional, Tuple, Dict, Any, Union
2+
import re
23

34
from sqlalchemy import text
45
from sqlalchemy.orm import Session
@@ -28,6 +29,36 @@ def _get_check_pg_partman_ext_statement() -> Tuple[str, Dict[str, Any]]:
2829
"""Get statement and params for checking/creating pg_partman extension."""
2930
return "create extension if not exists pg_partman cascade;", {}
3031

32+
@staticmethod
33+
def _validate_partition_interval(interval: Union[int, str]) -> str:
34+
"""Validate partition interval format.
35+
36+
Args:
37+
interval: Either an integer for numeric partitioning or a string for time-based partitioning
38+
(e.g., '1 day', '1 hour', '7 days')
39+
40+
Returns:
41+
The validated interval as a string
42+
43+
Raises:
44+
ValueError: If the interval format is invalid
45+
"""
46+
if isinstance(interval, int):
47+
if interval <= 0:
48+
raise ValueError("Numeric partition interval must be positive")
49+
return str(interval)
50+
51+
# Validate time-based interval format
52+
# Valid PostgreSQL interval formats: '1 day', '7 days', '1 hour', '1 month', etc.
53+
time_pattern = r"^\d+\s+(microsecond|millisecond|second|minute|hour|day|week|month|year)s?$"
54+
if not re.match(time_pattern, interval.strip(), re.IGNORECASE):
55+
raise ValueError(
56+
f"Invalid time-based partition interval: '{interval}'. "
57+
"Expected format: '<number> <unit>' where unit is one of: "
58+
"microsecond, millisecond, second, minute, hour, day, week, month, year"
59+
)
60+
return interval.strip()
61+
3162
@staticmethod
3263
def _get_create_queue_statement(
3364
queue_name: str, unlogged: bool
@@ -349,6 +380,14 @@ async def create_partitioned_queue_async(
349380
session: Async SQLAlchemy session.
350381
commit: Whether to commit the transaction.
351382
"""
383+
# Validate partition intervals
384+
partition_interval = PGMQOperation._validate_partition_interval(
385+
partition_interval
386+
)
387+
retention_interval = PGMQOperation._validate_partition_interval(
388+
retention_interval
389+
)
390+
352391
stmt, params = PGMQOperation._get_create_partitioned_queue_statement(
353392
queue_name, partition_interval, retention_interval
354393
)

pgmq_sqlalchemy/queue.py

Lines changed: 1 addition & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import asyncio
2-
import re
3-
from typing import List, Optional, Union
2+
from typing import List, Optional
43

54
from sqlalchemy import create_engine
65
from sqlalchemy.orm import sessionmaker
@@ -191,35 +190,6 @@ async def _create():
191190
queue_name, unlogged, session=session, commit=commit
192191
)
193192

194-
def _validate_partition_interval(self, interval: Union[int, str]) -> str:
195-
"""Validate partition interval format.
196-
197-
Args:
198-
interval: Either an integer for numeric partitioning or a string for time-based partitioning
199-
(e.g., '1 day', '1 hour', '7 days')
200-
201-
Returns:
202-
The validated interval as a string
203-
204-
Raises:
205-
ValueError: If the interval format is invalid
206-
"""
207-
if isinstance(interval, int):
208-
if interval <= 0:
209-
raise ValueError("Numeric partition interval must be positive")
210-
return str(interval)
211-
212-
# Validate time-based interval format
213-
# Valid PostgreSQL interval formats: '1 day', '7 days', '1 hour', '1 month', etc.
214-
time_pattern = r"^\d+\s+(microsecond|millisecond|second|minute|hour|day|week|month|year)s?$"
215-
if not re.match(time_pattern, interval.strip(), re.IGNORECASE):
216-
raise ValueError(
217-
f"Invalid time-based partition interval: '{interval}'. "
218-
"Expected format: '<number> <unit>' where unit is one of: "
219-
"microsecond, millisecond, second, minute, hour, day, week, month, year"
220-
)
221-
return interval.strip()
222-
223193
def create_partitioned_queue(
224194
self,
225195
queue_name: str,
@@ -264,10 +234,6 @@ def create_partitioned_queue(
264234
# check if the pg_partman extension exists before creating a partitioned queue at runtime
265235
self._check_pg_partman_ext()
266236

267-
# Validate partition intervals
268-
partition_interval = self._validate_partition_interval(partition_interval)
269-
retention_interval = self._validate_partition_interval(retention_interval)
270-
271237
if self.is_async:
272238
if session is None:
273239

0 commit comments

Comments
 (0)