From 3723ebdae139468c9b75776f28c950923ba3141c Mon Sep 17 00:00:00 2001 From: Rodrigo Mendez Date: Mon, 3 Nov 2025 16:53:27 -0600 Subject: [PATCH 1/3] feat: Migration for legacy library permissions --- CHANGELOG.rst | 8 + openedx_authz/engine/utils.py | 82 ++++++++++ .../0005_migrate_legacy_permissions.py | 61 ++++++++ openedx_authz/tests/stubs/models.py | 45 +++++- openedx_authz/tests/test_migrations.py | 147 ++++++++++++++++++ 5 files changed, 342 insertions(+), 1 deletion(-) create mode 100644 openedx_authz/migrations/0005_migrate_legacy_permissions.py create mode 100644 openedx_authz/tests/test_migrations.py diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 5b473d88..fc9b01d6 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -46,6 +46,14 @@ Added * Register ``CasbinRule`` model in the Django admin. * Register ``ExtendedCasbinRule`` model in the Django admin as an inline model of ``CasbinRule``. +0.16.0 - 2025-11-14 +******************** + +Added +===== + +* Migration to transfer legacy permissions from ContentLibraryPermission to the new Casbin-based authorization model. + 0.15.0 - 2025-11-11 ******************** diff --git a/openedx_authz/engine/utils.py b/openedx_authz/engine/utils.py index b6d4284d..94fc9180 100644 --- a/openedx_authz/engine/utils.py +++ b/openedx_authz/engine/utils.py @@ -8,6 +8,9 @@ from casbin import Enforcer +from openedx_authz.api.users import assign_role_to_user_in_scope, batch_assign_role_to_users_in_scope +from openedx_authz.constants.roles import LIBRARY_ADMIN, LIBRARY_AUTHOR, LIBRARY_USER + logger = logging.getLogger(__name__) GROUPING_POLICY_PTYPES = ["g", "g2", "g3", "g4", "g5", "g6"] @@ -69,3 +72,82 @@ def migrate_policy_between_enforcers( except Exception as e: logger.error(f"Error loading policies from file: {e}") raise + + +def migrate_legacy_permissions(ContentLibraryPermission): + """ + Migrate legacy permission data to the new Casbin-based authorization model. + This function reads legacy permissions from the ContentLibraryPermission model + and assigns equivalent roles in the new authorization system. + + The old Library permissions are stored in the ContentLibraryPermission model, it consists of the following columns: + + - library: FK to ContentLibrary + - user: optional FK to User + - group: optional FK to Group + - access_level: 'admin' | 'author' | 'read' + + In the new Authz model, this would roughly translate to: + + - library: scope + - user: subject + - access_level: role + + Now, we don't have an equivalent concept to "Group", for this we will go through the users in the group and assign + roles independently. + + param ContentLibraryPermission: The ContentLibraryPermission model to use. + """ + + legacy_permissions = ContentLibraryPermission.objects.select_related( + "library", "library__org", "user", "group" + ).all() + + # List to keep track of any permissions that could not be migrated + permissions_with_errors = [] + + for permission in legacy_permissions: + # Migrate the permission to the new model + + # Derive equivalent role based on access level + access_level_to_role = { + "admin": LIBRARY_ADMIN, + "author": LIBRARY_AUTHOR, + "read": LIBRARY_USER, + } + + role = access_level_to_role.get(permission.access_level) + if role is None: + # This should not happen as there are no more access_levels defined + # in ContentLibraryPermission, log and skip + logger.error(f"Unknown access level: {permission.access_level} for User: {permission.user}") + permissions_with_errors.append(permission) + continue + + # Generating scope based on library identifier + scope = f"lib:{permission.library.org.name}:{permission.library.slug}" + + if permission.group: + # Permission applied to a group + users = [user.username for user in permission.group.user_set.all()] + logger.info( + f"Migrating permissions for Users: {users} in Group: {permission.group.name} " + f"to Role: {role.external_key} in Scope: {scope}" + ) + batch_assign_role_to_users_in_scope( + users=users, role_external_key=role.external_key, scope_external_key=scope + ) + else: + # Permission applied to individual user + logger.info( + f"Migrating permission for User: {permission.user.username} " + f"to Role: {role.external_key} in Scope: {scope}" + ) + + assign_role_to_user_in_scope( + user_external_key=permission.user.username, + role_external_key=role.external_key, + scope_external_key=scope, + ) + + return permissions_with_errors diff --git a/openedx_authz/migrations/0005_migrate_legacy_permissions.py b/openedx_authz/migrations/0005_migrate_legacy_permissions.py new file mode 100644 index 00000000..95a7f97d --- /dev/null +++ b/openedx_authz/migrations/0005_migrate_legacy_permissions.py @@ -0,0 +1,61 @@ +# Generated by Django 5.2.7 on 2025-11-03 20:39 + +import logging + +from django.db import migrations + +from openedx_authz.engine.utils import migrate_legacy_permissions + +logger = logging.getLogger(__name__) + + +def _log_migration_errors(permissions_with_errors: list) -> None: + """ + Log the permissions that could not be migrated during the migration process. + Args: + permissions_with_errors (list): List of ContentLibraryPermission instances that failed to migrate. + """ + logger.error( + f"Migration completed with errors for {len(permissions_with_errors)} permissions.\n" + "The following permissions could not be migrated:" + ) + for permission in permissions_with_errors: + logger.error( + "Access level: %s, %sLibrary: %s", + permission.access_level, + f"User: {permission.user.username}, " if permission.user else f"Group: {permission.group.name}, ", + permission.library.slug, + ) + + +def apply_migrate_legacy_permissions(apps, schema_editor): + """ + Wrapper to run the migration using the historical version of the ContentLibraryPermission model. + """ + # ContentLibraryPermission model from the content_libraries app, here is where the legacy permissions are stored + try: + ContentLibraryPermission = apps.get_model("content_libraries", "ContentLibraryPermission") + except LookupError: + # Don't run the migration where the content_libraries app is not installed, like during development. + logger.warning("ContentLibraryPermission model not found. Skipping migration.") + return + + permissions_with_errors = migrate_legacy_permissions(ContentLibraryPermission) + + if permissions_with_errors: + _log_migration_errors(permissions_with_errors) + + +class Migration(migrations.Migration): + """ + Migration to transfer legacy permissions from ContentLibraryPermission + to the new Casbin-based authorization model. + """ + + dependencies = [ + ("openedx_authz", "0004_contentlibraryscope"), + ] + + operations = [ + migrations.RunPython(apply_migrate_legacy_permissions), + ] diff --git a/openedx_authz/tests/stubs/models.py b/openedx_authz/tests/stubs/models.py index 97010e44..def5a7aa 100644 --- a/openedx_authz/tests/stubs/models.py +++ b/openedx_authz/tests/stubs/models.py @@ -4,10 +4,25 @@ referenced in FK relationships without requiring the full application context. """ +from django.conf import settings +from django.contrib.auth.models import Group from django.db import models from opaque_keys.edx.locator import LibraryLocatorV2 +class Organization(models.Model): + """Stub model representing an organization for testing purposes. + + .. no_pii: + """ + + name = models.CharField(max_length=255) + short_name = models.CharField(max_length=100) + + def __str__(self): + return str(self.name) + + class ContentLibraryManager(models.Manager): """Manager for ContentLibrary model with helper methods.""" @@ -38,9 +53,37 @@ class ContentLibrary(models.Model): locator = models.CharField(max_length=255, unique=True, db_index=True) title = models.CharField(max_length=255, blank=True, null=True) + slug = models.SlugField(allow_unicode=True) + org = models.ForeignKey(Organization, on_delete=models.PROTECT, null=True) created_at = models.DateTimeField(auto_now_add=True) objects = ContentLibraryManager() def __str__(self): - return self.locator + return str(self.locator) + + +# Legacy permission models for testing purposes +class ContentLibraryPermission(models.Model): + """Stub model representing legacy content library permissions for testing purposes. + + .. no_pii: + """ + + ADMIN_LEVEL = "admin" + AUTHOR_LEVEL = "author" + READ_LEVEL = "read" + ACCESS_LEVEL_CHOICES = ( + (ADMIN_LEVEL, "Administer users and author content"), + (AUTHOR_LEVEL, "Author content"), + (READ_LEVEL, "Read-only"), + ) + + library = models.ForeignKey(ContentLibrary, on_delete=models.CASCADE) + user = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE, null=True, blank=True) + group = models.ForeignKey(Group, on_delete=models.CASCADE, null=True, blank=True) + access_level = models.CharField(max_length=30, choices=ACCESS_LEVEL_CHOICES) + + def __str__(self): + who = self.user.username if self.user else self.group.name + return f"ContentLibraryPermission ({self.access_level} for {who})" diff --git a/openedx_authz/tests/test_migrations.py b/openedx_authz/tests/test_migrations.py new file mode 100644 index 00000000..ad8b1148 --- /dev/null +++ b/openedx_authz/tests/test_migrations.py @@ -0,0 +1,147 @@ +"""Unit Tests for openedx_authz migrations.""" + +from django.contrib.auth import get_user_model +from django.contrib.auth.models import Group +from django.test import TestCase + +from openedx_authz.api.users import batch_unassign_role_from_users, get_user_role_assignments_in_scope +from openedx_authz.constants.roles import LIBRARY_ADMIN, LIBRARY_USER +from openedx_authz.engine.enforcer import AuthzEnforcer +from openedx_authz.engine.utils import migrate_legacy_permissions +from openedx_authz.tests.stubs.models import ContentLibrary, ContentLibraryPermission, Organization + +User = get_user_model() + +# Specify a unique prefix to avoid collisions with existing data +OBJECT_PREFIX = "tmlp_" + +org_name = f"{OBJECT_PREFIX}org" +lib_name = f"{OBJECT_PREFIX}library" +group_name = f"{OBJECT_PREFIX}test_group" +user_names = [f"{OBJECT_PREFIX}user{i}" for i in range(3)] +group_user_names = [f"{OBJECT_PREFIX}guser{i}" for i in range(3)] +error_user_name = f"{OBJECT_PREFIX}error_user" +error_group_name = f"{OBJECT_PREFIX}error_group" +empty_group_name = f"{OBJECT_PREFIX}empty_group" + + +class TestLegacyPermissionsMigration(TestCase): + """Test cases for migrating legacy permissions.""" + + def setUp(self): + """ + Set up test data: + + What this does: + 1. Creates an Org and a ContentLibrary + 2. Create Users and Groups + 3. Assign legacy permissions using ContentLibraryPermission + 4. Create invalid permissions for user and group + """ + # Create ContentLibrary + + org = Organization.objects.create(name=org_name, short_name=org_name) + library = ContentLibrary.objects.create(org=org, slug=lib_name) + + # Create Users and Groups + users = [ + User.objects.create_user(username=user_name, email=f"{user_name}@example.com") for user_name in user_names + ] + + group_users = [ + User.objects.create_user(username=user_name, email=f"{user_name}@example.com") + for user_name in group_user_names + ] + group = Group.objects.create(name=group_name) + group.user_set.set(group_users) + + error_user = User.objects.create_user(username=error_user_name, email=f"{error_user_name}@example.com") + error_group = Group.objects.create(name=error_group_name) + error_group.user_set.set([error_user]) + + empty_group = Group.objects.create(name=empty_group_name) + + # Assign legacy permissions for users and group + for user in users: + ContentLibraryPermission.objects.create( + user=user, + library=library, + access_level=ContentLibraryPermission.ADMIN_LEVEL, + ) + + ContentLibraryPermission.objects.create( + group=group, + library=library, + access_level=ContentLibraryPermission.READ_LEVEL, + ) + + # Create invalid permissions for testing error logging + ContentLibraryPermission.objects.create( + user=error_user, + library=library, + access_level="invalid", + ) + ContentLibraryPermission.objects.create( + group=error_group, + library=library, + access_level="invalid", + ) + + # Edge case: empty group with no users + ContentLibraryPermission.objects.create( + group=empty_group, + library=library, + access_level=ContentLibraryPermission.READ_LEVEL, + ) + + def tearDown(self): + """ + Clean up test data created for the migration test. + """ + super().tearDown() + + AuthzEnforcer.get_enforcer().load_policy() + batch_unassign_role_from_users( + users=user_names, + role_external_key=LIBRARY_ADMIN.external_key, + scope_external_key=f"lib:{org_name}:{lib_name}", + ) + batch_unassign_role_from_users( + users=group_user_names, + role_external_key=LIBRARY_USER.external_key, + scope_external_key=f"lib:{org_name}:{lib_name}", + ) + + ContentLibrary.objects.filter(slug=lib_name).delete() + Organization.objects.filter(name=org_name).delete() + Group.objects.filter(name=group_name).delete() + Group.objects.filter(name=error_group_name).delete() + Group.objects.filter(name=empty_group_name).delete() + for user_name in user_names + group_user_names + [error_user_name]: + User.objects.filter(username=user_name).delete() + + def test_migration(self): + """Test the migration of legacy permissions. + 1. Rus the migration to migrate legacy permissions. + 2. Check that each user has the expected role in the new model. + 3. Check that the group users have the expected role in the new model. + 4. Check that invalid permissions were identified correctly as errors. + """ + + permissions_with_errors = migrate_legacy_permissions(ContentLibraryPermission) + + AuthzEnforcer.get_enforcer().load_policy() + for user_name in user_names: + assignments = get_user_role_assignments_in_scope( + user_external_key=user_name, scope_external_key=f"lib:{org_name}:{lib_name}" + ) + self.assertEqual(len(assignments), 1) + self.assertEqual(assignments[0].roles[0], LIBRARY_ADMIN) + for group_user_name in group_user_names: + assignments = get_user_role_assignments_in_scope( + user_external_key=group_user_name, scope_external_key=f"lib:{org_name}:{lib_name}" + ) + self.assertEqual(len(assignments), 1) + self.assertEqual(assignments[0].roles[0], LIBRARY_USER) + + self.assertEqual(len(permissions_with_errors), 2) From e3af38a205d7d71c84aeda06d5ef78998e2c65c1 Mon Sep 17 00:00:00 2001 From: Rodrigo Mendez Date: Fri, 14 Nov 2025 10:03:44 -0600 Subject: [PATCH 2/3] squash!: Fix conflicts --- CHANGELOG.rst | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index fc9b01d6..ea83d158 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -31,6 +31,7 @@ Added ===== * Signal to clear policies associated to a user when they are retired. +* Migration to transfer legacy permissions from ContentLibraryPermission to the new Casbin-based authorization model. 0.16.0 - 2025-11-13 ******************** @@ -46,14 +47,6 @@ Added * Register ``CasbinRule`` model in the Django admin. * Register ``ExtendedCasbinRule`` model in the Django admin as an inline model of ``CasbinRule``. -0.16.0 - 2025-11-14 -******************** - -Added -===== - -* Migration to transfer legacy permissions from ContentLibraryPermission to the new Casbin-based authorization model. - 0.15.0 - 2025-11-11 ******************** From c8d40ab8644fc866316443a7a8568c16be90fa4a Mon Sep 17 00:00:00 2001 From: Rodrigo Mendez Date: Mon, 17 Nov 2025 08:53:18 -0600 Subject: [PATCH 3/3] squash!: Update version --- CHANGELOG.rst | 9 ++++++++- openedx_authz/__init__.py | 2 +- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index ea83d158..f119f409 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -16,6 +16,14 @@ Unreleased * +0.18.0 - 2025-11-17 +******************** + +Added +===== + +* Migration to transfer legacy permissions from ContentLibraryPermission to the new Casbin-based authorization model. + 0.17.1 - 2025-11-14 ******************** @@ -31,7 +39,6 @@ Added ===== * Signal to clear policies associated to a user when they are retired. -* Migration to transfer legacy permissions from ContentLibraryPermission to the new Casbin-based authorization model. 0.16.0 - 2025-11-13 ******************** diff --git a/openedx_authz/__init__.py b/openedx_authz/__init__.py index db0b2199..01796def 100644 --- a/openedx_authz/__init__.py +++ b/openedx_authz/__init__.py @@ -4,6 +4,6 @@ import os -__version__ = "0.17.1" +__version__ = "0.18.0" ROOT_DIRECTORY = os.path.dirname(os.path.abspath(__file__))