Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ Unreleased

*

0.18.0 - 2025-11-17
********************

Added
=====

* Migration to transfer legacy permissions from ContentLibraryPermission to the new Casbin-based authorization model.

0.17.1 - 2025-11-14
********************

Expand Down
2 changes: 1 addition & 1 deletion openedx_authz/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@

import os

__version__ = "0.17.1"
__version__ = "0.18.0"

ROOT_DIRECTORY = os.path.dirname(os.path.abspath(__file__))
82 changes: 82 additions & 0 deletions openedx_authz/engine/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@

from casbin import Enforcer

from openedx_authz.api.users import assign_role_to_user_in_scope, batch_assign_role_to_users_in_scope
from openedx_authz.constants.roles import LIBRARY_ADMIN, LIBRARY_AUTHOR, LIBRARY_USER

logger = logging.getLogger(__name__)

GROUPING_POLICY_PTYPES = ["g", "g2", "g3", "g4", "g5", "g6"]
Expand Down Expand Up @@ -69,3 +72,82 @@ def migrate_policy_between_enforcers(
except Exception as e:
logger.error(f"Error loading policies from file: {e}")
raise


def migrate_legacy_permissions(ContentLibraryPermission):
"""
Migrate legacy permission data to the new Casbin-based authorization model.
This function reads legacy permissions from the ContentLibraryPermission model
and assigns equivalent roles in the new authorization system.

The old Library permissions are stored in the ContentLibraryPermission model, it consists of the following columns:

- library: FK to ContentLibrary
- user: optional FK to User
- group: optional FK to Group
- access_level: 'admin' | 'author' | 'read'

In the new Authz model, this would roughly translate to:

- library: scope
- user: subject
- access_level: role

Now, we don't have an equivalent concept to "Group", for this we will go through the users in the group and assign
roles independently.

param ContentLibraryPermission: The ContentLibraryPermission model to use.
"""

legacy_permissions = ContentLibraryPermission.objects.select_related(
"library", "library__org", "user", "group"
).all()

# List to keep track of any permissions that could not be migrated
permissions_with_errors = []

for permission in legacy_permissions:
# Migrate the permission to the new model

# Derive equivalent role based on access level
access_level_to_role = {
"admin": LIBRARY_ADMIN,
"author": LIBRARY_AUTHOR,
"read": LIBRARY_USER,
}

role = access_level_to_role.get(permission.access_level)
if role is None:
# This should not happen as there are no more access_levels defined
# in ContentLibraryPermission, log and skip
logger.error(f"Unknown access level: {permission.access_level} for User: {permission.user}")
permissions_with_errors.append(permission)
continue

# Generating scope based on library identifier
scope = f"lib:{permission.library.org.name}:{permission.library.slug}"

if permission.group:
# Permission applied to a group
users = [user.username for user in permission.group.user_set.all()]
logger.info(
f"Migrating permissions for Users: {users} in Group: {permission.group.name} "
f"to Role: {role.external_key} in Scope: {scope}"
)
batch_assign_role_to_users_in_scope(
users=users, role_external_key=role.external_key, scope_external_key=scope
)
else:
# Permission applied to individual user
logger.info(
f"Migrating permission for User: {permission.user.username} "
f"to Role: {role.external_key} in Scope: {scope}"
)

assign_role_to_user_in_scope(
user_external_key=permission.user.username,
role_external_key=role.external_key,
scope_external_key=scope,
)

return permissions_with_errors
61 changes: 61 additions & 0 deletions openedx_authz/migrations/0005_migrate_legacy_permissions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Generated by Django 5.2.7 on 2025-11-03 20:39

import logging

from django.db import migrations

from openedx_authz.engine.utils import migrate_legacy_permissions

logger = logging.getLogger(__name__)


def _log_migration_errors(permissions_with_errors: list) -> None:
"""
Log the permissions that could not be migrated during the migration process.
Args:
permissions_with_errors (list): List of ContentLibraryPermission instances that failed to migrate.
"""
logger.error(
f"Migration completed with errors for {len(permissions_with_errors)} permissions.\n"
"The following permissions could not be migrated:"
)
for permission in permissions_with_errors:
logger.error(
"Access level: %s, %sLibrary: %s",
permission.access_level,
f"User: {permission.user.username}, " if permission.user else f"Group: {permission.group.name}, ",
permission.library.slug,
)


def apply_migrate_legacy_permissions(apps, schema_editor):
"""
Wrapper to run the migration using the historical version of the ContentLibraryPermission model.
"""
# ContentLibraryPermission model from the content_libraries app, here is where the legacy permissions are stored
try:
ContentLibraryPermission = apps.get_model("content_libraries", "ContentLibraryPermission")
except LookupError:
# Don't run the migration where the content_libraries app is not installed, like during development.
logger.warning("ContentLibraryPermission model not found. Skipping migration.")
return

permissions_with_errors = migrate_legacy_permissions(ContentLibraryPermission)

if permissions_with_errors:
_log_migration_errors(permissions_with_errors)


class Migration(migrations.Migration):
"""
Migration to transfer legacy permissions from ContentLibraryPermission
to the new Casbin-based authorization model.
"""

dependencies = [
("openedx_authz", "0004_contentlibraryscope"),
]

operations = [
migrations.RunPython(apply_migrate_legacy_permissions),
]
45 changes: 44 additions & 1 deletion openedx_authz/tests/stubs/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,25 @@
referenced in FK relationships without requiring the full application context.
"""

from django.conf import settings
from django.contrib.auth.models import Group
from django.db import models
from opaque_keys.edx.locator import LibraryLocatorV2


class Organization(models.Model):
"""Stub model representing an organization for testing purposes.

.. no_pii:
"""

name = models.CharField(max_length=255)
short_name = models.CharField(max_length=100)

def __str__(self):
return str(self.name)


class ContentLibraryManager(models.Manager):
"""Manager for ContentLibrary model with helper methods."""

Expand Down Expand Up @@ -38,9 +53,37 @@ class ContentLibrary(models.Model):

locator = models.CharField(max_length=255, unique=True, db_index=True)
title = models.CharField(max_length=255, blank=True, null=True)
slug = models.SlugField(allow_unicode=True)
org = models.ForeignKey(Organization, on_delete=models.PROTECT, null=True)
created_at = models.DateTimeField(auto_now_add=True)

objects = ContentLibraryManager()

def __str__(self):
return self.locator
return str(self.locator)


# Legacy permission models for testing purposes
class ContentLibraryPermission(models.Model):
"""Stub model representing legacy content library permissions for testing purposes.

.. no_pii:
"""

ADMIN_LEVEL = "admin"
AUTHOR_LEVEL = "author"
READ_LEVEL = "read"
ACCESS_LEVEL_CHOICES = (
(ADMIN_LEVEL, "Administer users and author content"),
(AUTHOR_LEVEL, "Author content"),
(READ_LEVEL, "Read-only"),
)

library = models.ForeignKey(ContentLibrary, on_delete=models.CASCADE)
user = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE, null=True, blank=True)
group = models.ForeignKey(Group, on_delete=models.CASCADE, null=True, blank=True)
access_level = models.CharField(max_length=30, choices=ACCESS_LEVEL_CHOICES)

def __str__(self):
who = self.user.username if self.user else self.group.name
return f"ContentLibraryPermission ({self.access_level} for {who})"
147 changes: 147 additions & 0 deletions openedx_authz/tests/test_migrations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
"""Unit Tests for openedx_authz migrations."""

from django.contrib.auth import get_user_model
from django.contrib.auth.models import Group
from django.test import TestCase

from openedx_authz.api.users import batch_unassign_role_from_users, get_user_role_assignments_in_scope
from openedx_authz.constants.roles import LIBRARY_ADMIN, LIBRARY_USER
from openedx_authz.engine.enforcer import AuthzEnforcer
from openedx_authz.engine.utils import migrate_legacy_permissions
from openedx_authz.tests.stubs.models import ContentLibrary, ContentLibraryPermission, Organization

User = get_user_model()

# Specify a unique prefix to avoid collisions with existing data
OBJECT_PREFIX = "tmlp_"

org_name = f"{OBJECT_PREFIX}org"
lib_name = f"{OBJECT_PREFIX}library"
group_name = f"{OBJECT_PREFIX}test_group"
user_names = [f"{OBJECT_PREFIX}user{i}" for i in range(3)]
group_user_names = [f"{OBJECT_PREFIX}guser{i}" for i in range(3)]
error_user_name = f"{OBJECT_PREFIX}error_user"
error_group_name = f"{OBJECT_PREFIX}error_group"
empty_group_name = f"{OBJECT_PREFIX}empty_group"


class TestLegacyPermissionsMigration(TestCase):
"""Test cases for migrating legacy permissions."""

def setUp(self):
"""
Set up test data:

What this does:
1. Creates an Org and a ContentLibrary
2. Create Users and Groups
3. Assign legacy permissions using ContentLibraryPermission
4. Create invalid permissions for user and group
"""
# Create ContentLibrary

org = Organization.objects.create(name=org_name, short_name=org_name)
library = ContentLibrary.objects.create(org=org, slug=lib_name)

# Create Users and Groups
users = [
User.objects.create_user(username=user_name, email=f"{user_name}@example.com") for user_name in user_names
]

group_users = [
User.objects.create_user(username=user_name, email=f"{user_name}@example.com")
for user_name in group_user_names
]
group = Group.objects.create(name=group_name)
group.user_set.set(group_users)

error_user = User.objects.create_user(username=error_user_name, email=f"{error_user_name}@example.com")
error_group = Group.objects.create(name=error_group_name)
error_group.user_set.set([error_user])

empty_group = Group.objects.create(name=empty_group_name)

# Assign legacy permissions for users and group
for user in users:
ContentLibraryPermission.objects.create(
user=user,
library=library,
access_level=ContentLibraryPermission.ADMIN_LEVEL,
)

ContentLibraryPermission.objects.create(
group=group,
library=library,
access_level=ContentLibraryPermission.READ_LEVEL,
)

# Create invalid permissions for testing error logging
ContentLibraryPermission.objects.create(
user=error_user,
library=library,
access_level="invalid",
)
ContentLibraryPermission.objects.create(
group=error_group,
library=library,
access_level="invalid",
)

# Edge case: empty group with no users
ContentLibraryPermission.objects.create(
group=empty_group,
library=library,
access_level=ContentLibraryPermission.READ_LEVEL,
)

def tearDown(self):
"""
Clean up test data created for the migration test.
"""
super().tearDown()

AuthzEnforcer.get_enforcer().load_policy()
batch_unassign_role_from_users(
users=user_names,
role_external_key=LIBRARY_ADMIN.external_key,
scope_external_key=f"lib:{org_name}:{lib_name}",
)
batch_unassign_role_from_users(
users=group_user_names,
role_external_key=LIBRARY_USER.external_key,
scope_external_key=f"lib:{org_name}:{lib_name}",
)

ContentLibrary.objects.filter(slug=lib_name).delete()
Organization.objects.filter(name=org_name).delete()
Group.objects.filter(name=group_name).delete()
Group.objects.filter(name=error_group_name).delete()
Group.objects.filter(name=empty_group_name).delete()
for user_name in user_names + group_user_names + [error_user_name]:
User.objects.filter(username=user_name).delete()

def test_migration(self):
"""Test the migration of legacy permissions.
1. Rus the migration to migrate legacy permissions.
2. Check that each user has the expected role in the new model.
3. Check that the group users have the expected role in the new model.
4. Check that invalid permissions were identified correctly as errors.
"""

permissions_with_errors = migrate_legacy_permissions(ContentLibraryPermission)

AuthzEnforcer.get_enforcer().load_policy()
for user_name in user_names:
assignments = get_user_role_assignments_in_scope(
user_external_key=user_name, scope_external_key=f"lib:{org_name}:{lib_name}"
)
self.assertEqual(len(assignments), 1)
self.assertEqual(assignments[0].roles[0], LIBRARY_ADMIN)
for group_user_name in group_user_names:
assignments = get_user_role_assignments_in_scope(
user_external_key=group_user_name, scope_external_key=f"lib:{org_name}:{lib_name}"
)
self.assertEqual(len(assignments), 1)
self.assertEqual(assignments[0].roles[0], LIBRARY_USER)

self.assertEqual(len(permissions_with_errors), 2)