Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ jobs:
export COVERAGE_FILE=".coverage.py${{ matrix.python-version }}.${{ matrix.driver }}"

uv run pytest tests --driver=${{ matrix.driver }} --db-name=${DB_NAME} --cov=pgmq_sqlalchemy.queue --cov-report=xml:coverage-py${{ matrix.python-version }}-${{ matrix.driver }}.xml
continue-on-error: true
- name: Upload coverage artifact
if: always()
uses: actions/upload-artifact@v4
with:
name: coverage-py${{ matrix.python-version }}-${{ matrix.driver }}
Expand Down
112 changes: 51 additions & 61 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,59 +29,47 @@ def pytest_addoption(parser):
)


def pytest_collection_modifyitems(config, items):
"""Modify test collection to skip tests not matching the --driver option."""
driver_from_cli = config.getoption("--driver")

if not driver_from_cli:
# No driver specified, run all tests
return

# Determine if the specified driver is sync or async
is_async_driver = driver_from_cli in ASYNC_DRIVERS
is_sync_driver = driver_from_cli in SYNC_DRIVERS

if not is_async_driver and not is_sync_driver:
# Invalid driver
return

# Filter out tests that don't match the specified driver
skip_marker = pytest.mark.skip(reason=f"Test uses different driver (--driver={driver_from_cli} specified)")

for item in items:
# Parse the test name to extract driver info
# Format is usually: test_name[fixture_name-driver_name]
item_id = item.nodeid

# Check if the test has a specific driver in its ID
# Extract driver name from test ID (e.g., test_name[pgmq_by_dsn-psycopg2])
if '[' in item_id and ']' in item_id:
# Extract the part between brackets
bracket_content = item_id[item_id.find('[')+1:item_id.find(']')]

# Check for async fixtures by name (more precise than string matching)
is_async_test = any(async_fixture in bracket_content for async_fixture in ASYNC_FIXTURE_NAMES)

# Skip async tests if sync driver specified
if is_sync_driver and is_async_test:
item.add_marker(skip_marker)
continue

# Skip sync tests if async driver specified
if is_async_driver and not is_async_test:
item.add_marker(skip_marker)
continue

# Check if any known driver is in the bracket content
# Sort drivers by length (descending) to match longer names first (e.g., psycopg2cffi before psycopg2)
sorted_drivers = sorted(SYNC_DRIVERS + ASYNC_DRIVERS, key=len, reverse=True)
for driver in sorted_drivers:
if f"-{driver}]" in item_id or f"-{driver}-" in bracket_content:
# This test is for a specific driver
if driver != driver_from_cli:
# Skip if it doesn't match the CLI driver
item.add_marker(skip_marker)
break
def pytest_generate_tests(metafunc):
"""
Dynamically generate test parametrization based on CLI options.

This allows us to parametrize fixtures based on the --driver option.
"""
if "pgmq_all_variants" in metafunc.fixturenames:
driver_from_cli = metafunc.config.getoption("--driver")

# Define sync and async fixture variants
sync_fixtures = [
'pgmq_by_dsn',
'pgmq_by_engine',
'pgmq_by_session_maker',
'pgmq_by_dsn_and_engine',
'pgmq_by_dsn_and_session_maker',
]

async_fixtures = [
'pgmq_by_async_dsn',
'pgmq_by_async_engine',
'pgmq_by_async_session_maker',
]

# Determine which fixtures to use
if not driver_from_cli:
# No driver specified, use all fixtures
fixture_params = sync_fixtures + async_fixtures
elif driver_from_cli in ASYNC_DRIVERS:
# Async driver specified
fixture_params = async_fixtures
else:
# Sync driver specified
fixture_params = sync_fixtures

# Parametrize the test
metafunc.parametrize(
"pgmq_all_variants",
fixture_params,
indirect=True
)


@pytest.fixture(scope="module")
Expand Down Expand Up @@ -113,7 +101,7 @@ def get_sa_db(request):
return os.getenv("SQLALCHEMY_DB", "postgres")


