Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion api/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@ features/workflows/logic/
/tests/ldap_integration_tests/
/tests/saml_unit_tests/

# Unit test coverage
# Code maintenance
.coverage
.mypy_cache
1 change: 1 addition & 0 deletions api/api/urls/v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
name="environment-document",
),
re_path("", include("features.versioning.urls", namespace="versioning")),
path("", include("features.feature_lifecycle.urls", namespace="feature-lifecycle")),
# API documentation
path(
"swagger.json",
Expand Down
72 changes: 44 additions & 28 deletions api/app_analytics/influxdb_wrapper.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import functools
import json
import logging
import typing
Expand All @@ -21,21 +22,10 @@
map_flux_tables_to_usage_data,
map_labels_to_influx_record_values,
)
from app_analytics.types import Labels
from app_analytics.types import DownsampleSize, Labels

logger = logging.getLogger(__name__)

url = settings.INFLUXDB_URL
token = settings.INFLUXDB_TOKEN
influx_org = settings.INFLUXDB_ORG
read_bucket = settings.INFLUXDB_BUCKET + "_downsampled_15m"

retries = Retry(connect=3, read=3, redirect=3)
# Set a timeout to prevent threads being potentially stuck open due to network weirdness
influxdb_client = InfluxDBClient(
url=url, token=token, org=influx_org, retries=retries, timeout=30000
)

DEFAULT_DROP_COLUMNS = (
"organisation",
"organisation_id",
Expand All @@ -52,18 +42,35 @@
)


def get_range_bucket_mappings(date_start: datetime) -> str:
now = timezone.now()
if (now - date_start).days > 10:
return settings.INFLUXDB_BUCKET + "_downsampled_1h"
return settings.INFLUXDB_BUCKET + "_downsampled_15m"


class InfluxDBWrapper:
client = None

def __init__(self, name): # type: ignore[no-untyped-def]
self.name = name
self.records = []
self.write_api = influxdb_client.write_api(write_options=SYNCHRONOUS)

@classmethod
@functools.cache
def get_client(cls) -> InfluxDBClient:
"""A singleton InfluxDB client instance"""
retries = Retry(connect=3, read=3, redirect=3)
return InfluxDBClient(
url=settings.INFLUXDB_URL,
token=settings.INFLUXDB_TOKEN,
org=settings.INFLUXDB_ORG,
retries=retries,
timeout=30000, # Hard stop to prevent hanging requests
)

@classmethod
def get_downsampled_bucket(cls, size: DownsampleSize) -> str:
return f"{settings.INFLUXDB_BUCKET}_downsampled_{size}"

@classmethod
def select_downsampled_bucket(cls, date_start: datetime) -> str:
if (timezone.now() - date_start).days > 10:
return cls.get_downsampled_bucket(DownsampleSize.ONE_HOUR)
return cls.get_downsampled_bucket(DownsampleSize.FIFTEEN_MINUTES)

