|
26 | 26 | from openedx_authz.models import ExtendedCasbinRule |
27 | 27 |
|
28 | 28 | __all__ = [ |
29 | | - "get_permissions_for_single_role", |
30 | | - "get_permissions_for_roles", |
31 | | - "get_all_roles_names", |
32 | | - "get_all_roles_in_scope", |
33 | | - "get_permissions_for_active_roles_in_scope", |
34 | | - "get_role_definitions_in_scope", |
35 | 29 | "assign_role_to_subject_in_scope", |
36 | 30 | "batch_assign_role_to_subjects_in_scope", |
37 | | - "unassign_role_from_subject_in_scope", |
38 | 31 | "batch_unassign_role_from_subjects_in_scope", |
39 | | - "get_subject_role_assignments_in_scope", |
40 | | - "get_subject_role_assignments_for_role", |
41 | | - "get_subject_role_assignments_for_role_in_scope", |
| 32 | + "get_all_roles_in_scope", |
| 33 | + "get_all_roles_names", |
42 | 34 | "get_all_subject_role_assignments_in_scope", |
43 | | - "get_subject_role_assignments", |
| 35 | + "get_permissions_for_active_roles_in_scope", |
| 36 | + "get_permissions_for_roles", |
| 37 | + "get_permissions_for_single_role", |
| 38 | + "get_role_assignments", |
| 39 | + "get_role_definitions_in_scope", |
44 | 40 | "get_scopes_for_subject_and_permission", |
| 41 | + "get_subject_role_assignments", |
| 42 | + "get_subject_role_assignments_for_role_in_scope", |
| 43 | + "get_subject_role_assignments_in_scope", |
| 44 | + "unassign_role_from_subject_in_scope", |
45 | 45 | "unassign_subject_from_all_roles", |
46 | 46 | ] |
47 | 47 |
|
@@ -294,6 +294,92 @@ def get_subject_role_assignments(subject: SubjectData) -> list[RoleAssignmentDat |
294 | 294 | return role_assignments |
295 | 295 |
|
296 | 296 |
|
| 297 | +def get_field_index_and_values( |
| 298 | + subject: SubjectData | None, |
| 299 | + role: RoleData | None, |
| 300 | + scope: ScopeData | None, |
| 301 | +) -> tuple[int, list[str]]: |
| 302 | + """Build field index and values for Casbin's get_filtered_grouping_policy. |
| 303 | +
|
| 304 | + Returns the leftmost non-None field as field_index and a list of consecutive |
| 305 | + values starting from that index. Empty strings serve as wildcards for positions |
| 306 | + between specified values. |
| 307 | +
|
| 308 | + Examples: |
| 309 | + >>> get_field_index_and_values(user, None, None) |
| 310 | + (0, ['user^steve']) |
| 311 | + >>> get_field_index_and_values(user, role, None) |
| 312 | + (0, ['user^steve', 'role^course_admin']) |
| 313 | + >>> get_field_index_and_values(None, role, scope) |
| 314 | + (1, ['role^course_admin', 'course-v1^course-v1:OpenedX+Demo+Course']) |
| 315 | + >>> get_field_index_and_values(user, None, scope) |
| 316 | + (0, ['user^steve', '', 'course-v1^course-v1:OpenedX+Demo+Course']) |
| 317 | +
|
| 318 | + Args: |
| 319 | + subject: Optional subject to filter by. |
| 320 | + role: Optional role to filter by. |
| 321 | + scope: Optional scope to filter by. |
| 322 | +
|
| 323 | + Returns: |
| 324 | + tuple: (field_index, field_values) where field_index is the starting position |
| 325 | + and field_values are the consecutive filter values from that position. |
| 326 | + """ |
| 327 | + values = [ |
| 328 | + subject.namespaced_key if subject else "", |
| 329 | + role.namespaced_key if role else "", |
| 330 | + scope.namespaced_key if scope else "", |
| 331 | + ] |
| 332 | + |
| 333 | + # Find first non-empty value (leftmost defined field) |
| 334 | + try: |
| 335 | + field_index = next(idx for idx, value in enumerate(values) if value) |
| 336 | + except StopIteration: |
| 337 | + return 0, [] |
| 338 | + |
| 339 | + # Take slice from first defined field |
| 340 | + field_values = values[field_index:] |
| 341 | + |
| 342 | + # Remove trailing wildcards |
| 343 | + while field_values and field_values[-1] == "": |
| 344 | + field_values.pop() |
| 345 | + |
| 346 | + return field_index, field_values |
| 347 | + |
| 348 | + |
| 349 | +def get_role_assignments( |
| 350 | + *, |
| 351 | + subject: SubjectData | None = None, |
| 352 | + role: RoleData | None = None, |
| 353 | + scope: ScopeData | None = None, |
| 354 | +) -> list[RoleAssignmentData]: |
| 355 | + """Get all the roles for a subject across all scopes filtered by the given filters. |
| 356 | +
|
| 357 | + Args: |
| 358 | + subject: Optional SubjectData object to filter by. |
| 359 | + role: Optional RoleData object to filter by. |
| 360 | + scope: Optional ScopeData object to filter by. |
| 361 | +
|
| 362 | + Returns: |
| 363 | + list[RoleAssignmentData]: A list of RoleAssignmentData objects filtered by the given filters. |
| 364 | + """ |
| 365 | + enforcer = AuthzEnforcer.get_enforcer() |
| 366 | + role_assignments = [] |
| 367 | + field_index, field_values = get_field_index_and_values(subject, role, scope) |
| 368 | + policies = enforcer.get_filtered_grouping_policy(field_index, *field_values) |
| 369 | + |
| 370 | + for policy in policies: |
| 371 | + role = RoleData(namespaced_key=policy[GroupingPolicyIndex.ROLE.value]) |
| 372 | + role.permissions = get_permissions_for_single_role(role) |
| 373 | + role_assignments.append( |
| 374 | + RoleAssignmentData( |
| 375 | + subject=SubjectData(namespaced_key=policy[GroupingPolicyIndex.SUBJECT.value]), |
| 376 | + roles=[role], |
| 377 | + scope=ScopeData(namespaced_key=policy[GroupingPolicyIndex.SCOPE.value]), |
| 378 | + ) |
| 379 | + ) |
| 380 | + return role_assignments |
| 381 | + |
| 382 | + |
297 | 383 | def get_subject_role_assignments_in_scope(subject: SubjectData, scope: ScopeData) -> list[RoleAssignmentData]: |
298 | 384 | """Get the roles for a subject in a specific scope. |
299 | 385 |
|
|
0 commit comments