|
8 | 8 |
|
9 | 9 | from casbin import Enforcer |
10 | 10 |
|
| 11 | +from openedx_authz.api.users import assign_role_to_user_in_scope, batch_assign_role_to_users_in_scope |
| 12 | +from openedx_authz.constants.roles import LIBRARY_ADMIN, LIBRARY_AUTHOR, LIBRARY_USER |
| 13 | + |
11 | 14 | logger = logging.getLogger(__name__) |
12 | 15 |
|
13 | 16 | GROUPING_POLICY_PTYPES = ["g", "g2", "g3", "g4", "g5", "g6"] |
@@ -69,3 +72,82 @@ def migrate_policy_between_enforcers( |
69 | 72 | except Exception as e: |
70 | 73 | logger.error(f"Error loading policies from file: {e}") |
71 | 74 | raise |
| 75 | + |
| 76 | + |
| 77 | +def migrate_legacy_permissions(ContentLibraryPermission): |
| 78 | + """ |
| 79 | + Migrate legacy permission data to the new Casbin-based authorization model. |
| 80 | + This function reads legacy permissions from the ContentLibraryPermission model |
| 81 | + and assigns equivalent roles in the new authorization system. |
| 82 | +
|
| 83 | + The old Library permissions are stored in the ContentLibraryPermission model, it consists of the following columns: |
| 84 | +
|
| 85 | + - library: FK to ContentLibrary |
| 86 | + - user: optional FK to User |
| 87 | + - group: optional FK to Group |
| 88 | + - access_level: 'admin' | 'author' | 'read' |
| 89 | +
|
| 90 | + In the new Authz model, this would roughly translate to: |
| 91 | +
|
| 92 | + - library: scope |
| 93 | + - user: subject |
| 94 | + - access_level: role |
| 95 | +
|
| 96 | + Now, we don't have an equivalent concept to "Group", for this we will go through the users in the group and assign |
| 97 | + roles independently. |
| 98 | +
|
| 99 | + param ContentLibraryPermission: The ContentLibraryPermission model to use. |
| 100 | + """ |
| 101 | + |
| 102 | + legacy_permissions = ContentLibraryPermission.objects.select_related( |
| 103 | + "library", "library__org", "user", "group" |
| 104 | + ).all() |
| 105 | + |
| 106 | + # List to keep track of any permissions that could not be migrated |
| 107 | + permissions_with_errors = [] |
| 108 | + |
| 109 | + for permission in legacy_permissions: |
| 110 | + # Migrate the permission to the new model |
| 111 | + |
| 112 | + # Derive equivalent role based on access level |
| 113 | + access_level_to_role = { |
| 114 | + "admin": LIBRARY_ADMIN, |
| 115 | + "author": LIBRARY_AUTHOR, |
| 116 | + "read": LIBRARY_USER, |
| 117 | + } |
| 118 | + |
| 119 | + role = access_level_to_role.get(permission.access_level) |
| 120 | + if role is None: |
| 121 | + # This should not happen as there are no more access_levels defined |
| 122 | + # in ContentLibraryPermission, log and skip |
| 123 | + logger.error(f"Unknown access level: {permission.access_level} for User: {permission.user}") |
| 124 | + permissions_with_errors.append(permission) |
| 125 | + continue |
| 126 | + |
| 127 | + # Generating scope based on library identifier |
| 128 | + scope = f"lib:{permission.library.org.name}:{permission.library.slug}" |
| 129 | + |
| 130 | + if permission.group: |
| 131 | + # Permission applied to a group |
| 132 | + users = [user.username for user in permission.group.user_set.all()] |
| 133 | + logger.info( |
| 134 | + f"Migrating permissions for Users: {users} in Group: {permission.group.name} " |
| 135 | + f"to Role: {role.external_key} in Scope: {scope}" |
| 136 | + ) |
| 137 | + batch_assign_role_to_users_in_scope( |
| 138 | + users=users, role_external_key=role.external_key, scope_external_key=scope |
| 139 | + ) |
| 140 | + else: |
| 141 | + # Permission applied to individual user |
| 142 | + logger.info( |
| 143 | + f"Migrating permission for User: {permission.user.username} " |
| 144 | + f"to Role: {role.external_key} in Scope: {scope}" |
| 145 | + ) |
| 146 | + |
| 147 | + assign_role_to_user_in_scope( |
| 148 | + user_external_key=permission.user.username, |
| 149 | + role_external_key=role.external_key, |
| 150 | + scope_external_key=scope, |
| 151 | + ) |
| 152 | + |
| 153 | + return permissions_with_errors |
0 commit comments