From df1e196dd66fbac1c47f2895e9daf1019bc4ac75 Mon Sep 17 00:00:00 2001 From: Maria Grimaldi Date: Tue, 14 Oct 2025 10:47:13 +0200 Subject: [PATCH 01/20] feat: add initial extended model for consistency and additional storage --- conftest.py | 24 +++++++++++ openedx_authz/api/data.py | 3 ++ openedx_authz/migrations/0001_initial.py | 51 ++++++++++++++++++++++++ 3 files changed, 78 insertions(+) create mode 100644 conftest.py create mode 100644 openedx_authz/migrations/0001_initial.py diff --git a/conftest.py b/conftest.py new file mode 100644 index 00000000..18f8c44b --- /dev/null +++ b/conftest.py @@ -0,0 +1,24 @@ +"""Pytest configuration for openedx-authz tests.""" + +import pytest + + +@pytest.fixture(scope='session') +def django_db_setup(): + """Override django_db_setup to use existing database instead of creating a new one. + + This is necessary when running tests in an edx-platform environment where: + 1. The database already exists + 2. The database user doesn't have CREATE DATABASE permissions + + By providing this fixture, we tell pytest-django to skip database creation + and use the existing database directly. + """ + # Do nothing - use the existing database + pass + + +@pytest.fixture(scope='session') +def django_db_modify_db_settings(): + """Configure database settings to use existing database for tests.""" + pass diff --git a/openedx_authz/api/data.py b/openedx_authz/api/data.py index 3eb6e6c5..7f9b6ea6 100644 --- a/openedx_authz/api/data.py +++ b/openedx_authz/api/data.py @@ -318,6 +318,8 @@ class ScopeData(AuthZData, metaclass=ScopeMeta): # Subclasses like ContentLibraryData ('lib') represent concrete resource types with their own namespaces. NAMESPACE: ClassVar[str] = "global" + scope_id: int = None # Optional field to link to actual scope instance + @classmethod def validate_external_key(cls, _: str) -> bool: """Validate the external_key format for ScopeData. @@ -545,6 +547,7 @@ class SubjectData(AuthZData, metaclass=SubjectMeta): NAMESPACE: ClassVar[str] = "sub" + subject_id: int = None # Optional field to link to actual subject instance @define class UserData(SubjectData): diff --git a/openedx_authz/migrations/0001_initial.py b/openedx_authz/migrations/0001_initial.py new file mode 100644 index 00000000..836165a0 --- /dev/null +++ b/openedx_authz/migrations/0001_initial.py @@ -0,0 +1,51 @@ +# Generated by Django 5.2.7 on 2025-10-16 09:15 + +import django.db.models.deletion +from django.conf import settings +from django.db import migrations, models + + +class Migration(migrations.Migration): + + initial = True + + dependencies = [ + ('casbin_adapter', '0001_initial'), + ('content_libraries', '0011_remove_contentlibrary_bundle_uuid_and_more'), + migrations.swappable_dependency(settings.AUTH_USER_MODEL), + ] + + operations = [ + migrations.CreateModel( + name='Scope', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('content_library', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, related_name='authz_scopes', to='content_libraries.contentlibrary')), + ], + ), + migrations.CreateModel( + name='Subject', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('user', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, related_name='authz_subjects', to=settings.AUTH_USER_MODEL)), + ], + ), + migrations.CreateModel( + name='ExtendedCasbinRule', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('casbin_rule_key', models.CharField(max_length=255, unique=True)), + ('description', models.TextField(blank=True, null=True)), + ('created_at', models.DateTimeField(auto_now_add=True)), + ('updated_at', models.DateTimeField(auto_now=True)), + ('metadata', models.JSONField(blank=True, null=True)), + ('casbin_rule', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='extended_rule', to='casbin_adapter.casbinrule')), + ('scope', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, related_name='casbin_rules', to='openedx_authz.scope')), + ('subject', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, related_name='casbin_rules', to='openedx_authz.subject')), + ], + options={ + 'verbose_name': 'Extended Casbin Rule', + 'verbose_name_plural': 'Extended Casbin Rules', + }, + ), + ] From 9d3b39538f770ea7fe2c9003eef120e938d2778d Mon Sep 17 00:00:00 2001 From: Maria Grimaldi Date: Thu, 16 Oct 2025 22:37:32 +0200 Subject: [PATCH 02/20] feat: add model to be used as backreference to maintain rules up to date --- conftest.py | 24 ---- openedx_authz/migrations/0001_initial.py | 107 ++++++++++++++---- openedx_authz/tests/test_models.py | 138 ----------------------- 3 files changed, 87 insertions(+), 182 deletions(-) delete mode 100644 conftest.py delete mode 100644 openedx_authz/tests/test_models.py diff --git a/conftest.py b/conftest.py deleted file mode 100644 index 18f8c44b..00000000 --- a/conftest.py +++ /dev/null @@ -1,24 +0,0 @@ -"""Pytest configuration for openedx-authz tests.""" - -import pytest - - -@pytest.fixture(scope='session') -def django_db_setup(): - """Override django_db_setup to use existing database instead of creating a new one. - - This is necessary when running tests in an edx-platform environment where: - 1. The database already exists - 2. The database user doesn't have CREATE DATABASE permissions - - By providing this fixture, we tell pytest-django to skip database creation - and use the existing database directly. - """ - # Do nothing - use the existing database - pass - - -@pytest.fixture(scope='session') -def django_db_modify_db_settings(): - """Configure database settings to use existing database for tests.""" - pass diff --git a/openedx_authz/migrations/0001_initial.py b/openedx_authz/migrations/0001_initial.py index 836165a0..679cad26 100644 --- a/openedx_authz/migrations/0001_initial.py +++ b/openedx_authz/migrations/0001_initial.py @@ -10,42 +10,109 @@ class Migration(migrations.Migration): initial = True dependencies = [ - ('casbin_adapter', '0001_initial'), - ('content_libraries', '0011_remove_contentlibrary_bundle_uuid_and_more'), + ("casbin_adapter", "0001_initial"), + ("content_libraries", "0011_remove_contentlibrary_bundle_uuid_and_more"), migrations.swappable_dependency(settings.AUTH_USER_MODEL), ] operations = [ migrations.CreateModel( - name='Scope', + name="Scope", fields=[ - ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), - ('content_library', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, related_name='authz_scopes', to='content_libraries.contentlibrary')), + ( + "id", + models.BigAutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ( + "content_library", + models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.CASCADE, + related_name="authz_scopes", + to="content_libraries.contentlibrary", + ), + ), ], ), migrations.CreateModel( - name='Subject', + name="Subject", fields=[ - ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), - ('user', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, related_name='authz_subjects', to=settings.AUTH_USER_MODEL)), + ( + "id", + models.BigAutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ( + "user", + models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.CASCADE, + related_name="authz_subjects", + to=settings.AUTH_USER_MODEL, + ), + ), ], ), migrations.CreateModel( - name='ExtendedCasbinRule', + name="ExtendedCasbinRule", fields=[ - ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), - ('casbin_rule_key', models.CharField(max_length=255, unique=True)), - ('description', models.TextField(blank=True, null=True)), - ('created_at', models.DateTimeField(auto_now_add=True)), - ('updated_at', models.DateTimeField(auto_now=True)), - ('metadata', models.JSONField(blank=True, null=True)), - ('casbin_rule', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='extended_rule', to='casbin_adapter.casbinrule')), - ('scope', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, related_name='casbin_rules', to='openedx_authz.scope')), - ('subject', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, related_name='casbin_rules', to='openedx_authz.subject')), + ( + "id", + models.BigAutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ("casbin_rule_key", models.CharField(max_length=255, unique=True)), + ("description", models.TextField(blank=True, null=True)), + ("created_at", models.DateTimeField(auto_now_add=True)), + ("updated_at", models.DateTimeField(auto_now=True)), + ("metadata", models.JSONField(blank=True, null=True)), + ( + "casbin_rule", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name="extended_rule", + to="casbin_adapter.casbinrule", + ), + ), + ( + "scope", + models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.CASCADE, + related_name="casbin_rules", + to="openedx_authz.scope", + ), + ), + ( + "subject", + models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.CASCADE, + related_name="casbin_rules", + to="openedx_authz.subject", + ), + ), ], options={ - 'verbose_name': 'Extended Casbin Rule', - 'verbose_name_plural': 'Extended Casbin Rules', + "verbose_name": "Extended Casbin Rule", + "verbose_name_plural": "Extended Casbin Rules", }, ), ] diff --git a/openedx_authz/tests/test_models.py b/openedx_authz/tests/test_models.py deleted file mode 100644 index 4bdf77ca..00000000 --- a/openedx_authz/tests/test_models.py +++ /dev/null @@ -1,138 +0,0 @@ -"""Unit tests for authorization models using stub ContentLibrary. - -This test suite verifies the functionality of the authorization models including: -- ExtendedCasbinRule model with metadata and relationships -- Cascade deletion behavior across model hierarchies - -These tests use the stub ContentLibrary model from openedx_authz.tests.stubs.models -instead of the real ContentLibrary model, allowing them to run without the full -edx-platform context. - -Note: This is a simplified unit test suite. For comprehensive tests of Scope/Subject -polymorphism and registry patterns, see the integration tests in test_integration/test_models.py -which run against the real ContentLibrary model. -""" - -from casbin_adapter.models import CasbinRule -from django.contrib.auth import get_user_model -from django.test import TestCase -from opaque_keys.edx.locator import LibraryLocatorV2 - -from openedx_authz.api.data import ContentLibraryData, UserData -from openedx_authz.models import ExtendedCasbinRule, Scope, Subject -from openedx_authz.tests.stubs.models import ContentLibrary - -User = get_user_model() - - -class TestExtendedCasbinRuleModelWithStub(TestCase): - """Test cases for the ExtendedCasbinRule model using stub setup.""" - - def setUp(self): - """Set up test fixtures.""" - self.test_username = "test_user" - self.test_user = User.objects.create_user(username=self.test_username, email="test@example.com") - - self.library_key = LibraryLocatorV2.from_string("lib:TestOrg:TestLib") - self.content_library = ContentLibrary.objects.get_by_key(self.library_key) - - self.casbin_rule = CasbinRule.objects.create( - ptype="p", - v0="user^test_user", - v1="role^instructor", - v2="lib^lib:TestOrg:TestLib", - v3="allow", - ) - - subject_data = UserData(external_key=self.test_username) - self.subject = Subject.objects.get_or_create_for_external_key(subject_data) - - scope_data = ContentLibraryData(external_key=str(self.library_key)) - self.scope = Scope.objects.get_or_create_for_external_key(scope_data) - - def test_extended_casbin_rule_creation_with_all_fields(self): - """Test creating ExtendedCasbinRule with all fields populated. - - Expected Result: - - ExtendedCasbinRule is created successfully. - - All fields are populated correctly. - - Timestamps are set automatically. - """ - casbin_rule_key = ( - f"{self.casbin_rule.ptype},{self.casbin_rule.v0},{self.casbin_rule.v1}," - f"{self.casbin_rule.v2},{self.casbin_rule.v3}" - ) - - extended_rule = ExtendedCasbinRule.objects.create( - casbin_rule_key=casbin_rule_key, - casbin_rule=self.casbin_rule, - description="Test rule for instructor role", - metadata={"created_by": "test_system", "priority": 1}, - scope=self.scope, - subject=self.subject, - ) - - self.assertIsNotNone(extended_rule) - self.assertEqual(extended_rule.casbin_rule_key, casbin_rule_key) - self.assertEqual(extended_rule.casbin_rule, self.casbin_rule) - self.assertEqual(extended_rule.description, "Test rule for instructor role") - self.assertEqual(extended_rule.metadata["created_by"], "test_system") - self.assertEqual(extended_rule.metadata["priority"], 1) - self.assertEqual(extended_rule.scope, self.scope) - self.assertEqual(extended_rule.subject, self.subject) - self.assertIsNotNone(extended_rule.created_at) - self.assertIsNotNone(extended_rule.updated_at) - - def test_extended_casbin_rule_cascade_deletion_when_scope_deleted(self): - """Deleting a Scope should cascade to ExtendedCasbinRule and trigger the handler cleanup. - - Expected Result: - - ExtendedCasbinRule baseline row links the Scope to the CasbinRule. - - Removing the Scope deletes the ExtendedCasbinRule via database cascade. - - CasbinRule disappears because the post_delete handler mirrors the cascade. - """ - casbin_rule_key = ( - f"{self.casbin_rule.ptype},{self.casbin_rule.v0},{self.casbin_rule.v1}," - f"{self.casbin_rule.v2},{self.casbin_rule.v3}" - ) - extended_rule = ExtendedCasbinRule.objects.create( - casbin_rule_key=casbin_rule_key, - casbin_rule=self.casbin_rule, - scope=self.scope, - ) - extended_rule_id = extended_rule.id - casbin_rule_id = self.casbin_rule.id - scope_id = self.scope.id - - self.scope.delete() - - self.assertFalse(ExtendedCasbinRule.objects.filter(id=extended_rule_id).exists()) - self.assertFalse(CasbinRule.objects.filter(id=casbin_rule_id).exists()) - self.assertFalse(Scope.objects.filter(id=scope_id).exists()) - - def test_extended_casbin_rule_cascade_deletion_when_subject_deleted(self): - """Deleting a Subject should cascade to ExtendedCasbinRule and invoke the handler cleanup. - - Expected Result: - - ExtendedCasbinRule baseline row links the Subject to the CasbinRule. - - Removing the Subject deletes the ExtendedCasbinRule via database cascade. - - CasbinRule disappears because the post_delete handler mirrors the cascade. - """ - casbin_rule_key = ( - f"{self.casbin_rule.ptype},{self.casbin_rule.v0},{self.casbin_rule.v1}," - f"{self.casbin_rule.v2},{self.casbin_rule.v3}" - ) - extended_rule = ExtendedCasbinRule.objects.create( - casbin_rule_key=casbin_rule_key, - casbin_rule=self.casbin_rule, - subject=self.subject, - ) - extended_rule_id = extended_rule.id - casbin_rule_id = self.casbin_rule.id - subject_id = self.subject.id - - self.subject.delete() - - self.assertFalse(ExtendedCasbinRule.objects.filter(id=extended_rule_id).exists()) - self.assertFalse(CasbinRule.objects.filter(id=casbin_rule_id).exists()) - self.assertFalse(Subject.objects.filter(id=subject_id).exists()) From 833cc2cb9ed3a9005261a9025483afa0266cf9fe Mon Sep 17 00:00:00 2001 From: Maria Grimaldi Date: Mon, 20 Oct 2025 14:00:34 +0200 Subject: [PATCH 03/20] refactor!: use registry pattern to extend base models --- openedx_authz/migrations/0001_initial.py | 143 +++++++----------- ...002_alter_contentlibraryscope_scope_ptr.py | 19 +++ openedx_authz/tests/api/test_data.py | 4 +- openedx_authz/tests/api/test_roles.py | 24 ++- openedx_authz/tests/api/test_users.py | 4 +- openedx_authz/tests/rest_api/test_views.py | 36 +++-- openedx_authz/tests/test_commands.py | 49 +++++- openedx_authz/tests/test_enforcement.py | 8 +- openedx_authz/tests/test_filter.py | 11 +- 9 files changed, 185 insertions(+), 113 deletions(-) create mode 100644 openedx_authz/migrations/0002_alter_contentlibraryscope_scope_ptr.py diff --git a/openedx_authz/migrations/0001_initial.py b/openedx_authz/migrations/0001_initial.py index 679cad26..98ce87f3 100644 --- a/openedx_authz/migrations/0001_initial.py +++ b/openedx_authz/migrations/0001_initial.py @@ -1,4 +1,4 @@ -# Generated by Django 5.2.7 on 2025-10-16 09:15 +# Generated by Django 5.2.7 on 2025-10-20 13:18 import django.db.models.deletion from django.conf import settings @@ -10,109 +10,82 @@ class Migration(migrations.Migration): initial = True dependencies = [ - ("casbin_adapter", "0001_initial"), - ("content_libraries", "0011_remove_contentlibrary_bundle_uuid_and_more"), - migrations.swappable_dependency(settings.AUTH_USER_MODEL), - ] + ('casbin_adapter', '0001_initial'), + migrations.swappable_dependency( + getattr( + settings, + "OPENEDX_AUTHZ_CONTENT_LIBRARY_MODEL", + "content_libraries.ContentLibrary", + ) + ), + migrations.swappable_dependency(settings.AUTH_USER_MODEL), + ] operations = [ migrations.CreateModel( - name="Scope", + name='Scope', fields=[ - ( - "id", - models.BigAutoField( - auto_created=True, - primary_key=True, - serialize=False, - verbose_name="ID", - ), - ), - ( - "content_library", - models.ForeignKey( - blank=True, - null=True, - on_delete=django.db.models.deletion.CASCADE, - related_name="authz_scopes", - to="content_libraries.contentlibrary", - ), - ), + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), ], + options={ + 'abstract': False, + }, ), migrations.CreateModel( - name="Subject", + name='Subject', fields=[ - ( - "id", - models.BigAutoField( - auto_created=True, - primary_key=True, - serialize=False, - verbose_name="ID", - ), - ), - ( - "user", - models.ForeignKey( - blank=True, - null=True, - on_delete=django.db.models.deletion.CASCADE, - related_name="authz_subjects", - to=settings.AUTH_USER_MODEL, - ), - ), + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), ], + options={ + 'abstract': False, + }, ), migrations.CreateModel( - name="ExtendedCasbinRule", + name='ExtendedCasbinRule', fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('casbin_rule_key', models.CharField(max_length=255, unique=True)), + ('description', models.TextField(blank=True, null=True)), + ('created_at', models.DateTimeField(auto_now_add=True)), + ('updated_at', models.DateTimeField(auto_now=True)), + ('metadata', models.JSONField(blank=True, null=True)), + ('casbin_rule', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='extended_rule', to='casbin_adapter.casbinrule')), + ('scope', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, related_name='casbin_rules', to='openedx_authz.scope')), + ('subject', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, related_name='casbin_rules', to='openedx_authz.subject')), + ], + options={ + 'verbose_name': 'Extended Casbin Rule', + 'verbose_name_plural': 'Extended Casbin Rules', + }, + ), + migrations.CreateModel( + name='ContentLibraryScope', + fields=[ + ('scope_ptr', models.OneToOneField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID', to='openedx_authz.scope', parent_link=True, on_delete=django.db.models.deletion.CASCADE)), ( - "id", - models.BigAutoField( - auto_created=True, - primary_key=True, - serialize=False, - verbose_name="ID", - ), - ), - ("casbin_rule_key", models.CharField(max_length=255, unique=True)), - ("description", models.TextField(blank=True, null=True)), - ("created_at", models.DateTimeField(auto_now_add=True)), - ("updated_at", models.DateTimeField(auto_now=True)), - ("metadata", models.JSONField(blank=True, null=True)), - ( - "casbin_rule", - models.ForeignKey( - on_delete=django.db.models.deletion.CASCADE, - related_name="extended_rule", - to="casbin_adapter.casbinrule", - ), - ), - ( - "scope", - models.ForeignKey( - blank=True, - null=True, - on_delete=django.db.models.deletion.CASCADE, - related_name="casbin_rules", - to="openedx_authz.scope", - ), - ), - ( - "subject", + 'content_library', models.ForeignKey( blank=True, + db_constraint=False, null=True, on_delete=django.db.models.deletion.CASCADE, - related_name="casbin_rules", - to="openedx_authz.subject", + related_name='authz_scopes', + to=getattr( + settings, + 'OPENEDX_AUTHZ_CONTENT_LIBRARY_MODEL', + 'content_libraries.ContentLibrary', + ), ), ), ], - options={ - "verbose_name": "Extended Casbin Rule", - "verbose_name_plural": "Extended Casbin Rules", - }, + bases=('openedx_authz.scope',), + ), + migrations.CreateModel( + name='UserSubject', + fields=[ + ('subject_ptr', models.OneToOneField(auto_created=True, on_delete=django.db.models.deletion.CASCADE, parent_link=True, primary_key=True, serialize=False, to='openedx_authz.subject')), + ('user', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, related_name='authz_subjects', to=settings.AUTH_USER_MODEL)), + ], + bases=('openedx_authz.subject',), ), ] diff --git a/openedx_authz/migrations/0002_alter_contentlibraryscope_scope_ptr.py b/openedx_authz/migrations/0002_alter_contentlibraryscope_scope_ptr.py new file mode 100644 index 00000000..dba55dab --- /dev/null +++ b/openedx_authz/migrations/0002_alter_contentlibraryscope_scope_ptr.py @@ -0,0 +1,19 @@ +# Generated by Django 5.2.7 on 2025-10-20 17:42 + +import django.db.models.deletion +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('openedx_authz', '0001_initial'), + ] + + operations = [ + migrations.AlterField( + model_name='contentlibraryscope', + name='scope_ptr', + field=models.OneToOneField(auto_created=True, on_delete=django.db.models.deletion.CASCADE, parent_link=True, primary_key=True, serialize=False, to='openedx_authz.scope'), + ), + ] diff --git a/openedx_authz/tests/api/test_data.py b/openedx_authz/tests/api/test_data.py index a1ac6227..6faad31f 100644 --- a/openedx_authz/tests/api/test_data.py +++ b/openedx_authz/tests/api/test_data.py @@ -469,7 +469,9 @@ def test_role_data_str_with_permissions(self): action2 = ActionData(external_key="write") permission1 = PermissionData(action=action1, effect="allow") permission2 = PermissionData(action=action2, effect="deny") - role = RoleData(external_key="instructor", permissions=[permission1, permission2]) + role = RoleData( + external_key="instructor", permissions=[permission1, permission2] + ) actual_str = str(role) diff --git a/openedx_authz/tests/api/test_roles.py b/openedx_authz/tests/api/test_roles.py index 431ca9ab..71a4455c 100644 --- a/openedx_authz/tests/api/test_roles.py +++ b/openedx_authz/tests/api/test_roles.py @@ -493,7 +493,9 @@ def test_get_subject_role_assignments_in_scope(self, subject_name, scope_name, e SubjectData(external_key=subject_name), ScopeData(external_key=scope_name) ) - role_names = {r.external_key for assignment in role_assignments for r in assignment.roles} + role_names = { + r.external_key for assignment in role_assignments for r in assignment.roles + } self.assertEqual(role_names, expected_roles) @ddt_data( @@ -803,7 +805,11 @@ def test_batch_assign_role_to_subjects_in_scope(self, subject_names, role, scope SubjectData(external_key=subject_name), ScopeData(external_key=scope_name), ) - role_names = {r.external_key for assignment in user_roles for r in assignment.roles} + role_names = { + r.external_key + for assignment in user_roles + for r in assignment.roles + } self.assertIn(role, role_names) else: assign_role_to_subject_in_scope( @@ -815,7 +821,9 @@ def test_batch_assign_role_to_subjects_in_scope(self, subject_names, role, scope SubjectData(external_key=subject_names), ScopeData(external_key=scope_name), ) - role_names = {r.external_key for assignment in user_roles for r in assignment.roles} + role_names = { + r.external_key for assignment in user_roles for r in assignment.roles + } self.assertIn(role, role_names) @ddt_data( @@ -858,7 +866,11 @@ def test_unassign_role_from_subject_in_scope(self, subject_names, role, scope_na SubjectData(external_key=subject), ScopeData(external_key=scope_name), ) - role_names = {r.external_key for assignment in user_roles for r in assignment.roles} + role_names = { + r.external_key + for assignment in user_roles + for r in assignment.roles + } self.assertNotIn(role, role_names) else: unassign_role_from_subject_in_scope( @@ -870,7 +882,9 @@ def test_unassign_role_from_subject_in_scope(self, subject_names, role, scope_na SubjectData(external_key=subject_names), ScopeData(external_key=scope_name), ) - role_names = {r.external_key for assignment in user_roles for r in assignment.roles} + role_names = { + r.external_key for assignment in user_roles for r in assignment.roles + } self.assertNotIn(role, role_names) @ddt_data( diff --git a/openedx_authz/tests/api/test_users.py b/openedx_authz/tests/api/test_users.py index 369d740b..ec842ac3 100644 --- a/openedx_authz/tests/api/test_users.py +++ b/openedx_authz/tests/api/test_users.py @@ -144,7 +144,9 @@ def test_get_user_role_assignments_in_scope(self, username, scope_name, expected """ user_roles = get_user_role_assignments_in_scope(user_external_key=username, scope_external_key=scope_name) - role_names = {r.external_key for assignment in user_roles for r in assignment.roles} + role_names = { + r.external_key for assignment in user_roles for r in assignment.roles + } self.assertEqual(role_names, expected_roles) @data( diff --git a/openedx_authz/tests/rest_api/test_views.py b/openedx_authz/tests/rest_api/test_views.py index 87dde6b7..2a6a3587 100644 --- a/openedx_authz/tests/rest_api/test_views.py +++ b/openedx_authz/tests/rest_api/test_views.py @@ -184,7 +184,9 @@ def setUp(self): ), ) @unpack - def test_permission_validation_success(self, request_data: list[dict], permission_map: list[bool]): + def test_permission_validation_success( + self, request_data: list[dict], permission_map: list[bool] + ): """Test successful permission validation requests. Expected result: @@ -280,7 +282,9 @@ def test_permission_validation_unauthenticated(self): scope = "lib:Org1:LIB1" self.client.force_authenticate(user=None) - response = self.client.post(self.url, data=[{"action": action, "scope": scope}], format="json") + response = self.client.post( + self.url, data=[{"action": action, "scope": scope}], format="json" + ) self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) @@ -293,7 +297,9 @@ def test_permission_validation_unauthenticated(self): (ValueError(), status.HTTP_400_BAD_REQUEST, "Invalid scope format"), ) @unpack - def test_permission_validation_exception_handling(self, exception: Exception, status_code: int, message: str): + def test_permission_validation_exception_handling( + self, exception: Exception, status_code: int, message: str + ): """Test permission validation exception handling for different error types. Expected result: @@ -461,7 +467,9 @@ def test_get_users_by_scope_permissions(self, username: str, status_code: int): ), ) @unpack - def test_add_users_to_role_success(self, users: list[str], expected_completed: int, expected_errors: int): + def test_add_users_to_role_success( + self, users: list[str], expected_completed: int, expected_errors: int + ): """Test adding users to a role within a scope. Expected result: @@ -489,7 +497,9 @@ def test_add_users_to_role_success(self, users: list[str], expected_completed: i (["admin_2", "regular_3", "regular_4"], 0, 3), ) @unpack - def test_add_users_to_role_already_has_role(self, users: list[str], expected_completed: int, expected_errors: int): + def test_add_users_to_role_already_has_role( + self, users: list[str], expected_completed: int, expected_errors: int + ): """Test adding users to a role that already has the role.""" role = roles.LIBRARY_USER.external_key scope = "lib:Org2:LIB2" @@ -503,7 +513,9 @@ def test_add_users_to_role_already_has_role(self, users: list[str], expected_com self.assertEqual(len(response.data["errors"]), expected_errors) @patch.object(api, "assign_role_to_user_in_scope") - def test_add_users_to_role_exception_handling(self, mock_assign_role_to_user_in_scope): + def test_add_users_to_role_exception_handling( + self, mock_assign_role_to_user_in_scope + ): """Test adding users to a role with exception handling.""" request_data = { "role": roles.LIBRARY_ADMIN.external_key, @@ -612,7 +624,9 @@ def test_add_users_to_role_permissions(self, username: str, status_code: int): ), ) @unpack - def test_remove_users_from_role_success(self, users: list[str], expected_completed: int, expected_errors: int): + def test_remove_users_from_role_success( + self, users: list[str], expected_completed: int, expected_errors: int + ): """Test removing users from a role within a scope. Expected result: @@ -633,7 +647,9 @@ def test_remove_users_from_role_success(self, users: list[str], expected_complet self.assertEqual(len(response.data["errors"]), expected_errors) @patch.object(api, "unassign_role_from_user") - def test_remove_users_from_role_exception_handling(self, mock_unassign_role_from_user): + def test_remove_users_from_role_exception_handling( + self, mock_unassign_role_from_user + ): """Test removing users from a role with exception handling.""" query_params = { "role": roles.LIBRARY_ADMIN.external_key, @@ -799,7 +815,9 @@ def test_get_roles_scope_is_invalid(self, query_params: dict, error_code: str): ({"page": 1, "page_size": 4}, 4, False), ) @unpack - def test_get_roles_pagination(self, query_params: dict, expected_count: int, has_next: bool): + def test_get_roles_pagination( + self, query_params: dict, expected_count: int, has_next: bool + ): """Test retrieving roles with pagination. Expected result: diff --git a/openedx_authz/tests/test_commands.py b/openedx_authz/tests/test_commands.py index 8d270571..fe1463c5 100644 --- a/openedx_authz/tests/test_commands.py +++ b/openedx_authz/tests/test_commands.py @@ -146,7 +146,10 @@ def test_policy_file_not_found_raises(self): self.assertEqual(f"Policy file not found: {non_existent_policy}", str(ctx.exception)) - def test_model_file_not_found_raises(self): + @patch.object( + EnforcementCommand, "_get_file_path", return_value="invalid/path/model.conf" + ) + def test_model_file_not_found_raises(self, mock_get_file_path: Mock): """Test that command errors when the provided model file does not exist.""" non_existent_model = "invalid/path/model.conf" @@ -159,10 +162,26 @@ def test_model_file_not_found_raises(self): self.assertEqual(f"Model file not found: {non_existent_model}", str(ctx.exception)) - @patch.object(AuthzEnforcer, "get_enforcer") - def test_display_loaded_policies(self, mock_get_enforcer: Mock): - """Test that policy statistics are displayed correctly.""" - mock_get_enforcer.return_value = self.enforcer + @patch("openedx_authz.management.commands.enforcement.casbin.Enforcer") + @patch.object(EnforcementCommand, "_run_interactive_mode") + def test_successful_run_prints_summary( + self, mock_run_interactive: Mock, mock_enforcer_cls: Mock + ): + """ + Test successful command execution with policy file and interactive mode. + When files exist, command should create enforcer, print counts, and call interactive loop. + """ + mock_enforcer = Mock() + policies = [["p", "role:platform_admin", "act:manage", "*", "allow"]] + roles = [["g", "user:user-1", "role:platform_admin", "*"]] + action_grouping = [ + ["g2", "act:edit", "act:read"], + ["g2", "act:edit", "act:write"], + ] + mock_enforcer.get_policy.return_value = policies + mock_enforcer.get_grouping_policy.return_value = roles + mock_enforcer.get_named_grouping_policy.return_value = action_grouping + mock_enforcer_cls.return_value = mock_enforcer with patch("builtins.input", side_effect=["quit"]): call_command(self.command_name, stdout=self.buffer) @@ -318,9 +337,23 @@ def test_interactive_request_error(self, exception: Exception, mock_is_allowed: ): call_command(self.command_name, stdout=self.buffer) - output = self.buffer.getvalue() - self.assertIn("✗ Error processing request:", output) - self.assertIn(str(exception), output) + invalid_output = self.buffer.getvalue() + self.assertIn("✗ Invalid format. Expected 3 parts, got 2", invalid_output) + self.assertIn("Format: subject action scope", invalid_output) + self.assertIn( + f"Example: {user_input} {make_scope_key('org', 'OpenedX')}", invalid_output + ) + + @data(ValueError(), IndexError(), TypeError()) + def test_interactive_request_error(self, exception: Exception): + """Test that `_test_interactive_request` handles processing errors.""" + self.enforcer.enforce.side_effect = exception + user_input = f"{make_user_key('alice')} {make_action_key('read')} {make_scope_key('org', 'OpenedX')}" + + self.command._test_interactive_request(self.enforcer, user_input) + + error_output = self.buffer.getvalue() + self.assertIn(f"✗ Error processing request: {str(exception)}", error_output) # pylint: disable=protected-access diff --git a/openedx_authz/tests/test_enforcement.py b/openedx_authz/tests/test_enforcement.py index 9afbb415..6dd6d825 100644 --- a/openedx_authz/tests/test_enforcement.py +++ b/openedx_authz/tests/test_enforcement.py @@ -147,7 +147,9 @@ class SystemWideRoleTests(CasbinEnforcementTestCase): { "subject": make_user_key("user-1"), "action": make_action_key("manage"), - "scope": make_scope_key("course", "course-v1:any-org+any-course+any-course-run"), + "scope": make_scope_key( + "course", "course-v1:any-org+any-course+any-course-run" + ), "expected_result": True, }, { @@ -371,7 +373,9 @@ class RoleAssignmentTests(CasbinEnforcementTestCase): { "subject": make_user_key("user-5"), "action": make_action_key("manage"), - "scope": make_scope_key("course", "course-v1:any-org+any-course+any-course-run"), + "scope": make_scope_key( + "course", "course-v1:any-org+any-course+any-course-run" + ), "expected_result": True, }, { diff --git a/openedx_authz/tests/test_filter.py b/openedx_authz/tests/test_filter.py index 475aa533..11497041 100644 --- a/openedx_authz/tests/test_filter.py +++ b/openedx_authz/tests/test_filter.py @@ -10,7 +10,12 @@ import unittest from openedx_authz.engine.filter import Filter -from openedx_authz.tests.test_utils import make_action_key, make_role_key, make_scope_key, make_user_key +from openedx_authz.tests.test_utils import ( + make_action_key, + make_role_key, + make_scope_key, + make_user_key, +) class TestFilter(unittest.TestCase): @@ -165,7 +170,9 @@ def test_filter_deny_policies(self): def test_filter_wildcard_resources(self): """Test filter for wildcard resource patterns.""" - f = Filter(ptype=["p"], v2=[make_scope_key("lib", "*"), make_scope_key("course", "*")]) + f = Filter( + ptype=["p"], v2=[make_scope_key("lib", "*"), make_scope_key("course", "*")] + ) self.assertEqual(f.ptype, ["p"]) self.assertIn(make_scope_key("lib", "*"), f.v2) self.assertIn(make_scope_key("course", "*"), f.v2) From b4f7555401995b31bc68c4f863a9dbee524e2eed Mon Sep 17 00:00:00 2001 From: Maria Grimaldi Date: Tue, 21 Oct 2025 18:21:29 +0200 Subject: [PATCH 04/20] refactor: consider when deleting extended model so the casbin rule is also deleted --- openedx_authz/migrations/0001_initial.py | 3 +-- ...03_alter_extendedcasbinrule_casbin_rule.py | 20 +++++++++++++++++++ 2 files changed, 21 insertions(+), 2 deletions(-) create mode 100644 openedx_authz/migrations/0003_alter_extendedcasbinrule_casbin_rule.py diff --git a/openedx_authz/migrations/0001_initial.py b/openedx_authz/migrations/0001_initial.py index 98ce87f3..976ace8b 100644 --- a/openedx_authz/migrations/0001_initial.py +++ b/openedx_authz/migrations/0001_initial.py @@ -49,7 +49,7 @@ class Migration(migrations.Migration): ('created_at', models.DateTimeField(auto_now_add=True)), ('updated_at', models.DateTimeField(auto_now=True)), ('metadata', models.JSONField(blank=True, null=True)), - ('casbin_rule', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='extended_rule', to='casbin_adapter.casbinrule')), + ('casbin_rule', models.OneToOneField(on_delete=django.db.models.deletion.CASCADE, related_name='extended_rule', to='casbin_adapter.casbinrule')), ('scope', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, related_name='casbin_rules', to='openedx_authz.scope')), ('subject', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, related_name='casbin_rules', to='openedx_authz.subject')), ], @@ -66,7 +66,6 @@ class Migration(migrations.Migration): 'content_library', models.ForeignKey( blank=True, - db_constraint=False, null=True, on_delete=django.db.models.deletion.CASCADE, related_name='authz_scopes', diff --git a/openedx_authz/migrations/0003_alter_extendedcasbinrule_casbin_rule.py b/openedx_authz/migrations/0003_alter_extendedcasbinrule_casbin_rule.py new file mode 100644 index 00000000..eec70c7f --- /dev/null +++ b/openedx_authz/migrations/0003_alter_extendedcasbinrule_casbin_rule.py @@ -0,0 +1,20 @@ +# Generated by Django 5.2.7 on 2025-10-21 16:41 + +import django.db.models.deletion +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('casbin_adapter', '0001_initial'), + ('openedx_authz', '0002_alter_contentlibraryscope_scope_ptr'), + ] + + operations = [ + migrations.AlterField( + model_name='extendedcasbinrule', + name='casbin_rule', + field=models.OneToOneField(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='extended_rule', to='casbin_adapter.casbinrule'), + ), + ] From 338ab8146769d57b86b5b1eac4a7fa2676f1e1ce Mon Sep 17 00:00:00 2001 From: Maria Grimaldi Date: Thu, 23 Oct 2025 16:21:32 +0200 Subject: [PATCH 05/20] refactor: run make format --- openedx_authz/api/data.py | 1 + openedx_authz/migrations/0001_initial.py | 134 ++++++++++++------ ...002_alter_contentlibraryscope_scope_ptr.py | 16 ++- ...03_alter_extendedcasbinrule_casbin_rule.py | 17 ++- openedx_authz/tests/api/test_data.py | 4 +- openedx_authz/tests/api/test_roles.py | 24 +--- openedx_authz/tests/api/test_users.py | 4 +- openedx_authz/tests/rest_api/test_views.py | 36 ++--- openedx_authz/tests/test_commands.py | 12 +- openedx_authz/tests/test_enforcement.py | 8 +- openedx_authz/tests/test_filter.py | 4 +- 11 files changed, 139 insertions(+), 121 deletions(-) diff --git a/openedx_authz/api/data.py b/openedx_authz/api/data.py index 7f9b6ea6..e361583a 100644 --- a/openedx_authz/api/data.py +++ b/openedx_authz/api/data.py @@ -549,6 +549,7 @@ class SubjectData(AuthZData, metaclass=SubjectMeta): subject_id: int = None # Optional field to link to actual subject instance + @define class UserData(SubjectData): """A user subject for authorization in the Open edX platform. diff --git a/openedx_authz/migrations/0001_initial.py b/openedx_authz/migrations/0001_initial.py index 976ace8b..35151ea9 100644 --- a/openedx_authz/migrations/0001_initial.py +++ b/openedx_authz/migrations/0001_initial.py @@ -6,85 +6,139 @@ class Migration(migrations.Migration): - initial = True dependencies = [ - ('casbin_adapter', '0001_initial'), - migrations.swappable_dependency( - getattr( - settings, - "OPENEDX_AUTHZ_CONTENT_LIBRARY_MODEL", - "content_libraries.ContentLibrary", - ) - ), - migrations.swappable_dependency(settings.AUTH_USER_MODEL), - ] + ("casbin_adapter", "0001_initial"), + migrations.swappable_dependency( + getattr( + settings, + "OPENEDX_AUTHZ_CONTENT_LIBRARY_MODEL", + "content_libraries.ContentLibrary", + ) + ), + migrations.swappable_dependency(settings.AUTH_USER_MODEL), + ] operations = [ migrations.CreateModel( - name='Scope', + name="Scope", fields=[ - ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), ], options={ - 'abstract': False, + "abstract": False, }, ), migrations.CreateModel( - name='Subject', + name="Subject", fields=[ - ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), ], options={ - 'abstract': False, + "abstract": False, }, ), migrations.CreateModel( - name='ExtendedCasbinRule', + name="ExtendedCasbinRule", fields=[ - ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), - ('casbin_rule_key', models.CharField(max_length=255, unique=True)), - ('description', models.TextField(blank=True, null=True)), - ('created_at', models.DateTimeField(auto_now_add=True)), - ('updated_at', models.DateTimeField(auto_now=True)), - ('metadata', models.JSONField(blank=True, null=True)), - ('casbin_rule', models.OneToOneField(on_delete=django.db.models.deletion.CASCADE, related_name='extended_rule', to='casbin_adapter.casbinrule')), - ('scope', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, related_name='casbin_rules', to='openedx_authz.scope')), - ('subject', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, related_name='casbin_rules', to='openedx_authz.subject')), + ("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), + ("casbin_rule_key", models.CharField(max_length=255, unique=True)), + ("description", models.TextField(blank=True, null=True)), + ("created_at", models.DateTimeField(auto_now_add=True)), + ("updated_at", models.DateTimeField(auto_now=True)), + ("metadata", models.JSONField(blank=True, null=True)), + ( + "casbin_rule", + models.OneToOneField( + on_delete=django.db.models.deletion.CASCADE, + related_name="extended_rule", + to="casbin_adapter.casbinrule", + ), + ), + ( + "scope", + models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.CASCADE, + related_name="casbin_rules", + to="openedx_authz.scope", + ), + ), + ( + "subject", + models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.CASCADE, + related_name="casbin_rules", + to="openedx_authz.subject", + ), + ), ], options={ - 'verbose_name': 'Extended Casbin Rule', - 'verbose_name_plural': 'Extended Casbin Rules', + "verbose_name": "Extended Casbin Rule", + "verbose_name_plural": "Extended Casbin Rules", }, ), migrations.CreateModel( - name='ContentLibraryScope', + name="ContentLibraryScope", fields=[ - ('scope_ptr', models.OneToOneField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID', to='openedx_authz.scope', parent_link=True, on_delete=django.db.models.deletion.CASCADE)), ( - 'content_library', + "scope_ptr", + models.OneToOneField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + to="openedx_authz.scope", + parent_link=True, + on_delete=django.db.models.deletion.CASCADE, + ), + ), + ( + "content_library", models.ForeignKey( blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, - related_name='authz_scopes', + related_name="authz_scopes", to=getattr( settings, - 'OPENEDX_AUTHZ_CONTENT_LIBRARY_MODEL', - 'content_libraries.ContentLibrary', + "OPENEDX_AUTHZ_CONTENT_LIBRARY_MODEL", + "content_libraries.ContentLibrary", ), ), ), ], - bases=('openedx_authz.scope',), + bases=("openedx_authz.scope",), ), migrations.CreateModel( - name='UserSubject', + name="UserSubject", fields=[ - ('subject_ptr', models.OneToOneField(auto_created=True, on_delete=django.db.models.deletion.CASCADE, parent_link=True, primary_key=True, serialize=False, to='openedx_authz.subject')), - ('user', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, related_name='authz_subjects', to=settings.AUTH_USER_MODEL)), + ( + "subject_ptr", + models.OneToOneField( + auto_created=True, + on_delete=django.db.models.deletion.CASCADE, + parent_link=True, + primary_key=True, + serialize=False, + to="openedx_authz.subject", + ), + ), + ( + "user", + models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.CASCADE, + related_name="authz_subjects", + to=settings.AUTH_USER_MODEL, + ), + ), ], - bases=('openedx_authz.subject',), + bases=("openedx_authz.subject",), ), ] diff --git a/openedx_authz/migrations/0002_alter_contentlibraryscope_scope_ptr.py b/openedx_authz/migrations/0002_alter_contentlibraryscope_scope_ptr.py index dba55dab..2f7fe853 100644 --- a/openedx_authz/migrations/0002_alter_contentlibraryscope_scope_ptr.py +++ b/openedx_authz/migrations/0002_alter_contentlibraryscope_scope_ptr.py @@ -5,15 +5,21 @@ class Migration(migrations.Migration): - dependencies = [ - ('openedx_authz', '0001_initial'), + ("openedx_authz", "0001_initial"), ] operations = [ migrations.AlterField( - model_name='contentlibraryscope', - name='scope_ptr', - field=models.OneToOneField(auto_created=True, on_delete=django.db.models.deletion.CASCADE, parent_link=True, primary_key=True, serialize=False, to='openedx_authz.scope'), + model_name="contentlibraryscope", + name="scope_ptr", + field=models.OneToOneField( + auto_created=True, + on_delete=django.db.models.deletion.CASCADE, + parent_link=True, + primary_key=True, + serialize=False, + to="openedx_authz.scope", + ), ), ] diff --git a/openedx_authz/migrations/0003_alter_extendedcasbinrule_casbin_rule.py b/openedx_authz/migrations/0003_alter_extendedcasbinrule_casbin_rule.py index eec70c7f..d73139d5 100644 --- a/openedx_authz/migrations/0003_alter_extendedcasbinrule_casbin_rule.py +++ b/openedx_authz/migrations/0003_alter_extendedcasbinrule_casbin_rule.py @@ -5,16 +5,21 @@ class Migration(migrations.Migration): - dependencies = [ - ('casbin_adapter', '0001_initial'), - ('openedx_authz', '0002_alter_contentlibraryscope_scope_ptr'), + ("casbin_adapter", "0001_initial"), + ("openedx_authz", "0002_alter_contentlibraryscope_scope_ptr"), ] operations = [ migrations.AlterField( - model_name='extendedcasbinrule', - name='casbin_rule', - field=models.OneToOneField(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='extended_rule', to='casbin_adapter.casbinrule'), + model_name="extendedcasbinrule", + name="casbin_rule", + field=models.OneToOneField( + blank=True, + null=True, + on_delete=django.db.models.deletion.SET_NULL, + related_name="extended_rule", + to="casbin_adapter.casbinrule", + ), ), ] diff --git a/openedx_authz/tests/api/test_data.py b/openedx_authz/tests/api/test_data.py index 6faad31f..a1ac6227 100644 --- a/openedx_authz/tests/api/test_data.py +++ b/openedx_authz/tests/api/test_data.py @@ -469,9 +469,7 @@ def test_role_data_str_with_permissions(self): action2 = ActionData(external_key="write") permission1 = PermissionData(action=action1, effect="allow") permission2 = PermissionData(action=action2, effect="deny") - role = RoleData( - external_key="instructor", permissions=[permission1, permission2] - ) + role = RoleData(external_key="instructor", permissions=[permission1, permission2]) actual_str = str(role) diff --git a/openedx_authz/tests/api/test_roles.py b/openedx_authz/tests/api/test_roles.py index 71a4455c..431ca9ab 100644 --- a/openedx_authz/tests/api/test_roles.py +++ b/openedx_authz/tests/api/test_roles.py @@ -493,9 +493,7 @@ def test_get_subject_role_assignments_in_scope(self, subject_name, scope_name, e SubjectData(external_key=subject_name), ScopeData(external_key=scope_name) ) - role_names = { - r.external_key for assignment in role_assignments for r in assignment.roles - } + role_names = {r.external_key for assignment in role_assignments for r in assignment.roles} self.assertEqual(role_names, expected_roles) @ddt_data( @@ -805,11 +803,7 @@ def test_batch_assign_role_to_subjects_in_scope(self, subject_names, role, scope SubjectData(external_key=subject_name), ScopeData(external_key=scope_name), ) - role_names = { - r.external_key - for assignment in user_roles - for r in assignment.roles - } + role_names = {r.external_key for assignment in user_roles for r in assignment.roles} self.assertIn(role, role_names) else: assign_role_to_subject_in_scope( @@ -821,9 +815,7 @@ def test_batch_assign_role_to_subjects_in_scope(self, subject_names, role, scope SubjectData(external_key=subject_names), ScopeData(external_key=scope_name), ) - role_names = { - r.external_key for assignment in user_roles for r in assignment.roles - } + role_names = {r.external_key for assignment in user_roles for r in assignment.roles} self.assertIn(role, role_names) @ddt_data( @@ -866,11 +858,7 @@ def test_unassign_role_from_subject_in_scope(self, subject_names, role, scope_na SubjectData(external_key=subject), ScopeData(external_key=scope_name), ) - role_names = { - r.external_key - for assignment in user_roles - for r in assignment.roles - } + role_names = {r.external_key for assignment in user_roles for r in assignment.roles} self.assertNotIn(role, role_names) else: unassign_role_from_subject_in_scope( @@ -882,9 +870,7 @@ def test_unassign_role_from_subject_in_scope(self, subject_names, role, scope_na SubjectData(external_key=subject_names), ScopeData(external_key=scope_name), ) - role_names = { - r.external_key for assignment in user_roles for r in assignment.roles - } + role_names = {r.external_key for assignment in user_roles for r in assignment.roles} self.assertNotIn(role, role_names) @ddt_data( diff --git a/openedx_authz/tests/api/test_users.py b/openedx_authz/tests/api/test_users.py index ec842ac3..369d740b 100644 --- a/openedx_authz/tests/api/test_users.py +++ b/openedx_authz/tests/api/test_users.py @@ -144,9 +144,7 @@ def test_get_user_role_assignments_in_scope(self, username, scope_name, expected """ user_roles = get_user_role_assignments_in_scope(user_external_key=username, scope_external_key=scope_name) - role_names = { - r.external_key for assignment in user_roles for r in assignment.roles - } + role_names = {r.external_key for assignment in user_roles for r in assignment.roles} self.assertEqual(role_names, expected_roles) @data( diff --git a/openedx_authz/tests/rest_api/test_views.py b/openedx_authz/tests/rest_api/test_views.py index 2a6a3587..87dde6b7 100644 --- a/openedx_authz/tests/rest_api/test_views.py +++ b/openedx_authz/tests/rest_api/test_views.py @@ -184,9 +184,7 @@ def setUp(self): ), ) @unpack - def test_permission_validation_success( - self, request_data: list[dict], permission_map: list[bool] - ): + def test_permission_validation_success(self, request_data: list[dict], permission_map: list[bool]): """Test successful permission validation requests. Expected result: @@ -282,9 +280,7 @@ def test_permission_validation_unauthenticated(self): scope = "lib:Org1:LIB1" self.client.force_authenticate(user=None) - response = self.client.post( - self.url, data=[{"action": action, "scope": scope}], format="json" - ) + response = self.client.post(self.url, data=[{"action": action, "scope": scope}], format="json") self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) @@ -297,9 +293,7 @@ def test_permission_validation_unauthenticated(self): (ValueError(), status.HTTP_400_BAD_REQUEST, "Invalid scope format"), ) @unpack - def test_permission_validation_exception_handling( - self, exception: Exception, status_code: int, message: str - ): + def test_permission_validation_exception_handling(self, exception: Exception, status_code: int, message: str): """Test permission validation exception handling for different error types. Expected result: @@ -467,9 +461,7 @@ def test_get_users_by_scope_permissions(self, username: str, status_code: int): ), ) @unpack - def test_add_users_to_role_success( - self, users: list[str], expected_completed: int, expected_errors: int - ): + def test_add_users_to_role_success(self, users: list[str], expected_completed: int, expected_errors: int): """Test adding users to a role within a scope. Expected result: @@ -497,9 +489,7 @@ def test_add_users_to_role_success( (["admin_2", "regular_3", "regular_4"], 0, 3), ) @unpack - def test_add_users_to_role_already_has_role( - self, users: list[str], expected_completed: int, expected_errors: int - ): + def test_add_users_to_role_already_has_role(self, users: list[str], expected_completed: int, expected_errors: int): """Test adding users to a role that already has the role.""" role = roles.LIBRARY_USER.external_key scope = "lib:Org2:LIB2" @@ -513,9 +503,7 @@ def test_add_users_to_role_already_has_role( self.assertEqual(len(response.data["errors"]), expected_errors) @patch.object(api, "assign_role_to_user_in_scope") - def test_add_users_to_role_exception_handling( - self, mock_assign_role_to_user_in_scope - ): + def test_add_users_to_role_exception_handling(self, mock_assign_role_to_user_in_scope): """Test adding users to a role with exception handling.""" request_data = { "role": roles.LIBRARY_ADMIN.external_key, @@ -624,9 +612,7 @@ def test_add_users_to_role_permissions(self, username: str, status_code: int): ), ) @unpack - def test_remove_users_from_role_success( - self, users: list[str], expected_completed: int, expected_errors: int - ): + def test_remove_users_from_role_success(self, users: list[str], expected_completed: int, expected_errors: int): """Test removing users from a role within a scope. Expected result: @@ -647,9 +633,7 @@ def test_remove_users_from_role_success( self.assertEqual(len(response.data["errors"]), expected_errors) @patch.object(api, "unassign_role_from_user") - def test_remove_users_from_role_exception_handling( - self, mock_unassign_role_from_user - ): + def test_remove_users_from_role_exception_handling(self, mock_unassign_role_from_user): """Test removing users from a role with exception handling.""" query_params = { "role": roles.LIBRARY_ADMIN.external_key, @@ -815,9 +799,7 @@ def test_get_roles_scope_is_invalid(self, query_params: dict, error_code: str): ({"page": 1, "page_size": 4}, 4, False), ) @unpack - def test_get_roles_pagination( - self, query_params: dict, expected_count: int, has_next: bool - ): + def test_get_roles_pagination(self, query_params: dict, expected_count: int, has_next: bool): """Test retrieving roles with pagination. Expected result: diff --git a/openedx_authz/tests/test_commands.py b/openedx_authz/tests/test_commands.py index fe1463c5..2d6d25a9 100644 --- a/openedx_authz/tests/test_commands.py +++ b/openedx_authz/tests/test_commands.py @@ -146,9 +146,7 @@ def test_policy_file_not_found_raises(self): self.assertEqual(f"Policy file not found: {non_existent_policy}", str(ctx.exception)) - @patch.object( - EnforcementCommand, "_get_file_path", return_value="invalid/path/model.conf" - ) + @patch.object(EnforcementCommand, "_get_file_path", return_value="invalid/path/model.conf") def test_model_file_not_found_raises(self, mock_get_file_path: Mock): """Test that command errors when the provided model file does not exist.""" non_existent_model = "invalid/path/model.conf" @@ -164,9 +162,7 @@ def test_model_file_not_found_raises(self, mock_get_file_path: Mock): @patch("openedx_authz.management.commands.enforcement.casbin.Enforcer") @patch.object(EnforcementCommand, "_run_interactive_mode") - def test_successful_run_prints_summary( - self, mock_run_interactive: Mock, mock_enforcer_cls: Mock - ): + def test_successful_run_prints_summary(self, mock_run_interactive: Mock, mock_enforcer_cls: Mock): """ Test successful command execution with policy file and interactive mode. When files exist, command should create enforcer, print counts, and call interactive loop. @@ -340,9 +336,7 @@ def test_interactive_request_error(self, exception: Exception, mock_is_allowed: invalid_output = self.buffer.getvalue() self.assertIn("✗ Invalid format. Expected 3 parts, got 2", invalid_output) self.assertIn("Format: subject action scope", invalid_output) - self.assertIn( - f"Example: {user_input} {make_scope_key('org', 'OpenedX')}", invalid_output - ) + self.assertIn(f"Example: {user_input} {make_scope_key('org', 'OpenedX')}", invalid_output) @data(ValueError(), IndexError(), TypeError()) def test_interactive_request_error(self, exception: Exception): diff --git a/openedx_authz/tests/test_enforcement.py b/openedx_authz/tests/test_enforcement.py index 6dd6d825..9afbb415 100644 --- a/openedx_authz/tests/test_enforcement.py +++ b/openedx_authz/tests/test_enforcement.py @@ -147,9 +147,7 @@ class SystemWideRoleTests(CasbinEnforcementTestCase): { "subject": make_user_key("user-1"), "action": make_action_key("manage"), - "scope": make_scope_key( - "course", "course-v1:any-org+any-course+any-course-run" - ), + "scope": make_scope_key("course", "course-v1:any-org+any-course+any-course-run"), "expected_result": True, }, { @@ -373,9 +371,7 @@ class RoleAssignmentTests(CasbinEnforcementTestCase): { "subject": make_user_key("user-5"), "action": make_action_key("manage"), - "scope": make_scope_key( - "course", "course-v1:any-org+any-course+any-course-run" - ), + "scope": make_scope_key("course", "course-v1:any-org+any-course+any-course-run"), "expected_result": True, }, { diff --git a/openedx_authz/tests/test_filter.py b/openedx_authz/tests/test_filter.py index 11497041..41e60547 100644 --- a/openedx_authz/tests/test_filter.py +++ b/openedx_authz/tests/test_filter.py @@ -170,9 +170,7 @@ def test_filter_deny_policies(self): def test_filter_wildcard_resources(self): """Test filter for wildcard resource patterns.""" - f = Filter( - ptype=["p"], v2=[make_scope_key("lib", "*"), make_scope_key("course", "*")] - ) + f = Filter(ptype=["p"], v2=[make_scope_key("lib", "*"), make_scope_key("course", "*")]) self.assertEqual(f.ptype, ["p"]) self.assertIn(make_scope_key("lib", "*"), f.v2) self.assertIn(make_scope_key("course", "*"), f.v2) From 1ec58feb5a8068791a1ea3ac54dbe0f7f9f1fd0a Mon Sep 17 00:00:00 2001 From: Maria Grimaldi Date: Fri, 24 Oct 2025 13:20:06 +0200 Subject: [PATCH 06/20] refactor: regenerate migrations --- openedx_authz/migrations/0001_initial.py | 144 ------------------ ...002_alter_contentlibraryscope_scope_ptr.py | 25 --- ...03_alter_extendedcasbinrule_casbin_rule.py | 25 --- 3 files changed, 194 deletions(-) delete mode 100644 openedx_authz/migrations/0001_initial.py delete mode 100644 openedx_authz/migrations/0002_alter_contentlibraryscope_scope_ptr.py delete mode 100644 openedx_authz/migrations/0003_alter_extendedcasbinrule_casbin_rule.py diff --git a/openedx_authz/migrations/0001_initial.py b/openedx_authz/migrations/0001_initial.py deleted file mode 100644 index 35151ea9..00000000 --- a/openedx_authz/migrations/0001_initial.py +++ /dev/null @@ -1,144 +0,0 @@ -# Generated by Django 5.2.7 on 2025-10-20 13:18 - -import django.db.models.deletion -from django.conf import settings -from django.db import migrations, models - - -class Migration(migrations.Migration): - initial = True - - dependencies = [ - ("casbin_adapter", "0001_initial"), - migrations.swappable_dependency( - getattr( - settings, - "OPENEDX_AUTHZ_CONTENT_LIBRARY_MODEL", - "content_libraries.ContentLibrary", - ) - ), - migrations.swappable_dependency(settings.AUTH_USER_MODEL), - ] - - operations = [ - migrations.CreateModel( - name="Scope", - fields=[ - ("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), - ], - options={ - "abstract": False, - }, - ), - migrations.CreateModel( - name="Subject", - fields=[ - ("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), - ], - options={ - "abstract": False, - }, - ), - migrations.CreateModel( - name="ExtendedCasbinRule", - fields=[ - ("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), - ("casbin_rule_key", models.CharField(max_length=255, unique=True)), - ("description", models.TextField(blank=True, null=True)), - ("created_at", models.DateTimeField(auto_now_add=True)), - ("updated_at", models.DateTimeField(auto_now=True)), - ("metadata", models.JSONField(blank=True, null=True)), - ( - "casbin_rule", - models.OneToOneField( - on_delete=django.db.models.deletion.CASCADE, - related_name="extended_rule", - to="casbin_adapter.casbinrule", - ), - ), - ( - "scope", - models.ForeignKey( - blank=True, - null=True, - on_delete=django.db.models.deletion.CASCADE, - related_name="casbin_rules", - to="openedx_authz.scope", - ), - ), - ( - "subject", - models.ForeignKey( - blank=True, - null=True, - on_delete=django.db.models.deletion.CASCADE, - related_name="casbin_rules", - to="openedx_authz.subject", - ), - ), - ], - options={ - "verbose_name": "Extended Casbin Rule", - "verbose_name_plural": "Extended Casbin Rules", - }, - ), - migrations.CreateModel( - name="ContentLibraryScope", - fields=[ - ( - "scope_ptr", - models.OneToOneField( - auto_created=True, - primary_key=True, - serialize=False, - verbose_name="ID", - to="openedx_authz.scope", - parent_link=True, - on_delete=django.db.models.deletion.CASCADE, - ), - ), - ( - "content_library", - models.ForeignKey( - blank=True, - null=True, - on_delete=django.db.models.deletion.CASCADE, - related_name="authz_scopes", - to=getattr( - settings, - "OPENEDX_AUTHZ_CONTENT_LIBRARY_MODEL", - "content_libraries.ContentLibrary", - ), - ), - ), - ], - bases=("openedx_authz.scope",), - ), - migrations.CreateModel( - name="UserSubject", - fields=[ - ( - "subject_ptr", - models.OneToOneField( - auto_created=True, - on_delete=django.db.models.deletion.CASCADE, - parent_link=True, - primary_key=True, - serialize=False, - to="openedx_authz.subject", - ), - ), - ( - "user", - models.ForeignKey( - blank=True, - null=True, - on_delete=django.db.models.deletion.CASCADE, - related_name="authz_subjects", - to=settings.AUTH_USER_MODEL, - ), - ), - ], - bases=("openedx_authz.subject",), - ), - ] diff --git a/openedx_authz/migrations/0002_alter_contentlibraryscope_scope_ptr.py b/openedx_authz/migrations/0002_alter_contentlibraryscope_scope_ptr.py deleted file mode 100644 index 2f7fe853..00000000 --- a/openedx_authz/migrations/0002_alter_contentlibraryscope_scope_ptr.py +++ /dev/null @@ -1,25 +0,0 @@ -# Generated by Django 5.2.7 on 2025-10-20 17:42 - -import django.db.models.deletion -from django.db import migrations, models - - -class Migration(migrations.Migration): - dependencies = [ - ("openedx_authz", "0001_initial"), - ] - - operations = [ - migrations.AlterField( - model_name="contentlibraryscope", - name="scope_ptr", - field=models.OneToOneField( - auto_created=True, - on_delete=django.db.models.deletion.CASCADE, - parent_link=True, - primary_key=True, - serialize=False, - to="openedx_authz.scope", - ), - ), - ] diff --git a/openedx_authz/migrations/0003_alter_extendedcasbinrule_casbin_rule.py b/openedx_authz/migrations/0003_alter_extendedcasbinrule_casbin_rule.py deleted file mode 100644 index d73139d5..00000000 --- a/openedx_authz/migrations/0003_alter_extendedcasbinrule_casbin_rule.py +++ /dev/null @@ -1,25 +0,0 @@ -# Generated by Django 5.2.7 on 2025-10-21 16:41 - -import django.db.models.deletion -from django.db import migrations, models - - -class Migration(migrations.Migration): - dependencies = [ - ("casbin_adapter", "0001_initial"), - ("openedx_authz", "0002_alter_contentlibraryscope_scope_ptr"), - ] - - operations = [ - migrations.AlterField( - model_name="extendedcasbinrule", - name="casbin_rule", - field=models.OneToOneField( - blank=True, - null=True, - on_delete=django.db.models.deletion.SET_NULL, - related_name="extended_rule", - to="casbin_adapter.casbinrule", - ), - ), - ] From d3bf9c123c5a970837a05a428fad61233f4050fc Mon Sep 17 00:00:00 2001 From: Maria Grimaldi Date: Fri, 24 Oct 2025 13:22:19 +0200 Subject: [PATCH 07/20] refactor: fix issues when rebasing --- openedx_authz/tests/test_commands.py | 43 ++++++---------------------- openedx_authz/tests/test_filter.py | 7 +---- 2 files changed, 9 insertions(+), 41 deletions(-) diff --git a/openedx_authz/tests/test_commands.py b/openedx_authz/tests/test_commands.py index 2d6d25a9..8d270571 100644 --- a/openedx_authz/tests/test_commands.py +++ b/openedx_authz/tests/test_commands.py @@ -146,8 +146,7 @@ def test_policy_file_not_found_raises(self): self.assertEqual(f"Policy file not found: {non_existent_policy}", str(ctx.exception)) - @patch.object(EnforcementCommand, "_get_file_path", return_value="invalid/path/model.conf") - def test_model_file_not_found_raises(self, mock_get_file_path: Mock): + def test_model_file_not_found_raises(self): """Test that command errors when the provided model file does not exist.""" non_existent_model = "invalid/path/model.conf" @@ -160,24 +159,10 @@ def test_model_file_not_found_raises(self, mock_get_file_path: Mock): self.assertEqual(f"Model file not found: {non_existent_model}", str(ctx.exception)) - @patch("openedx_authz.management.commands.enforcement.casbin.Enforcer") - @patch.object(EnforcementCommand, "_run_interactive_mode") - def test_successful_run_prints_summary(self, mock_run_interactive: Mock, mock_enforcer_cls: Mock): - """ - Test successful command execution with policy file and interactive mode. - When files exist, command should create enforcer, print counts, and call interactive loop. - """ - mock_enforcer = Mock() - policies = [["p", "role:platform_admin", "act:manage", "*", "allow"]] - roles = [["g", "user:user-1", "role:platform_admin", "*"]] - action_grouping = [ - ["g2", "act:edit", "act:read"], - ["g2", "act:edit", "act:write"], - ] - mock_enforcer.get_policy.return_value = policies - mock_enforcer.get_grouping_policy.return_value = roles - mock_enforcer.get_named_grouping_policy.return_value = action_grouping - mock_enforcer_cls.return_value = mock_enforcer + @patch.object(AuthzEnforcer, "get_enforcer") + def test_display_loaded_policies(self, mock_get_enforcer: Mock): + """Test that policy statistics are displayed correctly.""" + mock_get_enforcer.return_value = self.enforcer with patch("builtins.input", side_effect=["quit"]): call_command(self.command_name, stdout=self.buffer) @@ -333,21 +318,9 @@ def test_interactive_request_error(self, exception: Exception, mock_is_allowed: ): call_command(self.command_name, stdout=self.buffer) - invalid_output = self.buffer.getvalue() - self.assertIn("✗ Invalid format. Expected 3 parts, got 2", invalid_output) - self.assertIn("Format: subject action scope", invalid_output) - self.assertIn(f"Example: {user_input} {make_scope_key('org', 'OpenedX')}", invalid_output) - - @data(ValueError(), IndexError(), TypeError()) - def test_interactive_request_error(self, exception: Exception): - """Test that `_test_interactive_request` handles processing errors.""" - self.enforcer.enforce.side_effect = exception - user_input = f"{make_user_key('alice')} {make_action_key('read')} {make_scope_key('org', 'OpenedX')}" - - self.command._test_interactive_request(self.enforcer, user_input) - - error_output = self.buffer.getvalue() - self.assertIn(f"✗ Error processing request: {str(exception)}", error_output) + output = self.buffer.getvalue() + self.assertIn("✗ Error processing request:", output) + self.assertIn(str(exception), output) # pylint: disable=protected-access diff --git a/openedx_authz/tests/test_filter.py b/openedx_authz/tests/test_filter.py index 41e60547..475aa533 100644 --- a/openedx_authz/tests/test_filter.py +++ b/openedx_authz/tests/test_filter.py @@ -10,12 +10,7 @@ import unittest from openedx_authz.engine.filter import Filter -from openedx_authz.tests.test_utils import ( - make_action_key, - make_role_key, - make_scope_key, - make_user_key, -) +from openedx_authz.tests.test_utils import make_action_key, make_role_key, make_scope_key, make_user_key class TestFilter(unittest.TestCase): From 3ea8a840715c7ac0349a1384fda870cb66e08255 Mon Sep 17 00:00:00 2001 From: Maria Grimaldi Date: Fri, 24 Oct 2025 13:26:07 +0200 Subject: [PATCH 08/20] refactor: drop unused variables from data classes --- openedx_authz/api/data.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/openedx_authz/api/data.py b/openedx_authz/api/data.py index e361583a..3eb6e6c5 100644 --- a/openedx_authz/api/data.py +++ b/openedx_authz/api/data.py @@ -318,8 +318,6 @@ class ScopeData(AuthZData, metaclass=ScopeMeta): # Subclasses like ContentLibraryData ('lib') represent concrete resource types with their own namespaces. NAMESPACE: ClassVar[str] = "global" - scope_id: int = None # Optional field to link to actual scope instance - @classmethod def validate_external_key(cls, _: str) -> bool: """Validate the external_key format for ScopeData. @@ -547,8 +545,6 @@ class SubjectData(AuthZData, metaclass=SubjectMeta): NAMESPACE: ClassVar[str] = "sub" - subject_id: int = None # Optional field to link to actual subject instance - @define class UserData(SubjectData): From b2b873bc01a74cd0f4939c089a7a085a8277e470 Mon Sep 17 00:00:00 2001 From: Maria Grimaldi Date: Tue, 4 Nov 2025 16:59:47 +0100 Subject: [PATCH 09/20] refactor: address integration test failure --- openedx_authz/urls.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openedx_authz/urls.py b/openedx_authz/urls.py index 1114ba2f..e600af88 100644 --- a/openedx_authz/urls.py +++ b/openedx_authz/urls.py @@ -7,5 +7,5 @@ app_name = "openedx_authz" urlpatterns = [ - path("authz/", include((urls, "openedx_authz"))), + path("authz/", include(urls)), ] From 27b6e5ccdac4380e0dc0bed7990cc31471624ef2 Mon Sep 17 00:00:00 2001 From: Maria Grimaldi Date: Tue, 4 Nov 2025 17:19:22 +0100 Subject: [PATCH 10/20] refactor: consider double namespace instead --- openedx_authz/urls.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openedx_authz/urls.py b/openedx_authz/urls.py index e600af88..1114ba2f 100644 --- a/openedx_authz/urls.py +++ b/openedx_authz/urls.py @@ -7,5 +7,5 @@ app_name = "openedx_authz" urlpatterns = [ - path("authz/", include(urls)), + path("authz/", include((urls, "openedx_authz"))), ] From d3f1632a11636f57482e4b2275de23dccdc891ff Mon Sep 17 00:00:00 2001 From: Maria Grimaldi Date: Tue, 4 Nov 2025 18:39:29 +0100 Subject: [PATCH 11/20] test: add minimal unittest suite for extended casbin model --- openedx_authz/tests/test_models.py | 139 +++++++++++++++++++++++++++++ 1 file changed, 139 insertions(+) create mode 100644 openedx_authz/tests/test_models.py diff --git a/openedx_authz/tests/test_models.py b/openedx_authz/tests/test_models.py new file mode 100644 index 00000000..7b54a1e8 --- /dev/null +++ b/openedx_authz/tests/test_models.py @@ -0,0 +1,139 @@ +"""Unit tests for authorization models using stub ContentLibrary. + +This test suite verifies the functionality of the authorization models including: +- ExtendedCasbinRule model with metadata and relationships +- Cascade deletion behavior across model hierarchies + +These tests use the stub ContentLibrary model from openedx_authz.tests.stubs.models +instead of the real ContentLibrary model, allowing them to run without the full +edx-platform context. + +Note: This is a simplified unit test suite. For comprehensive tests of Scope/Subject +polymorphism and registry patterns, see the integration tests in test_integration/test_models.py +which run against the real ContentLibrary model. +""" + +from casbin_adapter.models import CasbinRule +from django.contrib.auth import get_user_model +from django.db import IntegrityError +from django.test import TestCase +from opaque_keys.edx.locator import LibraryLocatorV2 + +from openedx_authz.api.data import ContentLibraryData, UserData +from openedx_authz.models import ContentLibraryScope, ExtendedCasbinRule, Scope, Subject, UserSubject +from openedx_authz.tests.stubs.models import ContentLibrary + +User = get_user_model() + + +class TestExtendedCasbinRuleModelWithStub(TestCase): + """Test cases for the ExtendedCasbinRule model using stub setup.""" + + def setUp(self): + """Set up test fixtures.""" + self.test_username = "test_user" + self.test_user = User.objects.create_user(username=self.test_username, email="test@example.com") + + self.library_key = LibraryLocatorV2.from_string("lib:TestOrg:TestLib") + self.content_library = ContentLibrary.objects.get_by_key(self.library_key) + + self.casbin_rule = CasbinRule.objects.create( + ptype="p", + v0="user^test_user", + v1="role^instructor", + v2="lib^lib:TestOrg:TestLib", + v3="allow", + ) + + subject_data = UserData(external_key=self.test_username) + self.subject = Subject.objects.get_or_create_for_external_key(subject_data) + + scope_data = ContentLibraryData(external_key=str(self.library_key)) + self.scope = Scope.objects.get_or_create_for_external_key(scope_data) + + def test_extended_casbin_rule_creation_with_all_fields(self): + """Test creating ExtendedCasbinRule with all fields populated. + + Expected Result: + - ExtendedCasbinRule is created successfully. + - All fields are populated correctly. + - Timestamps are set automatically. + """ + casbin_rule_key = ( + f"{self.casbin_rule.ptype},{self.casbin_rule.v0},{self.casbin_rule.v1}," + f"{self.casbin_rule.v2},{self.casbin_rule.v3}" + ) + + extended_rule = ExtendedCasbinRule.objects.create( + casbin_rule_key=casbin_rule_key, + casbin_rule=self.casbin_rule, + description="Test rule for instructor role", + metadata={"created_by": "test_system", "priority": 1}, + scope=self.scope, + subject=self.subject, + ) + + self.assertIsNotNone(extended_rule) + self.assertEqual(extended_rule.casbin_rule_key, casbin_rule_key) + self.assertEqual(extended_rule.casbin_rule, self.casbin_rule) + self.assertEqual(extended_rule.description, "Test rule for instructor role") + self.assertEqual(extended_rule.metadata["created_by"], "test_system") + self.assertEqual(extended_rule.metadata["priority"], 1) + self.assertEqual(extended_rule.scope, self.scope) + self.assertEqual(extended_rule.subject, self.subject) + self.assertIsNotNone(extended_rule.created_at) + self.assertIsNotNone(extended_rule.updated_at) + + def test_extended_casbin_rule_cascade_deletion_when_scope_deleted(self): + """Deleting a Scope should cascade to ExtendedCasbinRule and trigger the handler cleanup. + + Expected Result: + - ExtendedCasbinRule baseline row links the Scope to the CasbinRule. + - Removing the Scope deletes the ExtendedCasbinRule via database cascade. + - CasbinRule disappears because the post_delete handler mirrors the cascade. + """ + casbin_rule_key = ( + f"{self.casbin_rule.ptype},{self.casbin_rule.v0},{self.casbin_rule.v1}," + f"{self.casbin_rule.v2},{self.casbin_rule.v3}" + ) + extended_rule = ExtendedCasbinRule.objects.create( + casbin_rule_key=casbin_rule_key, + casbin_rule=self.casbin_rule, + scope=self.scope, + ) + extended_rule_id = extended_rule.id + casbin_rule_id = self.casbin_rule.id + scope_id = self.scope.id + + self.scope.delete() + + self.assertFalse(ExtendedCasbinRule.objects.filter(id=extended_rule_id).exists()) + self.assertFalse(CasbinRule.objects.filter(id=casbin_rule_id).exists()) + self.assertFalse(Scope.objects.filter(id=scope_id).exists()) + + def test_extended_casbin_rule_cascade_deletion_when_subject_deleted(self): + """Deleting a Subject should cascade to ExtendedCasbinRule and invoke the handler cleanup. + + Expected Result: + - ExtendedCasbinRule baseline row links the Subject to the CasbinRule. + - Removing the Subject deletes the ExtendedCasbinRule via database cascade. + - CasbinRule disappears because the post_delete handler mirrors the cascade. + """ + casbin_rule_key = ( + f"{self.casbin_rule.ptype},{self.casbin_rule.v0},{self.casbin_rule.v1}," + f"{self.casbin_rule.v2},{self.casbin_rule.v3}" + ) + extended_rule = ExtendedCasbinRule.objects.create( + casbin_rule_key=casbin_rule_key, + casbin_rule=self.casbin_rule, + subject=self.subject, + ) + extended_rule_id = extended_rule.id + casbin_rule_id = self.casbin_rule.id + subject_id = self.subject.id + + self.subject.delete() + + self.assertFalse(ExtendedCasbinRule.objects.filter(id=extended_rule_id).exists()) + self.assertFalse(CasbinRule.objects.filter(id=casbin_rule_id).exists()) + self.assertFalse(Subject.objects.filter(id=subject_id).exists()) From 5996a49f4015c3d358fb7b9f4baa1ece8f02035a Mon Sep 17 00:00:00 2001 From: Maria Grimaldi Date: Wed, 5 Nov 2025 13:55:43 +0100 Subject: [PATCH 12/20] refactor: address quality issues --- openedx_authz/tests/test_models.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/openedx_authz/tests/test_models.py b/openedx_authz/tests/test_models.py index 7b54a1e8..4bdf77ca 100644 --- a/openedx_authz/tests/test_models.py +++ b/openedx_authz/tests/test_models.py @@ -15,12 +15,11 @@ from casbin_adapter.models import CasbinRule from django.contrib.auth import get_user_model -from django.db import IntegrityError from django.test import TestCase from opaque_keys.edx.locator import LibraryLocatorV2 from openedx_authz.api.data import ContentLibraryData, UserData -from openedx_authz.models import ContentLibraryScope, ExtendedCasbinRule, Scope, Subject, UserSubject +from openedx_authz.models import ExtendedCasbinRule, Scope, Subject from openedx_authz.tests.stubs.models import ContentLibrary User = get_user_model() From 59053d70d766d1af11ae75f051a4b52bb7019503 Mon Sep 17 00:00:00 2001 From: Maria Grimaldi Date: Wed, 22 Oct 2025 17:51:06 +0200 Subject: [PATCH 13/20] feat: add handler to remove roles when user is retired --- openedx_authz/api/roles.py | 16 + openedx_authz/api/users.py | 14 + openedx_authz/handlers.py | 33 ++ openedx_authz/tests/api/test_roles.py | 169 +++++++++ openedx_authz/tests/api/test_users.py | 203 +++++++++++ .../tests/integration/test_handlers.py | 336 ++++++++++++++++++ 6 files changed, 771 insertions(+) create mode 100644 openedx_authz/tests/integration/test_handlers.py diff --git a/openedx_authz/api/roles.py b/openedx_authz/api/roles.py index fa9385b1..15b6f0f6 100644 --- a/openedx_authz/api/roles.py +++ b/openedx_authz/api/roles.py @@ -41,6 +41,7 @@ "get_all_subject_role_assignments_in_scope", "get_subject_role_assignments", "get_scopes_for_subject_and_permission", + "unassign_subjects_from_all_roles", ] # TODO: these are the concerns we still have to address: @@ -418,3 +419,18 @@ def get_scopes_for_subject_and_permission( if permission in role.permissions and role_assignment.scope not in scopes: scopes.append(role_assignment.scope) return scopes + return [SubjectData(namespaced_key=policy[GroupingPolicyIndex.SUBJECT.value]) for policy in policies] + + +def unassign_subjects_from_all_roles(subject: SubjectData) -> bool: + """Unassign a subject from all roles across all scopes. + + Args: + subject: The SubjectData object representing the subject to unassign. + + Returns: + bool: True if any roles were removed, False otherwise. + """ + enforcer = AuthzEnforcer.get_enforcer() + enforcer.load_policy() + return enforcer.remove_filtered_grouping_policy(GroupingPolicyIndex.SUBJECT.value, subject.namespaced_key) diff --git a/openedx_authz/api/users.py b/openedx_authz/api/users.py index 7b1f58a7..30cd3900 100644 --- a/openedx_authz/api/users.py +++ b/openedx_authz/api/users.py @@ -22,6 +22,7 @@ get_subject_role_assignments_in_scope, get_subjects_for_role_in_scope, unassign_role_from_subject_in_scope, + unassign_subjects_from_all_roles, ) __all__ = [ @@ -36,6 +37,7 @@ "is_user_allowed", "get_scopes_for_user_and_permission", "get_users_for_role_in_scope", + "unassign_all_roles_from_user", ] @@ -223,3 +225,15 @@ def get_scopes_for_user_and_permission( UserData(external_key=user_external_key), PermissionData(action=ActionData(external_key=action_external_key)), ) + + +def unassign_all_roles_from_user(user_external_key: str) -> bool: + """Unassign all roles from a user across all scopes. + + Args: + user_external_key (str): ID of the user (e.g., 'john_doe'). + + Returns: + bool: True if any roles were removed, False otherwise. + """ + return unassign_subjects_from_all_roles(UserData(external_key=user_external_key)) diff --git a/openedx_authz/handlers.py b/openedx_authz/handlers.py index 45541b83..f0129499 100644 --- a/openedx_authz/handlers.py +++ b/openedx_authz/handlers.py @@ -10,8 +10,14 @@ from django.db.models.signals import post_delete from django.dispatch import receiver +from openedx_authz.api.users import unassign_all_roles_from_user from openedx_authz.models.core import ExtendedCasbinRule +try: + from openedx.core.djangoapps.user_api.accounts.signals import USER_RETIRE_LMS_CRITICAL +except ImportError: + USER_RETIRE_LMS_CRITICAL = None + logger = logging.getLogger(__name__) @@ -48,3 +54,30 @@ def delete_casbin_rule_on_extended_rule_deletion(sender, instance, **kwargs): # instance.casbin_rule_id, exc_info=exc, ) + + +def unassign_roles_on_user_retirement(sender, user, **kwargs): + """Unassign roles from a user when they are retired. + + This handler is triggered when a user is retired in the LMS. It ensures that + any roles assigned to the user are removed, maintaining the integrity of the + authorization system. + + Args: + sender: The model class (User). + user: The user instance being retired. + **kwargs: Additional keyword arguments from the signal. + """ + try: + unassign_all_roles_from_user(user.username) + except Exception as exc: + logger.exception( + "Error unassigning roles from user %s during retirement", + user.id, + exc_info=exc, + ) + + +# Only register the handler if the signal is available (i.e., running in Open edX) +if USER_RETIRE_LMS_CRITICAL is not None: + USER_RETIRE_LMS_CRITICAL.connect(unassign_roles_on_user_retirement) diff --git a/openedx_authz/tests/api/test_roles.py b/openedx_authz/tests/api/test_roles.py index 431ca9ab..e1eeaf42 100644 --- a/openedx_authz/tests/api/test_roles.py +++ b/openedx_authz/tests/api/test_roles.py @@ -35,6 +35,7 @@ get_subject_role_assignments_in_scope, get_subjects_for_role_in_scope, unassign_role_from_subject_in_scope, + unassign_subjects_from_all_roles, ) from openedx_authz.constants import permissions, roles from openedx_authz.constants.roles import ( @@ -976,3 +977,171 @@ def test_assign_role_creates_extended_casbin_rule(self): self.assertIn(role_data.namespaced_key, extended_rule.casbin_rule_key) self.assertIn(subject_data.namespaced_key, extended_rule.casbin_rule_key) self.assertIn(scope_data.namespaced_key, extended_rule.casbin_rule_key) + + + @ddt_data( + # Test user with single role in single scope + ("alice", ["lib:Org1:math_101"], {"library_admin"}), + # Test user with multiple roles in different scopes + ("eve", ["lib:Org2:physics_401", "lib:Org2:chemistry_501", "lib:Org2:biology_601"], + {"library_admin", "library_author", "library_user"}), + # Test user with same role in multiple scopes + ("liam", ["lib:Org4:art_101", "lib:Org4:art_201", "lib:Org4:art_301"], {"library_author"}), + # Test user with multiple different roles in multiple scopes + ("peter", ["lib:Org6:project_alpha", "lib:Org6:project_beta", "lib:Org6:project_gamma", "lib:Org6:project_delta"], + {"library_admin", "library_author", "library_contributor", "library_user"}), + ) + @unpack + def test_unassign_subject_from_all_roles_removes_all_assignments( + self, subject_name, scopes, expected_roles_before + ): + """Test that unassign_subjects_from_all_roles removes all role assignments. + + Expected result: + - Before unassignment: Subject has roles in specified scopes + - Function returns True indicating roles were removed + - After unassignment: Subject has no role assignments in any scope + - Querying role assignments returns empty list + """ + subject = SubjectData(external_key=subject_name) + + # Verify the subject has roles before unassignment + assignments_before = get_subject_role_assignments(subject) + self.assertGreater(len(assignments_before), 0) + + # Verify roles are what we expect before removal + roles_before = { + r.external_key for assignment in assignments_before for r in assignment.roles + } + self.assertEqual(roles_before, expected_roles_before) + + # Verify assignments exist in each expected scope + for scope_name in scopes: + scope_assignments = get_subject_role_assignments_in_scope( + subject, ScopeData(external_key=scope_name) + ) + self.assertGreater(len(scope_assignments), 0) + + # Unassign all roles from the subject + result = unassign_subjects_from_all_roles(subject) + + # Verify the function returns True (indicating roles were removed) + self.assertTrue(result) + + # Verify the subject has no role assignments after unassignment + assignments_after = get_subject_role_assignments(subject) + self.assertEqual(len(assignments_after), 0) + + # Verify no assignments in any of the previous scopes + for scope_name in scopes: + scope_assignments = get_subject_role_assignments_in_scope( + subject, ScopeData(external_key=scope_name) + ) + self.assertEqual(len(scope_assignments), 0) + + def test_unassign_subject_with_no_roles_returns_false(self): + """Test that unassigning a subject with no roles returns False. + + Expected result: + - Function returns False when subject has no role assignments + - No errors occur when trying to unassign from non-existent subject + """ + non_existent_subject = SubjectData(external_key="user_with_no_roles") + + # Verify the subject has no roles + assignments_before = get_subject_role_assignments(non_existent_subject) + self.assertEqual(len(assignments_before), 0) + + # Unassign all roles (should return False since there are none) + result = unassign_subjects_from_all_roles(non_existent_subject) + + # Verify the function returns False (no roles to remove) + self.assertFalse(result) + + # Verify still no assignments after the operation + assignments_after = get_subject_role_assignments(non_existent_subject) + self.assertEqual(len(assignments_after), 0) + + def test_unassign_subject_does_not_affect_other_subjects(self): + """Test that unassigning one subject does not affect other subjects. + + Expected result: + - When unassigning roles from one subject, other subjects retain their roles + - Other subjects with the same roles in the same scopes are unaffected + """ + # Use subjects that share the same scope + subject_to_unassign = SubjectData(external_key="grace") + other_subject = SubjectData(external_key="heidi") + shared_scope = ScopeData(external_key="lib:Org1:math_advanced") + + # Verify both subjects have roles in the shared scope before + grace_assignments_before = get_subject_role_assignments_in_scope( + subject_to_unassign, shared_scope + ) + heidi_assignments_before = get_subject_role_assignments_in_scope( + other_subject, shared_scope + ) + + self.assertGreater(len(grace_assignments_before), 0) + self.assertGreater(len(heidi_assignments_before), 0) + + # Unassign all roles from grace + result = unassign_subjects_from_all_roles(subject_to_unassign) + self.assertTrue(result) + + # Verify grace has no assignments after unassignment + grace_assignments_after = get_subject_role_assignments(subject_to_unassign) + self.assertEqual(len(grace_assignments_after), 0) + + # Verify heidi still has her assignments + heidi_assignments_after = get_subject_role_assignments_in_scope( + other_subject, shared_scope + ) + self.assertEqual(len(heidi_assignments_after), len(heidi_assignments_before)) + + # Verify heidi still has the library_contributor role + heidi_roles = { + r.external_key + for assignment in heidi_assignments_after + for r in assignment.roles + } + self.assertIn("library_contributor", heidi_roles) + + def test_unassign_and_reassign_subject(self): + """Test that a subject can be reassigned roles after being unassigned. + + Expected result: + - Subject has roles initially + - After unassignment, subject has no roles + - Subject can be assigned new roles + - Newly assigned roles work correctly + """ + subject = SubjectData(external_key="bob") + original_scope = ScopeData(external_key="lib:Org1:history_201") + new_scope = ScopeData(external_key="lib:Org1:new_library") + new_role = RoleData(external_key="library_admin") + + # Verify bob has roles initially + assignments_before = get_subject_role_assignments(subject) + self.assertGreater(len(assignments_before), 0) + + # Unassign all roles + result = unassign_subjects_from_all_roles(subject) + self.assertTrue(result) + + # Verify no roles after unassignment + assignments_after_unassign = get_subject_role_assignments(subject) + self.assertEqual(len(assignments_after_unassign), 0) + + # Assign a new role in a new scope + assign_result = assign_role_to_subject_in_scope(subject, new_role, new_scope) + self.assertTrue(assign_result) + + # Verify the new assignment works + new_assignments = get_subject_role_assignments_in_scope(subject, new_scope) + self.assertEqual(len(new_assignments), 1) + + new_roles = { + r.external_key for assignment in new_assignments for r in assignment.roles + } + self.assertIn("library_admin", new_roles) diff --git a/openedx_authz/tests/api/test_users.py b/openedx_authz/tests/api/test_users.py index 369d740b..7c82639b 100644 --- a/openedx_authz/tests/api/test_users.py +++ b/openedx_authz/tests/api/test_users.py @@ -12,6 +12,7 @@ get_user_role_assignments_for_role_in_scope, get_user_role_assignments_in_scope, is_user_allowed, + unassign_all_roles_from_user, unassign_role_from_user, ) from openedx_authz.constants import permissions, roles @@ -229,6 +230,208 @@ def test_get_all_user_role_assignments_in_scope(self, scope_name, expected_assig for assignment in role_assignments: self.assertIn(assignment, expected_assignments) + @data( + # Test user with single role in single scope + ("alice", ["lib:Org1:math_101"], {"library_admin"}), + # Test user with multiple roles in different scopes + ("eve", ["lib:Org2:physics_401", "lib:Org2:chemistry_501", "lib:Org2:biology_601"], + {"library_admin", "library_author", "library_user"}), + # Test user with same role in multiple scopes + ("liam", ["lib:Org4:art_101", "lib:Org4:art_201", "lib:Org4:art_301"], {"library_author"}), + # Test user with multiple different roles in multiple scopes + ("peter", ["lib:Org6:project_alpha", "lib:Org6:project_beta", "lib:Org6:project_gamma", "lib:Org6:project_delta"], + {"library_admin", "library_author", "library_contributor", "library_user"}), + ) + @unpack + def test_unassign_all_roles_from_user_removes_all_assignments( + self, username, scopes, expected_roles_before + ): + """Test that unassign_all_roles_from_user removes all role assignments. + + Expected result: + - Before unassignment: User has roles in specified scopes + - Function returns True indicating roles were removed + - After unassignment: User has no role assignments in any scope + - Querying role assignments returns empty list + """ + # Verify the user has roles before unassignment + assignments_before = get_user_role_assignments(user_external_key=username) + self.assertGreater(len(assignments_before), 0) + + # Verify roles are what we expect before removal + roles_before = { + r.external_key for assignment in assignments_before for r in assignment.roles + } + self.assertEqual(roles_before, expected_roles_before) + + # Verify assignments exist in each expected scope + for scope_name in scopes: + scope_assignments = get_user_role_assignments_in_scope( + user_external_key=username, scope_external_key=scope_name + ) + self.assertGreater(len(scope_assignments), 0) + + # Unassign all roles from the user + result = unassign_all_roles_from_user(user_external_key=username) + + # Verify the function returns True (indicating roles were removed) + self.assertTrue(result) + + # Verify the user has no role assignments after unassignment + assignments_after = get_user_role_assignments(user_external_key=username) + self.assertEqual(len(assignments_after), 0) + + # Verify no assignments in any of the previous scopes + for scope_name in scopes: + scope_assignments = get_user_role_assignments_in_scope( + user_external_key=username, scope_external_key=scope_name + ) + self.assertEqual(len(scope_assignments), 0) + + def test_unassign_all_roles_from_user_with_no_roles_returns_false(self): + """Test that unassigning a user with no roles returns False. + + Expected result: + - Function returns False when user has no role assignments + - No errors occur when trying to unassign from non-existent user + """ + non_existent_user = "user_with_no_roles" + + # Verify the user has no roles + assignments_before = get_user_role_assignments(user_external_key=non_existent_user) + self.assertEqual(len(assignments_before), 0) + + # Unassign all roles (should return False since there are none) + result = unassign_all_roles_from_user(user_external_key=non_existent_user) + + # Verify the function returns False (no roles to remove) + self.assertFalse(result) + + # Verify still no assignments after the operation + assignments_after = get_user_role_assignments(user_external_key=non_existent_user) + self.assertEqual(len(assignments_after), 0) + + def test_unassign_all_roles_does_not_affect_other_users(self): + """Test that unassigning one user does not affect other users. + + Expected result: + - When unassigning roles from one user, other users retain their roles + - Other users with the same roles in the same scopes are unaffected + """ + # Use users that share the same scope + user_to_unassign = "grace" + other_user = "heidi" + shared_scope = "lib:Org1:math_advanced" + + # Verify both users have roles in the shared scope before + grace_assignments_before = get_user_role_assignments_in_scope( + user_external_key=user_to_unassign, scope_external_key=shared_scope + ) + heidi_assignments_before = get_user_role_assignments_in_scope( + user_external_key=other_user, scope_external_key=shared_scope + ) + + self.assertGreater(len(grace_assignments_before), 0) + self.assertGreater(len(heidi_assignments_before), 0) + + # Unassign all roles from grace + result = unassign_all_roles_from_user(user_external_key=user_to_unassign) + self.assertTrue(result) + + # Verify grace has no assignments after unassignment + grace_assignments_after = get_user_role_assignments(user_external_key=user_to_unassign) + self.assertEqual(len(grace_assignments_after), 0) + + # Verify heidi still has her assignments + heidi_assignments_after = get_user_role_assignments_in_scope( + user_external_key=other_user, scope_external_key=shared_scope + ) + self.assertEqual(len(heidi_assignments_after), len(heidi_assignments_before)) + + # Verify heidi still has the library_contributor role + heidi_roles = { + r.external_key + for assignment in heidi_assignments_after + for r in assignment.roles + } + self.assertIn("library_contributor", heidi_roles) + + def test_unassign_and_reassign_user(self): + """Test that a user can be reassigned roles after being unassigned. + + Expected result: + - User has roles initially + - After unassignment, user has no roles + - User can be assigned new roles + - Newly assigned roles work correctly + """ + username = "bob" + new_scope = "lib:Org1:new_library" + new_role = "library_admin" + + # Verify bob has roles initially + assignments_before = get_user_role_assignments(user_external_key=username) + self.assertGreater(len(assignments_before), 0) + + # Unassign all roles + result = unassign_all_roles_from_user(user_external_key=username) + self.assertTrue(result) + + # Verify no roles after unassignment + assignments_after_unassign = get_user_role_assignments(user_external_key=username) + self.assertEqual(len(assignments_after_unassign), 0) + + # Assign a new role in a new scope + assign_result = assign_role_to_user_in_scope( + user_external_key=username, + role_external_key=new_role, + scope_external_key=new_scope + ) + self.assertTrue(assign_result) + + # Verify the new assignment works + new_assignments = get_user_role_assignments_in_scope( + user_external_key=username, scope_external_key=new_scope + ) + self.assertEqual(len(new_assignments), 1) + + new_roles = { + r.external_key for assignment in new_assignments for r in assignment.roles + } + self.assertIn(new_role, new_roles) + + def test_unassign_all_roles_impacts_permissions(self): + """Test that unassigning all roles removes the user's permissions. + + Expected result: + - User has permissions before unassignment + - After unassignment, user no longer has those permissions + - Permission checks return False after unassignment + """ + username = "alice" + scope = "lib:Org1:math_101" + action = "delete_library" # library_admin permission + + # Verify alice has the permission before unassignment + has_permission_before = is_user_allowed( + user_external_key=username, + action_external_key=action, + scope_external_key=scope, + ) + self.assertTrue(has_permission_before) + + # Unassign all roles + result = unassign_all_roles_from_user(user_external_key=username) + self.assertTrue(result) + + # Verify alice no longer has the permission + has_permission_after = is_user_allowed( + user_external_key=username, + action_external_key=action, + scope_external_key=scope, + ) + self.assertFalse(has_permission_after) + @ddt class TestUserPermissions(UserAssignmentsSetupMixin): diff --git a/openedx_authz/tests/integration/test_handlers.py b/openedx_authz/tests/integration/test_handlers.py new file mode 100644 index 00000000..ae679251 --- /dev/null +++ b/openedx_authz/tests/integration/test_handlers.py @@ -0,0 +1,336 @@ +"""Integration tests for signal handlers in Open edX environment. + +These tests verify that signal handlers work correctly when integrated with +the real Open edX platform, particularly testing the user retirement flow. + +Run these tests in an edx-platform environment where the USER_RETIRE_LMS_CRITICAL +signal is available. +""" + +from django.contrib.auth import get_user_model +from django.test import TestCase + +from openedx.core.djangoapps.user_api.accounts.signals import USER_RETIRE_LMS_CRITICAL +from openedx_authz.api.users import ( + assign_role_to_user_in_scope, + get_user_role_assignments, + get_user_role_assignments_in_scope, +) +from openedx_authz.api.data import UserData +from openedx_authz.engine.enforcer import AuthzEnforcer +from openedx_authz.models import ExtendedCasbinRule, Subject + +User = get_user_model() + + +class TestUserRetirementSignalIntegration(TestCase): + """Integration tests for the USER_RETIRE_LMS_CRITICAL signal handler. + + These tests verify that when a user retirement signal is sent in a real Open edX + environment, all role assignments for the user are properly cleaned up across all scopes. + """ + + def setUp(self): + """Set up test data with real users and role assignments.""" + # Create real Django users for testing + self.retiring_user = User.objects.create_user( + username='retiring_user', + email='retiring@example.com', + password='testpass123' + ) + self.other_user = User.objects.create_user( + username='other_user', + email='other@example.com', + password='testpass123' + ) + + # Load enforcer policy + enforcer = AuthzEnforcer.get_enforcer() + enforcer.load_policy() + + def tearDown(self): + """Clean up test data.""" + # Clean up users + User.objects.filter(username__in=['retiring_user', 'other_user']).delete() + + # Clear enforcer policy + enforcer = AuthzEnforcer.get_enforcer() + enforcer.clear_policy() + + def test_user_retirement_signal_removes_all_role_assignments(self): + """Test that sending USER_RETIRE_LMS_CRITICAL removes all roles for a user. + + Expected Result: + - User has multiple role assignments before signal is sent + - After signal is sent, user has no role assignments + - Other users' role assignments are unaffected + """ + # Assign roles to retiring user in multiple scopes + assign_role_to_user_in_scope( + self.retiring_user.username, + "library_admin", + "lib:TestOrg:lib1" + ) + assign_role_to_user_in_scope( + self.retiring_user.username, + "library_author", + "lib:TestOrg:lib2" + ) + assign_role_to_user_in_scope( + self.retiring_user.username, + "library_user", + "lib:TestOrg:lib3" + ) + + # Assign role to other user + assign_role_to_user_in_scope( + self.other_user.username, + "library_admin", + "lib:TestOrg:lib4" + ) + + # Verify users have roles before retirement + retiring_user_roles_before = get_user_role_assignments(self.retiring_user.username) + other_user_roles_before = get_user_role_assignments(self.other_user.username) + + self.assertEqual(len(retiring_user_roles_before), 3) + self.assertEqual(len(other_user_roles_before), 1) + + # Send the retirement signal + USER_RETIRE_LMS_CRITICAL.send( + sender=User, + user=self.retiring_user + ) + + # Verify roles are removed for retiring user but not other user + retiring_user_roles_after = get_user_role_assignments(self.retiring_user.username) + other_user_roles_after = get_user_role_assignments(self.other_user.username) + + self.assertEqual(len(retiring_user_roles_after), 0) + self.assertEqual(len(other_user_roles_after), 1) + + def test_user_retirement_signal_with_no_roles(self): + """Test that retirement signal handles users with no roles gracefully. + + Expected Result: + - User has no roles before signal + - Signal completes without error + - User still has no roles after signal + """ + # Create user with no role assignments + user_no_roles = User.objects.create_user( + username='user_no_roles', + email='noroles@example.com', + password='testpass123' + ) + + # Verify user has no roles + user_roles_before = get_user_role_assignments(user_no_roles.username) + self.assertEqual(len(user_roles_before), 0) + + # Send retirement signal - should not raise error + USER_RETIRE_LMS_CRITICAL.send( + sender=User, + user=user_no_roles + ) + + # Verify still no roles + user_roles_after = get_user_role_assignments(user_no_roles.username) + self.assertEqual(len(user_roles_after), 0) + + # Cleanup + user_no_roles.delete() + + def test_user_retirement_removes_extended_casbin_rules(self): + """Test that user retirement also cleans up ExtendedCasbinRule records. + + Expected Result: + - User has ExtendedCasbinRule records linked to their assignments + - After retirement signal, ExtendedCasbinRule records are removed + - This ensures complete cleanup including database integrity + """ + # Assign roles which should create ExtendedCasbinRule records + assign_role_to_user_in_scope( + self.retiring_user.username, + "library_admin", + "lib:TestOrg:cleanup1" + ) + assign_role_to_user_in_scope( + self.retiring_user.username, + "library_author", + "lib:TestOrg:cleanup2" + ) + + # Get the subject to check ExtendedCasbinRule records + user_data = UserData(external_key=self.retiring_user.username) + subject = Subject.objects.get_or_create_for_external_key(user_data) + + # Verify ExtendedCasbinRule records exist + extended_rules_before = ExtendedCasbinRule.objects.filter(subject=subject) + self.assertGreater(extended_rules_before.count(), 0) + + # Send retirement signal + USER_RETIRE_LMS_CRITICAL.send( + sender=User, + user=self.retiring_user + ) + + # Verify ExtendedCasbinRule records are cleaned up + extended_rules_after = ExtendedCasbinRule.objects.filter(subject=subject) + self.assertEqual(extended_rules_after.count(), 0) + + def test_user_retirement_with_multiple_scopes_same_role(self): + """Test retirement for user with same role in multiple scopes. + + Expected Result: + - User has same role assigned in multiple different scopes + - After retirement, all assignments across all scopes are removed + - Role assignments are completely cleared + """ + scopes = [ + "lib:Org1:scope1", + "lib:Org2:scope2", + "lib:Org3:scope3" + ] + + # Assign same role in multiple scopes + for scope in scopes: + assign_role_to_user_in_scope( + self.retiring_user.username, + "library_admin", + scope + ) + + # Verify assignments in each scope before retirement + for scope in scopes: + assignments = get_user_role_assignments_in_scope( + self.retiring_user.username, + scope + ) + self.assertEqual(len(assignments), 1) + + total_assignments_before = get_user_role_assignments(self.retiring_user.username) + self.assertEqual(len(total_assignments_before), 3) + + # Send retirement signal + USER_RETIRE_LMS_CRITICAL.send( + sender=User, + user=self.retiring_user + ) + + # Verify all assignments removed from all scopes + for scope in scopes: + assignments = get_user_role_assignments_in_scope( + self.retiring_user.username, + scope + ) + self.assertEqual(len(assignments), 0) + + total_assignments_after = get_user_role_assignments(self.retiring_user.username) + self.assertEqual(len(total_assignments_after), 0) + + def test_user_retirement_with_mixed_role_types(self): + """Test retirement for user with different roles across scopes. + + Expected Result: + - User has different roles (admin, author, contributor, user) in different scopes + - After retirement, all roles are removed regardless of type + - Comprehensive cleanup across all role types + """ + role_scope_pairs = [ + ("library_admin", "lib:TestOrg:admin_scope"), + ("library_author", "lib:TestOrg:author_scope"), + ("library_contributor", "lib:TestOrg:contrib_scope"), + ("library_user", "lib:TestOrg:user_scope"), + ] + + # Assign different roles in different scopes + for role, scope in role_scope_pairs: + assign_role_to_user_in_scope( + self.retiring_user.username, + role, + scope + ) + + # Verify all assignments exist + total_assignments_before = get_user_role_assignments(self.retiring_user.username) + self.assertEqual(len(total_assignments_before), 4) + + # Extract role types to verify diversity + roles_before = { + r.external_key + for assignment in total_assignments_before + for r in assignment.roles + } + self.assertEqual( + roles_before, + {"library_admin", "library_author", "library_contributor", "library_user"} + ) + + # Send retirement signal + USER_RETIRE_LMS_CRITICAL.send( + sender=User, + user=self.retiring_user + ) + + # Verify all roles removed + total_assignments_after = get_user_role_assignments(self.retiring_user.username) + self.assertEqual(len(total_assignments_after), 0) + + def test_multiple_user_retirements_do_not_interfere(self): + """Test that retiring multiple users doesn't affect each other. + + Expected Result: + - Multiple users each have their own role assignments + - Retiring one user removes only their assignments + - Retiring another user removes only their assignments + - No cross-contamination between user retirements + """ + # Create additional users + user1 = User.objects.create_user( + username='retire_test_1', + email='retire1@example.com', + password='testpass123' + ) + user2 = User.objects.create_user( + username='retire_test_2', + email='retire2@example.com', + password='testpass123' + ) + user3 = User.objects.create_user( + username='retire_test_3', + email='retire3@example.com', + password='testpass123' + ) + + # Assign roles to each user + for user in [user1, user2, user3]: + assign_role_to_user_in_scope( + user.username, + "library_admin", + f"lib:TestOrg:{user.username}_scope" + ) + + # Verify all users have assignments + for user in [user1, user2, user3]: + assignments = get_user_role_assignments(user.username) + self.assertEqual(len(assignments), 1) + + # Retire user1 + USER_RETIRE_LMS_CRITICAL.send(sender=User, user=user1) + + # Verify user1 has no assignments, but user2 and user3 still do + self.assertEqual(len(get_user_role_assignments(user1.username)), 0) + self.assertEqual(len(get_user_role_assignments(user2.username)), 1) + self.assertEqual(len(get_user_role_assignments(user3.username)), 1) + + # Retire user2 + USER_RETIRE_LMS_CRITICAL.send(sender=User, user=user2) + + # Verify user2 has no assignments, but user3 still does + self.assertEqual(len(get_user_role_assignments(user2.username)), 0) + self.assertEqual(len(get_user_role_assignments(user3.username)), 1) + + # Cleanup + for user in [user1, user2, user3]: + user.delete() From 2925a21d9d5dd3a8b95597d5a5a013df5f18b9b5 Mon Sep 17 00:00:00 2001 From: Maria Grimaldi Date: Thu, 23 Oct 2025 12:31:34 +0200 Subject: [PATCH 14/20] refactor: address PR review --- openedx_authz/api/roles.py | 4 ++-- openedx_authz/api/users.py | 4 ++-- openedx_authz/tests/api/test_roles.py | 12 ++++++------ 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/openedx_authz/api/roles.py b/openedx_authz/api/roles.py index 15b6f0f6..be5758f5 100644 --- a/openedx_authz/api/roles.py +++ b/openedx_authz/api/roles.py @@ -41,7 +41,7 @@ "get_all_subject_role_assignments_in_scope", "get_subject_role_assignments", "get_scopes_for_subject_and_permission", - "unassign_subjects_from_all_roles", + "unassign_subject_from_all_roles", ] # TODO: these are the concerns we still have to address: @@ -422,7 +422,7 @@ def get_scopes_for_subject_and_permission( return [SubjectData(namespaced_key=policy[GroupingPolicyIndex.SUBJECT.value]) for policy in policies] -def unassign_subjects_from_all_roles(subject: SubjectData) -> bool: +def unassign_subject_from_all_roles(subject: SubjectData) -> bool: """Unassign a subject from all roles across all scopes. Args: diff --git a/openedx_authz/api/users.py b/openedx_authz/api/users.py index 30cd3900..e07df678 100644 --- a/openedx_authz/api/users.py +++ b/openedx_authz/api/users.py @@ -22,7 +22,7 @@ get_subject_role_assignments_in_scope, get_subjects_for_role_in_scope, unassign_role_from_subject_in_scope, - unassign_subjects_from_all_roles, + unassign_subject_from_all_roles, ) __all__ = [ @@ -236,4 +236,4 @@ def unassign_all_roles_from_user(user_external_key: str) -> bool: Returns: bool: True if any roles were removed, False otherwise. """ - return unassign_subjects_from_all_roles(UserData(external_key=user_external_key)) + return unassign_subject_from_all_roles(UserData(external_key=user_external_key)) diff --git a/openedx_authz/tests/api/test_roles.py b/openedx_authz/tests/api/test_roles.py index e1eeaf42..67588cd5 100644 --- a/openedx_authz/tests/api/test_roles.py +++ b/openedx_authz/tests/api/test_roles.py @@ -35,7 +35,7 @@ get_subject_role_assignments_in_scope, get_subjects_for_role_in_scope, unassign_role_from_subject_in_scope, - unassign_subjects_from_all_roles, + unassign_subject_from_all_roles, ) from openedx_authz.constants import permissions, roles from openedx_authz.constants.roles import ( @@ -995,7 +995,7 @@ def test_assign_role_creates_extended_casbin_rule(self): def test_unassign_subject_from_all_roles_removes_all_assignments( self, subject_name, scopes, expected_roles_before ): - """Test that unassign_subjects_from_all_roles removes all role assignments. + """Test that unassign_subject_from_all_roles removes all role assignments. Expected result: - Before unassignment: Subject has roles in specified scopes @@ -1023,7 +1023,7 @@ def test_unassign_subject_from_all_roles_removes_all_assignments( self.assertGreater(len(scope_assignments), 0) # Unassign all roles from the subject - result = unassign_subjects_from_all_roles(subject) + result = unassign_subject_from_all_roles(subject) # Verify the function returns True (indicating roles were removed) self.assertTrue(result) @@ -1053,7 +1053,7 @@ def test_unassign_subject_with_no_roles_returns_false(self): self.assertEqual(len(assignments_before), 0) # Unassign all roles (should return False since there are none) - result = unassign_subjects_from_all_roles(non_existent_subject) + result = unassign_subject_from_all_roles(non_existent_subject) # Verify the function returns False (no roles to remove) self.assertFalse(result) @@ -1086,7 +1086,7 @@ def test_unassign_subject_does_not_affect_other_subjects(self): self.assertGreater(len(heidi_assignments_before), 0) # Unassign all roles from grace - result = unassign_subjects_from_all_roles(subject_to_unassign) + result = unassign_subject_from_all_roles(subject_to_unassign) self.assertTrue(result) # Verify grace has no assignments after unassignment @@ -1126,7 +1126,7 @@ def test_unassign_and_reassign_subject(self): self.assertGreater(len(assignments_before), 0) # Unassign all roles - result = unassign_subjects_from_all_roles(subject) + result = unassign_subject_from_all_roles(subject) self.assertTrue(result) # Verify no roles after unassignment From e6096e228e52d168f67911370ffaa60cb67af618 Mon Sep 17 00:00:00 2001 From: Maria Grimaldi Date: Fri, 24 Oct 2025 14:39:19 +0200 Subject: [PATCH 15/20] refactor: address issues after rebase --- openedx_authz/tests/api/test_roles.py | 52 +++---- openedx_authz/tests/api/test_users.py | 40 ++--- .../tests/integration/test_handlers.py | 142 ++++-------------- 3 files changed, 63 insertions(+), 171 deletions(-) diff --git a/openedx_authz/tests/api/test_roles.py b/openedx_authz/tests/api/test_roles.py index 67588cd5..5da438c1 100644 --- a/openedx_authz/tests/api/test_roles.py +++ b/openedx_authz/tests/api/test_roles.py @@ -983,18 +983,22 @@ def test_assign_role_creates_extended_casbin_rule(self): # Test user with single role in single scope ("alice", ["lib:Org1:math_101"], {"library_admin"}), # Test user with multiple roles in different scopes - ("eve", ["lib:Org2:physics_401", "lib:Org2:chemistry_501", "lib:Org2:biology_601"], - {"library_admin", "library_author", "library_user"}), + ( + "eve", + ["lib:Org2:physics_401", "lib:Org2:chemistry_501", "lib:Org2:biology_601"], + {"library_admin", "library_author", "library_user"}, + ), # Test user with same role in multiple scopes ("liam", ["lib:Org4:art_101", "lib:Org4:art_201", "lib:Org4:art_301"], {"library_author"}), # Test user with multiple different roles in multiple scopes - ("peter", ["lib:Org6:project_alpha", "lib:Org6:project_beta", "lib:Org6:project_gamma", "lib:Org6:project_delta"], - {"library_admin", "library_author", "library_contributor", "library_user"}), + ( + "peter", + ["lib:Org6:project_alpha", "lib:Org6:project_beta", "lib:Org6:project_gamma", "lib:Org6:project_delta"], + {"library_admin", "library_author", "library_contributor", "library_user"}, + ), ) @unpack - def test_unassign_subject_from_all_roles_removes_all_assignments( - self, subject_name, scopes, expected_roles_before - ): + def test_unassign_subject_from_all_roles_removes_all_assignments(self, subject_name, scopes, expected_roles_before): """Test that unassign_subject_from_all_roles removes all role assignments. Expected result: @@ -1010,16 +1014,12 @@ def test_unassign_subject_from_all_roles_removes_all_assignments( self.assertGreater(len(assignments_before), 0) # Verify roles are what we expect before removal - roles_before = { - r.external_key for assignment in assignments_before for r in assignment.roles - } + roles_before = {r.external_key for assignment in assignments_before for r in assignment.roles} self.assertEqual(roles_before, expected_roles_before) # Verify assignments exist in each expected scope for scope_name in scopes: - scope_assignments = get_subject_role_assignments_in_scope( - subject, ScopeData(external_key=scope_name) - ) + scope_assignments = get_subject_role_assignments_in_scope(subject, ScopeData(external_key=scope_name)) self.assertGreater(len(scope_assignments), 0) # Unassign all roles from the subject @@ -1034,9 +1034,7 @@ def test_unassign_subject_from_all_roles_removes_all_assignments( # Verify no assignments in any of the previous scopes for scope_name in scopes: - scope_assignments = get_subject_role_assignments_in_scope( - subject, ScopeData(external_key=scope_name) - ) + scope_assignments = get_subject_role_assignments_in_scope(subject, ScopeData(external_key=scope_name)) self.assertEqual(len(scope_assignments), 0) def test_unassign_subject_with_no_roles_returns_false(self): @@ -1075,12 +1073,8 @@ def test_unassign_subject_does_not_affect_other_subjects(self): shared_scope = ScopeData(external_key="lib:Org1:math_advanced") # Verify both subjects have roles in the shared scope before - grace_assignments_before = get_subject_role_assignments_in_scope( - subject_to_unassign, shared_scope - ) - heidi_assignments_before = get_subject_role_assignments_in_scope( - other_subject, shared_scope - ) + grace_assignments_before = get_subject_role_assignments_in_scope(subject_to_unassign, shared_scope) + heidi_assignments_before = get_subject_role_assignments_in_scope(other_subject, shared_scope) self.assertGreater(len(grace_assignments_before), 0) self.assertGreater(len(heidi_assignments_before), 0) @@ -1094,17 +1088,11 @@ def test_unassign_subject_does_not_affect_other_subjects(self): self.assertEqual(len(grace_assignments_after), 0) # Verify heidi still has her assignments - heidi_assignments_after = get_subject_role_assignments_in_scope( - other_subject, shared_scope - ) + heidi_assignments_after = get_subject_role_assignments_in_scope(other_subject, shared_scope) self.assertEqual(len(heidi_assignments_after), len(heidi_assignments_before)) # Verify heidi still has the library_contributor role - heidi_roles = { - r.external_key - for assignment in heidi_assignments_after - for r in assignment.roles - } + heidi_roles = {r.external_key for assignment in heidi_assignments_after for r in assignment.roles} self.assertIn("library_contributor", heidi_roles) def test_unassign_and_reassign_subject(self): @@ -1141,7 +1129,5 @@ def test_unassign_and_reassign_subject(self): new_assignments = get_subject_role_assignments_in_scope(subject, new_scope) self.assertEqual(len(new_assignments), 1) - new_roles = { - r.external_key for assignment in new_assignments for r in assignment.roles - } + new_roles = {r.external_key for assignment in new_assignments for r in assignment.roles} self.assertIn("library_admin", new_roles) diff --git a/openedx_authz/tests/api/test_users.py b/openedx_authz/tests/api/test_users.py index 7c82639b..d470e8bb 100644 --- a/openedx_authz/tests/api/test_users.py +++ b/openedx_authz/tests/api/test_users.py @@ -234,18 +234,22 @@ def test_get_all_user_role_assignments_in_scope(self, scope_name, expected_assig # Test user with single role in single scope ("alice", ["lib:Org1:math_101"], {"library_admin"}), # Test user with multiple roles in different scopes - ("eve", ["lib:Org2:physics_401", "lib:Org2:chemistry_501", "lib:Org2:biology_601"], - {"library_admin", "library_author", "library_user"}), + ( + "eve", + ["lib:Org2:physics_401", "lib:Org2:chemistry_501", "lib:Org2:biology_601"], + {"library_admin", "library_author", "library_user"}, + ), # Test user with same role in multiple scopes ("liam", ["lib:Org4:art_101", "lib:Org4:art_201", "lib:Org4:art_301"], {"library_author"}), # Test user with multiple different roles in multiple scopes - ("peter", ["lib:Org6:project_alpha", "lib:Org6:project_beta", "lib:Org6:project_gamma", "lib:Org6:project_delta"], - {"library_admin", "library_author", "library_contributor", "library_user"}), + ( + "peter", + ["lib:Org6:project_alpha", "lib:Org6:project_beta", "lib:Org6:project_gamma", "lib:Org6:project_delta"], + {"library_admin", "library_author", "library_contributor", "library_user"}, + ), ) @unpack - def test_unassign_all_roles_from_user_removes_all_assignments( - self, username, scopes, expected_roles_before - ): + def test_unassign_all_roles_from_user_removes_all_assignments(self, username, scopes, expected_roles_before): """Test that unassign_all_roles_from_user removes all role assignments. Expected result: @@ -259,9 +263,7 @@ def test_unassign_all_roles_from_user_removes_all_assignments( self.assertGreater(len(assignments_before), 0) # Verify roles are what we expect before removal - roles_before = { - r.external_key for assignment in assignments_before for r in assignment.roles - } + roles_before = {r.external_key for assignment in assignments_before for r in assignment.roles} self.assertEqual(roles_before, expected_roles_before) # Verify assignments exist in each expected scope @@ -349,11 +351,7 @@ def test_unassign_all_roles_does_not_affect_other_users(self): self.assertEqual(len(heidi_assignments_after), len(heidi_assignments_before)) # Verify heidi still has the library_contributor role - heidi_roles = { - r.external_key - for assignment in heidi_assignments_after - for r in assignment.roles - } + heidi_roles = {r.external_key for assignment in heidi_assignments_after for r in assignment.roles} self.assertIn("library_contributor", heidi_roles) def test_unassign_and_reassign_user(self): @@ -383,21 +381,15 @@ def test_unassign_and_reassign_user(self): # Assign a new role in a new scope assign_result = assign_role_to_user_in_scope( - user_external_key=username, - role_external_key=new_role, - scope_external_key=new_scope + user_external_key=username, role_external_key=new_role, scope_external_key=new_scope ) self.assertTrue(assign_result) # Verify the new assignment works - new_assignments = get_user_role_assignments_in_scope( - user_external_key=username, scope_external_key=new_scope - ) + new_assignments = get_user_role_assignments_in_scope(user_external_key=username, scope_external_key=new_scope) self.assertEqual(len(new_assignments), 1) - new_roles = { - r.external_key for assignment in new_assignments for r in assignment.roles - } + new_roles = {r.external_key for assignment in new_assignments for r in assignment.roles} self.assertIn(new_role, new_roles) def test_unassign_all_roles_impacts_permissions(self): diff --git a/openedx_authz/tests/integration/test_handlers.py b/openedx_authz/tests/integration/test_handlers.py index ae679251..5d85b4ee 100644 --- a/openedx_authz/tests/integration/test_handlers.py +++ b/openedx_authz/tests/integration/test_handlers.py @@ -9,14 +9,14 @@ from django.contrib.auth import get_user_model from django.test import TestCase - from openedx.core.djangoapps.user_api.accounts.signals import USER_RETIRE_LMS_CRITICAL + +from openedx_authz.api.data import UserData from openedx_authz.api.users import ( assign_role_to_user_in_scope, get_user_role_assignments, get_user_role_assignments_in_scope, ) -from openedx_authz.api.data import UserData from openedx_authz.engine.enforcer import AuthzEnforcer from openedx_authz.models import ExtendedCasbinRule, Subject @@ -34,14 +34,10 @@ def setUp(self): """Set up test data with real users and role assignments.""" # Create real Django users for testing self.retiring_user = User.objects.create_user( - username='retiring_user', - email='retiring@example.com', - password='testpass123' + username="retiring_user", email="retiring@example.com", password="testpass123" ) self.other_user = User.objects.create_user( - username='other_user', - email='other@example.com', - password='testpass123' + username="other_user", email="other@example.com", password="testpass123" ) # Load enforcer policy @@ -51,7 +47,7 @@ def setUp(self): def tearDown(self): """Clean up test data.""" # Clean up users - User.objects.filter(username__in=['retiring_user', 'other_user']).delete() + User.objects.filter(username__in=["retiring_user", "other_user"]).delete() # Clear enforcer policy enforcer = AuthzEnforcer.get_enforcer() @@ -66,28 +62,12 @@ def test_user_retirement_signal_removes_all_role_assignments(self): - Other users' role assignments are unaffected """ # Assign roles to retiring user in multiple scopes - assign_role_to_user_in_scope( - self.retiring_user.username, - "library_admin", - "lib:TestOrg:lib1" - ) - assign_role_to_user_in_scope( - self.retiring_user.username, - "library_author", - "lib:TestOrg:lib2" - ) - assign_role_to_user_in_scope( - self.retiring_user.username, - "library_user", - "lib:TestOrg:lib3" - ) + assign_role_to_user_in_scope(self.retiring_user.username, "library_admin", "lib:TestOrg:lib1") + assign_role_to_user_in_scope(self.retiring_user.username, "library_author", "lib:TestOrg:lib2") + assign_role_to_user_in_scope(self.retiring_user.username, "library_user", "lib:TestOrg:lib3") # Assign role to other user - assign_role_to_user_in_scope( - self.other_user.username, - "library_admin", - "lib:TestOrg:lib4" - ) + assign_role_to_user_in_scope(self.other_user.username, "library_admin", "lib:TestOrg:lib4") # Verify users have roles before retirement retiring_user_roles_before = get_user_role_assignments(self.retiring_user.username) @@ -97,10 +77,7 @@ def test_user_retirement_signal_removes_all_role_assignments(self): self.assertEqual(len(other_user_roles_before), 1) # Send the retirement signal - USER_RETIRE_LMS_CRITICAL.send( - sender=User, - user=self.retiring_user - ) + USER_RETIRE_LMS_CRITICAL.send(sender=User, user=self.retiring_user) # Verify roles are removed for retiring user but not other user retiring_user_roles_after = get_user_role_assignments(self.retiring_user.username) @@ -119,9 +96,7 @@ def test_user_retirement_signal_with_no_roles(self): """ # Create user with no role assignments user_no_roles = User.objects.create_user( - username='user_no_roles', - email='noroles@example.com', - password='testpass123' + username="user_no_roles", email="noroles@example.com", password="testpass123" ) # Verify user has no roles @@ -129,10 +104,7 @@ def test_user_retirement_signal_with_no_roles(self): self.assertEqual(len(user_roles_before), 0) # Send retirement signal - should not raise error - USER_RETIRE_LMS_CRITICAL.send( - sender=User, - user=user_no_roles - ) + USER_RETIRE_LMS_CRITICAL.send(sender=User, user=user_no_roles) # Verify still no roles user_roles_after = get_user_role_assignments(user_no_roles.username) @@ -150,16 +122,8 @@ def test_user_retirement_removes_extended_casbin_rules(self): - This ensures complete cleanup including database integrity """ # Assign roles which should create ExtendedCasbinRule records - assign_role_to_user_in_scope( - self.retiring_user.username, - "library_admin", - "lib:TestOrg:cleanup1" - ) - assign_role_to_user_in_scope( - self.retiring_user.username, - "library_author", - "lib:TestOrg:cleanup2" - ) + assign_role_to_user_in_scope(self.retiring_user.username, "library_admin", "lib:TestOrg:cleanup1") + assign_role_to_user_in_scope(self.retiring_user.username, "library_author", "lib:TestOrg:cleanup2") # Get the subject to check ExtendedCasbinRule records user_data = UserData(external_key=self.retiring_user.username) @@ -170,10 +134,7 @@ def test_user_retirement_removes_extended_casbin_rules(self): self.assertGreater(extended_rules_before.count(), 0) # Send retirement signal - USER_RETIRE_LMS_CRITICAL.send( - sender=User, - user=self.retiring_user - ) + USER_RETIRE_LMS_CRITICAL.send(sender=User, user=self.retiring_user) # Verify ExtendedCasbinRule records are cleaned up extended_rules_after = ExtendedCasbinRule.objects.filter(subject=subject) @@ -187,43 +148,26 @@ def test_user_retirement_with_multiple_scopes_same_role(self): - After retirement, all assignments across all scopes are removed - Role assignments are completely cleared """ - scopes = [ - "lib:Org1:scope1", - "lib:Org2:scope2", - "lib:Org3:scope3" - ] + scopes = ["lib:Org1:scope1", "lib:Org2:scope2", "lib:Org3:scope3"] # Assign same role in multiple scopes for scope in scopes: - assign_role_to_user_in_scope( - self.retiring_user.username, - "library_admin", - scope - ) + assign_role_to_user_in_scope(self.retiring_user.username, "library_admin", scope) # Verify assignments in each scope before retirement for scope in scopes: - assignments = get_user_role_assignments_in_scope( - self.retiring_user.username, - scope - ) + assignments = get_user_role_assignments_in_scope(self.retiring_user.username, scope) self.assertEqual(len(assignments), 1) total_assignments_before = get_user_role_assignments(self.retiring_user.username) self.assertEqual(len(total_assignments_before), 3) # Send retirement signal - USER_RETIRE_LMS_CRITICAL.send( - sender=User, - user=self.retiring_user - ) + USER_RETIRE_LMS_CRITICAL.send(sender=User, user=self.retiring_user) # Verify all assignments removed from all scopes for scope in scopes: - assignments = get_user_role_assignments_in_scope( - self.retiring_user.username, - scope - ) + assignments = get_user_role_assignments_in_scope(self.retiring_user.username, scope) self.assertEqual(len(assignments), 0) total_assignments_after = get_user_role_assignments(self.retiring_user.username) @@ -246,32 +190,18 @@ def test_user_retirement_with_mixed_role_types(self): # Assign different roles in different scopes for role, scope in role_scope_pairs: - assign_role_to_user_in_scope( - self.retiring_user.username, - role, - scope - ) + assign_role_to_user_in_scope(self.retiring_user.username, role, scope) # Verify all assignments exist total_assignments_before = get_user_role_assignments(self.retiring_user.username) self.assertEqual(len(total_assignments_before), 4) # Extract role types to verify diversity - roles_before = { - r.external_key - for assignment in total_assignments_before - for r in assignment.roles - } - self.assertEqual( - roles_before, - {"library_admin", "library_author", "library_contributor", "library_user"} - ) + roles_before = {r.external_key for assignment in total_assignments_before for r in assignment.roles} + self.assertEqual(roles_before, {"library_admin", "library_author", "library_contributor", "library_user"}) # Send retirement signal - USER_RETIRE_LMS_CRITICAL.send( - sender=User, - user=self.retiring_user - ) + USER_RETIRE_LMS_CRITICAL.send(sender=User, user=self.retiring_user) # Verify all roles removed total_assignments_after = get_user_role_assignments(self.retiring_user.username) @@ -287,29 +217,13 @@ def test_multiple_user_retirements_do_not_interfere(self): - No cross-contamination between user retirements """ # Create additional users - user1 = User.objects.create_user( - username='retire_test_1', - email='retire1@example.com', - password='testpass123' - ) - user2 = User.objects.create_user( - username='retire_test_2', - email='retire2@example.com', - password='testpass123' - ) - user3 = User.objects.create_user( - username='retire_test_3', - email='retire3@example.com', - password='testpass123' - ) + user1 = User.objects.create_user(username="retire_test_1", email="retire1@example.com", password="testpass123") + user2 = User.objects.create_user(username="retire_test_2", email="retire2@example.com", password="testpass123") + user3 = User.objects.create_user(username="retire_test_3", email="retire3@example.com", password="testpass123") # Assign roles to each user for user in [user1, user2, user3]: - assign_role_to_user_in_scope( - user.username, - "library_admin", - f"lib:TestOrg:{user.username}_scope" - ) + assign_role_to_user_in_scope(user.username, "library_admin", f"lib:TestOrg:{user.username}_scope") # Verify all users have assignments for user in [user1, user2, user3]: From 59ab46b25fc6f8e074541c241637adfff913ea72 Mon Sep 17 00:00:00 2001 From: Maria Grimaldi Date: Thu, 6 Nov 2025 12:39:49 +0100 Subject: [PATCH 16/20] refactor: address quality issues after rebase --- openedx_authz/api/roles.py | 1 - openedx_authz/handlers.py | 7 ++++--- openedx_authz/tests/api/test_roles.py | 1 - openedx_authz/tests/integration/test_handlers.py | 2 +- 4 files changed, 5 insertions(+), 6 deletions(-) diff --git a/openedx_authz/api/roles.py b/openedx_authz/api/roles.py index be5758f5..60e281b5 100644 --- a/openedx_authz/api/roles.py +++ b/openedx_authz/api/roles.py @@ -419,7 +419,6 @@ def get_scopes_for_subject_and_permission( if permission in role.permissions and role_assignment.scope not in scopes: scopes.append(role_assignment.scope) return scopes - return [SubjectData(namespaced_key=policy[GroupingPolicyIndex.SUBJECT.value]) for policy in policies] def unassign_subject_from_all_roles(subject: SubjectData) -> bool: diff --git a/openedx_authz/handlers.py b/openedx_authz/handlers.py index f0129499..7701123e 100644 --- a/openedx_authz/handlers.py +++ b/openedx_authz/handlers.py @@ -56,8 +56,9 @@ def delete_casbin_rule_on_extended_rule_deletion(sender, instance, **kwargs): # ) -def unassign_roles_on_user_retirement(sender, user, **kwargs): - """Unassign roles from a user when they are retired. +def unassign_roles_on_user_retirement(sender, user, **kwargs): # pylint: disable=unused-argument + """ + Unassign roles from a user when they are retired. This handler is triggered when a user is retired in the LMS. It ensures that any roles assigned to the user are removed, maintaining the integrity of the @@ -70,7 +71,7 @@ def unassign_roles_on_user_retirement(sender, user, **kwargs): """ try: unassign_all_roles_from_user(user.username) - except Exception as exc: + except Exception as exc: # pylint: disable=broad-exception-caught logger.exception( "Error unassigning roles from user %s during retirement", user.id, diff --git a/openedx_authz/tests/api/test_roles.py b/openedx_authz/tests/api/test_roles.py index 5da438c1..83163c51 100644 --- a/openedx_authz/tests/api/test_roles.py +++ b/openedx_authz/tests/api/test_roles.py @@ -1105,7 +1105,6 @@ def test_unassign_and_reassign_subject(self): - Newly assigned roles work correctly """ subject = SubjectData(external_key="bob") - original_scope = ScopeData(external_key="lib:Org1:history_201") new_scope = ScopeData(external_key="lib:Org1:new_library") new_role = RoleData(external_key="library_admin") diff --git a/openedx_authz/tests/integration/test_handlers.py b/openedx_authz/tests/integration/test_handlers.py index 5d85b4ee..31642c90 100644 --- a/openedx_authz/tests/integration/test_handlers.py +++ b/openedx_authz/tests/integration/test_handlers.py @@ -9,7 +9,7 @@ from django.contrib.auth import get_user_model from django.test import TestCase -from openedx.core.djangoapps.user_api.accounts.signals import USER_RETIRE_LMS_CRITICAL +from openedx.core.djangoapps.user_api.accounts.signals import USER_RETIRE_LMS_CRITICAL # pylint: disable=import-error from openedx_authz.api.data import UserData from openedx_authz.api.users import ( From 282c6b678c074d84a705ec812daff6b5e339dce2 Mon Sep 17 00:00:00 2001 From: Maria Grimaldi Date: Thu, 13 Nov 2025 14:19:11 +0100 Subject: [PATCH 17/20] refactor: drop load policy in favor of cache invalidation --- openedx_authz/api/roles.py | 1 - 1 file changed, 1 deletion(-) diff --git a/openedx_authz/api/roles.py b/openedx_authz/api/roles.py index 60e281b5..e5da5abc 100644 --- a/openedx_authz/api/roles.py +++ b/openedx_authz/api/roles.py @@ -431,5 +431,4 @@ def unassign_subject_from_all_roles(subject: SubjectData) -> bool: bool: True if any roles were removed, False otherwise. """ enforcer = AuthzEnforcer.get_enforcer() - enforcer.load_policy() return enforcer.remove_filtered_grouping_policy(GroupingPolicyIndex.SUBJECT.value, subject.namespaced_key) From cbec3e6dc93f8da9dac660d4367cbd461309f153 Mon Sep 17 00:00:00 2001 From: Maria Grimaldi Date: Fri, 14 Nov 2025 14:10:50 +0100 Subject: [PATCH 18/20] refactor: (temp) use no-index by default to avoid duplicate warnings --- docs/conf.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/docs/conf.py b/docs/conf.py index c64d714c..75e02d1a 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -557,6 +557,12 @@ def on_init(app): # pylint: disable=unused-argument # If we are, assemble the path manually bin_path = os.path.abspath(os.path.join(sys.prefix, "bin")) apidoc_path = os.path.join(bin_path, apidoc_path) + + # Set SPHINX_APIDOC_OPTIONS to add :no-index: to generated automodule directives + # This prevents duplicate object warnings for re-exported API members + env = os.environ.copy() + env['SPHINX_APIDOC_OPTIONS'] = 'members,show-inheritance,undoc-members,no-index' + check_call( [ apidoc_path, @@ -565,7 +571,8 @@ def on_init(app): # pylint: disable=unused-argument os.path.join(root_path, "openedx_authz"), os.path.join(root_path, "openedx_authz/migrations"), os.path.join(root_path, "openedx_authz/tests"), - ] + ], + env=env ) From 41da28b7388d7c4a6a08571d972ea350ed581464 Mon Sep 17 00:00:00 2001 From: Maria Grimaldi Date: Fri, 14 Nov 2025 17:44:48 +0100 Subject: [PATCH 19/20] docs: update changelog --- CHANGELOG.rst | 8 ++++++++ openedx_authz/__init__.py | 2 +- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 3d2045ad..a37f898b 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -16,6 +16,14 @@ Unreleased * +0.17.0 - 2025-11-14 +******************** + +Added +===== + +* Signal to clear policies associated to a user when they are retired. + 0.16.0 - 2025-11-13 ******************** diff --git a/openedx_authz/__init__.py b/openedx_authz/__init__.py index 5c417147..4e10cf73 100644 --- a/openedx_authz/__init__.py +++ b/openedx_authz/__init__.py @@ -4,6 +4,6 @@ import os -__version__ = "0.16.0" +__version__ = "0.17.0" ROOT_DIRECTORY = os.path.dirname(os.path.abspath(__file__)) From 737c07aec3482297943887e62fd9c15e073a7199 Mon Sep 17 00:00:00 2001 From: Maria Grimaldi Date: Fri, 14 Nov 2025 17:52:33 +0100 Subject: [PATCH 20/20] refactor: consider later changes in namespaces --- openedx_authz/tests/api/test_users.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openedx_authz/tests/api/test_users.py b/openedx_authz/tests/api/test_users.py index d470e8bb..64f3848c 100644 --- a/openedx_authz/tests/api/test_users.py +++ b/openedx_authz/tests/api/test_users.py @@ -402,7 +402,7 @@ def test_unassign_all_roles_impacts_permissions(self): """ username = "alice" scope = "lib:Org1:math_101" - action = "delete_library" # library_admin permission + action = permissions.DELETE_LIBRARY.identifier # Verify alice has the permission before unassignment has_permission_before = is_user_allowed(