@pytest.fixture(scope="function", params=SYNC_DRIVERS)
@pytest.fixture(scope="function")
def get_dsn(
request: FixtureRequest,
get_sa_host,
Expand All @@ -122,19 +110,20 @@ def get_dsn(
get_sa_password,
get_sa_db,
):
"""Get DSN for sync drivers."""
"""Get DSN for sync drivers based on CLI option."""
driver_from_cli = request.config.getoption("--driver")

# Use CLI driver if specified, otherwise use parametrized driver
# Use CLI driver if specified and it's a sync driver
if driver_from_cli and driver_from_cli in SYNC_DRIVERS:
driver = driver_from_cli
else:
driver = request.param
# Default to first sync driver if no CLI option or invalid
driver = SYNC_DRIVERS[0]

return f"postgresql+{driver}://{get_sa_user}:{get_sa_password}@{get_sa_host}:{get_sa_port}/{get_sa_db}"


@pytest.fixture(scope="function", params=ASYNC_DRIVERS)
@pytest.fixture(scope="function")
def get_async_dsn(
request: FixtureRequest,
get_sa_host,
Expand All @@ -143,14 +132,15 @@ def get_async_dsn(
get_sa_password,
get_sa_db,
):
"""Get DSN for async drivers."""
"""Get DSN for async drivers based on CLI option."""
driver_from_cli = request.config.getoption("--driver")

# Use CLI driver if specified, otherwise use parametrized driver
# Use CLI driver if specified and it's an async driver
if driver_from_cli and driver_from_cli in ASYNC_DRIVERS:
driver = driver_from_cli
else:
driver = request.param
# Default to first async driver if no CLI option or invalid
driver = ASYNC_DRIVERS[0]

return f"postgresql+{driver}://{get_sa_user}:{get_sa_password}@{get_sa_host}:{get_sa_port}/{get_sa_db}"

Expand Down
74 changes: 28 additions & 46 deletions tests/fixture_deps.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,64 +6,47 @@
from pgmq_sqlalchemy import PGMQueue
from tests._utils import check_queue_exists

LAZY_FIXTURES = [
pytest.lazy_fixture("pgmq_by_dsn"),
pytest.lazy_fixture("pgmq_by_async_dsn"),
pytest.lazy_fixture("pgmq_by_engine"),
pytest.lazy_fixture("pgmq_by_async_engine"),
pytest.lazy_fixture("pgmq_by_session_maker"),
pytest.lazy_fixture("pgmq_by_async_session_maker"),
pytest.lazy_fixture("pgmq_by_dsn_and_engine"),
pytest.lazy_fixture("pgmq_by_dsn_and_session_maker"),
]

pgmq_deps = pytest.mark.parametrize(
"pgmq_fixture",
LAZY_FIXTURES,
)
"""
Decorator that allows a test function to receive a PGMQueue instance as a parameter.

Usage:

```
from tests.fixture_deps import pgmq_deps

@pgmq_deps
def test_create_queue(pgmq_fixture,db_session):
pgmq:PGMQueue = pgmq_fixture
# test code here
```
PGMQ_WITH_QUEUE = Tuple[PGMQueue, str]

Note:
`pytest` version should < 8.0.0,
or `pytest-lazy-fixture` will not work
ref: https://github.com/TvoroG/pytest-lazy-fixture/issues/65
"""

PGMQ_WITH_QUEUE = Tuple[PGMQueue, str]
@pytest.fixture(scope="function")
def pgmq_all_variants(request: pytest.FixtureRequest) -> PGMQueue:
"""
Fixture that parametrizes tests across all appropriate PGMQueue initialization methods.

When --driver is specified, only fixtures matching that driver type (sync/async) are used.
Without --driver, all fixtures are used.

The parametrization is handled by pytest_generate_tests in conftest.py.

Usage:
def test_something(pgmq_all_variants):
pgmq: PGMQueue = pgmq_all_variants
# test code here
"""
# The param is set by pytest_generate_tests via indirect parametrization
return request.getfixturevalue(request.param)


@pytest.fixture(scope="function", params=LAZY_FIXTURES)
def pgmq_setup_teardown(request: pytest.FixtureRequest, db_session) -> PGMQ_WITH_QUEUE:
@pytest.fixture(scope="function")
def pgmq_setup_teardown(pgmq_all_variants: PGMQueue, db_session) -> PGMQ_WITH_QUEUE:
"""
Fixture that provides a PGMQueue instance with a unique temporary queue with setup and teardown.

Args:
request (pytest.FixtureRequest): The pytest fixture request object.
pgmq_all_variants (PGMQueue): The PGMQueue instance (parametrized across all variants).
db_session (sqlalchemy.orm.Session): The SQLAlchemy session object.

Yields:
tuple[PGMQueue,str]: A tuple containing the PGMQueue instance and the name of the temporary queue.

Usage:
@pgmq_setup_teardown
def test_something(pgmq_setup_teardown):
pgmq, queue_name = pgmq_setup_teardown
# test code here

"""
pgmq = request.param
pgmq = pgmq_all_variants
queue_name = f"test_queue_{uuid.uuid4().hex}"
assert check_queue_exists(db_session, queue_name) is False
pgmq.create_queue(queue_name)
Expand All @@ -73,28 +56,27 @@ def test_something(pgmq_setup_teardown):
assert check_queue_exists(db_session, queue_name) is False


@pytest.fixture(scope="function", params=LAZY_FIXTURES)
@pytest.fixture(scope="function")
def pgmq_partitioned_setup_teardown(
request: pytest.FixtureRequest, db_session
pgmq_all_variants: PGMQueue, db_session
) -> PGMQ_WITH_QUEUE:
"""
Fixture that provides a PGMQueue instance with a unique temporary partitioned queue with setup and teardown.

Args:
request (pytest.FixtureRequest): The pytest fixture request object.
pgmq_all_variants (PGMQueue): The PGMQueue instance (parametrized across all variants).
db_session (sqlalchemy.orm.Session): The SQLAlchemy session object.

Yields:
tuple[PGMQueue,str]: A tuple containing the PGMQueue instance and the name of the temporary queue.

Usage:
@pgmq_setup_teardown
def test_something(pgmq_setup_teardown):
pgmq, queue_name = pgmq_setup_teardown
def test_something(pgmq_partitioned_setup_teardown):
pgmq, queue_name = pgmq_partitioned_setup_teardown
# test code here

"""
pgmq: PGMQueue = request.param
pgmq: PGMQueue = pgmq_all_variants
queue_name = f"test_queue_{uuid.uuid4().hex}"
assert check_queue_exists(db_session, queue_name) is False
pgmq.create_partitioned_queue(queue_name)
Expand Down
7 changes: 3 additions & 4 deletions tests/test_construct_pgmq.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import pytest
from pgmq_sqlalchemy import PGMQueue

from tests.fixture_deps import pgmq_deps
from tests.fixture_deps import pgmq_all_variants


@pgmq_deps
def test_construct_pgmq(pgmq_fixture):
pgmq: PGMQueue = pgmq_fixture
def test_construct_pgmq(pgmq_all_variants):
pgmq: PGMQueue = pgmq_all_variants
assert pgmq is not None


Expand Down
Loading