Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions api/.env-ci
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
DATABASE_URL=postgresql://postgres:password@localhost:5432/flagsmith
ANALYTICS_DATABASE_URL=postgresql://postgres:password@localhost:5433/analytics
CLICKHOUSE_HOST=localhost
CLICKHOUSE_PORT=9000
CLICKHOUSE_USER=flagsmith
CLICKHOUSE_PASSWORD=password
CLICKHOUSE_DATABASE=default
PYTEST_ADDOPTS=--cov . --cov-report xml -n auto --ci
COVERAGE_CORE=sysmon
5 changes: 5 additions & 0 deletions api/.env-local
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
DATABASE_URL=postgresql://postgres:password@localhost:5432/flagsmith
ANALYTICS_DATABASE_URL=postgresql://postgres:password@localhost:5433/analytics
CLICKHOUSE_HOST=localhost
CLICKHOUSE_PORT=9000
CLICKHOUSE_USER=flagsmith
CLICKHOUSE_PASSWORD=password
CLICKHOUSE_DATABASE=default
DJANGO_SETTINGS_MODULE=app.settings.local
PYTEST_ADDOPTS=--cov . --cov-report html -n auto
1 change: 1 addition & 0 deletions api/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ docker-build:
wait-for-db:
uv run python manage.py waitfordb
uv run python manage.py waitfordb --database analytics
uv run python manage.py waitfordb --database clickhouse

.PHONY: test
test: docker-up wait-for-db
Expand Down
2 changes: 1 addition & 1 deletion api/app/settings/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -1483,7 +1483,7 @@

