|
1 | 1 | """Utility functions for the Open edX AuthZ REST API.""" |
2 | 2 |
|
| 3 | +import logging |
| 4 | +from dataclasses import dataclass |
| 5 | + |
3 | 6 | from django.contrib.auth import get_user_model |
4 | | -from django.db.models import Q |
5 | 7 |
|
6 | | -from openedx_authz.api.data import GLOBAL_SCOPE_WILDCARD, ScopeData |
| 8 | +from openedx_authz.api.data import ( |
| 9 | + GLOBAL_SCOPE_WILDCARD, |
| 10 | + ContentLibraryData, |
| 11 | + CourseOverviewData, |
| 12 | + OrgContentLibraryGlobData, |
| 13 | + OrgCourseOverviewGlobData, |
| 14 | + RoleAssignmentData, |
| 15 | + ScopeData, |
| 16 | +) |
| 17 | +from openedx_authz.api.users import is_user_allowed |
| 18 | +from openedx_authz.constants.permissions import COURSES_VIEW_COURSE, VIEW_LIBRARY |
7 | 19 | from openedx_authz.rest_api.data import SearchField, SortField, SortOrder |
8 | 20 |
|
| 21 | +logger = logging.getLogger(__name__) |
| 22 | + |
9 | 23 | User = get_user_model() |
10 | 24 |
|
11 | 25 |
|
@@ -51,26 +65,6 @@ def get_user_map(usernames: list[str]) -> dict[str, User]: |
51 | 65 | return {user.username: user for user in users} |
52 | 66 |
|
53 | 67 |
|
54 | | -def get_user_by_username_or_email(username_or_email: str) -> User: |
55 | | - """ |
56 | | - Retrieve a user by their username or email address. |
57 | | -
|
58 | | - Args: |
59 | | - username_or_email (str): The username or email address to search for. |
60 | | -
|
61 | | - Returns: |
62 | | - User: The User object if found and not retired. |
63 | | -
|
64 | | - Raises: |
65 | | - User.DoesNotExist: If no user matches the provided username or email, |
66 | | - or if the user has an associated retirement request. |
67 | | - """ |
68 | | - user = User.objects.get(Q(email=username_or_email) | Q(username=username_or_email)) |
69 | | - if hasattr(user, "userretirementrequest"): |
70 | | - raise User.DoesNotExist |
71 | | - return user |
72 | | - |
73 | | - |
74 | 68 | def sort_users( |
75 | 69 | users: list[dict], |
76 | 70 | sort_by: SortField = SortField.USERNAME, |
@@ -135,3 +129,49 @@ def filter_users(users: list[dict], search: str | None, roles: list[str] | None) |
135 | 129 | filtered_users.append(user) |
136 | 130 |
|
137 | 131 | return filtered_users |
| 132 | + |
| 133 | + |
| 134 | +@dataclass |
| 135 | +class UserAssignments: |
| 136 | + user: "User" |
| 137 | + assignments: list[RoleAssignmentData] |
| 138 | + |
| 139 | + |
| 140 | +def get_user_assignment_map(role_assignments: list[RoleAssignmentData]) -> list[UserAssignments]: |
| 141 | + """ |
| 142 | + Group role assignments by user |
| 143 | + """ |
| 144 | + usernames = {assignment.subject.username for assignment in role_assignments} |
| 145 | + user_map = get_user_map(usernames) |
| 146 | + |
| 147 | + users_with_assignments: list[UserAssignments] = [] |
| 148 | + |
| 149 | + for username, user in user_map.items(): |
| 150 | + assignments = [a for a in role_assignments if a.subject.username == username] |
| 151 | + users_with_assignments.append(UserAssignments(user=user, assignments=assignments)) |
| 152 | + |
| 153 | + return users_with_assignments |
| 154 | + |
| 155 | + |
| 156 | +def filter_allowed_assignments(user: "User", assignments: list[RoleAssignmentData]) -> list[RoleAssignmentData]: |
| 157 | + """ |
| 158 | + Filter the given role assignments to only include those that the user has permission to view. |
| 159 | + """ |
| 160 | + allowed_assignments: list[RoleAssignmentData] = [] |
| 161 | + for assignment in assignments: |
| 162 | + permission = None |
| 163 | + |
| 164 | + # For CourseOverviewData and ContentLibraryData, check for the view permission |
| 165 | + if isinstance(assignment.scope, (CourseOverviewData, OrgCourseOverviewGlobData)): |
| 166 | + permission = COURSES_VIEW_COURSE.identifier |
| 167 | + elif isinstance(assignment.scope, (ContentLibraryData, OrgContentLibraryGlobData)): |
| 168 | + permission = VIEW_LIBRARY.identifier |
| 169 | + |
| 170 | + if permission and is_user_allowed( |
| 171 | + user_external_key=user.username, |
| 172 | + action_external_key=permission, |
| 173 | + scope_external_key=assignment.scope.external_key, |
| 174 | + ): |
| 175 | + allowed_assignments.append(assignment) |
| 176 | + |
| 177 | + return allowed_assignments |
0 commit comments