def add_data_point(
self,
Expand All @@ -85,8 +92,12 @@ def add_data_point(
self.records.append(point)

def write(self) -> None:
"""Persist collected data points to InfluxDB"""
try:
self.write_api.write(bucket=settings.INFLUXDB_BUCKET, record=self.records)
self.get_client().write_api(write_options=SYNCHRONOUS).write(
bucket=settings.INFLUXDB_BUCKET,
record=self.records,
)
except (HTTPError, InfluxDBError) as e:
logger.warning(
"Failed to write records to Influx: %s",
Expand All @@ -99,15 +110,20 @@ def write(self) -> None:
settings.INFLUXDB_BUCKET,
)

@staticmethod
@classmethod
def influx_query_manager(
cls,
date_start: datetime | None = None,
date_stop: datetime | None = None,
drop_columns: tuple[str, ...] = DEFAULT_DROP_COLUMNS,
filters: str = "|> filter(fn:(r) => r._measurement == 'api_call')",
extra: str = "",
bucket: str = read_bucket,
bucket: str | None = None,
) -> list[FluxTable]:
if bucket is None:
# NOTE: Legacy default
bucket = cls.get_downsampled_bucket(DownsampleSize.FIFTEEN_MINUTES)

now = timezone.now()
if date_start is None:
date_start = now - timedelta(days=30)
Expand All @@ -119,7 +135,7 @@ def influx_query_manager(
if date_start == date_stop:
return []

query_api = influxdb_client.query_api()
query_api = cls.get_client().query_api()
drop_columns_input = str(list(drop_columns)).replace("'", '"')

query = (
Expand All @@ -132,7 +148,7 @@ def influx_query_manager(
logger.debug("Running query in influx: \n\n %s", query)

try:
return query_api.query(org=influx_org, query=query)
return query_api.query(org=settings.INFLUXDB_ORG, query=query)
except HTTPError as e:
capture_exception(e)
return []
Expand Down Expand Up @@ -390,7 +406,7 @@ def get_top_organisations(
if limit:
limit = f"|> limit(n:{limit})"

bucket = get_range_bucket_mappings(date_start)
bucket = InfluxDBWrapper.select_downsampled_bucket(date_start)
results = InfluxDBWrapper.influx_query_manager(
date_start=date_start,
bucket=bucket,
Expand Down Expand Up @@ -432,7 +448,7 @@ def get_current_api_usage(
:return: number of current api calls
"""
bucket = read_bucket
bucket = InfluxDBWrapper.get_downsampled_bucket(DownsampleSize.FIFTEEN_MINUTES)
results = InfluxDBWrapper.influx_query_manager(
date_start=date_start,
bucket=bucket,
Expand Down Expand Up @@ -474,7 +490,7 @@ def get_platform_usage_trends(

org_id_set = ", ".join(f'"{oid}"' for oid in organisation_ids)

bucket = get_range_bucket_mappings(date_start)
bucket = InfluxDBWrapper.select_downsampled_bucket(date_start)
results = InfluxDBWrapper.influx_query_manager(
date_start=date_start,
date_stop=date_stop,
Expand Down
6 changes: 4 additions & 2 deletions api/app_analytics/migrate_to_pg.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from app_analytics.constants import ANALYTICS_READ_BUCKET_SIZE
from app_analytics.influxdb_wrapper import influxdb_client, read_bucket
from app_analytics.influxdb_wrapper import InfluxDBWrapper
from app_analytics.models import FeatureEvaluationBucket
from app_analytics.types import DownsampleSize


def migrate_feature_evaluations(migrate_till: int = 30) -> None:
query_api = influxdb_client.query_api()
query_api = InfluxDBWrapper.get_client().query_api()
read_bucket = InfluxDBWrapper.get_downsampled_bucket(DownsampleSize.FIFTEEN_MINUTES)

for i in range(migrate_till):
range_start = f"-{i + 1}d"
Expand Down
23 changes: 23 additions & 0 deletions api/app_analytics/migrations/0009_analytics_buckets_index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Generated by Django 5.2.15 on 2026-06-24 14:52

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("app_analytics", "0008_labels_jsonb"),
]

operations = [
migrations.AlterField(
model_name="apiusagebucket",
name="created_at",
field=models.DateTimeField(db_index=True),
),
migrations.AlterField(
model_name="featureevaluationbucket",
name="created_at",
field=models.DateTimeField(db_index=True),
),
]
2 changes: 1 addition & 1 deletion api/app_analytics/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class Meta:

class AbstractBucket(LifecycleModelMixin, models.Model): # type: ignore[misc]
bucket_size = models.PositiveIntegerField(help_text="Bucket size in minutes")
created_at = models.DateTimeField()
created_at = models.DateTimeField(db_index=True)
total_count = models.PositiveIntegerField()
environment_id = models.PositiveIntegerField()
labels = models.JSONField(default=dict)
Expand Down
69 changes: 68 additions & 1 deletion api/app_analytics/services.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
from datetime import datetime

from django.conf import settings
from django.db.models import QuerySet

from app_analytics import constants
from app_analytics.cache import APIUsageCache
from app_analytics.models import Resource
from app_analytics.influxdb_wrapper import InfluxDBWrapper, build_filter_string
from app_analytics.models import FeatureEvaluationBucket, Resource
from app_analytics.tasks import track_request
from app_analytics.types import Labels
from environments.models import Environment
from features.models import Feature

api_usage_cache = APIUsageCache()

Expand Down Expand Up @@ -31,3 +38,63 @@ def track_usage_by_resource_host_and_environment(
"labels": labels,
}
)


def get_features_in_use(
environment: Environment,
since: datetime | None = None,
) -> QuerySet[Feature] | None:
"""Obtain features found in recent analytics data, i.e. in use"""
if settings.USE_POSTGRES_FOR_ANALYTICS:
feature_names = _get_feature_names_in_use_from_analytics_db(environment, since)
elif settings.INFLUXDB_TOKEN:
feature_names = _get_feature_names_in_use_from_influxdb(environment, since)
else:
return None
features_in_use: QuerySet[Feature] = Feature.objects.filter(
name__in=feature_names,
project__environments=environment,
)
return features_in_use


def _get_feature_names_in_use_from_analytics_db(
environment: Environment,
since: datetime | None = None,
) -> list[str]:
# NOTE: Neighbour buckets may bleed depending on `since`
buckets = FeatureEvaluationBucket.objects.filter(
environment_id=environment.pk,
bucket_size=constants.ANALYTICS_READ_BUCKET_SIZE,
created_at__gte=since,
total_count__gt=0,
)
Comment on lines +66 to +71

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is currently a seq scan. Do we need to add an index to FeatureEvaluationBucket?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe... A bit of scope creep but I added 2d75b64 since it's low-cost in effort — both evaluation and usage buckets are often queried for by created_at.

I wonder though if adding either index here may cause a sudden spike in db storage for certain self-hosted customers? 🤔

Please cast a vote.

feature_names = buckets.values_list("feature_name", flat=True).distinct()
return list(feature_names)


def _get_feature_names_in_use_from_influxdb(
environment: Environment,
since: datetime | None = None,
) -> list[str]:
results = InfluxDBWrapper.influx_query_manager(
date_start=since,
filters=build_filter_string(
[
'r._measurement == "feature_evaluation"',
'r["_field"] == "request_count"',
f'r["environment_id"] == "{environment.pk}"',
]
),
extra=(
'|> keep(columns: ["feature_id"]) '
'|> distinct(column: "feature_id") '
'|> yield(name: "distinct")'
),
)
return [
feature_name
for table in results
for record in table.records
if (feature_name := record.get_value()) is not None
]
6 changes: 6 additions & 0 deletions api/app_analytics/types.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from datetime import date
from enum import StrEnum
from typing import TYPE_CHECKING, Literal, NamedTuple, TypeAlias, TypedDict

if TYPE_CHECKING:
Expand Down Expand Up @@ -27,6 +28,11 @@
]


class DownsampleSize(StrEnum):
FIFTEEN_MINUTES = "15m"
ONE_HOUR = "1h"


class APIUsageCacheKey(NamedTuple):
resource: "Resource"
host: str
Expand Down
12 changes: 12 additions & 0 deletions api/features/feature_lifecycle/serializers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from rest_framework import serializers


class FeatureLifecycleCountsSerializer(serializers.Serializer[dict[str, int]]):
"""Number of features in each lifecycle stage for an environment"""

new = serializers.IntegerField()
live = serializers.IntegerField()
stale = serializers.IntegerField()
permanent = serializers.IntegerField()
needs_monitoring = serializers.IntegerField()
to_remove = serializers.IntegerField()
Loading
Loading