if CLICKHOUSE_ENABLED:
_clickhouse_db: dict[str, Any] = {
"ENGINE": "clickhouse_backend.backend",
"ENGINE": "core.db_backends.clickhouse",
"HOST": CLICKHOUSE_HOST,
"PORT": CLICKHOUSE_PORT,
"USER": CLICKHOUSE_USER,
Expand Down
11 changes: 10 additions & 1 deletion api/app/settings/test.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
from app.settings.common import * # noqa
from app.settings.common import INSTALLED_APPS, LDAP_INSTALLED, REST_FRAMEWORK
from app.settings.common import (
DATABASES,
INSTALLED_APPS,
LDAP_INSTALLED,
REST_FRAMEWORK,
)

# TODO: remove once permissions are an enum --
# https://github.com/Flagsmith/flagsmith/issues/7850
DATABASES["default"]["ENGINE"] = "core.db_backends.postgresql"

if LDAP_INSTALLED:
INSTALLED_APPS = INSTALLED_APPS + ["flagsmith_ldap"]
Expand Down
Empty file.
Empty file.
9 changes: 9 additions & 0 deletions api/core/db_backends/clickhouse/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from clickhouse_backend.backend.base import (
DatabaseWrapper as ClickHouseDatabaseWrapper,
)

from core.db_backends.clickhouse.creation import DatabaseCreation


class DatabaseWrapper(ClickHouseDatabaseWrapper): # type: ignore[misc]
creation_class = DatabaseCreation
43 changes: 43 additions & 0 deletions api/core/db_backends/clickhouse/creation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from clickhouse_backend.backend.creation import (
DatabaseCreation as ClickHouseDatabaseCreation,
)


class DatabaseCreation(ClickHouseDatabaseCreation): # type: ignore[misc]
"""ClickHouse test-database creation with parallel-clone support.

ClickHouse has no transactional rollback, so each xdist worker needs its own physical
database for isolation, mirroring how the Postgres backend clones the
primary test database per worker.

TODO Remove this subclass once https://github.com/jayvynl/django-clickhouse-backend/issues/167
ships.
"""

def _clone_test_db(
self,
suffix: str,
verbosity: int,
keepdb: bool = False,
) -> None:
source_database_name: str = self.connection.settings_dict["NAME"]
target_database_name: str = self.get_test_db_clone_settings(suffix)["NAME"]
quote_name = self.connection.ops.quote_name

with self._nodb_cursor() as cursor:
cursor.execute(
f"DROP DATABASE IF EXISTS {quote_name(target_database_name)} SYNC"
)
cursor.execute(f"CREATE DATABASE {quote_name(target_database_name)}")
# Recreate every source table as an empty copy; `CREATE TABLE ... AS`
# copies the engine and schema without any rows.
cursor.execute(
"SELECT name FROM system.tables WHERE database = %s",
[source_database_name],
)
for (table_name,) in cursor.fetchall():
cursor.execute(
f"CREATE TABLE "
f"{quote_name(target_database_name)}.{quote_name(table_name)} "
f"AS {quote_name(source_database_name)}.{quote_name(table_name)}"
)
Empty file.
9 changes: 9 additions & 0 deletions api/core/db_backends/postgresql/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from django.db.backends.postgresql.base import (
DatabaseWrapper as PostgresDatabaseWrapper,
)

from core.db_backends.postgresql.operations import DatabaseOperations


class DatabaseWrapper(PostgresDatabaseWrapper):
ops_class = DatabaseOperations
29 changes: 29 additions & 0 deletions api/core/db_backends/postgresql/operations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from collections.abc import Sequence
from typing import Any

from django.db.backends.postgresql.operations import (
DatabaseOperations as PostgresDatabaseOperations,
)

# Tables holding migration-seeded reference data that must survive `flush`.
#
# TODO: remove this backend once https://github.com/Flagsmith/flagsmith/issues/7850 is closed
PRESERVED_TABLES = frozenset({"permissions_permissionmodel"})


class DatabaseOperations(PostgresDatabaseOperations):
def sql_flush(
self,
style: Any,
tables: Sequence[str],
*,
reset_sequences: bool = False,
allow_cascade: bool = False,
) -> list[str]:
retained = [table for table in tables if table not in PRESERVED_TABLES]
return super().sql_flush(
style,
retained,
reset_sequences=reset_sequences,
allow_cascade=allow_cascade,
)
3 changes: 3 additions & 0 deletions api/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,9 @@ addopts = [
]
console_output_style = 'count'
log_level = 'INFO'
markers = [
"clickhouse: test requires a live ClickHouse database (see the clickhouse_db fixture)",
]

[tool.mypy]
plugins = ["mypy_django_plugin.main"]
Expand Down
1 change: 0 additions & 1 deletion api/segment_membership/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ def open_clickhouse_cursor(
"""
with connections["clickhouse"].cursor() as cursor:
if log_comment:
# Underlying clickhouse-driver cursor exposes set_settings(...).
cursor.cursor.set_settings({"log_comment": log_comment})
yield cursor

Expand Down
25 changes: 25 additions & 0 deletions api/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from django.conf import settings
from django.contrib.contenttypes.models import ContentType
from django.core.cache import caches
from django.db import connections
from django.db.backends.base.creation import TEST_DATABASE_PREFIX
from django.test.utils import setup_databases
from django_test_migrations.migrator import Migrator
Expand Down Expand Up @@ -1344,6 +1345,30 @@ def clear_content_type_cache() -> typing.Generator[None, None, None]:
ContentType.objects.clear_cache()


@pytest.fixture
def clickhouse_db(
request: pytest.FixtureRequest, settings: SettingsWrapper
) -> typing.Generator[None, None, None]:
"""
Opt a test into a live ClickHouse database.

Skips when no `clickhouse` alias is configured (i.e. ClickHouse isn't
running). ClickHouse has no transactional rollback, so -- unlike the
Postgres-backed `db` fixture -- we can't rely on Django wrapping the test
in a transaction. We truncate every table on teardown instead to isolate
tests from one another.
"""
if "clickhouse" not in settings.DATABASES: # pragma: no cover
pytest.skip("No ClickHouse database configured, skipping")
request.applymarker(pytest.mark.django_db(databases=["default", "clickhouse"]))
request.getfixturevalue("db") # Resolve `db` only after injecting the clickhouse db
yield
connection = connections["clickhouse"]
with connection.cursor() as cursor:
for table_name in connection.introspection.table_names(cursor):
cursor.execute(f"TRUNCATE TABLE {connection.ops.quote_name(table_name)}")


@pytest.fixture
def use_analytics_db(request: pytest.FixtureRequest, settings: SettingsWrapper) -> None:
"""
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
import uuid

import pytest
from django.db import connections
from pytest_django.fixtures import SettingsWrapper
from pytest_mock import MockerFixture
from pytest_structlog import StructuredLogCapture

from projects.models import Project
from segment_membership.models import SegmentMembershipCount
from segment_membership.services import (
compute_segment_counts_for_project,
open_clickhouse_cursor,
)
from segment_membership.tasks import (
backfill_identities_to_clickhouse,
refresh_project_segment_counts,
)
from tests.types import EnableFeaturesFixture


@pytest.fixture
def seeded_identities(clickhouse_db: None, environment_api_key: str) -> None:
"""Seed three IDENTITIES rows for the environment: two match the `segment`
fixture's `foo EQUAL bar` condition, one does not."""
rows = [
(environment_api_key, "alice", "alice_key", {"foo": "bar"}),
(environment_api_key, "bob", "bob_key", {"foo": "bar"}),
(environment_api_key, "carol", "carol_key", {"foo": "baz"}),
]
with connections["clickhouse"].cursor() as cursor:
# Django's CursorWrapper stub forbids dicts in the params sequence;
# clickhouse-driver accepts them as JSON-column payloads.
cursor.executemany(
"INSERT INTO IDENTITIES (environment_id, identifier, identity_key, traits) VALUES",
rows, # type: ignore[arg-type]
)


@pytest.mark.clickhouse
def test_compute_segment_counts_for_project__matching_identities__counts_real_rows(
seeded_identities: None,
project: int,
environment: int,
segment: int,
) -> None:
# Given the `segment` fixture (matches `foo=bar`) and the seeded identities

# When
with open_clickhouse_cursor() as cursor:
result = compute_segment_counts_for_project(
Project.objects.get(pk=project), cursor
)

# Then only the two matching identities are counted, for the right env
[membership] = result
assert membership.segment_id == segment
assert membership.environment_id == environment
assert membership.count == 2


@pytest.mark.clickhouse
def test_refresh_project_segment_counts__matching_identities__upserts_real_counts(
seeded_identities: None,
settings: SettingsWrapper,
project: int,
environment: int,
segment: int,
enable_features: EnableFeaturesFixture,
) -> None:
# Given the org has segment-membership inspection on and ClickHouse enabled
enable_features("segment_membership_inspection")
settings.CLICKHOUSE_ENABLED = True

# When the refresh task runs end-to-end against real ClickHouse
refresh_project_segment_counts(project)

# Then the (segment, environment) count row reflects the two matches
membership = SegmentMembershipCount.objects.get(
segment_id=segment, environment_id=environment
)
assert membership.count == 2
assert membership.last_synced_at is not None


@pytest.mark.clickhouse
def test_backfill_identities_to_clickhouse__happy_path__rows_land_in_clickhouse(
clickhouse_db: None,
settings: SettingsWrapper,
mocker: MockerFixture,
project: int,
environment: int,
environment_api_key: str,
segment: int,
enable_features: EnableFeaturesFixture,
log: StructuredLogCapture,
) -> None:
# Given segment-membership inspection is on, ClickHouse is enabled, and
# Dynamo yields two identities for the environment
enable_features("segment_membership_inspection")
settings.CLICKHOUSE_ENABLED = True
refresh_dispatch = mocker.patch(
"segment_membership.tasks.refresh_project_segment_counts"
)
wrapper = mocker.MagicMock(is_enabled=True)
wrapper.iter_all_items_paginated.return_value = iter(
[
{
"identity_uuid": "f47ac10b-58cc-4372-a567-0e02b2c3d479",
"identifier": "a",
"composite_key": "k1",
"environment_api_key": environment_api_key,
"created_date": "2026-05-08T00:00:00Z",
"identity_traits": [{"trait_key": "foo", "trait_value": "bar"}],
},
{
"identity_uuid": "550e8400-e29b-41d4-a716-446655440000",
"identifier": "b",
"composite_key": "k2",
"environment_api_key": environment_api_key,
"created_date": "2026-05-08T00:00:00Z",
"identity_traits": [],
},
]
)
mocker.patch("segment_membership.tasks.DynamoIdentityWrapper", return_value=wrapper)

# When the backfill task runs end-to-end against real ClickHouse
backfill_identities_to_clickhouse()

# Then both identities actually land in IDENTITIES, keyed by env api key
with open_clickhouse_cursor() as cursor:
cursor.execute(
"SELECT identifier, identity_key FROM IDENTITIES FINAL "
"WHERE environment_id = %(env)s ORDER BY identifier",
{"env": environment_api_key},
)
rows = cursor.fetchall()
assert [(row[0], row[1]) for row in rows] == [("a", "k1"), ("b", "k2")]
# and the project's count refresh is dispatched
refresh_dispatch.delay.assert_called_once_with(args=(project,))
assert any(
e["event"] == "backfill.environment.completed" and e["rows__count"] == 2
for e in log.events
)


@pytest.mark.clickhouse
def test_open_clickhouse_cursor__with_log_comment__lands_in_query_log(
clickhouse_db: None,
) -> None:
# Given a unique log_comment
log_comment = f"flagsmith:test:{uuid.uuid4()}"

# When a query runs on a cursor opened with that log_comment
with open_clickhouse_cursor(log_comment=log_comment) as cursor:
cursor.execute("SELECT 1")

# Then the query is attributable in CH's query_log by that comment. The
# query_log flushes asynchronously, so flush before reading.
with open_clickhouse_cursor() as cursor:
cursor.execute("SYSTEM FLUSH LOGS")
cursor.execute(
"SELECT count() FROM system.query_log WHERE log_comment = %(lc)s",
{"lc": log_comment},
)
[(count,)] = cursor.fetchall()
assert count >= 1
Loading
Loading