From 68ccb8397f9c777795752272dca076f0c1beec11 Mon Sep 17 00:00:00 2001 From: Maria Grimaldi Date: Wed, 22 Oct 2025 14:58:00 +0200 Subject: [PATCH 1/5] chore: implement format command for easier quality management --- Makefile | 4 + openedx_authz/api/users.py | 24 ++- openedx_authz/engine/adapter.py | 8 +- openedx_authz/engine/enforcer.py | 4 +- openedx_authz/engine/utils.py | 4 +- .../management/commands/load_policies.py | 27 ++- openedx_authz/rest_api/decorators.py | 4 +- openedx_authz/rest_api/utils.py | 33 +++- openedx_authz/rest_api/v1/fields.py | 6 +- openedx_authz/rest_api/v1/permissions.py | 8 +- openedx_authz/rest_api/v1/serializers.py | 50 ++++-- openedx_authz/rest_api/v1/urls.py | 6 +- openedx_authz/rest_api/v1/views.py | 81 ++++++--- openedx_authz/tests/api/test_data.py | 19 +- openedx_authz/tests/api/test_roles.py | 67 +++++--- openedx_authz/tests/api/test_users.py | 54 ++++-- openedx_authz/tests/rest_api/test_views.py | 162 ++++++++++++++---- openedx_authz/tests/test_commands.py | 150 ++++++++++------ openedx_authz/tests/test_enforcement.py | 160 ++++++++++++++--- openedx_authz/tests/test_engine_utils.py | 5 +- openedx_authz/tests/test_filter.py | 8 +- 21 files changed, 672 insertions(+), 212 deletions(-) diff --git a/Makefile b/Makefile index 64a9a0aa..540decf5 100644 --- a/Makefile +++ b/Makefile @@ -58,6 +58,10 @@ upgrade: ## update the requirements/*.txt files with the latest packages satisfy quality: ## check coding style with pycodestyle and pylint tox -e quality +format: ## format code with black and isort + black openedx_authz tests + isort openedx_authz tests + pii_check: ## check for PII annotations on all Django models tox -e pii_check diff --git a/openedx_authz/api/users.py b/openedx_authz/api/users.py index 62c20320..f6caec47 100644 --- a/openedx_authz/api/users.py +++ b/openedx_authz/api/users.py @@ -37,7 +37,9 @@ ] -def assign_role_to_user_in_scope(user_external_key: str, role_external_key: str, scope_external_key: str) -> bool: +def assign_role_to_user_in_scope( + user_external_key: str, role_external_key: str, scope_external_key: str +) -> bool: """Assign a role to a user in a specific scope. Args: @@ -55,7 +57,9 @@ def assign_role_to_user_in_scope(user_external_key: str, role_external_key: str, ) -def batch_assign_role_to_users_in_scope(users: list[str], role_external_key: str, scope_external_key: str): +def batch_assign_role_to_users_in_scope( + users: list[str], role_external_key: str, scope_external_key: str +): """Assign a role to multiple users in a specific scope. Args: @@ -71,7 +75,9 @@ def batch_assign_role_to_users_in_scope(users: list[str], role_external_key: str ) -def unassign_role_from_user(user_external_key: str, role_external_key: str, scope_external_key: str): +def unassign_role_from_user( + user_external_key: str, role_external_key: str, scope_external_key: str +): """Unassign a role from a user in a specific scope. Args: @@ -89,7 +95,9 @@ def unassign_role_from_user(user_external_key: str, role_external_key: str, scop ) -def batch_unassign_role_from_users(users: list[str], role_external_key: str, scope_external_key: str): +def batch_unassign_role_from_users( + users: list[str], role_external_key: str, scope_external_key: str +): """Unassign a role from multiple users in a specific scope. Args: @@ -117,7 +125,9 @@ def get_user_role_assignments(user_external_key: str) -> list[RoleAssignmentData return get_subject_role_assignments(UserData(external_key=user_external_key)) -def get_user_role_assignments_in_scope(user_external_key: str, scope_external_key: str) -> list[RoleAssignmentData]: +def get_user_role_assignments_in_scope( + user_external_key: str, scope_external_key: str +) -> list[RoleAssignmentData]: """Get the roles assigned to a user in a specific scope. Args: @@ -162,7 +172,9 @@ def get_all_user_role_assignments_in_scope( Returns: list[RoleAssignmentData]: A list of user role assignments and all their metadata in the specified scope. """ - return get_all_subject_role_assignments_in_scope(ScopeData(external_key=scope_external_key)) + return get_all_subject_role_assignments_in_scope( + ScopeData(external_key=scope_external_key) + ) def is_user_allowed( diff --git a/openedx_authz/engine/adapter.py b/openedx_authz/engine/adapter.py index c68cfe82..0cc0de75 100644 --- a/openedx_authz/engine/adapter.py +++ b/openedx_authz/engine/adapter.py @@ -76,7 +76,9 @@ def is_filtered(self) -> bool: """ return True - def load_filtered_policy(self, model: Model, filter: Filter) -> None: # pylint: disable=redefined-builtin + def load_filtered_policy( + self, model: Model, filter: Filter # pylint: disable=redefined-builtin + ) -> None: """ Load policy rules from storage with filtering applied. @@ -99,7 +101,9 @@ def load_filtered_policy(self, model: Model, filter: Filter) -> None: # pylint: for line in filtered_queryset: persist.load_policy_line(str(line), model) - def filter_query(self, queryset: QuerySet, filter: Filter) -> QuerySet: # pylint: disable=redefined-builtin + def filter_query( + self, queryset: QuerySet, filter: Filter # pylint: disable=redefined-builtin + ) -> QuerySet: """ Apply filter criteria to the policy queryset. diff --git a/openedx_authz/engine/enforcer.py b/openedx_authz/engine/enforcer.py index 0e45ac9c..c8dc5f06 100644 --- a/openedx_authz/engine/enforcer.py +++ b/openedx_authz/engine/enforcer.py @@ -88,7 +88,9 @@ def _initialize_enforcer() -> SyncedEnforcer: # issues when the app is not fully loaded (e.g., while pulling translations, etc.). initialize_enforcer(db_alias) except Exception as e: - logger.error(f"Failed to initialize Casbin enforcer with DB alias '{db_alias}': {e}") + logger.error( + f"Failed to initialize Casbin enforcer with DB alias '{db_alias}': {e}" + ) raise adapter = ExtendedAdapter() diff --git a/openedx_authz/engine/utils.py b/openedx_authz/engine/utils.py index 11895a92..f86a6ac3 100644 --- a/openedx_authz/engine/utils.py +++ b/openedx_authz/engine/utils.py @@ -31,7 +31,9 @@ def migrate_policy_between_enforcers( # Load target enforcer policies to check for duplicates target_enforcer.load_policy() - logger.info(f"Target enforcer has {len(target_enforcer.get_policy())} existing policies before migration.") + logger.info( + f"Target enforcer has {len(target_enforcer.get_policy())} existing policies before migration." + ) # TODO: this operations use the enforcer directly, which may not be ideal # since we have to load the policy after each addition to avoid duplicates. diff --git a/openedx_authz/management/commands/load_policies.py b/openedx_authz/management/commands/load_policies.py index 57ca79de..4a6cbf06 100644 --- a/openedx_authz/management/commands/load_policies.py +++ b/openedx_authz/management/commands/load_policies.py @@ -69,7 +69,10 @@ def handle(self, *args, **options): Raises: CommandError: If the policy file is not found or loading fails. """ - policy_file_path, model_file_path = options["policy_file_path"], options["model_file_path"] + policy_file_path, model_file_path = ( + options["policy_file_path"], + options["model_file_path"], + ) if policy_file_path is None: policy_file_path = os.path.join( ROOT_DIRECTORY, "engine", "config", "authz.policy" @@ -83,13 +86,25 @@ def handle(self, *args, **options): if options.get("clear_existing"): target_enforcer.load_policy() - if click.confirm(click.style('Do you want to delete existing roles? ' - '(This will also delete the assignments related to those roles)', - fg='yellow', bold=True), default=False): + if click.confirm( + click.style( + "Do you want to delete existing roles? " + "(This will also delete the assignments related to those roles)", + fg="yellow", + bold=True, + ), + default=False, + ): self._delete_existing_roles(target_enforcer) - if click.confirm(click.style('Do you want to delete existing permissions inheritance?', - fg='yellow', bold=True), default=False): + if click.confirm( + click.style( + "Do you want to delete existing permissions inheritance?", + fg="yellow", + bold=True, + ), + default=False, + ): self._delete_permissions_inheritance(target_enforcer) source_enforcer = casbin.Enforcer(model_file_path, policy_file_path) diff --git a/openedx_authz/rest_api/decorators.py b/openedx_authz/rest_api/decorators.py index 1aead964..c5fd0ee0 100644 --- a/openedx_authz/rest_api/decorators.py +++ b/openedx_authz/rest_api/decorators.py @@ -39,7 +39,9 @@ def _decorator(func_or_class): SessionAuthenticationAllowInactiveUser, ] if is_authenticated: - func_or_class.permission_classes = [IsAuthenticated] + getattr(func_or_class, "permission_classes", []) + func_or_class.permission_classes = [IsAuthenticated] + getattr( + func_or_class, "permission_classes", [] + ) return func_or_class return _decorator diff --git a/openedx_authz/rest_api/utils.py b/openedx_authz/rest_api/utils.py index 370437a4..240a001d 100644 --- a/openedx_authz/rest_api/utils.py +++ b/openedx_authz/rest_api/utils.py @@ -28,7 +28,9 @@ def get_generic_scope(scope: ScopeData) -> ScopeData: >>> get_generic_scope(scope) ScopeData(namespaced_key="lib^*") """ - return ScopeData(namespaced_key=f"{scope.NAMESPACE}{ScopeData.SEPARATOR}{GENERIC_SCOPE_WILDCARD}") + return ScopeData( + namespaced_key=f"{scope.NAMESPACE}{ScopeData.SEPARATOR}{GENERIC_SCOPE_WILDCARD}" + ) def get_user_map(usernames: list[str]) -> dict[str, User]: @@ -72,7 +74,9 @@ def get_user_by_username_or_email(username_or_email: str) -> User: def sort_users( - users: list[dict], sort_by: SortField = SortField.USERNAME, order: SortOrder = SortOrder.ASC + users: list[dict], + sort_by: SortField = SortField.USERNAME, + order: SortOrder = SortOrder.ASC, ) -> list[dict]: """ Sort users by a given field and order. @@ -90,16 +94,26 @@ def sort_users( list[dict]: The sorted users. """ if sort_by not in SortField.values(): - raise ValueError(f"Invalid field: '{sort_by}'. Must be one of {SortField.values()}") + raise ValueError( + f"Invalid field: '{sort_by}'. Must be one of {SortField.values()}" + ) if order not in SortOrder.values(): - raise ValueError(f"Invalid order: '{order}'. Must be one of {SortOrder.values()}") - - sorted_users = sorted(users, key=lambda user: (user.get(sort_by) or "").lower(), reverse=order == SortOrder.DESC) + raise ValueError( + f"Invalid order: '{order}'. Must be one of {SortOrder.values()}" + ) + + sorted_users = sorted( + users, + key=lambda user: (user.get(sort_by) or "").lower(), + reverse=order == SortOrder.DESC, + ) return sorted_users -def filter_users(users: list[dict], search: str | None, roles: list[str] | None) -> list[dict]: +def filter_users( + users: list[dict], search: str | None, roles: list[str] | None +) -> list[dict]: """ Filter users by a case-insensitive search string and/or by roles. @@ -117,7 +131,10 @@ def filter_users(users: list[dict], search: str | None, roles: list[str] | None) filtered_users = [] for user in users: if search: - matches_search = any(search in (user.get(field) or "").lower() for field in SearchField.values()) + matches_search = any( + search in (user.get(field) or "").lower() + for field in SearchField.values() + ) if not matches_search: continue diff --git a/openedx_authz/rest_api/v1/fields.py b/openedx_authz/rest_api/v1/fields.py index 89800788..5014960b 100644 --- a/openedx_authz/rest_api/v1/fields.py +++ b/openedx_authz/rest_api/v1/fields.py @@ -8,7 +8,11 @@ class CommaSeparatedListField(serializers.CharField): def to_internal_value(self, data): """Convert string separated by commas to list of unique items preserving order""" - return list(dict.fromkeys(item.strip().lower() for item in data.split(",") if item.strip())) + return list( + dict.fromkeys( + item.strip().lower() for item in data.split(",") if item.strip() + ) + ) def to_representation(self, value): """Convert list to string separated by commas""" diff --git a/openedx_authz/rest_api/v1/permissions.py b/openedx_authz/rest_api/v1/permissions.py index 9fb0ad33..0716c2c3 100644 --- a/openedx_authz/rest_api/v1/permissions.py +++ b/openedx_authz/rest_api/v1/permissions.py @@ -198,7 +198,9 @@ def has_object_permission(self, request, view, obj) -> bool: """ if request.user.is_superuser or request.user.is_staff: return True - return self._get_permission_instance(request).has_object_permission(request, view, obj) + return self._get_permission_instance(request).has_object_permission( + request, view, obj + ) class MethodPermissionMixin: @@ -239,7 +241,9 @@ def get_required_permissions(self, request, view) -> list[str]: return handler.required_permissions return [] - def validate_permissions(self, request, permissions: list[str], scope_value: str) -> bool: + def validate_permissions( + self, request, permissions: list[str], scope_value: str + ) -> bool: """Validate that the user has all required permissions for the scope. Args: diff --git a/openedx_authz/rest_api/v1/serializers.py b/openedx_authz/rest_api/v1/serializers.py index 92dee429..3be8c31d 100644 --- a/openedx_authz/rest_api/v1/serializers.py +++ b/openedx_authz/rest_api/v1/serializers.py @@ -29,17 +29,23 @@ class ActionMixin(serializers.Serializer): # pylint: disable=abstract-method action = serializers.CharField(max_length=255) -class PermissionValidationSerializer(ActionMixin, ScopeMixin): # pylint: disable=abstract-method +class PermissionValidationSerializer( + ActionMixin, ScopeMixin +): # pylint: disable=abstract-method """Serializer for permission validation request.""" -class PermissionValidationResponseSerializer(PermissionValidationSerializer): # pylint: disable=abstract-method +class PermissionValidationResponseSerializer( + PermissionValidationSerializer +): # pylint: disable=abstract-method """Serializer for permission validation response.""" allowed = serializers.BooleanField() -class RoleScopeValidationMixin(serializers.Serializer): # pylint: disable=abstract-method +class RoleScopeValidationMixin( + serializers.Serializer +): # pylint: disable=abstract-method """Mixin providing role and scope validation logic.""" def validate(self, attrs) -> dict: @@ -77,7 +83,9 @@ def validate(self, attrs) -> dict: role_definitions = api.get_role_definitions_in_scope(generic_scope) if role not in role_definitions: - raise serializers.ValidationError(f"Role '{role_value}' does not exist in scope '{scope_value}'") + raise serializers.ValidationError( + f"Role '{role_value}' does not exist in scope '{scope_value}'" + ) return validated_data @@ -89,7 +97,9 @@ class AddUsersToRoleWithScopeSerializer( ): # pylint: disable=abstract-method """Serializer for adding users to a role with a scope.""" - users = serializers.ListField(child=serializers.CharField(max_length=255), allow_empty=False) + users = serializers.ListField( + child=serializers.CharField(max_length=255), allow_empty=False + ) def validate_users(self, value) -> list[str]: """Eliminate duplicates preserving order""" @@ -111,15 +121,21 @@ class ListUsersInRoleWithScopeSerializer(ScopeMixin): # pylint: disable=abstrac roles = CommaSeparatedListField(required=False, default=[]) sort_by = serializers.ChoiceField( - required=False, choices=[(e.value, e.name) for e in SortField], default=SortField.USERNAME + required=False, + choices=[(e.value, e.name) for e in SortField], + default=SortField.USERNAME, ) order = serializers.ChoiceField( - required=False, choices=[(e.value, e.name) for e in SortOrder], default=SortOrder.ASC + required=False, + choices=[(e.value, e.name) for e in SortOrder], + default=SortOrder.ASC, ) search = LowercaseCharField(required=False, default=None) -class ListRolesWithScopeSerializer(serializers.Serializer): # pylint: disable=abstract-method +class ListRolesWithScopeSerializer( + serializers.Serializer +): # pylint: disable=abstract-method """Serializer for listing roles within a scope.""" scope = serializers.CharField(max_length=255) @@ -149,7 +165,9 @@ def validate_scope(self, value: str) -> api.ScopeData: raise serializers.ValidationError(exc) from exc -class ListUsersInRoleWithScopeResponseSerializer(serializers.Serializer): # pylint: disable=abstract-method +class ListUsersInRoleWithScopeResponseSerializer( + serializers.Serializer +): # pylint: disable=abstract-method """Serializer for listing users in a role with a scope response.""" username = serializers.CharField(max_length=255) @@ -157,7 +175,9 @@ class ListUsersInRoleWithScopeResponseSerializer(serializers.Serializer): # pyl email = serializers.EmailField() -class ListRolesWithScopeResponseSerializer(serializers.Serializer): # pylint: disable=abstract-method +class ListRolesWithScopeResponseSerializer( + serializers.Serializer +): # pylint: disable=abstract-method """Serializer for listing roles with a scope response.""" role = serializers.CharField(max_length=255) @@ -165,7 +185,9 @@ class ListRolesWithScopeResponseSerializer(serializers.Serializer): # pylint: d user_count = serializers.IntegerField() -class UserRoleAssignmentSerializer(serializers.Serializer): # pylint: disable=abstract-method +class UserRoleAssignmentSerializer( + serializers.Serializer +): # pylint: disable=abstract-method """Serializer for a user role assignment.""" username = serializers.SerializerMethodField() @@ -189,7 +211,11 @@ def get_username(self, obj: api.RoleAssignmentData) -> str: def get_full_name(self, obj) -> str: """Get the full name for the given role assignment.""" user = self._get_user(obj) - return getattr(user.profile, "name", "") if user and hasattr(user, "profile") else "" + return ( + getattr(user.profile, "name", "") + if user and hasattr(user, "profile") + else "" + ) def get_email(self, obj) -> str: """Get the email for the given role assignment.""" diff --git a/openedx_authz/rest_api/v1/urls.py b/openedx_authz/rest_api/v1/urls.py index 83c350f9..b6e7c3fa 100644 --- a/openedx_authz/rest_api/v1/urls.py +++ b/openedx_authz/rest_api/v1/urls.py @@ -5,7 +5,11 @@ from openedx_authz.rest_api.v1 import views urlpatterns = [ - path("permissions/validate/me", views.PermissionValidationMeView.as_view(), name="permission-validation-me"), + path( + "permissions/validate/me", + views.PermissionValidationMeView.as_view(), + name="permission-validation-me", + ), path("roles/", views.RoleListView.as_view(), name="role-list"), path("roles/users/", views.RoleUserAPIView.as_view(), name="role-user-list"), ] diff --git a/openedx_authz/rest_api/v1/views.py b/openedx_authz/rest_api/v1/views.py index 3fb10759..083609ed 100644 --- a/openedx_authz/rest_api/v1/views.py +++ b/openedx_authz/rest_api/v1/views.py @@ -92,7 +92,9 @@ class PermissionValidationMeView(APIView): """ @apidocs.schema( - body=PermissionValidationSerializer(help_text="The permissions to validate", many=True), + body=PermissionValidationSerializer( + help_text="The permissions to validate", many=True + ), responses={ status.HTTP_200_OK: PermissionValidationResponseSerializer, status.HTTP_400_BAD_REQUEST: "The request data is invalid", @@ -114,7 +116,10 @@ def post(self, request: HttpRequest) -> Response: response_data.append({"action": action, "scope": scope, "allowed": allowed}) except ValueError as e: logger.error(f"Error validating permission for user {username}: {e}") - return Response(data={"message": "Invalid scope format"}, status=status.HTTP_400_BAD_REQUEST) + return Response( + data={"message": "Invalid scope format"}, + status=status.HTTP_400_BAD_REQUEST, + ) except Exception as e: # pylint: disable=broad-exception-caught logger.error(f"Error validating permission for user {username}: {e}") return Response( @@ -233,11 +238,21 @@ class RoleUserAPIView(APIView): @apidocs.schema( parameters=[ - apidocs.query_parameter("scope", str, description="The authorization scope for the role"), - apidocs.query_parameter("search", str, description="The search query to filter users by"), - apidocs.query_parameter("roles", str, description="The names of the roles to query"), - apidocs.query_parameter("page", int, description="Page number for pagination"), - apidocs.query_parameter("page_size", int, description="Number of items per page"), + apidocs.query_parameter( + "scope", str, description="The authorization scope for the role" + ), + apidocs.query_parameter( + "search", str, description="The search query to filter users by" + ), + apidocs.query_parameter( + "roles", str, description="The names of the roles to query" + ), + apidocs.query_parameter( + "page", int, description="Page number for pagination" + ), + apidocs.query_parameter( + "page_size", int, description="Number of items per page" + ), apidocs.query_parameter("sort_by", str, description="The field to sort by"), apidocs.query_parameter("order", str, description="The order to sort by"), ], @@ -254,16 +269,28 @@ def get(self, request: HttpRequest) -> Response: serializer.is_valid(raise_exception=True) query_params = serializer.validated_data - user_role_assignments = api.get_all_user_role_assignments_in_scope(query_params["scope"]) - usernames = {assignment.subject.username for assignment in user_role_assignments} + user_role_assignments = api.get_all_user_role_assignments_in_scope( + query_params["scope"] + ) + usernames = { + assignment.subject.username for assignment in user_role_assignments + } context = {"user_map": get_user_map(usernames)} - serialized_data = UserRoleAssignmentSerializer(user_role_assignments, many=True, context=context) + serialized_data = UserRoleAssignmentSerializer( + user_role_assignments, many=True, context=context + ) - filtered_users = filter_users(serialized_data.data, query_params["search"], query_params["roles"]) - user_role_assignments = sort_users(filtered_users, query_params["sort_by"], query_params["order"]) + filtered_users = filter_users( + serialized_data.data, query_params["search"], query_params["roles"] + ) + user_role_assignments = sort_users( + filtered_users, query_params["sort_by"], query_params["order"] + ) paginator = self.pagination_class() - paginated_response_data = paginator.paginate_queryset(user_role_assignments, request) + paginated_response_data = paginator.paginate_queryset( + user_role_assignments, request + ) return paginator.get_paginated_response(paginated_response_data) @apidocs.schema( @@ -308,10 +335,16 @@ def put(self, request: HttpRequest) -> Response: @apidocs.schema( parameters=[ apidocs.query_parameter( - "users", str, description="List of user identifiers (username or email) separated by a comma" + "users", + str, + description="List of user identifiers (username or email) separated by a comma", + ), + apidocs.query_parameter( + "role", str, description="The role to remove the users from" + ), + apidocs.query_parameter( + "scope", str, description="The scope to remove the users from" ), - apidocs.query_parameter("role", str, description="The role to remove the users from"), - apidocs.query_parameter("scope", str, description="The scope to remove the users from"), ], responses={ status.HTTP_207_MULTI_STATUS: "The users were removed from the role", @@ -412,9 +445,15 @@ class RoleListView(APIView): @apidocs.schema( parameters=[ - apidocs.query_parameter("scope", str, description="The scope to query roles for"), - apidocs.query_parameter("page", int, description="Page number for pagination"), - apidocs.query_parameter("page_size", int, description="Number of items per page"), + apidocs.query_parameter( + "scope", str, description="The scope to query roles for" + ), + apidocs.query_parameter( + "page", int, description="Page number for pagination" + ), + apidocs.query_parameter( + "page_size", int, description="Number of items per page" + ), ], responses={ status.HTTP_200_OK: ListRolesWithScopeResponseSerializer(many=True), @@ -443,5 +482,7 @@ def get(self, request: HttpRequest) -> Response: paginator = self.pagination_class() paginated_response_data = paginator.paginate_queryset(response_data, request) - serialized_data = ListRolesWithScopeResponseSerializer(paginated_response_data, many=True) + serialized_data = ListRolesWithScopeResponseSerializer( + paginated_response_data, many=True + ) return paginator.get_paginated_response(serialized_data.data) diff --git a/openedx_authz/tests/api/test_data.py b/openedx_authz/tests/api/test_data.py index e55b5119..7b07ce2d 100644 --- a/openedx_authz/tests/api/test_data.py +++ b/openedx_authz/tests/api/test_data.py @@ -321,7 +321,9 @@ def test_base_scope_data_with_external_key(self): """ scope = ScopeData(external_key="sc:generic_scope") - expected_namespaced = f"{ScopeData.NAMESPACE}{ScopeData.SEPARATOR}sc:generic_scope" + expected_namespaced = ( + f"{ScopeData.NAMESPACE}{ScopeData.SEPARATOR}sc:generic_scope" + ) self.assertIsInstance(scope, ScopeData) self.assertEqual(scope.external_key, "sc:generic_scope") @@ -446,7 +448,9 @@ def test_role_data_str_with_permissions(self): action2 = ActionData(external_key="write") permission1 = PermissionData(action=action1, effect="allow") permission2 = PermissionData(action=action2, effect="deny") - role = RoleData(external_key="instructor", permissions=[permission1, permission2]) + role = RoleData( + external_key="instructor", permissions=[permission1, permission2] + ) actual_str = str(role) @@ -456,7 +460,12 @@ def test_role_data_str_with_permissions(self): @data( ("read", "allow", "Read - allow", "act^read => allow"), ("write", "deny", "Write - deny", "act^write => deny"), - ("delete_library", "allow", "Delete Library - allow", "act^delete_library => allow"), + ( + "delete_library", + "allow", + "Delete Library - allow", + "act^delete_library => allow", + ), ) @unpack def test_permission_data_str_and_repr( @@ -508,7 +517,5 @@ def test_role_assignment_data_repr(self): actual_repr = repr(assignment) - expected_repr = ( - "user^john_doe => [role^instructor, role^library_admin] @ lib^lib:DemoX:CSPROB" - ) + expected_repr = "user^john_doe => [role^instructor, role^library_admin] @ lib^lib:DemoX:CSPROB" self.assertEqual(actual_repr, expected_repr) diff --git a/openedx_authz/tests/api/test_roles.py b/openedx_authz/tests/api/test_roles.py index 80ec90f0..391d47fd 100644 --- a/openedx_authz/tests/api/test_roles.py +++ b/openedx_authz/tests/api/test_roles.py @@ -412,7 +412,9 @@ def test_get_subject_role_assignments_in_scope( SubjectData(external_key=subject_name), ScopeData(external_key=scope_name) ) - role_names = {r.external_key for assignment in role_assignments for r in assignment.roles} + role_names = { + r.external_key for assignment in role_assignments for r in assignment.roles + } self.assertEqual(role_names, expected_roles) @ddt_data( @@ -569,9 +571,14 @@ def test_batch_assign_role_to_subjects_in_scope( ) for subject_name in subject_names: user_roles = get_subject_role_assignments_in_scope( - SubjectData(external_key=subject_name), ScopeData(external_key=scope_name) + SubjectData(external_key=subject_name), + ScopeData(external_key=scope_name), ) - role_names = {r.external_key for assignment in user_roles for r in assignment.roles} + role_names = { + r.external_key + for assignment in user_roles + for r in assignment.roles + } self.assertIn(role, role_names) else: assign_role_to_subject_in_scope( @@ -583,7 +590,9 @@ def test_batch_assign_role_to_subjects_in_scope( SubjectData(external_key=subject_names), ScopeData(external_key=scope_name), ) - role_names = {r.external_key for assignment in user_roles for r in assignment.roles} + role_names = { + r.external_key for assignment in user_roles for r in assignment.roles + } self.assertIn(role, role_names) @ddt_data( @@ -628,7 +637,11 @@ def test_unassign_role_from_subject_in_scope( SubjectData(external_key=subject), ScopeData(external_key=scope_name), ) - role_names = {r.external_key for assignment in user_roles for r in assignment.roles} + role_names = { + r.external_key + for assignment in user_roles + for r in assignment.roles + } self.assertNotIn(role, role_names) else: unassign_role_from_subject_in_scope( @@ -640,7 +653,9 @@ def test_unassign_role_from_subject_in_scope( SubjectData(external_key=subject_names), ScopeData(external_key=scope_name), ) - role_names = {r.external_key for assignment in user_roles for r in assignment.roles} + role_names = { + r.external_key for assignment in user_roles for r in assignment.roles + } self.assertNotIn(role, role_names) @ddt_data( @@ -649,10 +664,12 @@ def test_unassign_role_from_subject_in_scope( [ RoleAssignmentData( subject=SubjectData(external_key="alice"), - roles=[RoleData( - external_key="library_admin", - permissions=LIST_LIBRARY_ADMIN_PERMISSIONS, - )], + roles=[ + RoleData( + external_key="library_admin", + permissions=LIST_LIBRARY_ADMIN_PERMISSIONS, + ) + ], scope=ScopeData(external_key="lib:Org1:math_101"), ) ], @@ -662,10 +679,12 @@ def test_unassign_role_from_subject_in_scope( [ RoleAssignmentData( subject=SubjectData(external_key="bob"), - roles=[RoleData( - external_key="library_author", - permissions=LIST_LIBRARY_AUTHOR_PERMISSIONS, - )], + roles=[ + RoleData( + external_key="library_author", + permissions=LIST_LIBRARY_AUTHOR_PERMISSIONS, + ) + ], scope=ScopeData(external_key="lib:Org1:history_201"), ) ], @@ -675,10 +694,12 @@ def test_unassign_role_from_subject_in_scope( [ RoleAssignmentData( subject=SubjectData(external_key="carol"), - roles=[RoleData( - external_key="library_contributor", - permissions=LIST_LIBRARY_CONTRIBUTOR_PERMISSIONS, - )], + roles=[ + RoleData( + external_key="library_contributor", + permissions=LIST_LIBRARY_CONTRIBUTOR_PERMISSIONS, + ) + ], scope=ScopeData(external_key="lib:Org1:science_301"), ) ], @@ -688,10 +709,12 @@ def test_unassign_role_from_subject_in_scope( [ RoleAssignmentData( subject=SubjectData(external_key="dave"), - roles=[RoleData( - external_key="library_user", - permissions=LIST_LIBRARY_USER_PERMISSIONS, - )], + roles=[ + RoleData( + external_key="library_user", + permissions=LIST_LIBRARY_USER_PERMISSIONS, + ) + ], scope=ScopeData(external_key="lib:Org1:english_101"), ) ], diff --git a/openedx_authz/tests/api/test_users.py b/openedx_authz/tests/api/test_users.py index 5590c1b2..e47cb07e 100644 --- a/openedx_authz/tests/api/test_users.py +++ b/openedx_authz/tests/api/test_users.py @@ -71,7 +71,11 @@ def test_assign_role_to_user_in_scope(self, username, role, scope_name, batch): user_roles = get_user_role_assignments_in_scope( user_external_key=user, scope_external_key=scope_name ) - role_names = {r.external_key for assignment in user_roles for r in assignment.roles} + role_names = { + r.external_key + for assignment in user_roles + for r in assignment.roles + } self.assertIn(role, role_names) else: assign_role_to_user_in_scope( @@ -82,7 +86,9 @@ def test_assign_role_to_user_in_scope(self, username, role, scope_name, batch): user_roles = get_user_role_assignments_in_scope( user_external_key=username, scope_external_key=scope_name ) - role_names = {r.external_key for assignment in user_roles for r in assignment.roles} + role_names = { + r.external_key for assignment in user_roles for r in assignment.roles + } self.assertIn(role, role_names) @data( @@ -107,7 +113,11 @@ def test_unassign_role_from_user(self, username, role, scope_name, batch): user_roles = get_user_role_assignments_in_scope( user_external_key=user, scope_external_key=scope_name ) - role_names = {r.external_key for assignment in user_roles for r in assignment.roles} + role_names = { + r.external_key + for assignment in user_roles + for r in assignment.roles + } self.assertNotIn(role, role_names) else: unassign_role_from_user( @@ -118,7 +128,9 @@ def test_unassign_role_from_user(self, username, role, scope_name, batch): user_roles = get_user_role_assignments_in_scope( user_external_key=username, scope_external_key=scope_name ) - role_names = {r.external_key for assignment in user_roles for r in assignment.roles} + role_names = { + r.external_key for assignment in user_roles for r in assignment.roles + } self.assertNotIn(role, role_names) @data( @@ -161,7 +173,9 @@ def test_get_user_role_assignments_in_scope( user_external_key=username, scope_external_key=scope_name ) - role_names = {r.external_key for assignment in user_roles for r in assignment.roles} + role_names = { + r.external_key for assignment in user_roles for r in assignment.roles + } self.assertEqual(role_names, expected_roles) @data( @@ -195,10 +209,12 @@ def test_get_user_role_assignments_for_role_in_scope( [ RoleAssignmentData( subject=UserData(external_key="alice"), - roles=[RoleData( - external_key="library_admin", - permissions=LIST_LIBRARY_ADMIN_PERMISSIONS, - )], + roles=[ + RoleData( + external_key="library_admin", + permissions=LIST_LIBRARY_ADMIN_PERMISSIONS, + ) + ], scope=ContentLibraryData(external_key="lib:Org1:math_101"), ), ], @@ -208,10 +224,12 @@ def test_get_user_role_assignments_for_role_in_scope( [ RoleAssignmentData( subject=UserData(external_key="bob"), - roles=[RoleData( - external_key="library_author", - permissions=LIST_LIBRARY_AUTHOR_PERMISSIONS, - )], + roles=[ + RoleData( + external_key="library_author", + permissions=LIST_LIBRARY_AUTHOR_PERMISSIONS, + ) + ], scope=ContentLibraryData(external_key="lib:Org1:history_201"), ), ], @@ -221,10 +239,12 @@ def test_get_user_role_assignments_for_role_in_scope( [ RoleAssignmentData( subject=UserData(external_key="eve"), - roles=[RoleData( - external_key="library_admin", - permissions=LIST_LIBRARY_ADMIN_PERMISSIONS, - )], + roles=[ + RoleData( + external_key="library_admin", + permissions=LIST_LIBRARY_ADMIN_PERMISSIONS, + ) + ], scope=ContentLibraryData(external_key="lib:Org2:physics_401"), ), ], diff --git a/openedx_authz/tests/rest_api/test_views.py b/openedx_authz/tests/rest_api/test_views.py index 159e91bc..d6b44c1b 100644 --- a/openedx_authz/tests/rest_api/test_views.py +++ b/openedx_authz/tests/rest_api/test_views.py @@ -129,13 +129,17 @@ def setUpClass(cls): def create_regular_users(cls, quantity: int): """Create regular users.""" for i in range(1, quantity + 1): - User.objects.create_user(username=f"regular_{i}", email=f"regular_{i}@example.com") + User.objects.create_user( + username=f"regular_{i}", email=f"regular_{i}@example.com" + ) @classmethod def create_admin_users(cls, quantity: int): """Create admin users.""" for i in range(1, quantity + 1): - User.objects.create_superuser(username=f"admin_{i}", email=f"admin_{i}@example.com") + User.objects.create_superuser( + username=f"admin_{i}", email=f"admin_{i}@example.com" + ) @classmethod def setUpTestData(cls): @@ -180,7 +184,9 @@ def setUp(self): ), ) @unpack - def test_permission_validation_success(self, request_data: list[dict], permission_map: list[bool]): + def test_permission_validation_success( + self, request_data: list[dict], permission_map: list[bool] + ): """Test successful permission validation requests. Expected result: @@ -209,10 +215,22 @@ def test_permission_validation_success(self, request_data: list[dict], permissio [{}, {}], [{}, {"action": "edit_library", "scope": "lib:Org1:LIB1"}], [{"action": "edit_library", "scope": "lib:Org1:LIB1"}, {}], - [{"action": "edit_library", "scope": "lib:Org1:LIB1"}, {"action": "", "scope": "lib:Org1:LIB1"}], - [{"action": "edit_library", "scope": "lib:Org1:LIB1"}, {"action": "edit_library", "scope": ""}], - [{"action": "edit_library", "scope": "lib:Org1:LIB1"}, {"scope": "lib:Org1:LIB1"}], - [{"action": "edit_library", "scope": "lib:Org1:LIB1"}, {"action": "edit_library"}], + [ + {"action": "edit_library", "scope": "lib:Org1:LIB1"}, + {"action": "", "scope": "lib:Org1:LIB1"}, + ], + [ + {"action": "edit_library", "scope": "lib:Org1:LIB1"}, + {"action": "edit_library", "scope": ""}, + ], + [ + {"action": "edit_library", "scope": "lib:Org1:LIB1"}, + {"scope": "lib:Org1:LIB1"}, + ], + [ + {"action": "edit_library", "scope": "lib:Org1:LIB1"}, + {"action": "edit_library"}, + ], ) def test_permission_validation_invalid_data(self, invalid_data: list[dict]): """Test permission validation with invalid request data. @@ -234,16 +252,24 @@ def test_permission_validation_unauthenticated(self): scope = "lib:Org1:LIB1" self.client.force_authenticate(user=None) - response = self.client.post(self.url, data=[{"action": action, "scope": scope}], format="json") + response = self.client.post( + self.url, data=[{"action": action, "scope": scope}], format="json" + ) self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) @data( - (Exception(), status.HTTP_500_INTERNAL_SERVER_ERROR, "An error occurred while validating permissions"), + ( + Exception(), + status.HTTP_500_INTERNAL_SERVER_ERROR, + "An error occurred while validating permissions", + ), (ValueError(), status.HTTP_400_BAD_REQUEST, "Invalid scope format"), ) @unpack - def test_permission_validation_exception_handling(self, exception: Exception, status_code: int, message: str): + def test_permission_validation_exception_handling( + self, exception: Exception, status_code: int, message: str + ): """Test permission validation exception handling for different error types. Expected result: @@ -252,7 +278,9 @@ def test_permission_validation_exception_handling(self, exception: Exception, st """ with patch.object(api, "is_user_allowed", side_effect=exception): response = self.client.post( - self.url, data=[{"action": "edit_library", "scope": "lib:Org1:LIB1"}], format="json" + self.url, + data=[{"action": "edit_library", "scope": "lib:Org1:LIB1"}], + format="json", ) self.assertEqual(response.status_code, status_code) @@ -388,15 +416,30 @@ def test_get_users_by_scope_permissions(self, username: str, status_code: int): # Single user - success (regular user) (["regular_1@example.com"], 1, 0), # Multiple users - admin and regular users - (["admin_1@example.com", "regular_1@example.com", "regular_2@example.com"], 3, 0), + ( + ["admin_1@example.com", "regular_1@example.com", "regular_2@example.com"], + 3, + 0, + ), # With username and email -------------------- # All success (["admin_1", "regular_1@example.com", "regular_2@example.com"], 3, 0), # Mixed results (user not found) - (["admin_1", "regular_1@example.com", "nonexistent", "notexistent@example.com"], 2, 2), + ( + [ + "admin_1", + "regular_1@example.com", + "nonexistent", + "notexistent@example.com", + ], + 2, + 2, + ), ) @unpack - def test_add_users_to_role_success(self, users: list[str], expected_completed: int, expected_errors: int): + def test_add_users_to_role_success( + self, users: list[str], expected_completed: int, expected_errors: int + ): """Test adding users to a role within a scope. Expected result: @@ -424,7 +467,9 @@ def test_add_users_to_role_success(self, users: list[str], expected_completed: i (["admin_2", "regular_3", "regular_4"], 0, 3), ) @unpack - def test_add_users_to_role_already_has_role(self, users: list[str], expected_completed: int, expected_errors: int): + def test_add_users_to_role_already_has_role( + self, users: list[str], expected_completed: int, expected_errors: int + ): """Test adding users to a role that already has the role.""" role = "library_user" scope = "lib:Org2:LIB2" @@ -438,9 +483,15 @@ def test_add_users_to_role_already_has_role(self, users: list[str], expected_com self.assertEqual(len(response.data["errors"]), expected_errors) @patch.object(api, "assign_role_to_user_in_scope") - def test_add_users_to_role_exception_handling(self, mock_assign_role_to_user_in_scope): + def test_add_users_to_role_exception_handling( + self, mock_assign_role_to_user_in_scope + ): """Test adding users to a role with exception handling.""" - request_data = {"role": "library_admin", "scope": "lib:Org1:LIB1", "users": ["regular_1"]} + request_data = { + "role": "library_admin", + "scope": "lib:Org1:LIB1", + "users": ["regular_1"], + } mock_assign_role_to_user_in_scope.side_effect = Exception() with patch.object(api.ContentLibraryData, "exists", return_value=True): @@ -450,7 +501,10 @@ def test_add_users_to_role_exception_handling(self, mock_assign_role_to_user_in_ self.assertEqual(len(response.data["completed"]), 0) self.assertEqual(len(response.data["errors"]), 1) self.assertEqual(response.data["errors"][0]["user_identifier"], "regular_1") - self.assertEqual(response.data["errors"][0]["error"], RoleOperationError.ROLE_ASSIGNMENT_ERROR) + self.assertEqual( + response.data["errors"][0]["error"], + RoleOperationError.ROLE_ASSIGNMENT_ERROR, + ) @data( {}, @@ -492,7 +546,11 @@ def test_add_users_to_role_permissions(self, username: str, status_code: int): Expected result: - Returns appropriate status code based on permissions """ - request_data = {"role": "library_admin", "scope": "lib:Org3:LIB3", "users": ["regular_2"]} + request_data = { + "role": "library_admin", + "scope": "lib:Org3:LIB3", + "users": ["regular_2"], + } user = User.objects.filter(username=username).first() self.client.force_authenticate(user=user) @@ -515,22 +573,41 @@ def test_add_users_to_role_permissions(self, username: str, status_code: int): # Single user - success (regular user) (["regular_3@example.com"], 1, 0), # Multiple users - all success (admin and regular users) - (["admin_2@example.com", "regular_3@example.com", "regular_4@example.com"], 3, 0), + ( + ["admin_2@example.com", "regular_3@example.com", "regular_4@example.com"], + 3, + 0, + ), # With username and email ------------------- # All success (["admin_2", "regular_3@example.com", "regular_4@example.com"], 3, 0), # Mixed results (user not found) - (["admin_2", "regular_3@example.com", "nonexistent", "notexistent@example.com"], 2, 2), + ( + [ + "admin_2", + "regular_3@example.com", + "nonexistent", + "notexistent@example.com", + ], + 2, + 2, + ), ) @unpack - def test_remove_users_from_role_success(self, users: list[str], expected_completed: int, expected_errors: int): + def test_remove_users_from_role_success( + self, users: list[str], expected_completed: int, expected_errors: int + ): """Test removing users from a role within a scope. Expected result: - Returns 207 MULTI-STATUS status - Returns appropriate completed and error counts """ - query_params = {"role": "library_user", "scope": "lib:Org2:LIB2", "users": ",".join(users)} + query_params = { + "role": "library_user", + "scope": "lib:Org2:LIB2", + "users": ",".join(users), + } with patch.object(api.ContentLibraryData, "exists", return_value=True): response = self.client.delete(f"{self.url}?{urlencode(query_params)}") @@ -540,9 +617,15 @@ def test_remove_users_from_role_success(self, users: list[str], expected_complet self.assertEqual(len(response.data["errors"]), expected_errors) @patch.object(api, "unassign_role_from_user") - def test_remove_users_from_role_exception_handling(self, mock_unassign_role_from_user): + def test_remove_users_from_role_exception_handling( + self, mock_unassign_role_from_user + ): """Test removing users from a role with exception handling.""" - query_params = {"role": "library_admin", "scope": "lib:Org1:LIB1", "users": "regular_1,regular_2,regular_3"} + query_params = { + "role": "library_admin", + "scope": "lib:Org1:LIB1", + "users": "regular_1,regular_2,regular_3", + } mock_unassign_role_from_user.side_effect = [True, False, Exception()] with patch.object(api.ContentLibraryData, "exists", return_value=True): @@ -550,12 +633,23 @@ def test_remove_users_from_role_exception_handling(self, mock_unassign_role_from self.assertEqual(response.status_code, status.HTTP_207_MULTI_STATUS) self.assertEqual(len(response.data["completed"]), 1) self.assertEqual(len(response.data["errors"]), 2) - self.assertEqual(response.data["completed"][0]["user_identifier"], "regular_1") - self.assertEqual(response.data["completed"][0]["status"], RoleOperationStatus.ROLE_REMOVED) + self.assertEqual( + response.data["completed"][0]["user_identifier"], "regular_1" + ) + self.assertEqual( + response.data["completed"][0]["status"], + RoleOperationStatus.ROLE_REMOVED, + ) self.assertEqual(response.data["errors"][0]["user_identifier"], "regular_2") - self.assertEqual(response.data["errors"][0]["error"], RoleOperationError.USER_DOES_NOT_HAVE_ROLE) + self.assertEqual( + response.data["errors"][0]["error"], + RoleOperationError.USER_DOES_NOT_HAVE_ROLE, + ) self.assertEqual(response.data["errors"][1]["user_identifier"], "regular_3") - self.assertEqual(response.data["errors"][1]["error"], RoleOperationError.ROLE_REMOVAL_ERROR) + self.assertEqual( + response.data["errors"][1]["error"], + RoleOperationError.ROLE_REMOVAL_ERROR, + ) @data( {}, @@ -596,7 +690,11 @@ def test_remove_users_from_role_permissions(self, username: str, status_code: in Expected result: - Returns appropriate status code based on permissions """ - query_params = {"role": "library_admin", "scope": "lib:Org3:LIB3", "users": "user1,user2"} + query_params = { + "role": "library_admin", + "scope": "lib:Org3:LIB3", + "users": "user1,user2", + } user = User.objects.filter(username=username).first() self.client.force_authenticate(user=user) @@ -689,7 +787,9 @@ def test_get_roles_scope_is_invalid(self, query_params: dict, error_code: str): ({"page": 1, "page_size": 4}, 4, False), ) @unpack - def test_get_roles_pagination(self, query_params: dict, expected_count: int, has_next: bool): + def test_get_roles_pagination( + self, query_params: dict, expected_count: int, has_next: bool + ): """Test retrieving roles with pagination. Expected result: diff --git a/openedx_authz/tests/test_commands.py b/openedx_authz/tests/test_commands.py index 16ae986e..acf5a5f2 100644 --- a/openedx_authz/tests/test_commands.py +++ b/openedx_authz/tests/test_commands.py @@ -44,7 +44,10 @@ def test_requires_policy_file_argument(self): with self.assertRaises(CommandError) as ctx: call_command("enforcement") - self.assertEqual("Error: the following arguments are required: --policy-file-path", str(ctx.exception)) + self.assertEqual( + "Error: the following arguments are required: --policy-file-path", + str(ctx.exception), + ) def test_policy_file_not_found_raises(self): """Test that command errors when the provided policy file does not exist.""" @@ -55,13 +58,18 @@ def test_policy_file_not_found_raises(self): self.assertEqual(f"Policy file not found: {non_existent}", str(ctx.exception)) - @patch.object(EnforcementCommand, "_get_file_path", return_value="invalid/path/model.conf") + @patch.object( + EnforcementCommand, "_get_file_path", return_value="invalid/path/model.conf" + ) def test_model_file_not_found_raises(self, mock_get_file_path: Mock): """Test that command errors when the provided model file does not exist.""" with self.assertRaises(CommandError) as ctx: call_command("enforcement", policy_file_path=self.policy_file_path.name) - self.assertEqual(f"Model file not found: {mock_get_file_path.return_value}", str(ctx.exception)) + self.assertEqual( + f"Model file not found: {mock_get_file_path.return_value}", + str(ctx.exception), + ) @patch("openedx_authz.management.commands.enforcement.casbin.Enforcer") def test_error_creating_enforcer_raises(self, mock_enforcer_cls: Mock): @@ -71,11 +79,16 @@ def test_error_creating_enforcer_raises(self, mock_enforcer_cls: Mock): with self.assertRaises(CommandError) as ctx: call_command("enforcement", policy_file_path=self.policy_file_path.name) - self.assertEqual("Error creating Casbin enforcer: Enforcer creation error", str(ctx.exception)) + self.assertEqual( + "Error creating Casbin enforcer: Enforcer creation error", + str(ctx.exception), + ) @patch("openedx_authz.management.commands.enforcement.casbin.Enforcer") @patch.object(EnforcementCommand, "_run_interactive_mode") - def test_successful_run_prints_summary(self, mock_run_interactive: Mock, mock_enforcer_cls: Mock): + def test_successful_run_prints_summary( + self, mock_run_interactive: Mock, mock_enforcer_cls: Mock + ): """ Test successful command execution with policy file and interactive mode. When files exist, command should create enforcer, print counts, and call interactive loop. @@ -92,7 +105,11 @@ def test_successful_run_prints_summary(self, mock_run_interactive: Mock, mock_en mock_enforcer.get_named_grouping_policy.return_value = action_grouping mock_enforcer_cls.return_value = mock_enforcer - call_command("enforcement", policy_file_path=self.policy_file_path.name, stdout=self.buffer) + call_command( + "enforcement", + policy_file_path=self.policy_file_path.name, + stdout=self.buffer, + ) output = self.buffer.getvalue() self.assertIn("Casbin Interactive Enforcement", output) @@ -109,8 +126,13 @@ def test_run_interactive_mode_displays_help(self): example_text = f"Example: {make_user_key('alice')} {make_action_key('read')} {make_scope_key('org', 'OpenedX')}" self.assertIn("Interactive Mode", self.buffer.getvalue()) - self.assertIn("Test custom enforcement requests interactively.", self.buffer.getvalue()) - self.assertIn("Enter 'quit', 'exit', or 'q' to exit the interactive mode.", self.buffer.getvalue()) + self.assertIn( + "Test custom enforcement requests interactively.", self.buffer.getvalue() + ) + self.assertIn( + "Enter 'quit', 'exit', or 'q' to exit the interactive mode.", + self.buffer.getvalue(), + ) self.assertIn("Format: subject action scope", self.buffer.getvalue()) self.assertIn(example_text, self.buffer.getvalue()) @@ -124,9 +146,17 @@ def test_run_interactive_mode_maintains_interactive_loop(self): self.assertEqual(mock_input.call_count, len(input_values)) @data( - [f"{make_user_key('alice')} {make_action_key('read')} {make_scope_key('org', 'OpenedX')}"], - [f"{make_user_key('bob')} {make_action_key('read')} {make_scope_key('org', 'OpenedX')}"] * 5, - [f"{make_user_key('john')} {make_action_key('read')} {make_scope_key('org', 'OpenedX')}"] * 10, + [ + f"{make_user_key('alice')} {make_action_key('read')} {make_scope_key('org', 'OpenedX')}" + ], + [ + f"{make_user_key('bob')} {make_action_key('read')} {make_scope_key('org', 'OpenedX')}" + ] + * 5, + [ + f"{make_user_key('john')} {make_action_key('read')} {make_scope_key('org', 'OpenedX')}" + ] + * 10, ) def test_run_interactive_mode_processes_request(self, user_input: list[str]): """Test that the interactive mode processes the request.""" @@ -184,7 +214,9 @@ def test_interactive_request_invalid_format(self): invalid_output = self.buffer.getvalue() self.assertIn("✗ Invalid format. Expected 3 parts, got 2", invalid_output) self.assertIn("Format: subject action scope", invalid_output) - self.assertIn(f"Example: {user_input} {make_scope_key('org', 'OpenedX')}", invalid_output) + self.assertIn( + f"Example: {user_input} {make_scope_key('org', 'OpenedX')}", invalid_output + ) @data(ValueError(), IndexError(), TypeError()) def test_interactive_request_error(self, exception: Exception): @@ -211,11 +243,13 @@ def setUp(self): super().setUp() self.buffer = io.StringIO() - @patch('openedx_authz.engine.enforcer.AuthzEnforcer.get_enforcer') - @patch('casbin.Enforcer') - @patch('os.path.join') - @patch('click.confirm') - def test_handle_with_default_paths(self, mock_confirm, mock_join, mock_casbin_enforcer, mock_get_enforcer): + @patch("openedx_authz.engine.enforcer.AuthzEnforcer.get_enforcer") + @patch("casbin.Enforcer") + @patch("os.path.join") + @patch("click.confirm") + def test_handle_with_default_paths( + self, mock_confirm, mock_join, mock_casbin_enforcer, mock_get_enforcer + ): """Test handle method with default policy and model paths.""" # Setup mocks mock_target_enforcer = Mock() @@ -238,26 +272,28 @@ def test_handle_with_default_paths(self, mock_confirm, mock_join, mock_casbin_en command.migrate_policies = Mock() # Call handle method - command.handle(policy_file_path=None, model_file_path=None, clear_existing=False) + command.handle( + policy_file_path=None, model_file_path=None, clear_existing=False + ) # Assertions mock_casbin_enforcer.assert_called_once_with( model_path, policy_path, ) - mock_join.assert_any_call( - ROOT_DIRECTORY, "engine", "config", "authz.policy" - ) - mock_join.assert_any_call( - ROOT_DIRECTORY, "engine", "config", "model.conf" - ) + mock_join.assert_any_call(ROOT_DIRECTORY, "engine", "config", "authz.policy") + mock_join.assert_any_call(ROOT_DIRECTORY, "engine", "config", "model.conf") mock_confirm.assert_not_called() - command.migrate_policies.assert_called_once_with(mock_source_enforcer, mock_target_enforcer) + command.migrate_policies.assert_called_once_with( + mock_source_enforcer, mock_target_enforcer + ) - @patch('openedx_authz.engine.enforcer.AuthzEnforcer.get_enforcer') - @patch('casbin.Enforcer') - @patch('click.confirm') - def test_handle_with_custom_paths(self, mock_confirm, mock_casbin_enforcer, mock_get_enforcer): + @patch("openedx_authz.engine.enforcer.AuthzEnforcer.get_enforcer") + @patch("casbin.Enforcer") + @patch("click.confirm") + def test_handle_with_custom_paths( + self, mock_confirm, mock_casbin_enforcer, mock_get_enforcer + ): """Test handle method with custom policy and model paths.""" # Setup mocks mock_target_enforcer = Mock() @@ -271,21 +307,27 @@ def test_handle_with_custom_paths(self, mock_confirm, mock_casbin_enforcer, mock command.migrate_policies = Mock() # Custom paths - policy_path = '/custom/path/to/policy.csv' - model_path = '/custom/path/to/model.conf' + policy_path = "/custom/path/to/policy.csv" + model_path = "/custom/path/to/model.conf" # Call handle method - command.handle(policy_file_path=policy_path, model_file_path=model_path, clear_existing=False) + command.handle( + policy_file_path=policy_path, + model_file_path=model_path, + clear_existing=False, + ) # Assertions mock_casbin_enforcer.assert_called_once_with(model_path, policy_path) mock_confirm.assert_not_called() - command.migrate_policies.assert_called_once_with(mock_source_enforcer, mock_target_enforcer) + command.migrate_policies.assert_called_once_with( + mock_source_enforcer, mock_target_enforcer + ) - @patch('openedx_authz.engine.enforcer.AuthzEnforcer.get_enforcer') - @patch('casbin.Enforcer') - @patch('click.confirm') - @patch('click.style') + @patch("openedx_authz.engine.enforcer.AuthzEnforcer.get_enforcer") + @patch("casbin.Enforcer") + @patch("click.confirm") + @patch("click.style") def test_handle_clear_existing_roles_confirmed( self, mock_style, mock_confirm, mock_casbin_enforcer, mock_get_enforcer ): @@ -315,12 +357,14 @@ def test_handle_clear_existing_roles_confirmed( mock_confirm.assert_called_with(mock_style.return_value, default=False) command._delete_existing_roles.assert_called_once_with(mock_target_enforcer) command._delete_permissions_inheritance.assert_not_called() - command.migrate_policies.assert_called_once_with(mock_source_enforcer, mock_target_enforcer) + command.migrate_policies.assert_called_once_with( + mock_source_enforcer, mock_target_enforcer + ) - @patch('openedx_authz.engine.enforcer.AuthzEnforcer.get_enforcer') - @patch('casbin.Enforcer') - @patch('click.confirm') - @patch('click.style') + @patch("openedx_authz.engine.enforcer.AuthzEnforcer.get_enforcer") + @patch("casbin.Enforcer") + @patch("click.confirm") + @patch("click.style") def test_handle_clear_existing_permissions_confirmed( self, mock_style, mock_confirm, mock_casbin_enforcer, mock_get_enforcer ): @@ -349,13 +393,19 @@ def test_handle_clear_existing_permissions_confirmed( mock_target_enforcer.load_policy.assert_called_once() mock_confirm.assert_called_with(mock_style.return_value, default=False) command._delete_existing_roles.assert_not_called() - command._delete_permissions_inheritance.assert_called_once_with(mock_target_enforcer) - command.migrate_policies.assert_called_once_with(mock_source_enforcer, mock_target_enforcer) + command._delete_permissions_inheritance.assert_called_once_with( + mock_target_enforcer + ) + command.migrate_policies.assert_called_once_with( + mock_source_enforcer, mock_target_enforcer + ) - @patch('openedx_authz.engine.enforcer.AuthzEnforcer.get_enforcer') - @patch('casbin.Enforcer') - @patch('click.confirm') - def test_handle_clear_existing_both_denied(self, mock_confirm, mock_casbin_enforcer, mock_get_enforcer): + @patch("openedx_authz.engine.enforcer.AuthzEnforcer.get_enforcer") + @patch("casbin.Enforcer") + @patch("click.confirm") + def test_handle_clear_existing_both_denied( + self, mock_confirm, mock_casbin_enforcer, mock_get_enforcer + ): """Test handle method with clear_existing but denied deletions.""" expected_mock_confirm_calls = 2 # Setup mocks @@ -382,4 +432,6 @@ def test_handle_clear_existing_both_denied(self, mock_confirm, mock_casbin_enfor assert mock_confirm.call_count == expected_mock_confirm_calls command._delete_existing_roles.assert_not_called() command._delete_permissions_inheritance.assert_not_called() - command.migrate_policies.assert_called_once_with(mock_source_enforcer, mock_target_enforcer) + command.migrate_policies.assert_called_once_with( + mock_source_enforcer, mock_target_enforcer + ) diff --git a/openedx_authz/tests/test_enforcement.py b/openedx_authz/tests/test_enforcement.py index 63fb9c5b..90771586 100644 --- a/openedx_authz/tests/test_enforcement.py +++ b/openedx_authz/tests/test_enforcement.py @@ -139,7 +139,9 @@ class SystemWideRoleTests(CasbinEnforcementTestCase): { "subject": make_user_key("user-1"), "action": make_action_key("manage"), - "scope": make_scope_key("course", "course-v1:any-org+any-course+any-course-run"), + "scope": make_scope_key( + "course", "course-v1:any-org+any-course+any-course-run" + ), "expected_result": True, }, { @@ -167,8 +169,19 @@ class ActionGroupingTests(CasbinEnforcementTestCase): """ POLICY = [ - ["p", make_role_key("role-1"), make_action_key("manage"), make_scope_key("org", "*"), "allow"], - ["g", make_user_key("user-1"), make_role_key("role-1"), make_scope_key("org", "any-org")], + [ + "p", + make_role_key("role-1"), + make_action_key("manage"), + make_scope_key("org", "*"), + "allow", + ], + [ + "g", + make_user_key("user-1"), + make_role_key("role-1"), + make_scope_key("org", "any-org"), + ], ] + COMMON_ACTION_GROUPING CASES = [ @@ -216,29 +229,112 @@ class RoleAssignmentTests(CasbinEnforcementTestCase): POLICY = [ # Policies ["p", make_role_key("platform_admin"), make_action_key("manage"), "*", "allow"], - ["p", make_role_key("org_admin"), make_action_key("manage"), make_scope_key("org", "*"), "allow"], - ["p", make_role_key("org_editor"), make_action_key("edit"), make_scope_key("org", "*"), "allow"], - ["p", make_role_key("org_author"), make_action_key("write"), make_scope_key("org", "*"), "allow"], - ["p", make_role_key("course_admin"), make_action_key("manage"), make_scope_key("course", "*"), "allow"], - ["p", make_role_key("library_admin"), make_action_key("manage"), make_scope_key("lib", "*"), "allow"], - ["p", make_role_key("library_editor"), make_action_key("edit"), make_scope_key("lib", "*"), "allow"], - ["p", make_role_key("library_reviewer"), make_action_key("read"), make_scope_key("lib", "*"), "allow"], - ["p", make_role_key("library_author"), make_action_key("write"), make_scope_key("lib", "*"), "allow"], + [ + "p", + make_role_key("org_admin"), + make_action_key("manage"), + make_scope_key("org", "*"), + "allow", + ], + [ + "p", + make_role_key("org_editor"), + make_action_key("edit"), + make_scope_key("org", "*"), + "allow", + ], + [ + "p", + make_role_key("org_author"), + make_action_key("write"), + make_scope_key("org", "*"), + "allow", + ], + [ + "p", + make_role_key("course_admin"), + make_action_key("manage"), + make_scope_key("course", "*"), + "allow", + ], + [ + "p", + make_role_key("library_admin"), + make_action_key("manage"), + make_scope_key("lib", "*"), + "allow", + ], + [ + "p", + make_role_key("library_editor"), + make_action_key("edit"), + make_scope_key("lib", "*"), + "allow", + ], + [ + "p", + make_role_key("library_reviewer"), + make_action_key("read"), + make_scope_key("lib", "*"), + "allow", + ], + [ + "p", + make_role_key("library_author"), + make_action_key("write"), + make_scope_key("lib", "*"), + "allow", + ], # Role assignments ["g", make_user_key("user-1"), make_role_key("platform_admin"), "*"], - ["g", make_user_key("user-2"), make_role_key("org_admin"), make_scope_key("org", "any-org")], - ["g", make_user_key("user-3"), make_role_key("org_editor"), make_scope_key("org", "any-org")], - ["g", make_user_key("user-4"), make_role_key("org_author"), make_scope_key("org", "any-org")], + [ + "g", + make_user_key("user-2"), + make_role_key("org_admin"), + make_scope_key("org", "any-org"), + ], + [ + "g", + make_user_key("user-3"), + make_role_key("org_editor"), + make_scope_key("org", "any-org"), + ], + [ + "g", + make_user_key("user-4"), + make_role_key("org_author"), + make_scope_key("org", "any-org"), + ], [ "g", make_user_key("user-5"), make_role_key("course_admin"), make_scope_key("course", "course-v1:any-org+any-course+any-course-run"), ], - ["g", make_user_key("user-6"), make_role_key("library_admin"), make_library_key("lib:DemoX:CSPROB")], - ["g", make_user_key("user-7"), make_role_key("library_editor"), make_library_key("lib:DemoX:CSPROB")], - ["g", make_user_key("user-8"), make_role_key("library_reviewer"), make_library_key("lib:DemoX:CSPROB")], - ["g", make_user_key("user-9"), make_role_key("library_author"), make_library_key("lib:DemoX:CSPROB")], + [ + "g", + make_user_key("user-6"), + make_role_key("library_admin"), + make_library_key("lib:DemoX:CSPROB"), + ], + [ + "g", + make_user_key("user-7"), + make_role_key("library_editor"), + make_library_key("lib:DemoX:CSPROB"), + ], + [ + "g", + make_user_key("user-8"), + make_role_key("library_reviewer"), + make_library_key("lib:DemoX:CSPROB"), + ], + [ + "g", + make_user_key("user-9"), + make_role_key("library_author"), + make_library_key("lib:DemoX:CSPROB"), + ], ] + COMMON_ACTION_GROUPING CASES = [ @@ -269,7 +365,9 @@ class RoleAssignmentTests(CasbinEnforcementTestCase): { "subject": make_user_key("user-5"), "action": make_action_key("manage"), - "scope": make_scope_key("course", "course-v1:any-org+any-course+any-course-run"), + "scope": make_scope_key( + "course", "course-v1:any-org+any-course+any-course-run" + ), "expected_result": True, }, { @@ -383,9 +481,27 @@ class WildcardScopeTests(CasbinEnforcementTestCase): POLICY = [ # Policies ["p", make_role_key("platform_admin"), make_action_key("manage"), "*", "allow"], - ["p", make_role_key("org_admin"), make_action_key("manage"), make_scope_key("org", "*"), "allow"], - ["p", make_role_key("course_admin"), make_action_key("manage"), make_scope_key("course", "*"), "allow"], - ["p", make_role_key("library_admin"), make_action_key("manage"), make_scope_key("lib", "*"), "allow"], + [ + "p", + make_role_key("org_admin"), + make_action_key("manage"), + make_scope_key("org", "*"), + "allow", + ], + [ + "p", + make_role_key("course_admin"), + make_action_key("manage"), + make_scope_key("course", "*"), + "allow", + ], + [ + "p", + make_role_key("library_admin"), + make_action_key("manage"), + make_scope_key("lib", "*"), + "allow", + ], # Role assignments ["g", make_user_key("user-1"), make_role_key("platform_admin"), "*"], ["g", make_user_key("user-2"), make_role_key("org_admin"), "*"], diff --git a/openedx_authz/tests/test_engine_utils.py b/openedx_authz/tests/test_engine_utils.py index 89d3b225..ffac6009 100644 --- a/openedx_authz/tests/test_engine_utils.py +++ b/openedx_authz/tests/test_engine_utils.py @@ -150,7 +150,10 @@ def test_migrate_action_inheritance_from_file(self): # Verify a sample of expected g2 rules from the file self.assertIn( - [make_action_key("delete_library"), make_action_key("edit_library_content")], + [ + make_action_key("delete_library"), + make_action_key("edit_library_content"), + ], target_g2, ) self.assertIn( diff --git a/openedx_authz/tests/test_filter.py b/openedx_authz/tests/test_filter.py index 666cb163..468b937b 100644 --- a/openedx_authz/tests/test_filter.py +++ b/openedx_authz/tests/test_filter.py @@ -40,7 +40,7 @@ def test_initialization_with_multiple_attributes(self): ptype=["p"], v0=[make_user_key("alice")], v1=[make_action_key("read")], - v2=[make_scope_key("org", "MIT")] + v2=[make_scope_key("org", "MIT")], ) self.assertEqual(f.ptype, ["p"]) self.assertEqual(f.v0, [make_user_key("alice")]) @@ -132,7 +132,7 @@ def test_filter_role_assignments(self): ptype=["g"], v0=[make_user_key("alice")], v1=[make_role_key("admin")], - v2=[make_scope_key("org", "MIT")] + v2=[make_scope_key("org", "MIT")], ) self.assertEqual(f.ptype, ["g"]) self.assertEqual(f.v0, [make_user_key("alice")]) @@ -165,7 +165,9 @@ def test_filter_deny_policies(self): def test_filter_wildcard_resources(self): """Test filter for wildcard resource patterns.""" - f = Filter(ptype=["p"], v2=[make_scope_key("lib", "*"), make_scope_key("course", "*")]) + f = Filter( + ptype=["p"], v2=[make_scope_key("lib", "*"), make_scope_key("course", "*")] + ) self.assertEqual(f.ptype, ["p"]) self.assertIn(make_scope_key("lib", "*"), f.v2) self.assertIn(make_scope_key("course", "*"), f.v2) From 1d995d9174128a591077d94d28a2e3f609978b2e Mon Sep 17 00:00:00 2001 From: Maria Grimaldi Date: Wed, 22 Oct 2025 17:59:45 +0200 Subject: [PATCH 2/5] refactor: run ruff with compliant config --- Makefile | 4 +- openedx_authz/api/data.py | 8 +- openedx_authz/api/permissions.py | 8 +- openedx_authz/api/roles.py | 53 ++++--------- openedx_authz/api/users.py | 24 ++---- openedx_authz/engine/adapter.py | 8 +- openedx_authz/engine/enforcer.py | 4 +- openedx_authz/engine/utils.py | 24 ++---- .../management/commands/enforcement.py | 26 ++----- .../management/commands/load_policies.py | 8 +- openedx_authz/rest_api/decorators.py | 4 +- openedx_authz/rest_api/utils.py | 21 ++--- openedx_authz/rest_api/v1/fields.py | 6 +- openedx_authz/rest_api/v1/permissions.py | 8 +- openedx_authz/rest_api/v1/serializers.py | 42 +++------- openedx_authz/rest_api/v1/views.py | 72 +++++------------- openedx_authz/tests/api/test_data.py | 24 ++---- openedx_authz/tests/api/test_roles.py | 60 ++++----------- openedx_authz/tests/api/test_users.py | 76 +++++-------------- openedx_authz/tests/rest_api/test_views.py | 48 +++--------- openedx_authz/tests/test_commands.py | 70 +++++------------ openedx_authz/tests/test_enforcement.py | 8 +- openedx_authz/tests/test_engine_utils.py | 28 ++----- openedx_authz/tests/test_filter.py | 4 +- requirements/dev.txt | 2 + requirements/quality.in | 1 + requirements/quality.txt | 2 + tests/test_models.py | 4 +- 28 files changed, 165 insertions(+), 482 deletions(-) diff --git a/Makefile b/Makefile index 540decf5..6fc15157 100644 --- a/Makefile +++ b/Makefile @@ -59,8 +59,8 @@ quality: ## check coding style with pycodestyle and pylint tox -e quality format: ## format code with black and isort - black openedx_authz tests - isort openedx_authz tests + ruff format openedx_authz tests --line-length 120 + ruff check --select I --fix --line-length 120 pii_check: ## check for PII annotations on all Django models tox -e pii_check diff --git a/openedx_authz/api/data.py b/openedx_authz/api/data.py index a4c1b359..effc6ad2 100644 --- a/openedx_authz/api/data.py +++ b/openedx_authz/api/data.py @@ -248,9 +248,7 @@ def get_subclass_by_external_key(mcs, external_key: str) -> Type["ScopeData"]: scope_subclass = mcs.scope_registry.get(namespace) if not scope_subclass: - raise ValueError( - f"Unknown scope: {namespace} for external_key: {external_key}" - ) + raise ValueError(f"Unknown scope: {namespace} for external_key: {external_key}") if not scope_subclass.validate_external_key(external_key): raise ValueError(f"Invalid external_key format: {external_key}") @@ -281,9 +279,7 @@ def validate_external_key(mcs, external_key: str) -> bool: Returns: bool: True if valid, False otherwise. """ - raise NotImplementedError( - "Subclasses must implement validate_external_key method." - ) + raise NotImplementedError("Subclasses must implement validate_external_key method.") @define diff --git a/openedx_authz/api/permissions.py b/openedx_authz/api/permissions.py index 18cb93bc..6448866a 100644 --- a/openedx_authz/api/permissions.py +++ b/openedx_authz/api/permissions.py @@ -43,9 +43,7 @@ def get_all_permissions_in_scope(scope: ScopeData) -> list[PermissionData]: list of PermissionData: A list of PermissionData objects associated with the given scope. """ enforcer = AuthzEnforcer.get_enforcer() - actions = enforcer.get_filtered_policy( - PolicyIndex.SCOPE.value, scope.namespaced_key - ) + actions = enforcer.get_filtered_policy(PolicyIndex.SCOPE.value, scope.namespaced_key) return [get_permission_from_policy(action) for action in actions] @@ -65,6 +63,4 @@ def is_subject_allowed( bool: True if the subject has the specified permission in the scope, False otherwise. """ enforcer = AuthzEnforcer.get_enforcer() - return enforcer.enforce( - subject.namespaced_key, action.namespaced_key, scope.namespaced_key - ) + return enforcer.enforce(subject.namespaced_key, action.namespaced_key, scope.namespaced_key) diff --git a/openedx_authz/api/roles.py b/openedx_authz/api/roles.py index 937a0502..15eae88d 100644 --- a/openedx_authz/api/roles.py +++ b/openedx_authz/api/roles.py @@ -78,9 +78,7 @@ def get_permissions_for_roles( permissions_by_role = {} for role in roles: - permissions_by_role[role.external_key] = { - "permissions": get_permissions_for_single_role(role) - } + permissions_by_role[role.external_key] = {"permissions": get_permissions_for_single_role(role)} return permissions_by_role @@ -122,16 +120,11 @@ def get_permissions_for_active_roles_in_scope( if role: filtered_policy = [ - policy - for policy in filtered_policy - if policy[GroupingPolicyIndex.ROLE.value] == role.namespaced_key + policy for policy in filtered_policy if policy[GroupingPolicyIndex.ROLE.value] == role.namespaced_key ] return get_permissions_for_roles( - [ - RoleData(namespaced_key=policy[GroupingPolicyIndex.ROLE.value]) - for policy in filtered_policy - ] + [RoleData(namespaced_key=policy[GroupingPolicyIndex.ROLE.value]) for policy in filtered_policy] ) @@ -162,9 +155,7 @@ def get_role_definitions_in_scope(scope: ScopeData) -> list[RoleData]: permissions_per_role[policy[PolicyIndex.ROLE.value]]["scopes"].append( ScopeData(namespaced_key=policy[PolicyIndex.SCOPE.value]) ) # TODO: I don't think this actually gets used anywhere - permissions_per_role[policy[PolicyIndex.ROLE.value]]["permissions"].append( - get_permission_from_policy(policy) - ) + permissions_per_role[policy[PolicyIndex.ROLE.value]]["permissions"].append(get_permission_from_policy(policy)) return [ RoleData( @@ -199,9 +190,7 @@ def get_all_roles_in_scope(scope: ScopeData) -> list[list[str]]: ) -def assign_role_to_subject_in_scope( - subject: SubjectData, role: RoleData, scope: ScopeData -) -> bool: +def assign_role_to_subject_in_scope(subject: SubjectData, role: RoleData, scope: ScopeData) -> bool: """Assign a role to a subject. Args: @@ -220,9 +209,7 @@ def assign_role_to_subject_in_scope( ) -def batch_assign_role_to_subjects_in_scope( - subjects: list[SubjectData], role: RoleData, scope: ScopeData -) -> None: +def batch_assign_role_to_subjects_in_scope(subjects: list[SubjectData], role: RoleData, scope: ScopeData) -> None: """Assign a role to a list of subjects. Args: @@ -233,9 +220,7 @@ def batch_assign_role_to_subjects_in_scope( assign_role_to_subject_in_scope(subject, role, scope) -def unassign_role_from_subject_in_scope( - subject: SubjectData, role: RoleData, scope: ScopeData -) -> bool: +def unassign_role_from_subject_in_scope(subject: SubjectData, role: RoleData, scope: ScopeData) -> bool: """Unassign a role from a subject. Args: @@ -252,9 +237,7 @@ def unassign_role_from_subject_in_scope( ) -def batch_unassign_role_from_subjects_in_scope( - subjects: list[SubjectData], role: RoleData, scope: ScopeData -) -> None: +def batch_unassign_role_from_subjects_in_scope(subjects: list[SubjectData], role: RoleData, scope: ScopeData) -> None: """Unassign a role from a list of subjects. Args: @@ -277,9 +260,7 @@ def get_subject_role_assignments(subject: SubjectData) -> list[RoleAssignmentDat """ enforcer = AuthzEnforcer.get_enforcer() role_assignments = [] - for policy in enforcer.get_filtered_grouping_policy( - GroupingPolicyIndex.SUBJECT.value, subject.namespaced_key - ): + for policy in enforcer.get_filtered_grouping_policy(GroupingPolicyIndex.SUBJECT.value, subject.namespaced_key): role = RoleData(namespaced_key=policy[GroupingPolicyIndex.ROLE.value]) role.permissions = get_permissions_for_single_role(role) @@ -293,9 +274,7 @@ def get_subject_role_assignments(subject: SubjectData) -> list[RoleAssignmentDat return role_assignments -def get_subject_role_assignments_in_scope( - subject: SubjectData, scope: ScopeData -) -> list[RoleAssignmentData]: +def get_subject_role_assignments_in_scope(subject: SubjectData, scope: ScopeData) -> list[RoleAssignmentData]: """Get the roles for a subject in a specific scope. Args: @@ -308,9 +287,7 @@ def get_subject_role_assignments_in_scope( enforcer = AuthzEnforcer.get_enforcer() # TODO: we still need to get the remaining data for the role like email, etc role_assignments = [] - for namespaced_key in enforcer.get_roles_for_user_in_domain( - subject.namespaced_key, scope.namespaced_key - ): + for namespaced_key in enforcer.get_roles_for_user_in_domain(subject.namespaced_key, scope.namespaced_key): role = RoleData(namespaced_key=namespaced_key) role_assignments.append( RoleAssignmentData( @@ -327,9 +304,7 @@ def get_subject_role_assignments_in_scope( return role_assignments -def get_subject_role_assignments_for_role_in_scope( - role: RoleData, scope: ScopeData -) -> list[RoleAssignmentData]: +def get_subject_role_assignments_for_role_in_scope(role: RoleData, scope: ScopeData) -> list[RoleAssignmentData]: """Get the subjects assigned to a specific role in a specific scope. Args: @@ -341,9 +316,7 @@ def get_subject_role_assignments_for_role_in_scope( """ enforcer = AuthzEnforcer.get_enforcer() role_assignments = [] - for subject in enforcer.get_users_for_role_in_domain( - role.namespaced_key, scope.namespaced_key - ): + for subject in enforcer.get_users_for_role_in_domain(role.namespaced_key, scope.namespaced_key): if subject.startswith(f"{RoleData.NAMESPACE}{RoleData.SEPARATOR}"): # Skip roles that are also subjects continue diff --git a/openedx_authz/api/users.py b/openedx_authz/api/users.py index f6caec47..62c20320 100644 --- a/openedx_authz/api/users.py +++ b/openedx_authz/api/users.py @@ -37,9 +37,7 @@ ] -def assign_role_to_user_in_scope( - user_external_key: str, role_external_key: str, scope_external_key: str -) -> bool: +def assign_role_to_user_in_scope(user_external_key: str, role_external_key: str, scope_external_key: str) -> bool: """Assign a role to a user in a specific scope. Args: @@ -57,9 +55,7 @@ def assign_role_to_user_in_scope( ) -def batch_assign_role_to_users_in_scope( - users: list[str], role_external_key: str, scope_external_key: str -): +def batch_assign_role_to_users_in_scope(users: list[str], role_external_key: str, scope_external_key: str): """Assign a role to multiple users in a specific scope. Args: @@ -75,9 +71,7 @@ def batch_assign_role_to_users_in_scope( ) -def unassign_role_from_user( - user_external_key: str, role_external_key: str, scope_external_key: str -): +def unassign_role_from_user(user_external_key: str, role_external_key: str, scope_external_key: str): """Unassign a role from a user in a specific scope. Args: @@ -95,9 +89,7 @@ def unassign_role_from_user( ) -def batch_unassign_role_from_users( - users: list[str], role_external_key: str, scope_external_key: str -): +def batch_unassign_role_from_users(users: list[str], role_external_key: str, scope_external_key: str): """Unassign a role from multiple users in a specific scope. Args: @@ -125,9 +117,7 @@ def get_user_role_assignments(user_external_key: str) -> list[RoleAssignmentData return get_subject_role_assignments(UserData(external_key=user_external_key)) -def get_user_role_assignments_in_scope( - user_external_key: str, scope_external_key: str -) -> list[RoleAssignmentData]: +def get_user_role_assignments_in_scope(user_external_key: str, scope_external_key: str) -> list[RoleAssignmentData]: """Get the roles assigned to a user in a specific scope. Args: @@ -172,9 +162,7 @@ def get_all_user_role_assignments_in_scope( Returns: list[RoleAssignmentData]: A list of user role assignments and all their metadata in the specified scope. """ - return get_all_subject_role_assignments_in_scope( - ScopeData(external_key=scope_external_key) - ) + return get_all_subject_role_assignments_in_scope(ScopeData(external_key=scope_external_key)) def is_user_allowed( diff --git a/openedx_authz/engine/adapter.py b/openedx_authz/engine/adapter.py index 0cc0de75..679735e9 100644 --- a/openedx_authz/engine/adapter.py +++ b/openedx_authz/engine/adapter.py @@ -77,7 +77,9 @@ def is_filtered(self) -> bool: return True def load_filtered_policy( - self, model: Model, filter: Filter # pylint: disable=redefined-builtin + self, + model: Model, + filter: Filter, # pylint: disable=redefined-builtin ) -> None: """ Load policy rules from storage with filtering applied. @@ -102,7 +104,9 @@ def load_filtered_policy( persist.load_policy_line(str(line), model) def filter_query( - self, queryset: QuerySet, filter: Filter # pylint: disable=redefined-builtin + self, + queryset: QuerySet, + filter: Filter, # pylint: disable=redefined-builtin ) -> QuerySet: """ Apply filter criteria to the policy queryset. diff --git a/openedx_authz/engine/enforcer.py b/openedx_authz/engine/enforcer.py index c8dc5f06..0e45ac9c 100644 --- a/openedx_authz/engine/enforcer.py +++ b/openedx_authz/engine/enforcer.py @@ -88,9 +88,7 @@ def _initialize_enforcer() -> SyncedEnforcer: # issues when the app is not fully loaded (e.g., while pulling translations, etc.). initialize_enforcer(db_alias) except Exception as e: - logger.error( - f"Failed to initialize Casbin enforcer with DB alias '{db_alias}': {e}" - ) + logger.error(f"Failed to initialize Casbin enforcer with DB alias '{db_alias}': {e}") raise adapter = ExtendedAdapter() diff --git a/openedx_authz/engine/utils.py b/openedx_authz/engine/utils.py index f86a6ac3..b6d4284d 100644 --- a/openedx_authz/engine/utils.py +++ b/openedx_authz/engine/utils.py @@ -31,9 +31,7 @@ def migrate_policy_between_enforcers( # Load target enforcer policies to check for duplicates target_enforcer.load_policy() - logger.info( - f"Target enforcer has {len(target_enforcer.get_policy())} existing policies before migration." - ) + logger.info(f"Target enforcer has {len(target_enforcer.get_policy())} existing policies before migration.") # TODO: this operations use the enforcer directly, which may not be ideal # since we have to load the policy after each addition to avoid duplicates. @@ -53,31 +51,21 @@ def migrate_policy_between_enforcers( for grouping_policy_ptype in GROUPING_POLICY_PTYPES: try: - grouping_policies = source_enforcer.get_named_grouping_policy( - grouping_policy_ptype - ) + grouping_policies = source_enforcer.get_named_grouping_policy(grouping_policy_ptype) for grouping in grouping_policies: - if target_enforcer.has_named_grouping_policy( - grouping_policy_ptype, *grouping - ): + if target_enforcer.has_named_grouping_policy(grouping_policy_ptype, *grouping): logger.info( f"Grouping policy {grouping_policy_ptype}, {grouping} already exists in target, skipping." ) continue - target_enforcer.add_named_grouping_policy( - grouping_policy_ptype, *grouping - ) + target_enforcer.add_named_grouping_policy(grouping_policy_ptype, *grouping) # Ensure latest policies are loaded in the target enforcer after each addition # to avoid duplicates target_enforcer.load_policy() except KeyError as e: - logger.info( - f"Skipping {grouping_policy_ptype} policies: {e} not found in source enforcer." - ) - logger.info( - f"Successfully loaded policies from {source_enforcer.get_model()} into the database." - ) + logger.info(f"Skipping {grouping_policy_ptype} policies: {e} not found in source enforcer.") + logger.info(f"Successfully loaded policies from {source_enforcer.get_model()} into the database.") except Exception as e: logger.error(f"Error loading policies from file: {e}") raise diff --git a/openedx_authz/management/commands/enforcement.py b/openedx_authz/management/commands/enforcement.py index 31f7f902..55dd4717 100644 --- a/openedx_authz/management/commands/enforcement.py +++ b/openedx_authz/management/commands/enforcement.py @@ -78,9 +78,7 @@ def handle(self, *args, **options): Raises: CommandError: If model or policy files are not found or enforcer creation fails. """ - model_file_path = ( - self._get_file_path("model.conf") or options["model_file_path"] - ) + model_file_path = self._get_file_path("model.conf") or options["model_file_path"] policy_file_path = options["policy_file_path"] if not os.path.isfile(model_file_path): @@ -95,9 +93,7 @@ def handle(self, *args, **options): try: enforcer = casbin.Enforcer(model_file_path, policy_file_path) - self.stdout.write( - self.style.SUCCESS("Casbin enforcer created successfully") - ) + self.stdout.write(self.style.SUCCESS("Casbin enforcer created successfully")) policies = enforcer.get_policy() roles = enforcer.get_grouping_policy() @@ -160,9 +156,7 @@ def _run_interactive_mode(self, enforcer: casbin.Enforcer) -> None: self.stdout.write(self.style.ERROR("Exiting interactive mode...")) break - def _test_interactive_request( - self, enforcer: casbin.Enforcer, user_input: str - ) -> None: + def _test_interactive_request(self, enforcer: casbin.Enforcer, user_input: str) -> None: """Process and test a single enforcement request from user input. Parses the input string, validates the format, executes the enforcement @@ -180,11 +174,7 @@ def _test_interactive_request( try: parts = [part.strip() for part in user_input.split()] if len(parts) != 3: - self.stdout.write( - self.style.ERROR( - f"✗ Invalid format. Expected 3 parts, got {len(parts)}" - ) - ) + self.stdout.write(self.style.ERROR(f"✗ Invalid format. Expected 3 parts, got {len(parts)}")) self.stdout.write("Format: subject action scope") self.stdout.write("Example: user^alice act^read org^OpenedX") return @@ -193,13 +183,9 @@ def _test_interactive_request( result = enforcer.enforce(subject, action, scope) if result: - self.stdout.write( - self.style.SUCCESS(f"✓ ALLOWED: {subject} {action} {scope}") - ) + self.stdout.write(self.style.SUCCESS(f"✓ ALLOWED: {subject} {action} {scope}")) else: - self.stdout.write( - self.style.ERROR(f"✗ DENIED: {subject} {action} {scope}") - ) + self.stdout.write(self.style.ERROR(f"✗ DENIED: {subject} {action} {scope}")) except (ValueError, IndexError, TypeError) as e: self.stdout.write(self.style.ERROR(f"✗ Error processing request: {str(e)}")) diff --git a/openedx_authz/management/commands/load_policies.py b/openedx_authz/management/commands/load_policies.py index 4a6cbf06..9b2d2232 100644 --- a/openedx_authz/management/commands/load_policies.py +++ b/openedx_authz/management/commands/load_policies.py @@ -74,13 +74,9 @@ def handle(self, *args, **options): options["model_file_path"], ) if policy_file_path is None: - policy_file_path = os.path.join( - ROOT_DIRECTORY, "engine", "config", "authz.policy" - ) + policy_file_path = os.path.join(ROOT_DIRECTORY, "engine", "config", "authz.policy") if model_file_path is None: - model_file_path = os.path.join( - ROOT_DIRECTORY, "engine", "config", "model.conf" - ) + model_file_path = os.path.join(ROOT_DIRECTORY, "engine", "config", "model.conf") target_enforcer = AuthzEnforcer.get_enforcer() diff --git a/openedx_authz/rest_api/decorators.py b/openedx_authz/rest_api/decorators.py index c5fd0ee0..1aead964 100644 --- a/openedx_authz/rest_api/decorators.py +++ b/openedx_authz/rest_api/decorators.py @@ -39,9 +39,7 @@ def _decorator(func_or_class): SessionAuthenticationAllowInactiveUser, ] if is_authenticated: - func_or_class.permission_classes = [IsAuthenticated] + getattr( - func_or_class, "permission_classes", [] - ) + func_or_class.permission_classes = [IsAuthenticated] + getattr(func_or_class, "permission_classes", []) return func_or_class return _decorator diff --git a/openedx_authz/rest_api/utils.py b/openedx_authz/rest_api/utils.py index 240a001d..4af6aaa1 100644 --- a/openedx_authz/rest_api/utils.py +++ b/openedx_authz/rest_api/utils.py @@ -28,9 +28,7 @@ def get_generic_scope(scope: ScopeData) -> ScopeData: >>> get_generic_scope(scope) ScopeData(namespaced_key="lib^*") """ - return ScopeData( - namespaced_key=f"{scope.NAMESPACE}{ScopeData.SEPARATOR}{GENERIC_SCOPE_WILDCARD}" - ) + return ScopeData(namespaced_key=f"{scope.NAMESPACE}{ScopeData.SEPARATOR}{GENERIC_SCOPE_WILDCARD}") def get_user_map(usernames: list[str]) -> dict[str, User]: @@ -94,14 +92,10 @@ def sort_users( list[dict]: The sorted users. """ if sort_by not in SortField.values(): - raise ValueError( - f"Invalid field: '{sort_by}'. Must be one of {SortField.values()}" - ) + raise ValueError(f"Invalid field: '{sort_by}'. Must be one of {SortField.values()}") if order not in SortOrder.values(): - raise ValueError( - f"Invalid order: '{order}'. Must be one of {SortOrder.values()}" - ) + raise ValueError(f"Invalid order: '{order}'. Must be one of {SortOrder.values()}") sorted_users = sorted( users, @@ -111,9 +105,7 @@ def sort_users( return sorted_users -def filter_users( - users: list[dict], search: str | None, roles: list[str] | None -) -> list[dict]: +def filter_users(users: list[dict], search: str | None, roles: list[str] | None) -> list[dict]: """ Filter users by a case-insensitive search string and/or by roles. @@ -131,10 +123,7 @@ def filter_users( filtered_users = [] for user in users: if search: - matches_search = any( - search in (user.get(field) or "").lower() - for field in SearchField.values() - ) + matches_search = any(search in (user.get(field) or "").lower() for field in SearchField.values()) if not matches_search: continue diff --git a/openedx_authz/rest_api/v1/fields.py b/openedx_authz/rest_api/v1/fields.py index 5014960b..89800788 100644 --- a/openedx_authz/rest_api/v1/fields.py +++ b/openedx_authz/rest_api/v1/fields.py @@ -8,11 +8,7 @@ class CommaSeparatedListField(serializers.CharField): def to_internal_value(self, data): """Convert string separated by commas to list of unique items preserving order""" - return list( - dict.fromkeys( - item.strip().lower() for item in data.split(",") if item.strip() - ) - ) + return list(dict.fromkeys(item.strip().lower() for item in data.split(",") if item.strip())) def to_representation(self, value): """Convert list to string separated by commas""" diff --git a/openedx_authz/rest_api/v1/permissions.py b/openedx_authz/rest_api/v1/permissions.py index 0716c2c3..9fb0ad33 100644 --- a/openedx_authz/rest_api/v1/permissions.py +++ b/openedx_authz/rest_api/v1/permissions.py @@ -198,9 +198,7 @@ def has_object_permission(self, request, view, obj) -> bool: """ if request.user.is_superuser or request.user.is_staff: return True - return self._get_permission_instance(request).has_object_permission( - request, view, obj - ) + return self._get_permission_instance(request).has_object_permission(request, view, obj) class MethodPermissionMixin: @@ -241,9 +239,7 @@ def get_required_permissions(self, request, view) -> list[str]: return handler.required_permissions return [] - def validate_permissions( - self, request, permissions: list[str], scope_value: str - ) -> bool: + def validate_permissions(self, request, permissions: list[str], scope_value: str) -> bool: """Validate that the user has all required permissions for the scope. Args: diff --git a/openedx_authz/rest_api/v1/serializers.py b/openedx_authz/rest_api/v1/serializers.py index 3be8c31d..5c1d3e88 100644 --- a/openedx_authz/rest_api/v1/serializers.py +++ b/openedx_authz/rest_api/v1/serializers.py @@ -29,23 +29,17 @@ class ActionMixin(serializers.Serializer): # pylint: disable=abstract-method action = serializers.CharField(max_length=255) -class PermissionValidationSerializer( - ActionMixin, ScopeMixin -): # pylint: disable=abstract-method +class PermissionValidationSerializer(ActionMixin, ScopeMixin): # pylint: disable=abstract-method """Serializer for permission validation request.""" -class PermissionValidationResponseSerializer( - PermissionValidationSerializer -): # pylint: disable=abstract-method +class PermissionValidationResponseSerializer(PermissionValidationSerializer): # pylint: disable=abstract-method """Serializer for permission validation response.""" allowed = serializers.BooleanField() -class RoleScopeValidationMixin( - serializers.Serializer -): # pylint: disable=abstract-method +class RoleScopeValidationMixin(serializers.Serializer): # pylint: disable=abstract-method """Mixin providing role and scope validation logic.""" def validate(self, attrs) -> dict: @@ -83,9 +77,7 @@ def validate(self, attrs) -> dict: role_definitions = api.get_role_definitions_in_scope(generic_scope) if role not in role_definitions: - raise serializers.ValidationError( - f"Role '{role_value}' does not exist in scope '{scope_value}'" - ) + raise serializers.ValidationError(f"Role '{role_value}' does not exist in scope '{scope_value}'") return validated_data @@ -97,9 +89,7 @@ class AddUsersToRoleWithScopeSerializer( ): # pylint: disable=abstract-method """Serializer for adding users to a role with a scope.""" - users = serializers.ListField( - child=serializers.CharField(max_length=255), allow_empty=False - ) + users = serializers.ListField(child=serializers.CharField(max_length=255), allow_empty=False) def validate_users(self, value) -> list[str]: """Eliminate duplicates preserving order""" @@ -133,9 +123,7 @@ class ListUsersInRoleWithScopeSerializer(ScopeMixin): # pylint: disable=abstrac search = LowercaseCharField(required=False, default=None) -class ListRolesWithScopeSerializer( - serializers.Serializer -): # pylint: disable=abstract-method +class ListRolesWithScopeSerializer(serializers.Serializer): # pylint: disable=abstract-method """Serializer for listing roles within a scope.""" scope = serializers.CharField(max_length=255) @@ -165,9 +153,7 @@ def validate_scope(self, value: str) -> api.ScopeData: raise serializers.ValidationError(exc) from exc -class ListUsersInRoleWithScopeResponseSerializer( - serializers.Serializer -): # pylint: disable=abstract-method +class ListUsersInRoleWithScopeResponseSerializer(serializers.Serializer): # pylint: disable=abstract-method """Serializer for listing users in a role with a scope response.""" username = serializers.CharField(max_length=255) @@ -175,9 +161,7 @@ class ListUsersInRoleWithScopeResponseSerializer( email = serializers.EmailField() -class ListRolesWithScopeResponseSerializer( - serializers.Serializer -): # pylint: disable=abstract-method +class ListRolesWithScopeResponseSerializer(serializers.Serializer): # pylint: disable=abstract-method """Serializer for listing roles with a scope response.""" role = serializers.CharField(max_length=255) @@ -185,9 +169,7 @@ class ListRolesWithScopeResponseSerializer( user_count = serializers.IntegerField() -class UserRoleAssignmentSerializer( - serializers.Serializer -): # pylint: disable=abstract-method +class UserRoleAssignmentSerializer(serializers.Serializer): # pylint: disable=abstract-method """Serializer for a user role assignment.""" username = serializers.SerializerMethodField() @@ -211,11 +193,7 @@ def get_username(self, obj: api.RoleAssignmentData) -> str: def get_full_name(self, obj) -> str: """Get the full name for the given role assignment.""" user = self._get_user(obj) - return ( - getattr(user.profile, "name", "") - if user and hasattr(user, "profile") - else "" - ) + return getattr(user.profile, "name", "") if user and hasattr(user, "profile") else "" def get_email(self, obj) -> str: """Get the email for the given role assignment.""" diff --git a/openedx_authz/rest_api/v1/views.py b/openedx_authz/rest_api/v1/views.py index 083609ed..b21180da 100644 --- a/openedx_authz/rest_api/v1/views.py +++ b/openedx_authz/rest_api/v1/views.py @@ -92,9 +92,7 @@ class PermissionValidationMeView(APIView): """ @apidocs.schema( - body=PermissionValidationSerializer( - help_text="The permissions to validate", many=True - ), + body=PermissionValidationSerializer(help_text="The permissions to validate", many=True), responses={ status.HTTP_200_OK: PermissionValidationResponseSerializer, status.HTTP_400_BAD_REQUEST: "The request data is invalid", @@ -238,21 +236,11 @@ class RoleUserAPIView(APIView): @apidocs.schema( parameters=[ - apidocs.query_parameter( - "scope", str, description="The authorization scope for the role" - ), - apidocs.query_parameter( - "search", str, description="The search query to filter users by" - ), - apidocs.query_parameter( - "roles", str, description="The names of the roles to query" - ), - apidocs.query_parameter( - "page", int, description="Page number for pagination" - ), - apidocs.query_parameter( - "page_size", int, description="Number of items per page" - ), + apidocs.query_parameter("scope", str, description="The authorization scope for the role"), + apidocs.query_parameter("search", str, description="The search query to filter users by"), + apidocs.query_parameter("roles", str, description="The names of the roles to query"), + apidocs.query_parameter("page", int, description="Page number for pagination"), + apidocs.query_parameter("page_size", int, description="Number of items per page"), apidocs.query_parameter("sort_by", str, description="The field to sort by"), apidocs.query_parameter("order", str, description="The order to sort by"), ], @@ -269,28 +257,16 @@ def get(self, request: HttpRequest) -> Response: serializer.is_valid(raise_exception=True) query_params = serializer.validated_data - user_role_assignments = api.get_all_user_role_assignments_in_scope( - query_params["scope"] - ) - usernames = { - assignment.subject.username for assignment in user_role_assignments - } + user_role_assignments = api.get_all_user_role_assignments_in_scope(query_params["scope"]) + usernames = {assignment.subject.username for assignment in user_role_assignments} context = {"user_map": get_user_map(usernames)} - serialized_data = UserRoleAssignmentSerializer( - user_role_assignments, many=True, context=context - ) + serialized_data = UserRoleAssignmentSerializer(user_role_assignments, many=True, context=context) - filtered_users = filter_users( - serialized_data.data, query_params["search"], query_params["roles"] - ) - user_role_assignments = sort_users( - filtered_users, query_params["sort_by"], query_params["order"] - ) + filtered_users = filter_users(serialized_data.data, query_params["search"], query_params["roles"]) + user_role_assignments = sort_users(filtered_users, query_params["sort_by"], query_params["order"]) paginator = self.pagination_class() - paginated_response_data = paginator.paginate_queryset( - user_role_assignments, request - ) + paginated_response_data = paginator.paginate_queryset(user_role_assignments, request) return paginator.get_paginated_response(paginated_response_data) @apidocs.schema( @@ -339,12 +315,8 @@ def put(self, request: HttpRequest) -> Response: str, description="List of user identifiers (username or email) separated by a comma", ), - apidocs.query_parameter( - "role", str, description="The role to remove the users from" - ), - apidocs.query_parameter( - "scope", str, description="The scope to remove the users from" - ), + apidocs.query_parameter("role", str, description="The role to remove the users from"), + apidocs.query_parameter("scope", str, description="The scope to remove the users from"), ], responses={ status.HTTP_207_MULTI_STATUS: "The users were removed from the role", @@ -445,15 +417,9 @@ class RoleListView(APIView): @apidocs.schema( parameters=[ - apidocs.query_parameter( - "scope", str, description="The scope to query roles for" - ), - apidocs.query_parameter( - "page", int, description="Page number for pagination" - ), - apidocs.query_parameter( - "page_size", int, description="Number of items per page" - ), + apidocs.query_parameter("scope", str, description="The scope to query roles for"), + apidocs.query_parameter("page", int, description="Page number for pagination"), + apidocs.query_parameter("page_size", int, description="Number of items per page"), ], responses={ status.HTTP_200_OK: ListRolesWithScopeResponseSerializer(many=True), @@ -482,7 +448,5 @@ def get(self, request: HttpRequest) -> Response: paginator = self.pagination_class() paginated_response_data = paginator.paginate_queryset(response_data, request) - serialized_data = ListRolesWithScopeResponseSerializer( - paginated_response_data, many=True - ) + serialized_data = ListRolesWithScopeResponseSerializer(paginated_response_data, many=True) return paginator.get_paginated_response(serialized_data.data) diff --git a/openedx_authz/tests/api/test_data.py b/openedx_authz/tests/api/test_data.py index 7b07ce2d..8a9ca928 100644 --- a/openedx_authz/tests/api/test_data.py +++ b/openedx_authz/tests/api/test_data.py @@ -203,9 +203,7 @@ def test_content_library_data_with_external_key(self, external_key): """ library = ContentLibraryData(external_key=external_key) - expected_namespaced_key = ( - f"{library.NAMESPACE}{library.SEPARATOR}{external_key}" - ) + expected_namespaced_key = f"{library.NAMESPACE}{library.SEPARATOR}{external_key}" self.assertIsInstance(library, ContentLibraryData) self.assertEqual(library.external_key, external_key) @@ -233,9 +231,7 @@ def test_scope_data_registration(self): ("sc^generic_scope", ScopeData), ) @unpack - def test_dynamic_instantiation_via_namespaced_key( - self, namespaced_key, expected_class - ): + def test_dynamic_instantiation_via_namespaced_key(self, namespaced_key, expected_class): """Test that ScopeData dynamically instantiates the correct subclass. Expected Result: @@ -321,9 +317,7 @@ def test_base_scope_data_with_external_key(self): """ scope = ScopeData(external_key="sc:generic_scope") - expected_namespaced = ( - f"{ScopeData.NAMESPACE}{ScopeData.SEPARATOR}sc:generic_scope" - ) + expected_namespaced = f"{ScopeData.NAMESPACE}{ScopeData.SEPARATOR}sc:generic_scope" self.assertIsInstance(scope, ScopeData) self.assertEqual(scope.external_key, "sc:generic_scope") @@ -420,9 +414,7 @@ def test_scope_data_str_and_repr(self, external_key, expected_str, expected_repr ("course_staff", "Course Staff", "role^course_staff"), ) @unpack - def test_role_data_str_without_permissions( - self, external_key, expected_name, expected_repr - ): + def test_role_data_str_without_permissions(self, external_key, expected_name, expected_repr): """Test RoleData __str__ and __repr__ methods without permissions. Expected Result: @@ -448,9 +440,7 @@ def test_role_data_str_with_permissions(self): action2 = ActionData(external_key="write") permission1 = PermissionData(action=action1, effect="allow") permission2 = PermissionData(action=action2, effect="deny") - role = RoleData( - external_key="instructor", permissions=[permission1, permission2] - ) + role = RoleData(external_key="instructor", permissions=[permission1, permission2]) actual_str = str(role) @@ -468,9 +458,7 @@ def test_role_data_str_with_permissions(self): ), ) @unpack - def test_permission_data_str_and_repr( - self, action_key, effect, expected_str, expected_repr - ): + def test_permission_data_str_and_repr(self, action_key, effect, expected_str, expected_repr): """Test PermissionData __str__ and __repr__ methods. Expected Result: diff --git a/openedx_authz/tests/api/test_roles.py b/openedx_authz/tests/api/test_roles.py index 391d47fd..ffac5f9c 100644 --- a/openedx_authz/tests/api/test_roles.py +++ b/openedx_authz/tests/api/test_roles.py @@ -297,9 +297,7 @@ def test_get_permissions_for_roles(self, role_name, expected_permissions): - Permissions are correctly retrieved for the given roles and scope. - The permissions match the expected permissions. """ - assigned_permissions = get_permissions_for_single_role( - RoleData(external_key=role_name) - ) + assigned_permissions = get_permissions_for_single_role(RoleData(external_key=role_name)) self.assertEqual(assigned_permissions, expected_permissions) @@ -324,9 +322,7 @@ def test_get_permissions_for_roles(self, role_name, expected_permissions): ), ) @unpack - def test_get_permissions_for_active_role_in_specific_scope( - self, role_name, scope_name, expected_permissions - ): + def test_get_permissions_for_active_role_in_specific_scope(self, role_name, scope_name, expected_permissions): """Test retrieving permissions for a specific role after role assignments. Expected result: @@ -400,9 +396,7 @@ def test_get_roles_in_scope(self, scope_name, expected_roles): ("non_existent_user", "lib:Org999:non_existent_scope", set()), ) @unpack - def test_get_subject_role_assignments_in_scope( - self, subject_name, scope_name, expected_roles - ): + def test_get_subject_role_assignments_in_scope(self, subject_name, scope_name, expected_roles): """Test retrieving roles assigned to a subject in a specific scope. Expected result: @@ -412,9 +406,7 @@ def test_get_subject_role_assignments_in_scope( SubjectData(external_key=subject_name), ScopeData(external_key=scope_name) ) - role_names = { - r.external_key for assignment in role_assignments for r in assignment.roles - } + role_names = {r.external_key for assignment in role_assignments for r in assignment.roles} self.assertEqual(role_names, expected_roles) @ddt_data( @@ -463,19 +455,13 @@ def test_get_all_role_assignments_scopes(self, subject_name, expected_roles): - All roles assigned to the subject across all scopes are correctly retrieved. - Each role includes its associated permissions. """ - role_assignments = get_subject_role_assignments( - SubjectData(external_key=subject_name) - ) + role_assignments = get_subject_role_assignments(SubjectData(external_key=subject_name)) self.assertEqual(len(role_assignments), len(expected_roles)) for expected_role in expected_roles: # Compare the role part of the assignment - found = any( - expected_role in assignment.roles for assignment in role_assignments - ) - self.assertTrue( - found, f"Expected role {expected_role} not found in assignments" - ) + found = any(expected_role in assignment.roles for assignment in role_assignments) + self.assertTrue(found, f"Expected role {expected_role} not found in assignments") @ddt_data( ("library_admin", "lib:Org1:math_101", 1), @@ -550,9 +536,7 @@ class TestRoleAssignmentAPI(RolesTestSetupMixin): ("oliver", "library_admin", "lib:Org1:math_101", False), ) @unpack - def test_batch_assign_role_to_subjects_in_scope( - self, subject_names, role, scope_name, batch - ): + def test_batch_assign_role_to_subjects_in_scope(self, subject_names, role, scope_name, batch): """Test assigning a role to a single or multiple subjects in a specific scope. Expected result: @@ -574,11 +558,7 @@ def test_batch_assign_role_to_subjects_in_scope( SubjectData(external_key=subject_name), ScopeData(external_key=scope_name), ) - role_names = { - r.external_key - for assignment in user_roles - for r in assignment.roles - } + role_names = {r.external_key for assignment in user_roles for r in assignment.roles} self.assertIn(role, role_names) else: assign_role_to_subject_in_scope( @@ -590,9 +570,7 @@ def test_batch_assign_role_to_subjects_in_scope( SubjectData(external_key=subject_names), ScopeData(external_key=scope_name), ) - role_names = { - r.external_key for assignment in user_roles for r in assignment.roles - } + role_names = {r.external_key for assignment in user_roles for r in assignment.roles} self.assertIn(role, role_names) @ddt_data( @@ -616,9 +594,7 @@ def test_batch_assign_role_to_subjects_in_scope( ("oliver", "library_admin", "lib:Org1:math_101", False), ) @unpack - def test_unassign_role_from_subject_in_scope( - self, subject_names, role, scope_name, batch - ): + def test_unassign_role_from_subject_in_scope(self, subject_names, role, scope_name, batch): """Test unassigning a role from a subject or multiple subjects in a specific scope. Expected result: @@ -637,11 +613,7 @@ def test_unassign_role_from_subject_in_scope( SubjectData(external_key=subject), ScopeData(external_key=scope_name), ) - role_names = { - r.external_key - for assignment in user_roles - for r in assignment.roles - } + role_names = {r.external_key for assignment in user_roles for r in assignment.roles} self.assertNotIn(role, role_names) else: unassign_role_from_subject_in_scope( @@ -653,9 +625,7 @@ def test_unassign_role_from_subject_in_scope( SubjectData(external_key=subject_names), ScopeData(external_key=scope_name), ) - role_names = { - r.external_key for assignment in user_roles for r in assignment.roles - } + role_names = {r.external_key for assignment in user_roles for r in assignment.roles} self.assertNotIn(role, role_names) @ddt_data( @@ -729,9 +699,7 @@ def test_get_all_role_assignments_in_scope(self, scope_name, expected_assignment - All role assignments in the specified scope are correctly retrieved. - Each assignment includes the subject, role, and scope information with permissions. """ - role_assignments = get_all_subject_role_assignments_in_scope( - ScopeData(external_key=scope_name) - ) + role_assignments = get_all_subject_role_assignments_in_scope(ScopeData(external_key=scope_name)) self.assertEqual(len(role_assignments), len(expected_assignments)) for assignment in role_assignments: diff --git a/openedx_authz/tests/api/test_users.py b/openedx_authz/tests/api/test_users.py index e47cb07e..7f11fc1c 100644 --- a/openedx_authz/tests/api/test_users.py +++ b/openedx_authz/tests/api/test_users.py @@ -64,18 +64,10 @@ def test_assign_role_to_user_in_scope(self, username, role, scope_name, batch): - The role is successfully assigned to the user in the specified scope. """ if batch: - batch_assign_role_to_users_in_scope( - users=username, role_external_key=role, scope_external_key=scope_name - ) + batch_assign_role_to_users_in_scope(users=username, role_external_key=role, scope_external_key=scope_name) for user in username: - user_roles = get_user_role_assignments_in_scope( - user_external_key=user, scope_external_key=scope_name - ) - role_names = { - r.external_key - for assignment in user_roles - for r in assignment.roles - } + user_roles = get_user_role_assignments_in_scope(user_external_key=user, scope_external_key=scope_name) + role_names = {r.external_key for assignment in user_roles for r in assignment.roles} self.assertIn(role, role_names) else: assign_role_to_user_in_scope( @@ -83,12 +75,8 @@ def test_assign_role_to_user_in_scope(self, username, role, scope_name, batch): role_external_key=role, scope_external_key=scope_name, ) - user_roles = get_user_role_assignments_in_scope( - user_external_key=username, scope_external_key=scope_name - ) - role_names = { - r.external_key for assignment in user_roles for r in assignment.roles - } + user_roles = get_user_role_assignments_in_scope(user_external_key=username, scope_external_key=scope_name) + role_names = {r.external_key for assignment in user_roles for r in assignment.roles} self.assertIn(role, role_names) @data( @@ -106,18 +94,10 @@ def test_unassign_role_from_user(self, username, role, scope_name, batch): - The user no longer has the role in the specified scope. """ if batch: - batch_unassign_role_from_users( - users=username, role_external_key=role, scope_external_key=scope_name - ) + batch_unassign_role_from_users(users=username, role_external_key=role, scope_external_key=scope_name) for user in username: - user_roles = get_user_role_assignments_in_scope( - user_external_key=user, scope_external_key=scope_name - ) - role_names = { - r.external_key - for assignment in user_roles - for r in assignment.roles - } + user_roles = get_user_role_assignments_in_scope(user_external_key=user, scope_external_key=scope_name) + role_names = {r.external_key for assignment in user_roles for r in assignment.roles} self.assertNotIn(role, role_names) else: unassign_role_from_user( @@ -125,12 +105,8 @@ def test_unassign_role_from_user(self, username, role, scope_name, batch): role_external_key=role, scope_external_key=scope_name, ) - user_roles = get_user_role_assignments_in_scope( - user_external_key=username, scope_external_key=scope_name - ) - role_names = { - r.external_key for assignment in user_roles for r in assignment.roles - } + user_roles = get_user_role_assignments_in_scope(user_external_key=username, scope_external_key=scope_name) + role_names = {r.external_key for assignment in user_roles for r in assignment.roles} self.assertNotIn(role, role_names) @data( @@ -148,9 +124,7 @@ def test_get_user_role_assignments(self, username, expected_roles): """ role_assignments = get_user_role_assignments(user_external_key=username) - assigned_role_names = { - r.external_key for assignment in role_assignments for r in assignment.roles - } + assigned_role_names = {r.external_key for assignment in role_assignments for r in assignment.roles} self.assertEqual(assigned_role_names, expected_roles) @data( @@ -160,22 +134,16 @@ def test_get_user_role_assignments(self, username, expected_roles): ("grace", "lib:Org1:math_advanced", {"library_contributor"}), ) @unpack - def test_get_user_role_assignments_in_scope( - self, username, scope_name, expected_roles - ): + def test_get_user_role_assignments_in_scope(self, username, scope_name, expected_roles): """Test retrieving role assignments for a user within a specific scope. Expected result: - The role assigned to the user in the specified scope is correctly retrieved. - The returned role assignments contain the assigned role. """ - user_roles = get_user_role_assignments_in_scope( - user_external_key=username, scope_external_key=scope_name - ) + user_roles = get_user_role_assignments_in_scope(user_external_key=username, scope_external_key=scope_name) - role_names = { - r.external_key for assignment in user_roles for r in assignment.roles - } + role_names = {r.external_key for assignment in user_roles for r in assignment.roles} self.assertEqual(role_names, expected_roles) @data( @@ -184,9 +152,7 @@ def test_get_user_role_assignments_in_scope( ("library_contributor", "lib:Org1:math_advanced", {"grace", "heidi"}), ) @unpack - def test_get_user_role_assignments_for_role_in_scope( - self, role_name, scope_name, expected_users - ): + def test_get_user_role_assignments_for_role_in_scope(self, role_name, scope_name, expected_users): """Test retrieving all users assigned to a specific role within a specific scope. Expected result: @@ -197,9 +163,7 @@ def test_get_user_role_assignments_for_role_in_scope( role_external_key=role_name, scope_external_key=scope_name ) - assigned_usernames = { - assignment.subject.username for assignment in user_assignments - } + assigned_usernames = {assignment.subject.username for assignment in user_assignments} self.assertEqual(assigned_usernames, expected_users) @@ -251,18 +215,14 @@ def test_get_user_role_assignments_for_role_in_scope( ), ) @unpack - def test_get_all_user_role_assignments_in_scope( - self, scope_name, expected_assignments - ): + def test_get_all_user_role_assignments_in_scope(self, scope_name, expected_assignments): """Test retrieving all user role assignments within a specific scope. Expected result: - All user role assignments in the specified scope are correctly retrieved. - Each assignment includes the subject, role, and scope information. """ - role_assignments = get_all_user_role_assignments_in_scope( - scope_external_key=scope_name - ) + role_assignments = get_all_user_role_assignments_in_scope(scope_external_key=scope_name) self.assertEqual(len(role_assignments), len(expected_assignments)) for assignment in role_assignments: diff --git a/openedx_authz/tests/rest_api/test_views.py b/openedx_authz/tests/rest_api/test_views.py index d6b44c1b..390a4ffc 100644 --- a/openedx_authz/tests/rest_api/test_views.py +++ b/openedx_authz/tests/rest_api/test_views.py @@ -129,17 +129,13 @@ def setUpClass(cls): def create_regular_users(cls, quantity: int): """Create regular users.""" for i in range(1, quantity + 1): - User.objects.create_user( - username=f"regular_{i}", email=f"regular_{i}@example.com" - ) + User.objects.create_user(username=f"regular_{i}", email=f"regular_{i}@example.com") @classmethod def create_admin_users(cls, quantity: int): """Create admin users.""" for i in range(1, quantity + 1): - User.objects.create_superuser( - username=f"admin_{i}", email=f"admin_{i}@example.com" - ) + User.objects.create_superuser(username=f"admin_{i}", email=f"admin_{i}@example.com") @classmethod def setUpTestData(cls): @@ -184,9 +180,7 @@ def setUp(self): ), ) @unpack - def test_permission_validation_success( - self, request_data: list[dict], permission_map: list[bool] - ): + def test_permission_validation_success(self, request_data: list[dict], permission_map: list[bool]): """Test successful permission validation requests. Expected result: @@ -252,9 +246,7 @@ def test_permission_validation_unauthenticated(self): scope = "lib:Org1:LIB1" self.client.force_authenticate(user=None) - response = self.client.post( - self.url, data=[{"action": action, "scope": scope}], format="json" - ) + response = self.client.post(self.url, data=[{"action": action, "scope": scope}], format="json") self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) @@ -267,9 +259,7 @@ def test_permission_validation_unauthenticated(self): (ValueError(), status.HTTP_400_BAD_REQUEST, "Invalid scope format"), ) @unpack - def test_permission_validation_exception_handling( - self, exception: Exception, status_code: int, message: str - ): + def test_permission_validation_exception_handling(self, exception: Exception, status_code: int, message: str): """Test permission validation exception handling for different error types. Expected result: @@ -437,9 +427,7 @@ def test_get_users_by_scope_permissions(self, username: str, status_code: int): ), ) @unpack - def test_add_users_to_role_success( - self, users: list[str], expected_completed: int, expected_errors: int - ): + def test_add_users_to_role_success(self, users: list[str], expected_completed: int, expected_errors: int): """Test adding users to a role within a scope. Expected result: @@ -467,9 +455,7 @@ def test_add_users_to_role_success( (["admin_2", "regular_3", "regular_4"], 0, 3), ) @unpack - def test_add_users_to_role_already_has_role( - self, users: list[str], expected_completed: int, expected_errors: int - ): + def test_add_users_to_role_already_has_role(self, users: list[str], expected_completed: int, expected_errors: int): """Test adding users to a role that already has the role.""" role = "library_user" scope = "lib:Org2:LIB2" @@ -483,9 +469,7 @@ def test_add_users_to_role_already_has_role( self.assertEqual(len(response.data["errors"]), expected_errors) @patch.object(api, "assign_role_to_user_in_scope") - def test_add_users_to_role_exception_handling( - self, mock_assign_role_to_user_in_scope - ): + def test_add_users_to_role_exception_handling(self, mock_assign_role_to_user_in_scope): """Test adding users to a role with exception handling.""" request_data = { "role": "library_admin", @@ -594,9 +578,7 @@ def test_add_users_to_role_permissions(self, username: str, status_code: int): ), ) @unpack - def test_remove_users_from_role_success( - self, users: list[str], expected_completed: int, expected_errors: int - ): + def test_remove_users_from_role_success(self, users: list[str], expected_completed: int, expected_errors: int): """Test removing users from a role within a scope. Expected result: @@ -617,9 +599,7 @@ def test_remove_users_from_role_success( self.assertEqual(len(response.data["errors"]), expected_errors) @patch.object(api, "unassign_role_from_user") - def test_remove_users_from_role_exception_handling( - self, mock_unassign_role_from_user - ): + def test_remove_users_from_role_exception_handling(self, mock_unassign_role_from_user): """Test removing users from a role with exception handling.""" query_params = { "role": "library_admin", @@ -633,9 +613,7 @@ def test_remove_users_from_role_exception_handling( self.assertEqual(response.status_code, status.HTTP_207_MULTI_STATUS) self.assertEqual(len(response.data["completed"]), 1) self.assertEqual(len(response.data["errors"]), 2) - self.assertEqual( - response.data["completed"][0]["user_identifier"], "regular_1" - ) + self.assertEqual(response.data["completed"][0]["user_identifier"], "regular_1") self.assertEqual( response.data["completed"][0]["status"], RoleOperationStatus.ROLE_REMOVED, @@ -787,9 +765,7 @@ def test_get_roles_scope_is_invalid(self, query_params: dict, error_code: str): ({"page": 1, "page_size": 4}, 4, False), ) @unpack - def test_get_roles_pagination( - self, query_params: dict, expected_count: int, has_next: bool - ): + def test_get_roles_pagination(self, query_params: dict, expected_count: int, has_next: bool): """Test retrieving roles with pagination. Expected result: diff --git a/openedx_authz/tests/test_commands.py b/openedx_authz/tests/test_commands.py index acf5a5f2..6fa90679 100644 --- a/openedx_authz/tests/test_commands.py +++ b/openedx_authz/tests/test_commands.py @@ -58,9 +58,7 @@ def test_policy_file_not_found_raises(self): self.assertEqual(f"Policy file not found: {non_existent}", str(ctx.exception)) - @patch.object( - EnforcementCommand, "_get_file_path", return_value="invalid/path/model.conf" - ) + @patch.object(EnforcementCommand, "_get_file_path", return_value="invalid/path/model.conf") def test_model_file_not_found_raises(self, mock_get_file_path: Mock): """Test that command errors when the provided model file does not exist.""" with self.assertRaises(CommandError) as ctx: @@ -86,9 +84,7 @@ def test_error_creating_enforcer_raises(self, mock_enforcer_cls: Mock): @patch("openedx_authz.management.commands.enforcement.casbin.Enforcer") @patch.object(EnforcementCommand, "_run_interactive_mode") - def test_successful_run_prints_summary( - self, mock_run_interactive: Mock, mock_enforcer_cls: Mock - ): + def test_successful_run_prints_summary(self, mock_run_interactive: Mock, mock_enforcer_cls: Mock): """ Test successful command execution with policy file and interactive mode. When files exist, command should create enforcer, print counts, and call interactive loop. @@ -126,9 +122,7 @@ def test_run_interactive_mode_displays_help(self): example_text = f"Example: {make_user_key('alice')} {make_action_key('read')} {make_scope_key('org', 'OpenedX')}" self.assertIn("Interactive Mode", self.buffer.getvalue()) - self.assertIn( - "Test custom enforcement requests interactively.", self.buffer.getvalue() - ) + self.assertIn("Test custom enforcement requests interactively.", self.buffer.getvalue()) self.assertIn( "Enter 'quit', 'exit', or 'q' to exit the interactive mode.", self.buffer.getvalue(), @@ -146,17 +140,9 @@ def test_run_interactive_mode_maintains_interactive_loop(self): self.assertEqual(mock_input.call_count, len(input_values)) @data( - [ - f"{make_user_key('alice')} {make_action_key('read')} {make_scope_key('org', 'OpenedX')}" - ], - [ - f"{make_user_key('bob')} {make_action_key('read')} {make_scope_key('org', 'OpenedX')}" - ] - * 5, - [ - f"{make_user_key('john')} {make_action_key('read')} {make_scope_key('org', 'OpenedX')}" - ] - * 10, + [f"{make_user_key('alice')} {make_action_key('read')} {make_scope_key('org', 'OpenedX')}"], + [f"{make_user_key('bob')} {make_action_key('read')} {make_scope_key('org', 'OpenedX')}"] * 5, + [f"{make_user_key('john')} {make_action_key('read')} {make_scope_key('org', 'OpenedX')}"] * 10, ) def test_run_interactive_mode_processes_request(self, user_input: list[str]): """Test that the interactive mode processes the request.""" @@ -214,9 +200,7 @@ def test_interactive_request_invalid_format(self): invalid_output = self.buffer.getvalue() self.assertIn("✗ Invalid format. Expected 3 parts, got 2", invalid_output) self.assertIn("Format: subject action scope", invalid_output) - self.assertIn( - f"Example: {user_input} {make_scope_key('org', 'OpenedX')}", invalid_output - ) + self.assertIn(f"Example: {user_input} {make_scope_key('org', 'OpenedX')}", invalid_output) @data(ValueError(), IndexError(), TypeError()) def test_interactive_request_error(self, exception: Exception): @@ -247,9 +231,7 @@ def setUp(self): @patch("casbin.Enforcer") @patch("os.path.join") @patch("click.confirm") - def test_handle_with_default_paths( - self, mock_confirm, mock_join, mock_casbin_enforcer, mock_get_enforcer - ): + def test_handle_with_default_paths(self, mock_confirm, mock_join, mock_casbin_enforcer, mock_get_enforcer): """Test handle method with default policy and model paths.""" # Setup mocks mock_target_enforcer = Mock() @@ -272,9 +254,7 @@ def test_handle_with_default_paths( command.migrate_policies = Mock() # Call handle method - command.handle( - policy_file_path=None, model_file_path=None, clear_existing=False - ) + command.handle(policy_file_path=None, model_file_path=None, clear_existing=False) # Assertions mock_casbin_enforcer.assert_called_once_with( @@ -284,16 +264,12 @@ def test_handle_with_default_paths( mock_join.assert_any_call(ROOT_DIRECTORY, "engine", "config", "authz.policy") mock_join.assert_any_call(ROOT_DIRECTORY, "engine", "config", "model.conf") mock_confirm.assert_not_called() - command.migrate_policies.assert_called_once_with( - mock_source_enforcer, mock_target_enforcer - ) + command.migrate_policies.assert_called_once_with(mock_source_enforcer, mock_target_enforcer) @patch("openedx_authz.engine.enforcer.AuthzEnforcer.get_enforcer") @patch("casbin.Enforcer") @patch("click.confirm") - def test_handle_with_custom_paths( - self, mock_confirm, mock_casbin_enforcer, mock_get_enforcer - ): + def test_handle_with_custom_paths(self, mock_confirm, mock_casbin_enforcer, mock_get_enforcer): """Test handle method with custom policy and model paths.""" # Setup mocks mock_target_enforcer = Mock() @@ -320,9 +296,7 @@ def test_handle_with_custom_paths( # Assertions mock_casbin_enforcer.assert_called_once_with(model_path, policy_path) mock_confirm.assert_not_called() - command.migrate_policies.assert_called_once_with( - mock_source_enforcer, mock_target_enforcer - ) + command.migrate_policies.assert_called_once_with(mock_source_enforcer, mock_target_enforcer) @patch("openedx_authz.engine.enforcer.AuthzEnforcer.get_enforcer") @patch("casbin.Enforcer") @@ -357,9 +331,7 @@ def test_handle_clear_existing_roles_confirmed( mock_confirm.assert_called_with(mock_style.return_value, default=False) command._delete_existing_roles.assert_called_once_with(mock_target_enforcer) command._delete_permissions_inheritance.assert_not_called() - command.migrate_policies.assert_called_once_with( - mock_source_enforcer, mock_target_enforcer - ) + command.migrate_policies.assert_called_once_with(mock_source_enforcer, mock_target_enforcer) @patch("openedx_authz.engine.enforcer.AuthzEnforcer.get_enforcer") @patch("casbin.Enforcer") @@ -393,19 +365,13 @@ def test_handle_clear_existing_permissions_confirmed( mock_target_enforcer.load_policy.assert_called_once() mock_confirm.assert_called_with(mock_style.return_value, default=False) command._delete_existing_roles.assert_not_called() - command._delete_permissions_inheritance.assert_called_once_with( - mock_target_enforcer - ) - command.migrate_policies.assert_called_once_with( - mock_source_enforcer, mock_target_enforcer - ) + command._delete_permissions_inheritance.assert_called_once_with(mock_target_enforcer) + command.migrate_policies.assert_called_once_with(mock_source_enforcer, mock_target_enforcer) @patch("openedx_authz.engine.enforcer.AuthzEnforcer.get_enforcer") @patch("casbin.Enforcer") @patch("click.confirm") - def test_handle_clear_existing_both_denied( - self, mock_confirm, mock_casbin_enforcer, mock_get_enforcer - ): + def test_handle_clear_existing_both_denied(self, mock_confirm, mock_casbin_enforcer, mock_get_enforcer): """Test handle method with clear_existing but denied deletions.""" expected_mock_confirm_calls = 2 # Setup mocks @@ -432,6 +398,4 @@ def test_handle_clear_existing_both_denied( assert mock_confirm.call_count == expected_mock_confirm_calls command._delete_existing_roles.assert_not_called() command._delete_permissions_inheritance.assert_not_called() - command.migrate_policies.assert_called_once_with( - mock_source_enforcer, mock_target_enforcer - ) + command.migrate_policies.assert_called_once_with(mock_source_enforcer, mock_target_enforcer) diff --git a/openedx_authz/tests/test_enforcement.py b/openedx_authz/tests/test_enforcement.py index 90771586..4938be1d 100644 --- a/openedx_authz/tests/test_enforcement.py +++ b/openedx_authz/tests/test_enforcement.py @@ -139,9 +139,7 @@ class SystemWideRoleTests(CasbinEnforcementTestCase): { "subject": make_user_key("user-1"), "action": make_action_key("manage"), - "scope": make_scope_key( - "course", "course-v1:any-org+any-course+any-course-run" - ), + "scope": make_scope_key("course", "course-v1:any-org+any-course+any-course-run"), "expected_result": True, }, { @@ -365,9 +363,7 @@ class RoleAssignmentTests(CasbinEnforcementTestCase): { "subject": make_user_key("user-5"), "action": make_action_key("manage"), - "scope": make_scope_key( - "course", "course-v1:any-org+any-course+any-course-run" - ), + "scope": make_scope_key("course", "course-v1:any-org+any-course+any-course-run"), "expected_result": True, }, { diff --git a/openedx_authz/tests/test_engine_utils.py b/openedx_authz/tests/test_engine_utils.py index ffac6009..74f33321 100644 --- a/openedx_authz/tests/test_engine_utils.py +++ b/openedx_authz/tests/test_engine_utils.py @@ -195,11 +195,7 @@ def test_migrate_idempotent(self): "Running migration twice should not duplicate g2 rules", ) - duplicates = ( - CasbinRule.objects.values("v0", "v1", "v2") - .annotate(total=Count("*")) - .filter(total__gt=1) - ) + duplicates = CasbinRule.objects.values("v0", "v1", "v2").annotate(total=Count("*")).filter(total__gt=1) duplicate_list = list(duplicates) self.assertEqual( len(duplicate_list), @@ -257,11 +253,7 @@ def test_migrate_partial_duplicates(self): "Should have 31 policies total, with no duplicates", ) - duplicates = ( - CasbinRule.objects.values("v0", "v1", "v2") - .annotate(total=Count("*")) - .filter(total__gt=1) - ) + duplicates = CasbinRule.objects.values("v0", "v1", "v2").annotate(total=Count("*")).filter(total__gt=1) duplicate_list = list(duplicates) self.assertEqual( len(duplicate_list), @@ -344,12 +336,8 @@ def test_migrate_preserves_existing_db_policies(self): migrate_policy_between_enforcers(self.source_enforcer, self.target_enforcer) target_policies = self.target_enforcer.get_policy() - self.assertEqual( - len(target_policies), 32, "Should have 31 file policies + 1 custom policy" - ) - self.assertIn( - custom_policy, target_policies, "Custom database policy should be preserved" - ) + self.assertEqual(len(target_policies), 32, "Should have 31 file policies + 1 custom policy") + self.assertIn(custom_policy, target_policies, "Custom database policy should be preserved") def test_migrate_preserves_user_role_assignments_in_db(self): """Test that migration preserves user role assignments (g rules) in the database. @@ -373,9 +361,7 @@ def test_migrate_preserves_user_role_assignments_in_db(self): migrate_policy_between_enforcers(self.source_enforcer, self.target_enforcer) target_grouping = self.target_enforcer.get_grouping_policy() - self.assertEqual( - len(target_grouping), 2, "User role assignments should be preserved" - ) + self.assertEqual(len(target_grouping), 2, "User role assignments should be preserved") self.assertIn( [ make_user_key("user-1"), @@ -386,6 +372,4 @@ def test_migrate_preserves_user_role_assignments_in_db(self): ) target_policies = self.target_enforcer.get_policy() - self.assertEqual( - len(target_policies), 31, "All 31 policies from file should be loaded" - ) + self.assertEqual(len(target_policies), 31, "All 31 policies from file should be loaded") diff --git a/openedx_authz/tests/test_filter.py b/openedx_authz/tests/test_filter.py index 468b937b..475aa533 100644 --- a/openedx_authz/tests/test_filter.py +++ b/openedx_authz/tests/test_filter.py @@ -165,9 +165,7 @@ def test_filter_deny_policies(self): def test_filter_wildcard_resources(self): """Test filter for wildcard resource patterns.""" - f = Filter( - ptype=["p"], v2=[make_scope_key("lib", "*"), make_scope_key("course", "*")] - ) + f = Filter(ptype=["p"], v2=[make_scope_key("lib", "*"), make_scope_key("course", "*")]) self.assertEqual(f.ptype, ["p"]) self.assertIn(make_scope_key("lib", "*"), f.v2) self.assertIn(make_scope_key("course", "*"), f.v2) diff --git a/requirements/dev.txt b/requirements/dev.txt index 2fa26b57..c62bb952 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -305,6 +305,8 @@ requests==2.32.5 # via # -r requirements/quality.txt # edx-drf-extensions +ruff==0.14.1 + # via -r requirements/quality.txt semantic-version==2.10.0 # via # -r requirements/quality.txt diff --git a/requirements/quality.in b/requirements/quality.in index 93661d98..26604ce9 100644 --- a/requirements/quality.in +++ b/requirements/quality.in @@ -8,3 +8,4 @@ edx-lint # edX pylint rules and plugins isort # to standardize order of imports pycodestyle # PEP 8 compliance validation pydocstyle # PEP 257 compliance validation +ruff # Fast Python linter and formatter diff --git a/requirements/quality.txt b/requirements/quality.txt index cd19e5ae..b9d98d30 100644 --- a/requirements/quality.txt +++ b/requirements/quality.txt @@ -222,6 +222,8 @@ requests==2.32.5 # via # -r requirements/test.txt # edx-drf-extensions +ruff==0.14.1 + # via -r requirements/quality.in semantic-version==2.10.0 # via # -r requirements/test.txt diff --git a/tests/test_models.py b/tests/test_models.py index 9f808bb1..38c29ce8 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -6,9 +6,7 @@ import pytest -@pytest.mark.skip( - reason="Placeholder to allow pytest to succeed before real tests are in place." -) +@pytest.mark.skip(reason="Placeholder to allow pytest to succeed before real tests are in place.") def test_placeholder(): """ TODO: Delete this test once there are real tests. From c837bcc71e96aa7211adfd05eb3510aef2f2cb7c Mon Sep 17 00:00:00 2001 From: Maria Grimaldi Date: Thu, 23 Oct 2025 11:35:39 +0200 Subject: [PATCH 3/5] chore: migrate to ruff instead of pycodestyle and isort --- Makefile | 4 ++-- openedx_authz/api/roles.py | 16 ++++------------ openedx_authz/settings/common.py | 4 +--- requirements/dev.txt | 2 -- requirements/quality.in | 2 -- requirements/quality.txt | 6 +----- ruff.toml | 17 +++++++++++++++++ tox.ini | 10 +++++----- 8 files changed, 30 insertions(+), 31 deletions(-) create mode 100644 ruff.toml diff --git a/Makefile b/Makefile index 6fc15157..3f77808e 100644 --- a/Makefile +++ b/Makefile @@ -58,9 +58,9 @@ upgrade: ## update the requirements/*.txt files with the latest packages satisfy quality: ## check coding style with pycodestyle and pylint tox -e quality -format: ## format code with black and isort +format: ## format code with black and isort. Enable ruff to fix E (pycodestyle) and I (isort) issues ruff format openedx_authz tests --line-length 120 - ruff check --select I --fix --line-length 120 + ruff check --select E,I --fix --line-length 120 pii_check: ## check for PII annotations on all Django models tox -e pii_check diff --git a/openedx_authz/api/roles.py b/openedx_authz/api/roles.py index 15eae88d..ca0e67b9 100644 --- a/openedx_authz/api/roles.py +++ b/openedx_authz/api/roles.py @@ -114,9 +114,7 @@ def get_permissions_for_active_roles_in_scope( permissions and scopes. """ enforcer = AuthzEnforcer.get_enforcer() - filtered_policy = enforcer.get_filtered_grouping_policy( - GroupingPolicyIndex.SCOPE.value, scope.namespaced_key - ) + filtered_policy = enforcer.get_filtered_grouping_policy(GroupingPolicyIndex.SCOPE.value, scope.namespaced_key) if role: filtered_policy = [ @@ -141,9 +139,7 @@ def get_role_definitions_in_scope(scope: ScopeData) -> list[RoleData]: list[Role]: A list of roles. """ enforcer = AuthzEnforcer.get_enforcer() - policy_filtered = enforcer.get_filtered_policy( - PolicyIndex.SCOPE.value, scope.namespaced_key - ) + policy_filtered = enforcer.get_filtered_policy(PolicyIndex.SCOPE.value, scope.namespaced_key) permissions_per_role = defaultdict( lambda: { @@ -185,9 +181,7 @@ def get_all_roles_in_scope(scope: ScopeData) -> list[list[str]]: list[list[str]]: A list of policies in the specified scope. """ enforcer = AuthzEnforcer.get_enforcer() - return enforcer.get_filtered_grouping_policy( - GroupingPolicyIndex.SCOPE.value, scope.namespaced_key - ) + return enforcer.get_filtered_grouping_policy(GroupingPolicyIndex.SCOPE.value, scope.namespaced_key) def assign_role_to_subject_in_scope(subject: SubjectData, role: RoleData, scope: ScopeData) -> bool: @@ -232,9 +226,7 @@ def unassign_role_from_subject_in_scope(subject: SubjectData, role: RoleData, sc bool: True if the role was unassigned successfully, False otherwise. """ enforcer = AuthzEnforcer.get_enforcer() - return enforcer.delete_roles_for_user_in_domain( - subject.namespaced_key, role.namespaced_key, scope.namespaced_key - ) + return enforcer.delete_roles_for_user_in_domain(subject.namespaced_key, role.namespaced_key, scope.namespaced_key) def batch_unassign_role_from_subjects_in_scope(subjects: list[SubjectData], role: RoleData, scope: ScopeData) -> None: diff --git a/openedx_authz/settings/common.py b/openedx_authz/settings/common.py index 7c468cae..0de26897 100644 --- a/openedx_authz/settings/common.py +++ b/openedx_authz/settings/common.py @@ -21,8 +21,6 @@ def plugin_settings(settings): if casbin_adapter_app not in settings.INSTALLED_APPS: settings.INSTALLED_APPS.append(casbin_adapter_app) # Add Casbin configuration - settings.CASBIN_MODEL = os.path.join( - ROOT_DIRECTORY, "engine", "config", "model.conf" - ) + settings.CASBIN_MODEL = os.path.join(ROOT_DIRECTORY, "engine", "config", "model.conf") if not hasattr(settings, "CASBIN_AUTO_LOAD_POLICY_INTERVAL"): settings.CASBIN_AUTO_LOAD_POLICY_INTERVAL = 5 diff --git a/requirements/dev.txt b/requirements/dev.txt index c62bb952..33040c84 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -223,8 +223,6 @@ pycasbin==2.2.0 # via # -r requirements/quality.txt # casbin-django-orm-adapter -pycodestyle==2.14.0 - # via -r requirements/quality.txt pycparser==2.23 # via # -r requirements/quality.txt diff --git a/requirements/quality.in b/requirements/quality.in index 26604ce9..b370720d 100644 --- a/requirements/quality.in +++ b/requirements/quality.in @@ -5,7 +5,5 @@ -r test.txt # Core and testing dependencies for this package edx-lint # edX pylint rules and plugins -isort # to standardize order of imports -pycodestyle # PEP 8 compliance validation pydocstyle # PEP 257 compliance validation ruff # Fast Python linter and formatter diff --git a/requirements/quality.txt b/requirements/quality.txt index b9d98d30..9f1325e3 100644 --- a/requirements/quality.txt +++ b/requirements/quality.txt @@ -122,9 +122,7 @@ iniconfig==2.1.0 # -r requirements/test.txt # pytest isort==6.0.1 - # via - # -r requirements/quality.in - # pylint + # via pylint jinja2==3.1.6 # via # -r requirements/test.txt @@ -157,8 +155,6 @@ pycasbin==2.2.0 # via # -r requirements/test.txt # casbin-django-orm-adapter -pycodestyle==2.14.0 - # via -r requirements/quality.in pycparser==2.23 # via # -r requirements/test.txt diff --git a/ruff.toml b/ruff.toml new file mode 100644 index 00000000..3aae9db8 --- /dev/null +++ b/ruff.toml @@ -0,0 +1,17 @@ +# Ruff configuration for openedx-authz +# See https://docs.astral.sh/ruff/configuration/ + +line-length = 120 + +[lint] +# E: pycodestyle errors +# I: isort import sorting +select = ["E", "I"] + +exclude = [ + ".git", + ".tox", + "migrations", + "__pycache__", + "*.pyc", +] diff --git a/tox.ini b/tox.ini index 0c2ec5cb..6326bb9d 100644 --- a/tox.ini +++ b/tox.ini @@ -5,9 +5,10 @@ envlist = py{311,312}-django{42,52} ; D001 = Line too long ignore = D001 -[pycodestyle] -exclude = .git,.tox,migrations -max-line-length = 120 +[ruff] +# Configuration moved to ruff.toml +# select = E (pycodestyle errors), I (isort) +# max-line-length = 120 [pydocstyle] ; D101 = Missing docstring in public class @@ -75,9 +76,8 @@ commands = touch tests/__init__.py pylint openedx_authz tests test_utils manage.py setup.py rm tests/__init__.py - pycodestyle openedx_authz tests manage.py setup.py + ruff check openedx_authz tests test_utils manage.py setup.py pydocstyle openedx_authz tests manage.py setup.py - isort --check-only --diff tests test_utils openedx_authz manage.py setup.py make selfcheck [testenv:pii_check] From 8b42329f69621d600c2049598cf62afbcf7fdf8e Mon Sep 17 00:00:00 2001 From: Maria Grimaldi Date: Thu, 23 Oct 2025 11:43:00 +0200 Subject: [PATCH 4/5] refactor: drop inline configurations in favor of ruff.toml --- Makefile | 4 ++-- setup.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Makefile b/Makefile index 3f77808e..1e4724fe 100644 --- a/Makefile +++ b/Makefile @@ -59,8 +59,8 @@ quality: ## check coding style with pycodestyle and pylint tox -e quality format: ## format code with black and isort. Enable ruff to fix E (pycodestyle) and I (isort) issues - ruff format openedx_authz tests --line-length 120 - ruff check --select E,I --fix --line-length 120 + ruff format openedx_authz tests test_utils manage.py setup.py + ruff check --fix openedx_authz tests test_utils manage.py setup.py pii_check: ## check for PII annotations on all Django models tox -e pii_check diff --git a/setup.py b/setup.py index 91aa3307..294146fa 100755 --- a/setup.py +++ b/setup.py @@ -103,7 +103,7 @@ def add_version_constraint_or_raise(current_line, current_requirements, add_if_n add_version_constraint_or_raise(line, requirements, False) # process back into list of pkg><=constraints strings - constrained_requirements = [f'{pkg}{version or ""}' for (pkg, version) in sorted(requirements.items())] + constrained_requirements = [f"{pkg}{version or ''}" for (pkg, version) in sorted(requirements.items())] return constrained_requirements From 78625f9b0f641f6b7810ea3aba460b8d147fb816 Mon Sep 17 00:00:00 2001 From: Maria Grimaldi Date: Thu, 23 Oct 2025 16:10:21 +0200 Subject: [PATCH 5/5] docs: update changelog for next release --- CHANGELOG.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 67640718..3b049197 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -14,7 +14,7 @@ Change Log Unreleased ********** -* +* Migrate from using pycodestyle and isort to ruff for code quality checks and formatting. 0.6.0 - 2025-10-22 ******************