diff --git a/CHANGELOG.rst b/CHANGELOG.rst index d45e9cf2..91716d0b 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -14,7 +14,16 @@ Change Log Unreleased ********** -0.14.0 - 2025-11-10 +0.15.0 - 2025-11-11 +******************** + +Added +===== + +* `ExtendedCasbinRule` model to extend the base CasbinRule model for additional metadata, and cascade delete + support. + +0.14.0 - 2025-11-11 ******************** Added @@ -22,7 +31,7 @@ Added * Implement custom matcher to check for staff and superuser status. -0.13.1 - 2025-11-10 +0.13.1 - 2025-11-11 ******************** Fixed diff --git a/Makefile b/Makefile index 1e4724fe..c48d6e0b 100644 --- a/Makefile +++ b/Makefile @@ -59,8 +59,8 @@ quality: ## check coding style with pycodestyle and pylint tox -e quality format: ## format code with black and isort. Enable ruff to fix E (pycodestyle) and I (isort) issues - ruff format openedx_authz tests test_utils manage.py setup.py - ruff check --fix openedx_authz tests test_utils manage.py setup.py + ruff format openedx_authz tests manage.py setup.py + ruff check --fix openedx_authz tests manage.py setup.py pii_check: ## check for PII annotations on all Django models tox -e pii_check diff --git a/openedx_authz/__init__.py b/openedx_authz/__init__.py index 7a509bf1..2b9ed8f0 100644 --- a/openedx_authz/__init__.py +++ b/openedx_authz/__init__.py @@ -4,6 +4,6 @@ import os -__version__ = "0.14.0" +__version__ = "0.15.0" ROOT_DIRECTORY = os.path.dirname(os.path.abspath(__file__)) diff --git a/openedx_authz/api/roles.py b/openedx_authz/api/roles.py index c4e27e6c..fa9385b1 100644 --- a/openedx_authz/api/roles.py +++ b/openedx_authz/api/roles.py @@ -10,6 +10,8 @@ from collections import defaultdict +from django.db import transaction + from openedx_authz.api.data import ( GroupingPolicyIndex, PermissionData, @@ -21,6 +23,7 @@ ) from openedx_authz.api.permissions import get_permission_from_policy from openedx_authz.engine.enforcer import AuthzEnforcer +from openedx_authz.models import ExtendedCasbinRule __all__ = [ "get_permissions_for_single_role", @@ -197,11 +200,25 @@ def assign_role_to_subject_in_scope(subject: SubjectData, role: RoleData, scope: bool: True if the role was assigned successfully, False otherwise. """ enforcer = AuthzEnforcer.get_enforcer() - return enforcer.add_role_for_user_in_domain( - subject.namespaced_key, - role.namespaced_key, - scope.namespaced_key, - ) + adapter = AuthzEnforcer.get_adapter() + + with transaction.atomic(): + role_assignment = enforcer.add_role_for_user_in_domain( + subject.namespaced_key, + role.namespaced_key, + scope.namespaced_key, + ) + if not role_assignment: + return False + extended_rule = ExtendedCasbinRule.create_based_on_policy( + subject, + role, + scope, + adapter, + ) + if not extended_rule: + raise Exception("Failed to create ExtendedCasbinRule for the assignment") + return True def batch_assign_role_to_subjects_in_scope(subjects: list[SubjectData], role: RoleData, scope: ScopeData) -> None: diff --git a/openedx_authz/apps.py b/openedx_authz/apps.py index 7de0d16b..23574826 100644 --- a/openedx_authz/apps.py +++ b/openedx_authz/apps.py @@ -13,6 +13,7 @@ class OpenedxAuthzConfig(AppConfig): name = "openedx_authz" verbose_name = "Open edX AuthZ" default_auto_field = "django.db.models.BigAutoField" + plugin_app = { "url_config": { "lms.djangoapp": { @@ -39,3 +40,7 @@ class OpenedxAuthzConfig(AppConfig): }, }, } + + def ready(self): + """Import signal handlers when Django starts.""" + import openedx_authz.handlers # pylint: disable=import-outside-toplevel,unused-import diff --git a/openedx_authz/engine/adapter.py b/openedx_authz/engine/adapter.py index 679735e9..190117e7 100644 --- a/openedx_authz/engine/adapter.py +++ b/openedx_authz/engine/adapter.py @@ -129,3 +129,21 @@ def filter_query( filter_kwargs = {f"{attr.value}__in": filter_values} queryset = queryset.filter(**filter_kwargs) return queryset.order_by("id") + + def query_policy(self, filter: Filter) -> QuerySet: # pylint: disable=redefined-builtin + """ + Retrieve policy rules from the database based on filter criteria. + + This method constructs a Django queryset to fetch CasbinRule objects + that match the specified filter attributes. It supports filtering by + policy type (ptype) and policy values (v0-v5). + + Args: + filter (Filter): Filter object with attributes (ptype, v0, v1, v2, v3, v4, v5) + containing lists of values to filter by. Empty lists are ignored. + + Returns: + QuerySet: Queryset of CasbinRule objects matching the filter criteria. + """ + queryset = CasbinRule.objects.using(self.db_alias) + return self.filter_query(queryset, filter) diff --git a/openedx_authz/engine/enforcer.py b/openedx_authz/engine/enforcer.py index ee58b81f..48358622 100644 --- a/openedx_authz/engine/enforcer.py +++ b/openedx_authz/engine/enforcer.py @@ -61,9 +61,14 @@ class AuthzEnforcer: allowed = enforcer.get_enforcer().enforce(user, resource, action) Any of the two approaches will yield the same singleton enforcer instance. + + Attributes: + _enforcer (SyncedEnforcer): The singleton enforcer instance. + _adapter (ExtendedAdapter): The singleton adapter instance. """ _enforcer = None + _adapter = None def __new__(cls): """Singleton pattern to ensure a single enforcer instance.""" @@ -171,6 +176,19 @@ def get_enforcer(cls) -> SyncedEnforcer: return cls._enforcer + @classmethod + def get_adapter(cls) -> ExtendedAdapter: + """Get the adapter instance, getting it from the enforcer if needed. + + Returns: + ExtendedAdapter: The singleton adapter instance. + """ + if cls._adapter is None: + # We need to access the protected member _e to get the adapter from the base enforcer + # which the SyncedEnforcer wraps. + cls._adapter = cls.get_enforcer()._e.adapter # pylint: disable=protected-access + return cls._adapter + @classmethod def _initialize_enforcer(cls) -> SyncedEnforcer: """ diff --git a/openedx_authz/handlers.py b/openedx_authz/handlers.py new file mode 100644 index 00000000..45541b83 --- /dev/null +++ b/openedx_authz/handlers.py @@ -0,0 +1,50 @@ +""" +Signal handlers for the authorization framework. + +These handlers ensure proper cleanup and consistency when models are deleted. +""" + +import logging + +from casbin_adapter.models import CasbinRule +from django.db.models.signals import post_delete +from django.dispatch import receiver + +from openedx_authz.models.core import ExtendedCasbinRule + +logger = logging.getLogger(__name__) + + +@receiver(post_delete, sender=ExtendedCasbinRule) +def delete_casbin_rule_on_extended_rule_deletion(sender, instance, **kwargs): # pylint: disable=unused-argument + """ + Delete the companion CasbinRule after its ExtendedCasbinRule disappears. + + The handler keeps authorization data symmetric with three common flows: + + - Direct ExtendedCasbinRule deletes (API/UI) trigger removal of the linked CasbinRule. + - Cascades from `Scope` or `Subject` deletions clear their ExtendedCasbinRule rows and, + via this handler, the matching CasbinRule entries. + - Cascades initiated from the CasbinRule side (enforcer cleanups) leave the query as a + no-op because the row is already gone. + + Running on ``post_delete`` ensures database cascades complete before the cleanup runs, so + enforcer-driven deletions no longer raise false errors. + + Args: + sender: The model class (ExtendedCasbinRule). + instance: The ExtendedCasbinRule instance being deleted. + **kwargs: Additional keyword arguments from the signal. + """ + try: + # Rely on delete() being idempotent; returns 0 rows if the CasbinRule was + # already removed (for example, because it triggered this signal). + CasbinRule.objects.filter(id=instance.casbin_rule_id).delete() + except Exception as exc: # pylint: disable=broad-exception-caught + # Log but don't raise - we don't want to break the deletion of + # ExtendedCasbinRule if something goes wrong while deleting the CasbinRule. + logger.exception( + "Error deleting CasbinRule %s during ExtendedCasbinRule cleanup", + instance.casbin_rule_id, + exc_info=exc, + ) diff --git a/openedx_authz/migrations/0002_initial.py b/openedx_authz/migrations/0002_initial.py new file mode 100644 index 00000000..9f43cc73 --- /dev/null +++ b/openedx_authz/migrations/0002_initial.py @@ -0,0 +1,78 @@ +# Generated by Django 4.2.24 on 2025-10-24 11:19 + +import django.db.models.deletion +from django.conf import settings +from django.db import migrations, models + + +class Migration(migrations.Migration): + initial = True + + dependencies = [ + ("casbin_adapter", "0001_initial"), + ("openedx_authz", "0001_add_casbin_dependency"), + ] + + operations = [ + migrations.CreateModel( + name="Scope", + fields=[ + ("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), + ], + options={ + "abstract": False, + }, + ), + migrations.CreateModel( + name="Subject", + fields=[ + ("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), + ], + options={ + "abstract": False, + }, + ), + migrations.CreateModel( + name="ExtendedCasbinRule", + fields=[ + ("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), + ("casbin_rule_key", models.CharField(max_length=255, unique=True)), + ("description", models.TextField(blank=True, null=True)), + ("created_at", models.DateTimeField(auto_now_add=True)), + ("updated_at", models.DateTimeField(auto_now=True)), + ("metadata", models.JSONField(blank=True, null=True)), + ( + "casbin_rule", + models.OneToOneField( + on_delete=django.db.models.deletion.CASCADE, + related_name="extended_rule", + to="casbin_adapter.casbinrule", + ), + ), + ( + "scope", + models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.CASCADE, + related_name="casbin_rules", + to="openedx_authz.scope", + ), + ), + ( + "subject", + models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.CASCADE, + related_name="casbin_rules", + to="openedx_authz.subject", + ), + ), + ], + options={ + "verbose_name": "Extended Casbin Rule", + "verbose_name_plural": "Extended Casbin Rules", + }, + ), + ] diff --git a/openedx_authz/migrations/0003_usersubject.py b/openedx_authz/migrations/0003_usersubject.py new file mode 100644 index 00000000..439f25f5 --- /dev/null +++ b/openedx_authz/migrations/0003_usersubject.py @@ -0,0 +1,42 @@ +# Generated by Django 4.2.24 on 2025-10-24 11:19 + +import django.db.models.deletion +from django.conf import settings +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + migrations.swappable_dependency(settings.AUTH_USER_MODEL), + ("openedx_authz", "0002_initial"), + ] + + operations = [ + migrations.CreateModel( + name="UserSubject", + fields=[ + ( + "subject_ptr", + models.OneToOneField( + auto_created=True, + on_delete=django.db.models.deletion.CASCADE, + parent_link=True, + primary_key=True, + serialize=False, + to="openedx_authz.subject", + ), + ), + ( + "user", + models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.CASCADE, + related_name="authz_subjects", + to=settings.AUTH_USER_MODEL, + ), + ), + ], + bases=("openedx_authz.subject",), + ), + ] diff --git a/openedx_authz/migrations/0004_contentlibraryscope.py b/openedx_authz/migrations/0004_contentlibraryscope.py new file mode 100644 index 00000000..999ed025 --- /dev/null +++ b/openedx_authz/migrations/0004_contentlibraryscope.py @@ -0,0 +1,43 @@ +# Generated by Django 4.2.24 on 2025-10-24 11:19 +# Custom migration - DO NOT REGENERATE +# This migration conditionally depends on the content library model based on settings + +import django.db.models.deletion +from django.conf import settings +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("openedx_authz", "0003_usersubject"), + ] + + operations = [ + migrations.CreateModel( + name="ContentLibraryScope", + fields=[ + ( + "scope_ptr", + models.OneToOneField( + auto_created=True, + on_delete=django.db.models.deletion.CASCADE, + parent_link=True, + primary_key=True, + serialize=False, + to="openedx_authz.scope", + ), + ), + ( + "content_library", + models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.CASCADE, + related_name="authz_scopes", + to=settings.OPENEDX_AUTHZ_CONTENT_LIBRARY_MODEL, + ), + ), + ], + bases=("openedx_authz.scope",), + ), + ] diff --git a/openedx_authz/models.py b/openedx_authz/models.py deleted file mode 100644 index a25e4017..00000000 --- a/openedx_authz/models.py +++ /dev/null @@ -1,10 +0,0 @@ -""" -Database models for the authorization framework. - -These models will be used to store additional data about roles and permissions -that are not natively supported by Casbin, so as to avoid modifying the Casbin -schema that focuses on the core authorization logic. - -For example, we may want to store metadata about roles, such as a description -or the date it was created. -""" diff --git a/openedx_authz/models/__init__.py b/openedx_authz/models/__init__.py new file mode 100644 index 00000000..4f0318a2 --- /dev/null +++ b/openedx_authz/models/__init__.py @@ -0,0 +1,20 @@ +"""Database models for the authorization framework. + +These models will be used to store additional data about roles and permissions +that are not natively supported by Casbin, so as to avoid modifying the Casbin +schema that focuses on the core authorization logic. + +For example, we may want to store metadata about roles, such as a description +or the date it was created. + +This model is transversal to the implementation of the public API for +authorization, which is defined in openedx_authz.api. So it can be used by +various functions in the API to store and retrieve additional data about +roles and permissions. That's why we avoid coupling this model to too +specific concepts and also importing too specific classes from the API to +avoid circular dependencies. +""" + +from openedx_authz.models.core import * +from openedx_authz.models.scopes import * +from openedx_authz.models.subjects import * diff --git a/openedx_authz/models/core.py b/openedx_authz/models/core.py new file mode 100644 index 00000000..064eb4c8 --- /dev/null +++ b/openedx_authz/models/core.py @@ -0,0 +1,227 @@ +"""Core models for the authorization framework. + +These models will be used to store additional data about roles and permissions +that are not natively supported by Casbin, so as to avoid modifying the Casbin +schema that focuses on the core authorization logic. +""" + +from typing import ClassVar + +from django.db import models, transaction + +from openedx_authz.engine.filter import Filter + + +class BaseRegistryModel(models.Model): + """Base model that supports automatic subclass registration. + + This model provides a registry mechanism for its subclasses, allowing + dynamic retrieval of subclasses based on a namespace identifier. + + Subclasses should define a NAMESPACE class attribute (e.g., 'user' for users) + and implement get_or_create_for_external_key() classmethod. + """ + + _registry: ClassVar[dict[str, type["BaseRegistryModel"]]] = {} + NAMESPACE: ClassVar[str] = None + + class Meta: + abstract = True + + @classmethod + def __init_subclass__(cls, **kwargs): + """Automatically register subclasses when they're defined.""" + super().__init_subclass__(**kwargs) + if cls.NAMESPACE: + cls.get_registry()[cls.NAMESPACE] = cls + + @classmethod + def get_registry(cls) -> dict[str, type["BaseRegistryModel"]]: + """Get the registry of subclasses. + + Returns: + dict: A dictionary mapping namespace strings to subclass types. + """ + return cls._registry + +class ScopeManager(models.Manager): + """Custom manager for Scope model that handles polymorphic behavior.""" + + def get_or_create_for_external_key(self, scope_data): + """Get or create a Scope instance for the given scope data. + + This method determines the appropriate subclass based on the namespace + in the scope_data and delegates to that subclass's get_or_create_for_external_key. + + Args: + scope_data: The scope (ScopeData) object with NAMESPACE class attribute + + Returns: + Scope: The Scope instance + + Raises: + ValueError: If the namespace is not registered + """ + namespace = scope_data.NAMESPACE + if namespace not in (scope_registry := Scope.get_registry()): + raise ValueError(f"No Scope subclass registered for namespace '{namespace}'") + + scope_class = scope_registry[namespace] + return scope_class.get_or_create_for_external_key(scope_data) + + +class SubjectManager(models.Manager): + """Custom manager for Subject model that handles polymorphic behavior.""" + + def get_or_create_for_external_key(self, subject_data): + """Get or create a Subject instance for the given subject data. + + This method determines the appropriate subclass based on the namespace + in the subject_data and delegates to that subclass's get_or_create_for_external_key. + + Args: + subject_data: The subject (SubjectData) object with NAMESPACE class attribute + + Returns: + Subject: The Subject instance + + Raises: + ValueError: If the namespace is not registered + """ + namespace = subject_data.NAMESPACE + if namespace not in (subject_registry := Subject.get_registry()): + raise ValueError(f"No Subject subclass registered for namespace '{namespace}'") + + subject_class = subject_registry[namespace] + return subject_class.get_or_create_for_external_key(subject_data) + + +class Scope(BaseRegistryModel): + """Model representing a scope in the authorization system. + + .. no_pii: + + This model can be extended to represent different types of scopes, + such as courses or content libraries. + + Subclasses should define a NAMESPACE class attribute (e.g., 'lib' for content libraries) + and implement get_or_create_for_external_key() classmethod. + """ + + objects = ScopeManager() + + class Meta: + abstract = False + + +class Subject(BaseRegistryModel): + """Model representing a subject in the authorization system. + + .. no_pii: + + This model can be extended to represent different types of subjects, + such as users or groups. + + Subclasses should define a NAMESPACE class attribute (e.g., 'user' for users) + and implement get_or_create_for_external_key() classmethod. + """ + + objects = SubjectManager() + + class Meta: + abstract = False + + +class ExtendedCasbinRule(models.Model): + """Extended model for Casbin rules to store additional metadata. + + .. no_pii: + + This model extends the CasbinRule model provided by the casbin_adapter + package to include additional fields for storing metadata about each rule. + """ + + # OneToOne relationship ensures each CasbinRule has at most one ExtendedCasbinRule. + # We also maintain a unique key based on the casbin_rule field components for easy lookup + # based on a policy line (ptype,v0,v1,v2,v3) which should be unique. + # + # Note: We use CASCADE here. When CasbinRule is deleted, ExtendedCasbinRule is also deleted. + # The signal handler in handlers.py ensures the reverse (ExtendedCasbinRule deletion → CasbinRule deletion). + casbin_rule_key = models.CharField(max_length=255, unique=True) + casbin_rule = models.OneToOneField( + "casbin_adapter.CasbinRule", + on_delete=models.CASCADE, + related_name="extended_rule", + ) + + description = models.TextField(blank=True, null=True) + created_at = models.DateTimeField(auto_now_add=True) + updated_at = models.DateTimeField(auto_now=True) + metadata = models.JSONField(blank=True, null=True) + + # Scope of the rule. This could be a course, content library, or any other scope type. See Scope model above. + scope = models.ForeignKey( + Scope, + on_delete=models.CASCADE, + null=True, + blank=True, + related_name="casbin_rules", + ) + + # Subject of the rule. This could be a user, group, or any other subject type. See Subject model above. + subject = models.ForeignKey( + Subject, + on_delete=models.CASCADE, + null=True, + blank=True, + related_name="casbin_rules", + ) + + class Meta: + verbose_name = "Extended Casbin Rule" + verbose_name_plural = "Extended Casbin Rules" + + @classmethod + def create_based_on_policy( + cls, + subject, + role, + scope, + adapter, + ): + """Helper method to create an ExtendedCasbinRule based on policy components. + + Args: + subject: SubjectData object with namespaced_key and external_key + role: RoleData object with namespaced_key and external_key + scope: ScopeData object with namespaced_key and external_key + adapter: The Casbin adapter instance. + + Returns: + ExtendedCasbinRule: The created ExtendedCasbinRule instance. + """ + casbin_rule = adapter.query_policy( + Filter( + ptype=["g"], + v0=[subject.namespaced_key], + v1=[role.namespaced_key], + v2=[scope.namespaced_key], + ) + ).first() + + if not casbin_rule: + return None + + casbin_rule_key = f"{casbin_rule.ptype},{casbin_rule.v0},{casbin_rule.v1},{casbin_rule.v2},{casbin_rule.v3}" + + with transaction.atomic(): + extended_rule, _ = cls.objects.get_or_create( + casbin_rule_key=casbin_rule_key, + defaults={ + "casbin_rule": casbin_rule, + "scope": Scope.objects.get_or_create_for_external_key(scope), + "subject": Subject.objects.get_or_create_for_external_key(subject), + }, + ) + + return extended_rule diff --git a/openedx_authz/models/scopes.py b/openedx_authz/models/scopes.py new file mode 100644 index 00000000..02a82e83 --- /dev/null +++ b/openedx_authz/models/scopes.py @@ -0,0 +1,77 @@ +"""Models for ContentLibrary scopes in the authorization framework. + +These models extend the base Scope model to represent content library scopes, +which are used to define permissions and roles related to content libraries +within the Open edX platform. +""" + +from django.apps import apps +from django.conf import settings +from django.db import models +from opaque_keys.edx.locator import LibraryLocatorV2 + +from openedx_authz.models.core import Scope + + +def get_content_library_model(): + """Return the ContentLibrary model class specified by settings. + + The setting `OPENEDX_AUTHZ_CONTENT_LIBRARY_MODEL` should be an + app_label.ModelName string (e.g. 'content_libraries.ContentLibrary'). + """ + CONTENT_LIBRARY_MODEL = getattr( + settings, + "OPENEDX_AUTHZ_CONTENT_LIBRARY_MODEL", + "content_libraries.ContentLibrary", + ) + try: + app_label, model_name = CONTENT_LIBRARY_MODEL.split(".") + return apps.get_model(app_label, model_name, require_ready=False) + except LookupError: + return None + + +ContentLibrary = get_content_library_model() + + +class ContentLibraryScope(Scope): + """Scope representing a content library in the authorization system. + + .. no_pii: + """ + + NAMESPACE = "lib" + + # Link to the actual course or content library, if applicable. In other cases, this could be null. + # Piggybacking on the existing ContentLibrary model to keep the ExtendedCasbinRule up to date + # by deleting the Scope, and thus the ExtendedCasbinRule, when the ContentLibrary is deleted. + # + # When content_libraries IS available, the on_delete=CASCADE will still work at the + # application level through Django's signal handlers. + # Use a string reference to the external app's model so Django won't try + # to import it at model import time. The migration already records the + # dependency on `content_libraries` when the app is present. + content_library = models.ForeignKey( + settings.OPENEDX_AUTHZ_CONTENT_LIBRARY_MODEL, + on_delete=models.CASCADE, + null=True, + blank=True, + related_name="authz_scopes", + swappable=True, + ) + + @classmethod + def get_or_create_for_external_key(cls, scope): + """Get or create a ContentLibraryScope for the given external key. + + Args: + scope: ScopeData object with an external_key attribute containing + a LibraryLocatorV2-compatible string. + + Returns: + ContentLibraryScope: The Scope instance for the given ContentLibrary + """ + library_key = LibraryLocatorV2.from_string(scope.external_key) + content_library = ContentLibrary.objects.get_by_key(library_key) + scope, _ = cls.objects.get_or_create(content_library=content_library) + return scope diff --git a/openedx_authz/models/subjects.py b/openedx_authz/models/subjects.py new file mode 100644 index 00000000..39f6e66b --- /dev/null +++ b/openedx_authz/models/subjects.py @@ -0,0 +1,47 @@ +"""Models for User subjects in the authorization framework. + +These models extend the base Subject model to represent user subjects, +which are used to define permissions and roles related to users +within the Open edX platform. +""" + +from django.contrib.auth import get_user_model +from django.db import models + +from openedx_authz.models.core import Subject + +User = get_user_model() + + +class UserSubject(Subject): + """Subject representing a user in the authorization system. + + .. no_pii: + """ + + NAMESPACE = "user" + + # Link to the actual user, if the subject is a user. In other cases, this could be null. + # Piggybacking on the existing User model to keep the ExtendedCasbinRule up to date + # by deleting the Subject, and thus the ExtendedCasbinRule, when the User is deleted. + user = models.ForeignKey( + User, + on_delete=models.CASCADE, + null=True, + blank=True, + related_name="authz_subjects", + ) + + @classmethod + def get_or_create_for_external_key(cls, subject): + """Get or create a UserSubject for the given external key. + + Args: + subject_external_key: Username string + + Returns: + UserSubject: The Subject instance for the given User + """ + user = User.objects.get(username=subject.external_key) + subject, _ = cls.objects.get_or_create(user=user) + return subject diff --git a/openedx_authz/settings/common.py b/openedx_authz/settings/common.py index c3c2840e..d2b55922 100644 --- a/openedx_authz/settings/common.py +++ b/openedx_authz/settings/common.py @@ -39,3 +39,7 @@ def plugin_settings(settings): # save policy changes back to the database. if not hasattr(settings, "CASBIN_AUTO_SAVE_POLICY"): settings.CASBIN_AUTO_SAVE_POLICY = True + + # Set default ContentLibrary model for swappable dependency + if not hasattr(settings, "OPENEDX_AUTHZ_CONTENT_LIBRARY_MODEL"): + settings.OPENEDX_AUTHZ_CONTENT_LIBRARY_MODEL = "content_libraries.ContentLibrary" diff --git a/openedx_authz/settings/test.py b/openedx_authz/settings/test.py index 42a79c75..8565ceac 100644 --- a/openedx_authz/settings/test.py +++ b/openedx_authz/settings/test.py @@ -37,6 +37,7 @@ def plugin_settings(settings): # pylint: disable=unused-argument "django.contrib.sessions", "openedx_authz.engine.apps.CasbinAdapterConfig", "openedx_authz.apps.OpenedxAuthzConfig", + "openedx_authz.tests.stubs.apps.StubsConfig", ) MIDDLEWARE = [ @@ -71,3 +72,6 @@ def plugin_settings(settings): # pylint: disable=unused-argument CASBIN_MODEL = os.path.join(ROOT_DIRECTORY, "engine", "config", "model.conf") CASBIN_AUTO_LOAD_POLICY_INTERVAL = 0 CASBIN_AUTO_SAVE_POLICY = True + +# Use stub model for testing instead of the real content_libraries app +OPENEDX_AUTHZ_CONTENT_LIBRARY_MODEL = "stubs.ContentLibrary" diff --git a/openedx_authz/tests/api/test_roles.py b/openedx_authz/tests/api/test_roles.py index 3dec484a..172fa7ec 100644 --- a/openedx_authz/tests/api/test_roles.py +++ b/openedx_authz/tests/api/test_roles.py @@ -5,6 +5,8 @@ roles and permissions within specific scopes. """ +from unittest.mock import patch + import casbin import pkg_resources from ddt import data as ddt_data @@ -43,6 +45,32 @@ ) from openedx_authz.engine.enforcer import AuthzEnforcer from openedx_authz.engine.utils import migrate_policy_between_enforcers +from openedx_authz.models import ExtendedCasbinRule, Scope, Subject + + +def _mock_get_or_create_scope(scope_data): + """Mock implementation that creates actual Scope instances.""" + scope, _ = Scope.objects.get_or_create(id=hash(scope_data.external_key) % 10000) + return scope + + +def _mock_get_or_create_subject(subject_data): + """Mock implementation that creates actual Subject instances.""" + subject, _ = Subject.objects.get_or_create(id=hash(subject_data.external_key) % 10000) + return subject + + +# Apply patches at module level using the new manager method +_scope_patcher = patch( + "openedx_authz.models.ScopeManager.get_or_create_for_external_key", + side_effect=_mock_get_or_create_scope, +) +_subject_patcher = patch( + "openedx_authz.models.SubjectManager.get_or_create_for_external_key", + side_effect=_mock_get_or_create_subject, +) +_scope_patcher.start() +_subject_patcher.start() class BaseRolesTestCase(TestCase): @@ -277,6 +305,50 @@ class TestRolesAPI(RolesTestSetupMixin): environments. """ + def test_assign_role_creates_extended_rule(self): + """Assign a role to a subject and verify an ExtendedCasbinRule is created. + + Expected result: + - The assignment function returns True + - An ExtendedCasbinRule record exists linking the subject and scope + """ + + subject = SubjectData(external_key="unit_test_user_assign_1") + role = RoleData(external_key="library_user") + scope = ScopeData(external_key="lib:UnitTest:assign_lib_1") + + subj_before = Subject.objects.get_or_create_for_external_key(subject) + scope_before = Scope.objects.get_or_create_for_external_key(scope) + self.assertFalse(ExtendedCasbinRule.objects.filter(subject=subj_before, scope=scope_before).exists()) + + result = assign_role_to_subject_in_scope(subject, role, scope) + self.assertTrue(result) + + subj_obj = Subject.objects.get_or_create_for_external_key(subject) + scope_obj = Scope.objects.get_or_create_for_external_key(scope) + self.assertTrue(ExtendedCasbinRule.objects.filter(subject=subj_obj, scope=scope_obj).exists()) + + def test_assign_role_fails_when_extended_rule_not_created(self): + """Test that assign_role raises exception when ExtendedCasbinRule creation fails. + + Expected result: + - Exception is raised when ExtendedCasbinRule.create_based_on_policy returns None + - Transaction is rolled back and no role assignment persists + """ + subject = SubjectData(external_key="unit_test_user_assign_fail") + role = RoleData(external_key="library_user") + scope = ScopeData(external_key="lib:UnitTest:assign_fail_lib") + + with patch("openedx_authz.models.ExtendedCasbinRule.create_based_on_policy", return_value=None): + with self.assertRaises(Exception) as context: + assign_role_to_subject_in_scope(subject, role, scope) + + self.assertEqual(str(context.exception), "Failed to create ExtendedCasbinRule for the assignment") + + subj_obj = Subject.objects.get_or_create_for_external_key(subject) + scope_obj = Scope.objects.get_or_create_for_external_key(scope) + self.assertFalse(ExtendedCasbinRule.objects.filter(subject=subj_obj, scope=scope_obj).exists()) + @ddt_data( # Library Admin role with actual permissions from authz.policy ( @@ -877,3 +949,30 @@ def test_get_all_role_assignments_in_scope(self, scope_name, expected_assignment self.assertEqual(len(role_assignments), len(expected_assignments)) for assignment in role_assignments: self.assertIn(assignment, expected_assignments) + + def test_assign_role_creates_extended_casbin_rule(self): + """Test that assigning a role creates an ExtendedCasbinRule record. + + Expected result: + - After assigning a role, an ExtendedCasbinRule is created + - The ExtendedCasbinRule has the correct subject and scope references + - The ExtendedCasbinRule is linked to a CasbinRule + """ + initial_count = ExtendedCasbinRule.objects.count() + subject_data = SubjectData(external_key="test_user_extended") + role_data = RoleData(external_key=roles.LIBRARY_USER.external_key) + scope_data = ScopeData(external_key="lib:TestOrg:TestLib") + + assign_role_to_subject_in_scope(subject_data, role_data, scope_data) + + new_count = ExtendedCasbinRule.objects.count() + self.assertEqual(new_count, initial_count + 1) + + extended_rule = ExtendedCasbinRule.objects.order_by('-id').first() + self.assertIsNotNone(extended_rule) + self.assertIsNotNone(extended_rule.casbin_rule) + self.assertIsNotNone(extended_rule.subject) + self.assertIsNotNone(extended_rule.scope) + self.assertIn(role_data.namespaced_key, extended_rule.casbin_rule_key) + self.assertIn(subject_data.namespaced_key, extended_rule.casbin_rule_key) + self.assertIn(scope_data.namespaced_key, extended_rule.casbin_rule_key) diff --git a/openedx_authz/tests/integration/__init__.py b/openedx_authz/tests/integration/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/openedx_authz/tests/integration/conftest.py b/openedx_authz/tests/integration/conftest.py new file mode 100644 index 00000000..ecec30de --- /dev/null +++ b/openedx_authz/tests/integration/conftest.py @@ -0,0 +1,22 @@ +"""Pytest configuration for openedx-authz tests.""" + +import pytest + + +@pytest.fixture(scope="session") +def django_db_setup(): + """Override django_db_setup to use existing database instead of creating a new one. + + This is necessary when running tests in an edx-platform environment where: + 1. The database already exists + 2. The database user doesn't have CREATE DATABASE permissions + + By providing this fixture, we tell pytest-django to skip database creation + and use the existing database directly. + """ + # Do nothing - use the existing database + + +@pytest.fixture(scope="session") +def django_db_modify_db_settings(): + """Configure database settings to use existing database for tests.""" diff --git a/openedx_authz/tests/integration/test_models.py b/openedx_authz/tests/integration/test_models.py new file mode 100644 index 00000000..9d879dc5 --- /dev/null +++ b/openedx_authz/tests/integration/test_models.py @@ -0,0 +1,1358 @@ +"""Test cases for authorization models. + +This test suite verifies the functionality of the authorization models including: +- Scope model with ContentLibrary integration +- Subject model with User integration +- Polymorphic behavior and registry pattern for Scope and Subject models +- ExtendedCasbinRule model with metadata and relationships +- Cascade deletion behavior across model hierarchies + +Notes: + - Tests use the parent models (Scope, Subject) with polymorphic dispatch + via manager methods to reflect actual production usage. + - Where enforcer behaviour is required, tests exercise the shared + AuthzEnforcer so the production adapter runs without mocks. + +Run these tests in an environment where openedx.core.djangoapps.content_libraries.models +is accessible (e.g., edx-platform with content libraries installed). +""" + +import uuid + +import openedx.core.djangoapps.content_libraries.api as library_api # pylint: disable=import-error +import pytest +from casbin_adapter.models import CasbinRule +from ddt import ddt +from django.contrib.auth import get_user_model +from django.db import IntegrityError +from django.test import TestCase, override_settings +from organizations.api import ensure_organization # pylint: disable=import-error +from organizations.models import Organization # pylint: disable=import-error + +from openedx_authz.api.data import ContentLibraryData, RoleData, SubjectData, UserData +from openedx_authz.api.roles import assign_role_to_subject_in_scope +from openedx_authz.engine.enforcer import AuthzEnforcer +from openedx_authz.models import ( + ContentLibrary, + ContentLibraryScope, + ExtendedCasbinRule, + Scope, + Subject, + UserSubject, +) + +User = get_user_model() + + +def create_test_library(org_short_name, slug=None, title="Test Library"): + """ + Helper function to create a content library using the proper API. + + This uses library_api.create_library() which: + - Creates the ContentLibrary database record + - Creates the associated LearningPackage + - Fires CONTENT_LIBRARY_CREATED event + - Returns ContentLibraryMetadata + + Args: + org_short_name: Organization short name (e.g., "TestOrg") + slug: Library slug (e.g., "TestLib"). If None, generates a unique slug using uuid4. + title: Library title (default: "Test Library") + + Returns: + tuple: (library_metadata, library_key, content_library) + - library_metadata: ContentLibraryMetadata instance from API + - library_key: LibraryLocatorV2 instance + - content_library: ContentLibrary model instance + """ + if slug is None: + slug = f"testlib-{uuid.uuid4().hex[:8]}" + + ensure_organization(org_short_name) + org = Organization.objects.get(short_name=org_short_name) + + library_metadata = library_api.create_library( + org=org, + slug=slug, + title=title, + description=f"A library for testing authorization: {slug}", + ) + library_key = library_metadata.key + content_library = ContentLibrary.objects.get_by_key(library_key) + return library_metadata, library_key, content_library + + +def build_casbin_rule_key(ptype, v0, v1, v2, v3=""): + """Compose the casbin rule key string consistently across tests.""" + return ",".join(str(component or "") for component in (ptype, v0, v1, v2, v3)) + + +@ddt +@override_settings(OPENEDX_AUTHZ_CONTENT_LIBRARY_MODEL="content_libraries.ContentLibrary") +class TestScopeModel(TestCase): + """Test cases for the Scope model. + + These tests create ContentLibrary instances via the content library API and + exercise the Scope manager helpers using ContentLibraryData objects to test + the polymorphic behavior. + """ + + def setUp(self): + """Set up test fixtures.""" + # Create library using the API helper (auto-generates unique slug) + self.library_metadata, self.library_key, self.content_library = create_test_library( + org_short_name="TestOrg", + ) + + def test_get_or_create_for_external_key_creates_new(self): + """Test that get_or_create_for_external_key creates a new Scope when none exists. + + Expected result: + - Scope is created successfully + - Scope is linked to the ContentLibrary + - Only one Scope exists for the ContentLibrary + """ + scope_data = ContentLibraryData(external_key=str(self.library_key)) + + scope = Scope.objects.get_or_create_for_external_key(scope_data) + + self.assertIsNotNone(scope) + self.assertIsInstance(scope, ContentLibraryScope) + self.assertEqual(scope.content_library, self.content_library) + self.assertEqual(Scope.objects.filter(contentlibraryscope__content_library=self.content_library).count(), 1) + + def test_get_or_create_for_external_key_gets_existing(self): + """Test that get_or_create_for_external_key retrieves existing Scope. + + Expected result: + - First call creates the Scope + - Second call retrieves the same Scope + - Only one Scope exists for the ContentLibrary + """ + scope_data = ContentLibraryData(external_key=str(self.library_key)) + + scope1 = Scope.objects.get_or_create_for_external_key(scope_data) + scope2 = Scope.objects.get_or_create_for_external_key(scope_data) + + self.assertEqual(scope1.id, scope2.id) + self.assertEqual(ContentLibraryScope.objects.filter(content_library=self.content_library).count(), 1) + + def test_scope_can_be_created_without_content_library(self): + """Test that Scope can be created without a content_library. + + Expected result: + - Scope is created successfully + - content_library field is None + """ + scope = Scope.objects.create() + + self.assertIsNotNone(scope) + self.assertIsNone(getattr(scope, "content_library", None)) + + def test_scope_cascade_deletion_when_content_library_deleted(self): + """Test that Scope is deleted when its ContentLibrary is deleted. + + Expected result: + - Scope is created successfully + - Deleting ContentLibrary also deletes the Scope + """ + scope_data = ContentLibraryData(external_key=str(self.library_key)) + scope = Scope.objects.get_or_create_for_external_key(scope_data) + scope_id = scope.id + + self.content_library.delete() + + self.assertFalse(Scope.objects.filter(id=scope_id).exists()) + + +@pytest.mark.integration +@override_settings(OPENEDX_AUTHZ_CONTENT_LIBRARY_MODEL="content_libraries.ContentLibrary") +class TestSubjectModel(TestCase): + """Test cases for the Subject model. + + These tests create User instances and exercise the Subject manager helpers + using UserData objects to test the polymorphic behavior. + """ + + def setUp(self): + """Set up test fixtures.""" + self.test_username = "test_user" + self.test_user = User.objects.create_user(username=self.test_username) + + def test_get_or_create_for_external_key_creates_new(self): + """Test that get_or_create_for_external_key creates a new Subject when none exists. + + Expected result: + - Subject is created successfully + - Subject is linked to the User + - Only one Subject exists for the User + """ + subject_data = UserData(external_key=self.test_username) + + subject = Subject.objects.get_or_create_for_external_key(subject_data) + + self.assertIsNotNone(subject) + self.assertIsInstance(subject, UserSubject) + self.assertEqual(subject.user, self.test_user) + self.assertEqual(Subject.objects.filter(usersubject__user=self.test_user).count(), 1) + + def test_get_or_create_for_external_key_gets_existing(self): + """Test that get_or_create_for_external_key retrieves existing Subject. + + Expected result: + - First call creates the Subject + - Second call retrieves the same Subject + - Only one Subject exists for the User + """ + subject_data = UserData(external_key=self.test_username) + + subject1 = Subject.objects.get_or_create_for_external_key(subject_data) + subject2 = Subject.objects.get_or_create_for_external_key(subject_data) + + self.assertEqual(subject1.id, subject2.id) + self.assertEqual(Subject.objects.filter(usersubject__user=self.test_user).count(), 1) + + def test_subject_can_be_created_without_user(self): + """Test that Subject can be created without a user. + + Expected result: + - Subject is created successfully + - user field is None + """ + subject = Subject.objects.create() + + self.assertIsNotNone(subject) + self.assertIsNone(getattr(subject, "user", None)) + + def test_subject_cascade_deletion_when_user_deleted(self): + """Test that Subject is deleted when its User is deleted. + + Expected result: + - Subject is created successfully + - Deleting User also deletes the Subject + """ + subject_data = UserData(external_key=self.test_username) + subject = Subject.objects.get_or_create_for_external_key(subject_data) + subject_id = subject.id + + self.test_user.delete() + + self.assertFalse(Subject.objects.filter(id=subject_id).exists()) + + +@pytest.mark.integration +@override_settings(OPENEDX_AUTHZ_CONTENT_LIBRARY_MODEL="content_libraries.ContentLibrary") +class TestPolymorphicBehavior(TestCase): + """Test cases for polymorphic behavior of Scope and Subject models. + + These tests verify that: + - The registry pattern correctly maps namespaces to subclasses + - Manager methods dispatch to the correct subclass based on namespace + - Queries return instances of the correct polymorphic type + - Multiple subclass types can coexist in the database + """ + + def setUp(self): + """Set up test fixtures.""" + self.test_username = "test_user" + self.test_user = User.objects.create_user(username=self.test_username) + + self.library_metadata, self.library_key, self.content_library = create_test_library( + org_short_name="TestOrg", + ) + self.scope_data = ContentLibraryData(external_key=str(self.library_key)) + self.subject_data = UserData(external_key=self.test_username) + + def test_scope_registry_contains_content_library_namespace(self): + """Test that ContentLibraryScope is registered in Scope._registry. + + Expected result: + - 'lib' namespace is present in registry + - Registry maps 'lib' to ContentLibraryScope class + """ + self.assertEqual(Scope._registry.get("lib"), ContentLibraryScope) # pylint: disable=protected-access + + def test_subject_registry_contains_user_namespace(self): + """Test that UserSubject is registered in Subject._registry. + + Expected result: + - 'user' namespace is present in registry + - Registry maps 'user' to UserSubject class + """ + self.assertEqual(Subject._registry.get("user"), UserSubject) # pylint: disable=protected-access + + def test_scope_manager_dispatches_to_content_library_scope(self): + """Test that Scope manager dispatches to ContentLibraryScope for 'lib' namespace. + + Expected result: + - Scope.objects.get_or_create_for_external_key returns ContentLibraryScope instance + - Instance has content_library attribute + - Instance is linked to the correct ContentLibrary + """ + scope_data = ContentLibraryData(external_key=str(self.library_key)) + + scope = Scope.objects.get_or_create_for_external_key(scope_data) + + self.assertIsInstance(scope, ContentLibraryScope) + self.assertTrue(hasattr(scope, "content_library")) + self.assertEqual(scope.content_library, self.content_library) + + def test_subject_manager_dispatches_to_user_subject(self): + """Test that Subject manager dispatches to UserSubject for 'user' namespace. + + Expected result: + - Subject.objects.get_or_create_for_external_key returns UserSubject instance + - Instance has user attribute + - Instance is linked to the correct User + """ + subject_data = UserData(external_key=self.test_username) + + subject = Subject.objects.get_or_create_for_external_key(subject_data) + + self.assertIsInstance(subject, UserSubject) + self.assertTrue(hasattr(subject, "user")) + self.assertEqual(subject.user, self.test_user) + + def test_scope_manager_raises_error_for_unregistered_namespace(self): + """Test that Scope manager raises ValueError for unknown namespace. + + Expected result: + - ValueError is raised when namespace not in registry + - Error message indicates the unknown namespace + """ + from openedx_authz.api.data import ScopeData # pylint: disable=import-outside-toplevel + + class UnregisteredScopeData(ScopeData): # pylint: disable=abstract-method + NAMESPACE = "unregistered" + + unregistered_data = UnregisteredScopeData(external_key="some_key") + + with self.assertRaises(ValueError) as context: + Scope.objects.get_or_create_for_external_key(unregistered_data) + + self.assertIn("unregistered", str(context.exception)) + + def test_subject_manager_raises_error_for_unregistered_namespace(self): + """Test that Subject manager raises ValueError for unknown namespace. + + Expected result: + - ValueError is raised when namespace not in registry + - Error message indicates the unknown namespace + """ + + class UnregisteredSubjectData(SubjectData): + NAMESPACE = "unregistered" + + unregistered_data = UnregisteredSubjectData(external_key="some_key") + + with self.assertRaises(ValueError) as context: + Subject.objects.get_or_create_for_external_key(unregistered_data) + + self.assertIn("unregistered", str(context.exception)) + + def test_multiple_scope_types_can_coexist(self): + """Test that different Scope subclasses can coexist in the database. + + Expected result: + - Base Scope table contains both ContentLibraryScope and plain Scope + - Each can be queried independently + - Total Scope count includes all types + """ + scope_data = ContentLibraryData(external_key=str(self.library_key)) + content_library_scope = Scope.objects.get_or_create_for_external_key(scope_data) + + plain_scope = Scope.objects.create() + + all_scopes = Scope.objects.all() + all_scope_ids = set(all_scopes.values_list("id", flat=True)) + + self.assertEqual(all_scopes.count(), 2) + self.assertIn(content_library_scope.id, all_scope_ids) + self.assertIn(plain_scope.id, all_scope_ids) + + content_library_scopes = ContentLibraryScope.objects.all() + self.assertEqual(content_library_scopes.count(), 1) + self.assertEqual(content_library_scopes.first().id, content_library_scope.id) + + def test_multiple_subject_types_can_coexist(self): + """Test that different Subject subclasses can coexist in the database. + + Expected result: + - Base Subject table contains both UserSubject and plain Subject + - Each can be queried independently + - Total Subject count includes all types + """ + subject_data = UserData(external_key=self.test_username) + user_subject = Subject.objects.get_or_create_for_external_key(subject_data) + plain_subject = Subject.objects.create() + all_subjects = Subject.objects.all() + all_subject_ids = set(all_subjects.values_list("id", flat=True)) + + self.assertEqual(all_subjects.count(), 2) + self.assertIn(user_subject.id, all_subject_ids) + self.assertIn(plain_subject.id, all_subject_ids) + + user_subjects = UserSubject.objects.all() + self.assertEqual(user_subjects.count(), 1) + self.assertEqual(user_subjects.first().id, user_subject.id) + + def test_scope_query_returns_polymorphic_instances(self): + """Test that querying Scope returns the correct polymorphic instance type. + + Expected result: + - Querying by ID returns ContentLibraryScope instance, not base Scope + - Instance retains all subclass attributes and methods + """ + scope_data = ContentLibraryData(external_key=str(self.library_key)) + created_scope = Scope.objects.get_or_create_for_external_key(scope_data) + queried_scope = Scope.objects.get(id=created_scope.id) + + self.assertIsInstance(queried_scope, Scope) + + polymorphic_scope = ContentLibraryScope.objects.get(id=created_scope.id) + self.assertIsInstance(polymorphic_scope, ContentLibraryScope) + self.assertEqual(polymorphic_scope.content_library, self.content_library) + + def test_subject_query_returns_polymorphic_instances(self): + """Test that querying Subject returns the correct polymorphic instance type. + + Expected result: + - Querying by ID returns UserSubject instance when queried from subclass + - Instance retains all subclass attributes and methods + """ + subject_data = UserData(external_key=self.test_username) + created_subject = Subject.objects.get_or_create_for_external_key(subject_data) + + queried_subject = Subject.objects.get(id=created_subject.id) + + self.assertIsInstance(queried_subject, Subject) + + polymorphic_subject = UserSubject.objects.get(id=created_subject.id) + self.assertIsInstance(polymorphic_subject, UserSubject) + self.assertEqual(polymorphic_subject.user, self.test_user) + + def test_scope_namespace_class_variable_is_set(self): + """Test that Scope subclasses have NAMESPACE class variable set. + + Expected result: + - ContentLibraryScope.NAMESPACE is 'lib' + - Base Scope.NAMESPACE is None + """ + self.assertEqual(ContentLibraryScope.NAMESPACE, "lib") + self.assertIsNone(Scope.NAMESPACE) + + def test_subject_namespace_class_variable_is_set(self): + """Test that Subject subclasses have NAMESPACE class variable set. + + Expected result: + - UserSubject.NAMESPACE is 'user' + - Base Subject.NAMESPACE is None + """ + self.assertEqual(UserSubject.NAMESPACE, "user") + self.assertIsNone(Subject.NAMESPACE) + + +@pytest.mark.integration +@override_settings(OPENEDX_AUTHZ_CONTENT_LIBRARY_MODEL="content_libraries.ContentLibrary") +class TestExtendedCasbinRuleModel(TestCase): + """Test cases for the ExtendedCasbinRule model.""" + + def setUp(self): + """Set up test fixtures.""" + self.test_username = "test_user" + self.test_user = User.objects.create_user(username=self.test_username) + + self.library_metadata, self.library_key, self.content_library = create_test_library( + org_short_name="TestOrg", + ) + + self.casbin_rule = CasbinRule.objects.create( + ptype="p", + v0="user^test_user", + v1="role^instructor", + v2="lib^lib:TestOrg:TestLib", + v3="allow", + ) + + subject_data = UserData(external_key=self.test_username) + self.subject = Subject.objects.get_or_create_for_external_key(subject_data) + + scope_data = ContentLibraryData(external_key=str(self.library_key)) + self.scope = Scope.objects.get_or_create_for_external_key(scope_data) + + def test_extended_casbin_rule_creation_with_all_fields(self): + """Test creating ExtendedCasbinRule with all fields populated. + + Expected Result: + - ExtendedCasbinRule is created successfully. + - All fields are populated correctly. + - Timestamps are set automatically. + """ + casbin_rule_key = ( + f"{self.casbin_rule.ptype},{self.casbin_rule.v0},{self.casbin_rule.v1}," + f"{self.casbin_rule.v2},{self.casbin_rule.v3}" + ) + + extended_rule = ExtendedCasbinRule.objects.create( + casbin_rule_key=casbin_rule_key, + casbin_rule=self.casbin_rule, + description="Test rule for instructor role", + metadata={"created_by": "test_system", "priority": 1}, + scope=self.scope, + subject=self.subject, + ) + + self.assertIsNotNone(extended_rule) + self.assertEqual(extended_rule.casbin_rule_key, casbin_rule_key) + self.assertEqual(extended_rule.casbin_rule, self.casbin_rule) + self.assertEqual(extended_rule.description, "Test rule for instructor role") + self.assertEqual(extended_rule.metadata["created_by"], "test_system") + self.assertEqual(extended_rule.metadata["priority"], 1) + self.assertEqual(extended_rule.scope, self.scope) + self.assertEqual(extended_rule.subject, self.subject) + self.assertIsNotNone(extended_rule.created_at) + self.assertIsNotNone(extended_rule.updated_at) + + def test_extended_casbin_rule_unique_key_constraint(self): + """Test that casbin_rule_key must be unique. + + Expected Result: + - The first ExtendedCasbinRule is created successfully. + - A second ExtendedCasbinRule with the same key raises IntegrityError. + """ + casbin_rule_key = ( + f"{self.casbin_rule.ptype},{self.casbin_rule.v0},{self.casbin_rule.v1}," + f"{self.casbin_rule.v2},{self.casbin_rule.v3}" + ) + + ExtendedCasbinRule.objects.create(casbin_rule_key=casbin_rule_key, casbin_rule=self.casbin_rule) + + casbin_rule2 = CasbinRule.objects.create( + ptype="p", + v0="user^test_user2", + v1="role^admin", + v2="lib^lib:TestOrg:TestLib2", + v3="allow", + ) + + with self.assertRaises(IntegrityError): + ExtendedCasbinRule.objects.create(casbin_rule_key=casbin_rule_key, casbin_rule=casbin_rule2) + + def test_extended_casbin_rule_cascade_deletion_when_casbin_rule_deleted(self): + """Deleting the CasbinRule should cascade through the one-to-one link to ExtendedCasbinRule. + + Expected Result: + - ExtendedCasbinRule baseline row is created successfully. + - Removing the CasbinRule eliminates the ExtendedCasbinRule via database cascade. + """ + casbin_rule_key = ( + f"{self.casbin_rule.ptype},{self.casbin_rule.v0},{self.casbin_rule.v1}," + f"{self.casbin_rule.v2},{self.casbin_rule.v3}" + ) + extended_rule = ExtendedCasbinRule.objects.create(casbin_rule_key=casbin_rule_key, casbin_rule=self.casbin_rule) + extended_rule_id = extended_rule.id + + self.casbin_rule.delete() + + self.assertFalse(ExtendedCasbinRule.objects.filter(id=extended_rule_id).exists()) + + def test_extended_casbin_rule_cascade_deletion_when_scope_deleted(self): + """Deleting a Scope should cascade to ExtendedCasbinRule and trigger the handler cleanup. + + Expected Result: + - ExtendedCasbinRule baseline row links the Scope to the CasbinRule. + - Removing the Scope deletes the ExtendedCasbinRule via database cascade. + - CasbinRule disappears because the post_delete handler mirrors the cascade. + """ + casbin_rule_key = ( + f"{self.casbin_rule.ptype},{self.casbin_rule.v0},{self.casbin_rule.v1}," + f"{self.casbin_rule.v2},{self.casbin_rule.v3}" + ) + extended_rule = ExtendedCasbinRule.objects.create( + casbin_rule_key=casbin_rule_key, + casbin_rule=self.casbin_rule, + scope=self.scope, + ) + extended_rule_id = extended_rule.id + casbin_rule_id = self.casbin_rule.id + scope_id = self.scope.id + + self.scope.delete() + + self.assertFalse(ExtendedCasbinRule.objects.filter(id=extended_rule_id).exists()) + self.assertFalse(CasbinRule.objects.filter(id=casbin_rule_id).exists()) + self.assertFalse(Scope.objects.filter(id=scope_id).exists()) + + def test_extended_casbin_rule_cascade_deletion_when_subject_deleted(self): + """Deleting a Subject should cascade to ExtendedCasbinRule and invoke the handler cleanup. + + Expected Result: + - ExtendedCasbinRule baseline row links the Subject to the CasbinRule. + - Removing the Subject deletes the ExtendedCasbinRule via database cascade. + - CasbinRule disappears because the post_delete handler mirrors the cascade. + """ + casbin_rule_key = ( + f"{self.casbin_rule.ptype},{self.casbin_rule.v0},{self.casbin_rule.v1}," + f"{self.casbin_rule.v2},{self.casbin_rule.v3}" + ) + extended_rule = ExtendedCasbinRule.objects.create( + casbin_rule_key=casbin_rule_key, + casbin_rule=self.casbin_rule, + subject=self.subject, + ) + extended_rule_id = extended_rule.id + casbin_rule_id = self.casbin_rule.id + subject_id = self.subject.id + + self.subject.delete() + + self.assertFalse(ExtendedCasbinRule.objects.filter(id=extended_rule_id).exists()) + self.assertFalse(CasbinRule.objects.filter(id=casbin_rule_id).exists()) + self.assertFalse(Subject.objects.filter(id=subject_id).exists()) + + def test_extended_casbin_rule_metadata_json_field(self): + """Test that metadata JSONField can store complex data structures. + + Expected result: + - ExtendedCasbinRule stores complex metadata + - Metadata is retrieved correctly from database + - Nested structures are preserved + """ + casbin_rule_key = ( + f"{self.casbin_rule.ptype},{self.casbin_rule.v0},{self.casbin_rule.v1}," + f"{self.casbin_rule.v2},{self.casbin_rule.v3}" + ) + complex_metadata = { + "tags": ["test", "instructor", "library"], + "config": { + "enabled": True, + "priority": 10, + "features": ["read", "write", "delete"], + }, + "audit": {"created_by": "system", "last_modified_by": "admin"}, + } + + extended_rule = ExtendedCasbinRule.objects.create( + casbin_rule_key=casbin_rule_key, + casbin_rule=self.casbin_rule, + metadata=complex_metadata, + ) + + retrieved_rule = ExtendedCasbinRule.objects.get(id=extended_rule.id) + + self.assertEqual(retrieved_rule.metadata["tags"], ["test", "instructor", "library"]) + self.assertEqual(retrieved_rule.metadata["config"]["enabled"], True) + self.assertEqual(retrieved_rule.metadata["config"]["priority"], 10) + self.assertEqual(retrieved_rule.metadata["audit"]["created_by"], "system") + + def test_extended_casbin_rule_verbose_names(self): + """Test that model has correct verbose names. + + Expected result: + - Singular verbose name is correct + - Plural verbose name is correct + """ + self.assertEqual(ExtendedCasbinRule._meta.verbose_name, "Extended Casbin Rule") + self.assertEqual(ExtendedCasbinRule._meta.verbose_name_plural, "Extended Casbin Rules") + + def test_extended_casbin_rule_can_be_created_without_optional_fields(self): + """Test that ExtendedCasbinRule can be created with only required fields. + + Expected result: + - ExtendedCasbinRule is created with required fields only + - Optional fields are None/null + """ + casbin_rule_key = "p,user^test2,role^viewer,lib^lib:Org:Lib2,allow" + casbin_rule2 = CasbinRule.objects.create( + ptype="p", + v0="user^test2", + v1="role^viewer", + v2="lib^lib:Org:Lib2", + v3="allow", + ) + + extended_rule = ExtendedCasbinRule.objects.create(casbin_rule_key=casbin_rule_key, casbin_rule=casbin_rule2) + + self.assertIsNotNone(extended_rule) + self.assertIsNone(extended_rule.description) + self.assertIsNone(extended_rule.metadata) + self.assertIsNone(extended_rule.scope) + self.assertIsNone(extended_rule.subject) + + +@pytest.mark.integration +@override_settings(OPENEDX_AUTHZ_CONTENT_LIBRARY_MODEL="content_libraries.ContentLibrary") +class TestExtendedCasbinRuleCreateBasedOnPolicy(TestCase): + """Test cases for ExtendedCasbinRule.create_based_on_policy method. + + The tests rely on the shared AuthzEnforcer instance so the database-backed + adapter is exercised end to end. + """ + + def setUp(self): + """Set up test fixtures.""" + self.test_username = "test_user" + self.test_user = User.objects.create_user(username=self.test_username) + + # Create library using the API helper (auto-generates unique slug) + self.library_metadata, self.library_key, self.content_library = create_test_library( + org_short_name="TestOrg", + ) + + def test_create_based_on_policy_generates_correct_casbin_rule_key(self): + """Test that create_based_on_policy generates the correct unique casbin_rule_key. + + Expected result: + - ExtendedCasbinRule is created successfully + - casbin_rule_key follows expected format + - Related Scope and Subject are linked correctly + """ + subject_data = UserData(external_key=self.test_username) + role_data = RoleData(external_key="instructor") + scope_data = ContentLibraryData(external_key=str(self.library_key)) + + subject = Subject.objects.get_or_create_for_external_key(subject_data) + scope = Scope.objects.get_or_create_for_external_key(scope_data) + + casbin_rule = CasbinRule.objects.create( + ptype="g", + v0=subject_data.namespaced_key, + v1=role_data.namespaced_key, + v2=scope_data.namespaced_key, + v3="", + ) + + adapter = AuthzEnforcer.get_adapter() + + expected_key = f"g,{subject_data.namespaced_key},{role_data.namespaced_key},{scope_data.namespaced_key}," + + result = ExtendedCasbinRule.create_based_on_policy( + subject=subject_data, + role=role_data, + scope=scope_data, + adapter=adapter, + ) + + self.assertEqual(result.casbin_rule_key, expected_key) + self.assertEqual(result.casbin_rule, casbin_rule) + self.assertEqual(result.scope, scope) + self.assertEqual(result.subject, subject) + + def test_create_based_on_policy_is_idempotent(self): + """Test that calling create_based_on_policy multiple times with same params returns same rule. + + Expected result: + - First call creates the ExtendedCasbinRule + - Second call returns the same ExtendedCasbinRule + - Only one ExtendedCasbinRule exists + """ + subject_data = UserData(external_key=self.test_username) + role_data = RoleData(external_key="instructor") + scope_data = ContentLibraryData(external_key=str(self.library_key)) + + Subject.objects.get_or_create_for_external_key(subject_data) + Scope.objects.get_or_create_for_external_key(scope_data) + + CasbinRule.objects.create( + ptype="g", + v0=subject_data.namespaced_key, + v1=role_data.namespaced_key, + v2=scope_data.namespaced_key, + v3="", + ) + + adapter = AuthzEnforcer.get_adapter() + + result1 = ExtendedCasbinRule.create_based_on_policy( + subject=subject_data, + role=role_data, + scope=scope_data, + adapter=adapter, + ) + + result2 = ExtendedCasbinRule.create_based_on_policy( + subject=subject_data, + role=role_data, + scope=scope_data, + adapter=adapter, + ) + + self.assertEqual(result1.id, result2.id) + self.assertEqual(ExtendedCasbinRule.objects.count(), 1) + + +@pytest.mark.integration +@override_settings(OPENEDX_AUTHZ_CONTENT_LIBRARY_MODEL="content_libraries.ContentLibrary") +class TestModelRelationships(TestCase): + """Test cases for model relationships and related_name attributes.""" + + def setUp(self): + """Set up test fixtures.""" + self.test_username = "test_user" + self.test_user = User.objects.create_user(username=self.test_username) + subject_data = UserData(external_key=self.test_username) + self.subject = Subject.objects.get_or_create_for_external_key(subject_data) + + # Create library using the API helper (auto-generates unique slug) + self.library_metadata, self.library_key, self.content_library = create_test_library( + org_short_name="TestOrg", + ) + + self.casbin_rule = CasbinRule.objects.create( + ptype="p", + v0="user^test_user", + v1="role^instructor", + v2="lib^lib:TestOrg:TestLib", + v3="allow", + ) + + def test_user_can_access_subjects_via_related_name(self): + """Test that User can access related Subject objects via authz_subjects. + + Expected result: + - User has exactly one related Subject + - Related Subject matches the created Subject + """ + self.assertEqual(self.test_user.authz_subjects.count(), 1) + self.assertEqual(self.test_user.authz_subjects.first(), self.subject) + + def test_subject_can_access_casbin_rules_via_related_name(self): + """Test that Subject can access related ExtendedCasbinRule objects via casbin_rules. + + Expected result: + - Subject has exactly one related ExtendedCasbinRule + - Related ExtendedCasbinRule matches the created rule + """ + casbin_rule_key = ( + f"{self.casbin_rule.ptype},{self.casbin_rule.v0},{self.casbin_rule.v1}," + f"{self.casbin_rule.v2},{self.casbin_rule.v3}" + ) + extended_rule = ExtendedCasbinRule.objects.create( + casbin_rule_key=casbin_rule_key, + casbin_rule=self.casbin_rule, + subject=self.subject, + ) + + self.assertEqual(self.subject.casbin_rules.count(), 1) + self.assertEqual(self.subject.casbin_rules.first(), extended_rule) + + def test_scope_can_access_casbin_rules_via_related_name(self): + """Test that Scope can access related ExtendedCasbinRule objects via casbin_rules. + + Expected result: + - Scope has exactly one related ExtendedCasbinRule + - Related ExtendedCasbinRule matches the created rule + """ + scope_data = ContentLibraryData(external_key=str(self.library_key)) + scope = Scope.objects.get_or_create_for_external_key(scope_data) + + casbin_rule_key = ( + f"{self.casbin_rule.ptype},{self.casbin_rule.v0},{self.casbin_rule.v1}," + f"{self.casbin_rule.v2},{self.casbin_rule.v3}" + ) + extended_rule = ExtendedCasbinRule.objects.create( + casbin_rule_key=casbin_rule_key, casbin_rule=self.casbin_rule, scope=scope + ) + + self.assertEqual(scope.casbin_rules.count(), 1) + self.assertEqual(scope.casbin_rules.first(), extended_rule) + + def test_casbin_rule_can_access_extended_rule_via_related_name(self): + """Test that CasbinRule can access related ExtendedCasbinRule via extended_rule. + + Expected result: + - CasbinRule has exactly one related ExtendedCasbinRule (OneToOne relationship) + - Related ExtendedCasbinRule matches the created rule + """ + casbin_rule_key = ( + f"{self.casbin_rule.ptype},{self.casbin_rule.v0},{self.casbin_rule.v1}," + f"{self.casbin_rule.v2},{self.casbin_rule.v3}" + ) + extended_rule = ExtendedCasbinRule.objects.create(casbin_rule_key=casbin_rule_key, casbin_rule=self.casbin_rule) + + self.assertEqual(self.casbin_rule.extended_rule, extended_rule) + + def test_content_library_can_access_scopes_via_related_name(self): + """Test that ContentLibrary can access related Scope objects via authz_scopes. + + Expected result: + - ContentLibrary has exactly one related Scope + - Related Scope matches the created Scope + """ + scope_data = ContentLibraryData(external_key=str(self.library_key)) + scope = Scope.objects.get_or_create_for_external_key(scope_data) + + self.assertEqual(self.content_library.authz_scopes.count(), 1) + self.assertEqual(self.content_library.authz_scopes.first(), scope) + + +@pytest.mark.integration +@override_settings(OPENEDX_AUTHZ_CONTENT_LIBRARY_MODEL="content_libraries.ContentLibrary") +class TestModelCascadeDeletionChain(TestCase): + """Test cases for cascade deletion chains across multiple models.""" + + def setUp(self): + """Set up test fixtures.""" + self.test_username = "test_user" + self.test_user = User.objects.create_user(username=self.test_username) + + self.library_metadata, self.library_key, self.content_library = create_test_library( + org_short_name="TestOrg", + ) + + def test_content_library_deletion_cascades_to_extended_casbin_rules(self): + """Deleting a ContentLibrary should cascade through Scope and allow the signal to clean policies. + + Expected Result: + - Removing the ContentLibrary deletes the associated Scope. + - The Scope cascade removes the ExtendedCasbinRule rows. + - The post_delete handler deletes the matching CasbinRule rows. + """ + scope_data = ContentLibraryData(external_key=str(self.library_key)) + scope = Scope.objects.get_or_create_for_external_key(scope_data) + + casbin_rule = CasbinRule.objects.create( + ptype="p", + v0="user^test_user", + v1="role^instructor", + v2=scope_data.namespaced_key, + v3="allow", + ) + + casbin_rule_key = f"{casbin_rule.ptype},{casbin_rule.v0},{casbin_rule.v1},{casbin_rule.v2},{casbin_rule.v3}" + extended_rule = ExtendedCasbinRule.objects.create( + casbin_rule_key=casbin_rule_key, casbin_rule=casbin_rule, scope=scope + ) + extended_rule_id = extended_rule.id + casbin_rule_id = casbin_rule.id + + self.content_library.delete() + + self.assertFalse(Scope.objects.filter(id=scope.id).exists()) + self.assertFalse(ExtendedCasbinRule.objects.filter(id=extended_rule_id).exists()) + self.assertFalse(CasbinRule.objects.filter(id=casbin_rule_id).exists()) + + def test_user_deletion_cascades_to_extended_casbin_rules(self): + """Deleting a User should cascade through Subject and allow the signal to clean policies. + + Expected Result: + - Removing the User deletes the associated Subject. + - The Subject cascade removes the ExtendedCasbinRule rows. + - The post_delete handler deletes the matching CasbinRule rows. + """ + subject_data = UserData(external_key=self.test_username) + subject = Subject.objects.get_or_create_for_external_key(subject_data) + + casbin_rule = CasbinRule.objects.create( + ptype="p", + v0=subject_data.namespaced_key, + v1="role^instructor", + v2="lib^lib:TestOrg:TestLib", + v3="allow", + ) + + casbin_rule_key = f"{casbin_rule.ptype},{casbin_rule.v0},{casbin_rule.v1},{casbin_rule.v2},{casbin_rule.v3}" + extended_rule = ExtendedCasbinRule.objects.create( + casbin_rule_key=casbin_rule_key, casbin_rule=casbin_rule, subject=subject + ) + extended_rule_id = extended_rule.id + casbin_rule_id = casbin_rule.id + + self.test_user.delete() + + self.assertFalse(Subject.objects.filter(id=subject.id).exists()) + self.assertFalse(ExtendedCasbinRule.objects.filter(id=extended_rule_id).exists()) + self.assertFalse(CasbinRule.objects.filter(id=casbin_rule_id).exists()) + + def test_complete_cascade_deletion_chain(self): + """Deleting the CasbinRule should illustrate the limits of reverse cascades. + + Expected Result: + - The ExtendedCasbinRule row disappears when its CasbinRule is deleted. + - Subject and Scope rows remain because the cascade stops at ExtendedCasbinRule. + - User and ContentLibrary rows remain unaffected. + """ + subject_data = UserData(external_key=self.test_username) + subject = Subject.objects.get_or_create_for_external_key(subject_data) + + scope_data = ContentLibraryData(external_key=str(self.library_key)) + scope = Scope.objects.get_or_create_for_external_key(scope_data) + + casbin_rule = CasbinRule.objects.create( + ptype="p", + v0=subject_data.namespaced_key, + v1="role^instructor", + v2=scope_data.namespaced_key, + v3="allow", + ) + + casbin_rule_key = f"{casbin_rule.ptype},{casbin_rule.v0},{casbin_rule.v1},{casbin_rule.v2},{casbin_rule.v3}" + extended_rule = ExtendedCasbinRule.objects.create( + casbin_rule_key=casbin_rule_key, + casbin_rule=casbin_rule, + subject=subject, + scope=scope, + ) + extended_rule_id = extended_rule.id + + self.assertTrue(ExtendedCasbinRule.objects.filter(id=extended_rule_id).exists()) + + casbin_rule.delete() + + self.assertFalse(ExtendedCasbinRule.objects.filter(id=extended_rule_id).exists()) + self.assertTrue(Subject.objects.filter(id=subject.id).exists()) + self.assertTrue(Scope.objects.filter(id=scope.id).exists()) + self.assertTrue(User.objects.filter(id=self.test_user.id).exists()) + self.assertTrue(ContentLibrary.objects.filter(id=self.content_library.id).exists()) + + def test_library_deletion_via_api_cascades_to_authorization_system(self): + """Test that deleting a library via API cascades through entire authorization chain. + + This tests the proper deletion path through library_api.delete_library() which + triggers the CONTENT_LIBRARY_DELETED event and verifies that all related + authorization data is properly cleaned up. + + This test differs from test_content_library_deletion_cascades_to_extended_casbin_rules + in that it uses the proper API methods (assign_role_to_subject_in_scope and + library_api.delete_library) rather than direct model operations, testing the + full integration path that would occur in production. + + Expected result: + - User has instructor role assigned in library scope + - ExtendedCasbinRule tracks the role assignment + - Deleting library via API removes: + * ContentLibrary itself + * Associated Scope (ContentLibraryScope) + * ExtendedCasbinRule linked to the scope + - CasbinRule and Subject remain (they're not tied to scope lifecycle) + """ + # Create or get a user and assign them the instructor role in this library's scope + test_username = "test_instructor_lib_del" + test_user, _ = User.objects.get_or_create( + username=test_username, defaults={"email": f"{test_username}@example.com"} + ) + + subject_data = UserData(external_key=test_username) + scope_data = ContentLibraryData(external_key=str(self.library_key)) + role_data = RoleData(external_key="instructor") + + assign_role_to_subject_in_scope(subject_data, role_data, scope_data) + + subject = Subject.objects.get_or_create_for_external_key(subject_data) + scope = Scope.objects.get_or_create_for_external_key(scope_data) + + extended_rules = ExtendedCasbinRule.objects.filter(scope=scope, subject=subject) + self.assertEqual(extended_rules.count(), 1) + extended_rule = extended_rules.first() + extended_rule_id = extended_rule.id + + casbin_rule = extended_rule.casbin_rule + casbin_rule_id = casbin_rule.id + + scope_id = scope.id + subject_id = subject.id + user_id = test_user.id + + self.assertTrue(Scope.objects.filter(id=scope_id).exists()) + self.assertTrue(ExtendedCasbinRule.objects.filter(id=extended_rule_id).exists()) + self.assertTrue(CasbinRule.objects.filter(id=casbin_rule_id).exists()) + self.assertTrue(Subject.objects.filter(id=subject_id).exists()) + self.assertTrue(User.objects.filter(id=user_id).exists()) + + library_api.delete_library(self.library_key) + + self.assertFalse(Scope.objects.filter(id=scope_id).exists()) + + self.assertFalse(ExtendedCasbinRule.objects.filter(id=extended_rule_id).exists()) + + self.assertFalse(CasbinRule.objects.filter(id=casbin_rule_id).exists()) + + self.assertTrue(Subject.objects.filter(id=subject_id).exists()) + self.assertTrue(User.objects.filter(id=user_id).exists()) + + def test_user_deletion_via_model_cascades_to_authorization_system(self): + """Test that deleting a user cascades through entire authorization chain. + + This tests that when a User is deleted, all related authorization data + is properly cleaned up through the cascade deletion chain. + + This test differs from test_user_deletion_cascades_to_extended_casbin_rules + in that it uses the proper API method (assign_role_to_subject_in_scope) + rather than direct model operations, testing the full integration path + that would occur in production. + + Expected result: + - User has instructor role assigned in a library scope + - ExtendedCasbinRule tracks the role assignment + - Deleting User removes: + * User itself + * Associated Subject (UserSubject) + * ExtendedCasbinRule linked to the subject + - CasbinRule and Scope remain (they're not tied to user lifecycle) + """ + subject_data = UserData(external_key=self.test_username) + scope_data = ContentLibraryData(external_key=str(self.library_key)) + role_data = RoleData(external_key="instructor") + + assign_role_to_subject_in_scope(subject_data, role_data, scope_data) + + subject = Subject.objects.get_or_create_for_external_key(subject_data) + scope = Scope.objects.get_or_create_for_external_key(scope_data) + + extended_rules = ExtendedCasbinRule.objects.filter(scope=scope, subject=subject) + self.assertEqual(extended_rules.count(), 1) + extended_rule = extended_rules.first() + extended_rule_id = extended_rule.id + + casbin_rule = extended_rule.casbin_rule + casbin_rule_id = casbin_rule.id + + scope_id = scope.id + subject_id = subject.id + user_id = self.test_user.id + + self.assertTrue(Subject.objects.filter(id=subject_id).exists()) + self.assertTrue(ExtendedCasbinRule.objects.filter(id=extended_rule_id).exists()) + self.assertTrue(CasbinRule.objects.filter(id=casbin_rule_id).exists()) + self.assertTrue(Scope.objects.filter(id=scope_id).exists()) + self.assertTrue(User.objects.filter(id=user_id).exists()) + + self.test_user.delete() + + self.assertFalse(User.objects.filter(id=user_id).exists()) + self.assertFalse(Subject.objects.filter(id=subject_id).exists()) + self.assertFalse(ExtendedCasbinRule.objects.filter(id=extended_rule_id).exists()) + self.assertFalse(CasbinRule.objects.filter(id=casbin_rule_id).exists()) + self.assertTrue(Scope.objects.filter(id=scope_id).exists()) + + def test_content_library_scope_direct_deletion_does_not_delete_content_library(self): + """Test that deleting ContentLibraryScope directly does not delete ContentLibrary. + + This test verifies the ForeignKey CASCADE behavior: child deletion doesn't cascade to parent. + + Expected result: + - ContentLibraryScope is deleted + - Scope is deleted (multi-table inheritance) + - ExtendedCasbinRule is deleted (CASCADE from Scope) + - CasbinRule is deleted (via pre_delete signal handler) + - ContentLibrary REMAINS (parent is not cascade-deleted by child) + """ + scope_data = ContentLibraryData(external_key=str(self.library_key)) + scope = Scope.objects.get_or_create_for_external_key(scope_data) + content_library_scope = ContentLibraryScope.objects.get(id=scope.id) + + casbin_rule = CasbinRule.objects.create( + ptype="p", + v0="user^test_user", + v1="role^instructor", + v2=scope_data.namespaced_key, + v3="allow", + ) + + casbin_rule_key = f"{casbin_rule.ptype},{casbin_rule.v0},{casbin_rule.v1},{casbin_rule.v2},{casbin_rule.v3}" + extended_rule = ExtendedCasbinRule.objects.create( + casbin_rule_key=casbin_rule_key, casbin_rule=casbin_rule, scope=scope + ) + extended_rule_id = extended_rule.id + casbin_rule_id = casbin_rule.id + scope_id = scope.id + content_library_id = self.content_library.id + + content_library_scope.delete() + + self.assertFalse(ContentLibraryScope.objects.filter(id=scope_id).exists()) + self.assertFalse(Scope.objects.filter(id=scope_id).exists()) + self.assertFalse(ExtendedCasbinRule.objects.filter(id=extended_rule_id).exists()) + self.assertFalse(CasbinRule.objects.filter(id=casbin_rule_id).exists()) + self.assertTrue(ContentLibrary.objects.filter(id=content_library_id).exists()) + + def test_user_subject_direct_deletion_does_not_delete_user(self): + """Test that deleting UserSubject directly does not delete User. + + This test verifies the ForeignKey CASCADE behavior: child deletion doesn't cascade to parent. + + Expected result: + - UserSubject is deleted + - Subject is deleted (multi-table inheritance) + - ExtendedCasbinRule is deleted (CASCADE from Subject) + - CasbinRule is deleted (via pre_delete signal handler) + - User REMAINS (parent is not cascade-deleted by child) + """ + subject_data = UserData(external_key=self.test_username) + subject = Subject.objects.get_or_create_for_external_key(subject_data) + user_subject = UserSubject.objects.get(id=subject.id) + + casbin_rule = CasbinRule.objects.create( + ptype="p", + v0=subject_data.namespaced_key, + v1="role^instructor", + v2="lib^lib:TestOrg:TestLib", + v3="allow", + ) + + casbin_rule_key = f"{casbin_rule.ptype},{casbin_rule.v0},{casbin_rule.v1},{casbin_rule.v2},{casbin_rule.v3}" + extended_rule = ExtendedCasbinRule.objects.create( + casbin_rule_key=casbin_rule_key, casbin_rule=casbin_rule, subject=subject + ) + extended_rule_id = extended_rule.id + casbin_rule_id = casbin_rule.id + subject_id = subject.id + user_id = self.test_user.id + + user_subject.delete() + + self.assertFalse(UserSubject.objects.filter(id=subject_id).exists()) + self.assertFalse(Subject.objects.filter(id=subject_id).exists()) + self.assertFalse(ExtendedCasbinRule.objects.filter(id=extended_rule_id).exists()) + self.assertFalse(CasbinRule.objects.filter(id=casbin_rule_id).exists()) + self.assertTrue(User.objects.filter(id=user_id).exists()) + + def test_extended_casbin_rule_direct_deletion_deletes_casbin_rule(self): + """Deleting the ExtendedCasbinRule should trigger the signal to remove its CasbinRule. + + Expected Result: + - ExtendedCasbinRule row is deleted successfully. + - Companion CasbinRule row is removed by the post_delete handler. + - Scope and Subject rows remain intact because cascades stop at ExtendedCasbinRule. + """ + subject_data = UserData(external_key=self.test_username) + subject = Subject.objects.get_or_create_for_external_key(subject_data) + + scope_data = ContentLibraryData(external_key=str(self.library_key)) + scope = Scope.objects.get_or_create_for_external_key(scope_data) + + casbin_rule = CasbinRule.objects.create( + ptype="p", + v0=subject_data.namespaced_key, + v1="role^instructor", + v2=scope_data.namespaced_key, + v3="allow", + ) + + casbin_rule_key = f"{casbin_rule.ptype},{casbin_rule.v0},{casbin_rule.v1},{casbin_rule.v2},{casbin_rule.v3}" + extended_rule = ExtendedCasbinRule.objects.create( + casbin_rule_key=casbin_rule_key, + casbin_rule=casbin_rule, + scope=scope, + subject=subject, + ) + extended_rule_id = extended_rule.id + casbin_rule_id = casbin_rule.id + scope_id = scope.id + subject_id = subject.id + + extended_rule.delete() + + self.assertFalse(ExtendedCasbinRule.objects.filter(id=extended_rule_id).exists()) + self.assertFalse(CasbinRule.objects.filter(id=casbin_rule_id).exists()) + self.assertTrue(Scope.objects.filter(id=scope_id).exists()) + self.assertTrue(Subject.objects.filter(id=subject_id).exists()) + + def test_bulk_delete_extended_casbin_rules_deletes_casbin_rules(self): + """Deleting ExtendedCasbinRule rows via a queryset should purge each CasbinRule. + + Expected Result: + - All ExtendedCasbinRule rows in the queryset disappear. + - Each related CasbinRule row is deleted by the post_delete handler. + - Scope row remains available. + """ + scope_data = ContentLibraryData(external_key=str(self.library_key)) + scope = Scope.objects.get_or_create_for_external_key(scope_data) + + casbin_rule_ids = [] + extended_rule_ids = [] + + for i in range(3): + casbin_rule = CasbinRule.objects.create( + ptype="p", + v0=f"user^test_user_{i}", + v1="role^instructor", + v2=scope_data.namespaced_key, + v3="allow", + ) + casbin_rule_ids.append(casbin_rule.id) + + casbin_rule_key = f"{casbin_rule.ptype},{casbin_rule.v0},{casbin_rule.v1},{casbin_rule.v2},{casbin_rule.v3}" + extended_rule = ExtendedCasbinRule.objects.create( + casbin_rule_key=casbin_rule_key, + casbin_rule=casbin_rule, + scope=scope, + ) + extended_rule_ids.append(extended_rule.id) + + ExtendedCasbinRule.objects.filter(scope=scope).delete() + + for extended_rule_id in extended_rule_ids: + self.assertFalse(ExtendedCasbinRule.objects.filter(id=extended_rule_id).exists()) + + for casbin_rule_id in casbin_rule_ids: + self.assertFalse(CasbinRule.objects.filter(id=casbin_rule_id).exists()) + + self.assertTrue(Scope.objects.filter(id=scope.id).exists()) + + def test_extended_casbin_rule_with_null_scope_deletion(self): + """Deleting an ExtendedCasbinRule without a Scope should still purge the CasbinRule. + + Expected Result: + - ExtendedCasbinRule row is deleted successfully. + - CasbinRule row is removed by the post_delete handler even with ``scope`` set to ``None``. + """ + casbin_rule = CasbinRule.objects.create( + ptype="p", + v0="user^test_user", + v1="role^admin", + v2="*", + v3="allow", + ) + + casbin_rule_key = f"{casbin_rule.ptype},{casbin_rule.v0},{casbin_rule.v1},{casbin_rule.v2},{casbin_rule.v3}" + extended_rule = ExtendedCasbinRule.objects.create( + casbin_rule_key=casbin_rule_key, + casbin_rule=casbin_rule, + scope=None, # Null scope + subject=None, + ) + extended_rule_id = extended_rule.id + casbin_rule_id = casbin_rule.id + + extended_rule.delete() + + self.assertFalse(ExtendedCasbinRule.objects.filter(id=extended_rule_id).exists()) + self.assertFalse(CasbinRule.objects.filter(id=casbin_rule_id).exists()) + + def test_extended_casbin_rule_with_null_subject_deletion(self): + """Deleting an ExtendedCasbinRule without a Subject should still purge the CasbinRule. + + Expected Result: + - ExtendedCasbinRule row is deleted successfully. + - CasbinRule row is removed by the post_delete handler even with ``subject`` set to ``None``. + - Scope row remains available. + """ + scope_data = ContentLibraryData(external_key=str(self.library_key)) + scope = Scope.objects.get_or_create_for_external_key(scope_data) + + casbin_rule = CasbinRule.objects.create( + ptype="p", + v0="role^instructor", + v1="read", + v2=scope_data.namespaced_key, + v3="allow", + ) + + casbin_rule_key = f"{casbin_rule.ptype},{casbin_rule.v0},{casbin_rule.v1},{casbin_rule.v2},{casbin_rule.v3}" + extended_rule = ExtendedCasbinRule.objects.create( + casbin_rule_key=casbin_rule_key, + casbin_rule=casbin_rule, + scope=scope, + subject=None, + ) + extended_rule_id = extended_rule.id + casbin_rule_id = casbin_rule.id + scope_id = scope.id + + extended_rule.delete() + + self.assertFalse(ExtendedCasbinRule.objects.filter(id=extended_rule_id).exists()) + self.assertFalse(CasbinRule.objects.filter(id=casbin_rule_id).exists()) + + self.assertTrue(Scope.objects.filter(id=scope_id).exists()) diff --git a/openedx_authz/tests/integration/test_views.py b/openedx_authz/tests/integration/test_views.py new file mode 100644 index 00000000..bd136007 --- /dev/null +++ b/openedx_authz/tests/integration/test_views.py @@ -0,0 +1,124 @@ +"""Integration tests for openedx_authz views.""" + +import os +import uuid +from urllib.parse import urlencode + +import casbin +import pytest +from django.contrib.auth import get_user_model +from django.test import TestCase +from django.urls import reverse +from rest_framework import status +from rest_framework.test import APIClient + +from openedx_authz import ROOT_DIRECTORY +from openedx_authz.api.users import assign_role_to_user_in_scope +from openedx_authz.engine.enforcer import AuthzEnforcer +from openedx_authz.engine.utils import migrate_policy_between_enforcers +from openedx_authz.models.core import ExtendedCasbinRule +from openedx_authz.tests.integration.test_models import create_test_library + +User = get_user_model() + + +@pytest.mark.integration +class TestRoleAssignmentView(TestCase): + """Tests for the role assignment view.""" + + @classmethod + def setUpClass(cls): + """Set up test class - seed database with policies.""" + super().setUpClass() + # Seed the database with policies from the policy file + # This loads the policy definitions (p, g rules) that define what permissions each role has + global_enforcer = AuthzEnforcer.get_enforcer() + global_enforcer.load_policy() + + # Use absolute paths based on the package ROOT_DIRECTORY + model_conf = os.path.join(ROOT_DIRECTORY, "engine", "config", "model.conf") + authz_policy = os.path.join(ROOT_DIRECTORY, "engine", "config", "authz.policy") + + migrate_policy_between_enforcers( + source_enforcer=casbin.Enforcer(model_conf, authz_policy), + target_enforcer=global_enforcer, + ) + + def setUp(self): + """Set up the test client and any required data.""" + self.client = APIClient() + self.url = reverse("openedx_authz:openedx_authz:role-user-list") + self.library_metadata, self.library_key, self.content_library = create_test_library("TestOrg") + self.role_key = "library_admin" + + # Create random users to avoid conflicts in persistent database + unique_id = uuid.uuid4().hex[:8] + self.user = User.objects.create_user(username=f"test_user_{unique_id}", email=f"test_{unique_id}@example.com") + self.admin_user = User.objects.create_user( + username=f"admin_user_{unique_id}", email=f"admin_{unique_id}@example.com", is_staff=True, is_superuser=True + ) + + assign_role_to_user_in_scope( + user_external_key=self.admin_user.username, + role_external_key=self.role_key, + scope_external_key=str(self.library_key), + ) + self.client.force_authenticate(user=self.admin_user) + + def test_role_assignment_with_extended_model(self): + """Test role assignment when ExtendedCasbinRule model is in use. + + Expected Results: + - Role assignment is successful (HTTP 207 Multi-Status). + - An ExtendedCasbinRule is created with the correct scope and subject. + """ + payload = { + "users": [self.user.username], + "role": self.role_key, + "scope": str(self.library_key), + } + + response = self.client.put(self.url, payload, format="json") + + self.assertEqual(response.status_code, status.HTTP_207_MULTI_STATUS) + self.assertEqual(len(response.data["completed"]), 1) + + extended_rule = ExtendedCasbinRule.objects.filter( + subject__usersubject__user=self.user, + scope__contentlibraryscope__content_library=self.content_library, + ).first() + self.assertIsNotNone(extended_rule) + self.assertIn(payload["role"], extended_rule.casbin_rule_key) + + def test_role_unassignment_with_extended_model(self): + """Test role unassignment when ExtendedCasbinRule model is in use. + + Expected Results: + - Role unassignment is successful (HTTP 207 Multi-Status). + - The associated ExtendedCasbinRule is deleted. + - No orphaned ExtendedCasbinRule remains after unassignment. + """ + payload = { + "users": [self.user.username], + "role": self.role_key, + "scope": str(self.library_key), + } + create_response = self.client.put(self.url, payload, format="json") + self.assertEqual(create_response.status_code, status.HTTP_207_MULTI_STATUS) + self.assertEqual(len(create_response.data["completed"]), 1) + + delete_params = { + "role": self.role_key, + "scope": str(self.library_key), + "users": self.user.username, + } + unassign_response = self.client.delete(f"{self.url}?{urlencode(delete_params)}") + + self.assertEqual(unassign_response.status_code, status.HTTP_207_MULTI_STATUS) + self.assertEqual(len(unassign_response.data["completed"]), 1) + + extended_rule = ExtendedCasbinRule.objects.filter( + subject__usersubject__user=self.user, + scope__contentlibraryscope__content_library=self.content_library, + ).first() + self.assertIsNone(extended_rule) diff --git a/openedx_authz/tests/rest_api/test_views.py b/openedx_authz/tests/rest_api/test_views.py index 4a962d3a..71018f0e 100644 --- a/openedx_authz/tests/rest_api/test_views.py +++ b/openedx_authz/tests/rest_api/test_views.py @@ -127,13 +127,19 @@ def setUpClass(cls): def create_regular_users(cls, quantity: int): """Create regular users.""" for i in range(1, quantity + 1): - User.objects.create_user(username=f"regular_{i}", email=f"regular_{i}@example.com") + User.objects.get_or_create(username=f"regular_{i}", defaults={"email": f"regular_{i}@example.com"}) @classmethod def create_admin_users(cls, quantity: int): """Create admin users.""" for i in range(1, quantity + 1): - User.objects.create_superuser(username=f"admin_{i}", email=f"admin_{i}@example.com") + user, created = User.objects.get_or_create( + username=f"admin_{i}", defaults={"email": f"admin_{i}@example.com"} + ) + if created: + user.is_superuser = True + user.is_staff = True + user.save() @classmethod def setUpTestData(cls): diff --git a/openedx_authz/tests/stubs/__init__.py b/openedx_authz/tests/stubs/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/openedx_authz/tests/stubs/apps.py b/openedx_authz/tests/stubs/apps.py new file mode 100644 index 00000000..0bae2b36 --- /dev/null +++ b/openedx_authz/tests/stubs/apps.py @@ -0,0 +1,9 @@ +"""Django app configuration for test stubs.""" + +from django.apps import AppConfig + + +class StubsConfig(AppConfig): + default_auto_field = "django.db.models.AutoField" + name = "openedx_authz.tests.stubs" + verbose_name = "Test stubs app" diff --git a/openedx_authz/tests/stubs/migrations/0001_initial.py b/openedx_authz/tests/stubs/migrations/0001_initial.py new file mode 100644 index 00000000..8e4af2b8 --- /dev/null +++ b/openedx_authz/tests/stubs/migrations/0001_initial.py @@ -0,0 +1,31 @@ +# Generated by Django for test stubs +from django.db import migrations, models + + +class Migration(migrations.Migration): + initial = True + + dependencies = [] + + operations = [ + migrations.CreateModel( + name="ContentLibrary", + fields=[ + ( + "id", + models.BigAutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ( + "locator", + models.CharField(max_length=255, unique=True, db_index=True), + ), + ("title", models.CharField(max_length=255, blank=True, null=True)), + ("created_at", models.DateTimeField(auto_now_add=True)), + ], + ), + ] diff --git a/openedx_authz/tests/stubs/models.py b/openedx_authz/tests/stubs/models.py new file mode 100644 index 00000000..97010e44 --- /dev/null +++ b/openedx_authz/tests/stubs/models.py @@ -0,0 +1,46 @@ +"""Stub models for testing ContentLibrary-related functionality. + +These models mimic the behavior of the actual models so the models can be +referenced in FK relationships without requiring the full application context. +""" + +from django.db import models +from opaque_keys.edx.locator import LibraryLocatorV2 + + +class ContentLibraryManager(models.Manager): + """Manager for ContentLibrary model with helper methods.""" + + def get_by_key(self, library_key): + """Get or create a ContentLibrary by its library key. + + Args: + library_key: The library key to look up. + + Returns: + ContentLibrary: The library instance. + """ + if library_key is None: + raise ValueError("library_key must not be None") + try: + key = str(LibraryLocatorV2.from_string(str(library_key))) + except Exception: # pylint: disable=broad-exception-caught + key = str(library_key) + obj, _ = self.get_or_create(locator=key) + return obj + + +class ContentLibrary(models.Model): + """Stub model representing a content library for testing purposes. + + .. no_pii: + """ + + locator = models.CharField(max_length=255, unique=True, db_index=True) + title = models.CharField(max_length=255, blank=True, null=True) + created_at = models.DateTimeField(auto_now_add=True) + + objects = ContentLibraryManager() + + def __str__(self): + return self.locator diff --git a/openedx_authz/tests/test_handlers.py b/openedx_authz/tests/test_handlers.py new file mode 100644 index 00000000..1b31e1f9 --- /dev/null +++ b/openedx_authz/tests/test_handlers.py @@ -0,0 +1,222 @@ +"""Behavioral tests for the ExtendedCasbinRule deletion signal. + +Coverage confirms direct deletions, cascades, bulk operations, and resilience when foreign keys +are missing so that the signal stays aligned with the cleanup guarantees in +``openedx_authz.handlers``. +""" + +from unittest.mock import patch + +from casbin_adapter.models import CasbinRule +from django.test import TestCase + +from openedx_authz.models.core import ExtendedCasbinRule, Scope, Subject + + +def create_casbin_rule_with_extended( # pylint: disable=too-many-positional-arguments + ptype="p", + v0="user^test_user", + v1="role^instructor", + v2="lib^test:library", + v3="allow", + scope=None, + subject=None, +): + """ + Helper function to create a CasbinRule with an associated ExtendedCasbinRule. + + Args: + ptype: Policy type (default: "p") + v0: Policy value 0 (default: "user^test_user") + v1: Policy value 1 (default: "role^instructor") + v2: Policy value 2 (default: "lib^test:library") + v3: Policy value 3 (default: "allow") + scope: Optional Scope instance to link + subject: Optional Subject instance to link + + Returns: + tuple: (casbin_rule, extended_rule) + """ + casbin_rule = CasbinRule.objects.create( + ptype=ptype, + v0=v0, + v1=v1, + v2=v2, + v3=v3, + ) + + casbin_rule_key = f"{casbin_rule.ptype},{casbin_rule.v0},{casbin_rule.v1},{casbin_rule.v2},{casbin_rule.v3}" + extended_rule = ExtendedCasbinRule.objects.create( + casbin_rule_key=casbin_rule_key, + casbin_rule=casbin_rule, + scope=scope, + subject=subject, + ) + + return casbin_rule, extended_rule + + +class TestExtendedCasbinRuleDeletionSignalHandlers(TestCase): + """Confirm the post_delete handler keeps ExtendedCasbinRule and CasbinRule in sync.""" + + def setUp(self): + """Create a baseline CasbinRule and ExtendedCasbinRule for each test.""" + self.casbin_rule, self.extended_rule = create_casbin_rule_with_extended() + + def test_deleting_extended_casbin_rule_deletes_casbin_rule(self): + """Deleting an ExtendedCasbinRule directly should trigger the signal that removes the + linked CasbinRule to avoid orphaned policy records. + + Expected Result: + - ExtendedCasbinRule record with the captured id no longer exists. + - Associated CasbinRule row is removed by the signal handler. + """ + extended_rule_id = self.extended_rule.id + casbin_rule_id = self.casbin_rule.id + + self.extended_rule.delete() + + self.assertFalse(ExtendedCasbinRule.objects.filter(id=extended_rule_id).exists()) + self.assertFalse(CasbinRule.objects.filter(id=casbin_rule_id).exists()) + + def test_deleting_casbin_rule_deletes_extended_casbin_rule(self): + """Deleting the CasbinRule should cascade through the one-to-one relationship and allow the + signal handler to exit quietly because the policy row is already gone. + + Expected Result: + - CasbinRule entry with the captured id no longer exists. + - ExtendedCasbinRule row cascades away with the same id. + - Signal completes without raising even though it has nothing left to delete. + """ + extended_rule_id = self.extended_rule.id + casbin_rule_id = self.casbin_rule.id + + self.casbin_rule.delete() + + self.assertFalse(ExtendedCasbinRule.objects.filter(id=extended_rule_id).exists()) + self.assertFalse(CasbinRule.objects.filter(id=casbin_rule_id).exists()) + + def test_signal_logs_exception_when_casbin_delete_fails(self): + """A failure deleting the CasbinRule should be logged without blocking later cleanups. + + Expected Result: + - Logger captures the exception raised by the delete attempt. + - ExtendedCasbinRule row is removed but the CasbinRule row persists. + - A subsequent ExtendedCasbinRule deletion still removes both records. + """ + extended_rule_id = self.extended_rule.id + casbin_rule_id = self.casbin_rule.id + extra_casbin_rule, extra_extended_rule = create_casbin_rule_with_extended( + v0="user^resilient", + v1="role^assistant", + v2="lib^resilient", + ) + + with ( + patch("openedx_authz.handlers.logger") as mock_logger, + patch("openedx_authz.handlers.CasbinRule.objects.filter") as mock_filter, + ): + mock_filter.return_value.delete.side_effect = RuntimeError("delete failed") + + self.extended_rule.delete() + + mock_logger.exception.assert_called_once() + self.assertIn("Error deleting CasbinRule", mock_logger.exception.call_args[0][0]) + + self.assertFalse(ExtendedCasbinRule.objects.filter(id=extended_rule_id).exists()) + self.assertTrue(CasbinRule.objects.filter(id=casbin_rule_id).exists()) + + extra_extended_rule.delete() + + self.assertFalse(ExtendedCasbinRule.objects.filter(id=extra_extended_rule.id).exists()) + self.assertFalse(CasbinRule.objects.filter(id=extra_casbin_rule.id).exists()) + + def test_bulk_delete_extended_casbin_rules_deletes_casbin_rules(self): + """Bulk deleting ExtendedCasbinRule rows should trigger the signal for each record so all + related CasbinRule entries disappear. + + Expected Result: + - All targeted ExtendedCasbinRule ids are absent after the delete call. + - CasbinRule rows backing those ids are also removed. + """ + casbin_rule_2, extended_rule_2 = create_casbin_rule_with_extended( + v0="user^test_user_2", + v1="role^student", + v2="lib^test:library_2", + ) + + casbin_rule_ids = [self.casbin_rule.id, casbin_rule_2.id] + extended_rule_ids = [self.extended_rule.id, extended_rule_2.id] + + ExtendedCasbinRule.objects.filter(id__in=extended_rule_ids).delete() + + self.assertEqual(ExtendedCasbinRule.objects.filter(id__in=extended_rule_ids).count(), 0) + self.assertEqual(CasbinRule.objects.filter(id__in=casbin_rule_ids).count(), 0) + + def test_cascade_deletion_with_scope_and_subject(self): + """Deleting a Subject that participates in an ExtendedCasbinRule should cascade through the + relationship and let the signal clear the CasbinRule while unrelated Scope data stays. + + Expected Result: + - Subject row is removed. + - Related ExtendedCasbinRule and CasbinRule instances no longer exist. + - Scope row referenced in the policy remains in place. + """ + scope = Scope.objects.create() + subject = Subject.objects.create() + + casbin_rule, extended_rule = create_casbin_rule_with_extended( + ptype="g", + v0="user^test_user", + v1="role^instructor", + v2="lib^test:library", + v3="", + scope=scope, + subject=subject, + ) + + casbin_rule_id = casbin_rule.id + extended_rule_id = extended_rule.id + scope_id = scope.id + subject_id = subject.id + + subject.delete() + + self.assertFalse(ExtendedCasbinRule.objects.filter(id=extended_rule_id).exists()) + self.assertFalse(CasbinRule.objects.filter(id=casbin_rule_id).exists()) + self.assertFalse(Subject.objects.filter(id=subject_id).exists()) + self.assertTrue(Scope.objects.filter(id=scope_id).exists()) + + def test_cascade_deletion_with_scope_deletion(self): + """Removing a Scope should cascade through the ExtendedCasbinRule relationship and rely on + the signal to delete the companion CasbinRule while Subjects remain available. + + Expected Result: + - Scope row is removed. + - Related ExtendedCasbinRule and CasbinRule rows no longer exist. + - Subject row referenced in the policy still exists after the cascade. + """ + scope = Scope.objects.create() + subject = Subject.objects.create() + + casbin_rule, extended_rule = create_casbin_rule_with_extended( + ptype="g", + v0="user^test_user", + v1="role^instructor", + v2="lib^test:library", + v3="", + scope=scope, + subject=subject, + ) + + casbin_rule_id = casbin_rule.id + extended_rule_id = extended_rule.id + scope_id = scope.id + subject_id = subject.id + + scope.delete() + + self.assertFalse(ExtendedCasbinRule.objects.filter(id=extended_rule_id).exists()) + self.assertFalse(CasbinRule.objects.filter(id=casbin_rule_id).exists()) + self.assertFalse(Scope.objects.filter(id=scope_id).exists()) + self.assertTrue(Subject.objects.filter(id=subject_id).exists()) diff --git a/openedx_authz/tests/test_models.py b/openedx_authz/tests/test_models.py new file mode 100644 index 00000000..4bdf77ca --- /dev/null +++ b/openedx_authz/tests/test_models.py @@ -0,0 +1,138 @@ +"""Unit tests for authorization models using stub ContentLibrary. + +This test suite verifies the functionality of the authorization models including: +- ExtendedCasbinRule model with metadata and relationships +- Cascade deletion behavior across model hierarchies + +These tests use the stub ContentLibrary model from openedx_authz.tests.stubs.models +instead of the real ContentLibrary model, allowing them to run without the full +edx-platform context. + +Note: This is a simplified unit test suite. For comprehensive tests of Scope/Subject +polymorphism and registry patterns, see the integration tests in test_integration/test_models.py +which run against the real ContentLibrary model. +""" + +from casbin_adapter.models import CasbinRule +from django.contrib.auth import get_user_model +from django.test import TestCase +from opaque_keys.edx.locator import LibraryLocatorV2 + +from openedx_authz.api.data import ContentLibraryData, UserData +from openedx_authz.models import ExtendedCasbinRule, Scope, Subject +from openedx_authz.tests.stubs.models import ContentLibrary + +User = get_user_model() + + +class TestExtendedCasbinRuleModelWithStub(TestCase): + """Test cases for the ExtendedCasbinRule model using stub setup.""" + + def setUp(self): + """Set up test fixtures.""" + self.test_username = "test_user" + self.test_user = User.objects.create_user(username=self.test_username, email="test@example.com") + + self.library_key = LibraryLocatorV2.from_string("lib:TestOrg:TestLib") + self.content_library = ContentLibrary.objects.get_by_key(self.library_key) + + self.casbin_rule = CasbinRule.objects.create( + ptype="p", + v0="user^test_user", + v1="role^instructor", + v2="lib^lib:TestOrg:TestLib", + v3="allow", + ) + + subject_data = UserData(external_key=self.test_username) + self.subject = Subject.objects.get_or_create_for_external_key(subject_data) + + scope_data = ContentLibraryData(external_key=str(self.library_key)) + self.scope = Scope.objects.get_or_create_for_external_key(scope_data) + + def test_extended_casbin_rule_creation_with_all_fields(self): + """Test creating ExtendedCasbinRule with all fields populated. + + Expected Result: + - ExtendedCasbinRule is created successfully. + - All fields are populated correctly. + - Timestamps are set automatically. + """ + casbin_rule_key = ( + f"{self.casbin_rule.ptype},{self.casbin_rule.v0},{self.casbin_rule.v1}," + f"{self.casbin_rule.v2},{self.casbin_rule.v3}" + ) + + extended_rule = ExtendedCasbinRule.objects.create( + casbin_rule_key=casbin_rule_key, + casbin_rule=self.casbin_rule, + description="Test rule for instructor role", + metadata={"created_by": "test_system", "priority": 1}, + scope=self.scope, + subject=self.subject, + ) + + self.assertIsNotNone(extended_rule) + self.assertEqual(extended_rule.casbin_rule_key, casbin_rule_key) + self.assertEqual(extended_rule.casbin_rule, self.casbin_rule) + self.assertEqual(extended_rule.description, "Test rule for instructor role") + self.assertEqual(extended_rule.metadata["created_by"], "test_system") + self.assertEqual(extended_rule.metadata["priority"], 1) + self.assertEqual(extended_rule.scope, self.scope) + self.assertEqual(extended_rule.subject, self.subject) + self.assertIsNotNone(extended_rule.created_at) + self.assertIsNotNone(extended_rule.updated_at) + + def test_extended_casbin_rule_cascade_deletion_when_scope_deleted(self): + """Deleting a Scope should cascade to ExtendedCasbinRule and trigger the handler cleanup. + + Expected Result: + - ExtendedCasbinRule baseline row links the Scope to the CasbinRule. + - Removing the Scope deletes the ExtendedCasbinRule via database cascade. + - CasbinRule disappears because the post_delete handler mirrors the cascade. + """ + casbin_rule_key = ( + f"{self.casbin_rule.ptype},{self.casbin_rule.v0},{self.casbin_rule.v1}," + f"{self.casbin_rule.v2},{self.casbin_rule.v3}" + ) + extended_rule = ExtendedCasbinRule.objects.create( + casbin_rule_key=casbin_rule_key, + casbin_rule=self.casbin_rule, + scope=self.scope, + ) + extended_rule_id = extended_rule.id + casbin_rule_id = self.casbin_rule.id + scope_id = self.scope.id + + self.scope.delete() + + self.assertFalse(ExtendedCasbinRule.objects.filter(id=extended_rule_id).exists()) + self.assertFalse(CasbinRule.objects.filter(id=casbin_rule_id).exists()) + self.assertFalse(Scope.objects.filter(id=scope_id).exists()) + + def test_extended_casbin_rule_cascade_deletion_when_subject_deleted(self): + """Deleting a Subject should cascade to ExtendedCasbinRule and invoke the handler cleanup. + + Expected Result: + - ExtendedCasbinRule baseline row links the Subject to the CasbinRule. + - Removing the Subject deletes the ExtendedCasbinRule via database cascade. + - CasbinRule disappears because the post_delete handler mirrors the cascade. + """ + casbin_rule_key = ( + f"{self.casbin_rule.ptype},{self.casbin_rule.v0},{self.casbin_rule.v1}," + f"{self.casbin_rule.v2},{self.casbin_rule.v3}" + ) + extended_rule = ExtendedCasbinRule.objects.create( + casbin_rule_key=casbin_rule_key, + casbin_rule=self.casbin_rule, + subject=self.subject, + ) + extended_rule_id = extended_rule.id + casbin_rule_id = self.casbin_rule.id + subject_id = self.subject.id + + self.subject.delete() + + self.assertFalse(ExtendedCasbinRule.objects.filter(id=extended_rule_id).exists()) + self.assertFalse(CasbinRule.objects.filter(id=casbin_rule_id).exists()) + self.assertFalse(Subject.objects.filter(id=subject_id).exists()) diff --git a/test_utils/__init__.py b/test_utils/__init__.py deleted file mode 100644 index 94666f1c..00000000 --- a/test_utils/__init__.py +++ /dev/null @@ -1,10 +0,0 @@ -""" -Test utilities. - -Since pytest discourages putting __init__.py into test directory -(i.e. making tests a package) one cannot import from anywhere -under tests folder. However, some utility classes/methods might be useful -in multiple test modules (i.e. factoryboy factories, base test classes). - -So this package is the place to put them. -""" diff --git a/tox.ini b/tox.ini index 6326bb9d..38dbf93e 100644 --- a/tox.ini +++ b/tox.ini @@ -32,7 +32,7 @@ match-dir = (?!migrations) [pytest] DJANGO_SETTINGS_MODULE = openedx_authz.settings.test -addopts = --cov openedx_authz --cov tests --cov-report term-missing --cov-report xml +addopts = --cov openedx_authz --cov tests --cov-report term-missing --cov-report xml --ignore=openedx_authz/tests/integration norecursedirs = .* docs requirements site-packages [testenv] @@ -73,11 +73,9 @@ deps = setuptools -r{toxinidir}/requirements/quality.txt commands = - touch tests/__init__.py - pylint openedx_authz tests test_utils manage.py setup.py - rm tests/__init__.py - ruff check openedx_authz tests test_utils manage.py setup.py - pydocstyle openedx_authz tests manage.py setup.py + pylint openedx_authz manage.py setup.py + ruff check openedx_authz manage.py setup.py + pydocstyle openedx_authz manage.py setup.py make selfcheck [testenv:pii_check]