Skip to content

Commit 732070a

Browse files
Copilotjason810496
andcommitted
Apply ruff linting and formatting fixes
Co-authored-by: jason810496 <[email protected]>
1 parent 4d03e8c commit 732070a

3 files changed

Lines changed: 65 additions & 32 deletions

File tree

README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ See [`Development` on Online Document](https://pgmq-sqlalchemy.readthedocs.io/en
119119

120120
## TODO
121121

122-
- [ ] Add **time-based** partition option and validation to `create_partitioned_queue` method.
123-
- [ ] Read(single/batch) Archive Table ( `read_archive` method )
124-
- [ ] Detach Archive Table ( `detach_archive` method )
125-
- [ ] Add `set_vt` utils method.
122+
- [x] Add **time-based** partition option and validation to `create_partitioned_queue` method.
123+
- [x] Read(single/batch) Archive Table ( `read_archive` method )
124+
- [x] Detach Archive Table ( `detach_archive` method )
125+
- [x] Add `set_vt` utils method.

pgmq_sqlalchemy/queue.py

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -242,25 +242,25 @@ async def _create_partitioned_queue_async(
242242

243243
def _validate_partition_interval(self, interval: Union[int, str]) -> str:
244244
"""Validate partition interval format.
245-
245+
246246
Args:
247247
interval: Either an integer for numeric partitioning or a string for time-based partitioning
248248
(e.g., '1 day', '1 hour', '7 days')
249-
249+
250250
Returns:
251251
The validated interval as a string
252-
252+
253253
Raises:
254254
ValueError: If the interval format is invalid
255255
"""
256256
if isinstance(interval, int):
257257
if interval <= 0:
258258
raise ValueError("Numeric partition interval must be positive")
259259
return str(interval)
260-
260+
261261
# Validate time-based interval format
262262
# Valid PostgreSQL interval formats: '1 day', '7 days', '1 hour', '1 month', etc.
263-
time_pattern = r'^\d+\s+(microsecond|millisecond|second|minute|hour|day|week|month|year)s?$'
263+
time_pattern = r"^\d+\s+(microsecond|millisecond|second|minute|hour|day|week|month|year)s?$"
264264
if not re.match(time_pattern, interval.strip(), re.IGNORECASE):
265265
raise ValueError(
266266
f"Invalid time-based partition interval: '{interval}'. "
@@ -284,7 +284,7 @@ def create_partitioned_queue(
284284
285285
# Numeric partitioning (by msg_id)
286286
pgmq_client.create_partitioned_queue('my_partitioned_queue', partition_interval=10000, retention_interval=100000)
287-
287+
288288
# Time-based partitioning (by enqueued_at)
289289
pgmq_client.create_partitioned_queue('my_time_queue', partition_interval='1 day', retention_interval='7 days')
290290
@@ -309,15 +309,21 @@ def create_partitioned_queue(
309309
"""
310310
# check if the pg_partman extension exists before creating a partitioned queue at runtime
311311
self._check_pg_partman_ext()
312-
312+
313313
# Validate partition intervals
314-
validated_partition_interval = self._validate_partition_interval(partition_interval)
315-
validated_retention_interval = self._validate_partition_interval(retention_interval)
314+
validated_partition_interval = self._validate_partition_interval(
315+
partition_interval
316+
)
317+
validated_retention_interval = self._validate_partition_interval(
318+
retention_interval
319+
)
316320

317321
if self.is_async:
318322
return self.loop.run_until_complete(
319323
self._create_partitioned_queue_async(
320-
queue_name, validated_partition_interval, validated_retention_interval
324+
queue_name,
325+
validated_partition_interval,
326+
validated_retention_interval,
321327
)
322328
)
323329
return self._create_partitioned_queue_sync(
@@ -1460,7 +1466,9 @@ def _read_archive_sync(self, queue_name: str) -> Optional[Message]:
14601466
"""Read a single message from the archive table synchronously."""
14611467
with self.session_maker() as session:
14621468
row = session.execute(
1463-
text(f"select msg_id, read_ct, enqueued_at, vt, message from pgmq.a_{queue_name} limit 1;")
1469+
text(
1470+
f"select msg_id, read_ct, enqueued_at, vt, message from pgmq.a_{queue_name} limit 1;"
1471+
)
14641472
).fetchone()
14651473
session.commit()
14661474
if row is None:
@@ -1474,7 +1482,9 @@ async def _read_archive_async(self, queue_name: str) -> Optional[Message]:
14741482
async with self.session_maker() as session:
14751483
row = (
14761484
await session.execute(
1477-
text(f"select msg_id, read_ct, enqueued_at, vt, message from pgmq.a_{queue_name} limit 1;")
1485+
text(
1486+
f"select msg_id, read_ct, enqueued_at, vt, message from pgmq.a_{queue_name} limit 1;"
1487+
)
14781488
)
14791489
).fetchone()
14801490
await session.commit()

tests/test_queue.py

Lines changed: 39 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -459,35 +459,44 @@ def test_detach_archive(pgmq_fixture, db_session):
459459
msg = MSG
460460
msg_id = pgmq.send(queue_name, msg)
461461
pgmq.archive(queue_name, msg_id)
462-
462+
463463
# Detach archive should not raise an error
464464
pgmq.detach_archive(queue_name)
465-
465+
466466
# Read the archive to ensure it still exists after detaching
467467
archived_msg = pgmq.read_archive(queue_name)
468468
assert archived_msg is not None
469469
assert archived_msg.msg_id == msg_id
470-
470+
471471
# Cleanup: Drop the archive and queue tables
472472
# After detaching, the archive is no longer part of the extension
473473
# We need to drop both tables manually by first removing them from the extension
474474
if pgmq.is_async:
475-
import asyncio
475+
476476
async def cleanup():
477477
async with pgmq.session_maker() as session:
478478
# Drop archive table (already detached)
479-
await session.execute(text(f"DROP TABLE IF EXISTS pgmq.a_{queue_name} CASCADE;"))
479+
await session.execute(
480+
text(f"DROP TABLE IF EXISTS pgmq.a_{queue_name} CASCADE;")
481+
)
480482
# Detach and drop queue table
481-
await session.execute(text(f"ALTER EXTENSION pgmq DROP TABLE pgmq.q_{queue_name};"))
482-
await session.execute(text(f"DROP TABLE IF EXISTS pgmq.q_{queue_name} CASCADE;"))
483+
await session.execute(
484+
text(f"ALTER EXTENSION pgmq DROP TABLE pgmq.q_{queue_name};")
485+
)
486+
await session.execute(
487+
text(f"DROP TABLE IF EXISTS pgmq.q_{queue_name} CASCADE;")
488+
)
483489
await session.commit()
490+
484491
pgmq.loop.run_until_complete(cleanup())
485492
else:
486493
with pgmq.session_maker() as session:
487494
# Drop archive table (already detached)
488495
session.execute(text(f"DROP TABLE IF EXISTS pgmq.a_{queue_name} CASCADE;"))
489496
# Detach and drop queue table
490-
session.execute(text(f"ALTER EXTENSION pgmq DROP TABLE pgmq.q_{queue_name};"))
497+
session.execute(
498+
text(f"ALTER EXTENSION pgmq DROP TABLE pgmq.q_{queue_name};")
499+
)
491500
session.execute(text(f"DROP TABLE IF EXISTS pgmq.q_{queue_name} CASCADE;"))
492501
session.commit()
493502

@@ -544,22 +553,30 @@ def test_read_archive_batch_limit(pgmq_setup_teardown: PGMQ_WITH_QUEUE):
544553
def test_create_time_based_partitioned_queue(pgmq_fixture, db_session):
545554
pgmq: PGMQueue = pgmq_fixture
546555
queue_name = f"test_queue_{uuid.uuid4().hex}"
547-
pgmq.create_partitioned_queue(queue_name, partition_interval='1 day', retention_interval='7 days')
556+
pgmq.create_partitioned_queue(
557+
queue_name, partition_interval="1 day", retention_interval="7 days"
558+
)
548559
assert check_queue_exists(db_session, queue_name) is True
549560

550561

551562
@pgmq_deps
552-
def test_create_time_based_partitioned_queue_various_intervals(pgmq_fixture, db_session):
563+
def test_create_time_based_partitioned_queue_various_intervals(
564+
pgmq_fixture, db_session
565+
):
553566
pgmq: PGMQueue = pgmq_fixture
554-
567+
555568
# Test with hour
556569
queue_name_hour = f"test_queue_{uuid.uuid4().hex}"
557-
pgmq.create_partitioned_queue(queue_name_hour, partition_interval='1 hour', retention_interval='24 hours')
570+
pgmq.create_partitioned_queue(
571+
queue_name_hour, partition_interval="1 hour", retention_interval="24 hours"
572+
)
558573
assert check_queue_exists(db_session, queue_name_hour) is True
559-
574+
560575
# Test with week
561576
queue_name_week = f"test_queue_{uuid.uuid4().hex}"
562-
pgmq.create_partitioned_queue(queue_name_week, partition_interval='1 week', retention_interval='4 weeks')
577+
pgmq.create_partitioned_queue(
578+
queue_name_week, partition_interval="1 week", retention_interval="4 weeks"
579+
)
563580
assert check_queue_exists(db_session, queue_name_week) is True
564581

565582

@@ -568,7 +585,11 @@ def test_create_partitioned_queue_invalid_time_interval(pgmq_fixture):
568585
pgmq: PGMQueue = pgmq_fixture
569586
queue_name = f"test_queue_{uuid.uuid4().hex}"
570587
with pytest.raises(ValueError) as e:
571-
pgmq.create_partitioned_queue(queue_name, partition_interval='invalid interval', retention_interval='7 days')
588+
pgmq.create_partitioned_queue(
589+
queue_name,
590+
partition_interval="invalid interval",
591+
retention_interval="7 days",
592+
)
572593
assert "Invalid time-based partition interval" in str(e.value)
573594

574595

@@ -577,5 +598,7 @@ def test_create_partitioned_queue_invalid_numeric_interval(pgmq_fixture):
577598
pgmq: PGMQueue = pgmq_fixture
578599
queue_name = f"test_queue_{uuid.uuid4().hex}"
579600
with pytest.raises(ValueError) as e:
580-
pgmq.create_partitioned_queue(queue_name, partition_interval=-100, retention_interval=100000)
601+
pgmq.create_partitioned_queue(
602+
queue_name, partition_interval=-100, retention_interval=100000
603+
)
581604
assert "Numeric partition interval must be positive" in str(e.value)

0 commit comments

Comments
 (0)