diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 67640718..3b049197 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -14,7 +14,7 @@ Change Log Unreleased ********** -* +* Migrate from using pycodestyle and isort to ruff for code quality checks and formatting. 0.6.0 - 2025-10-22 ****************** diff --git a/Makefile b/Makefile index 64a9a0aa..1e4724fe 100644 --- a/Makefile +++ b/Makefile @@ -58,6 +58,10 @@ upgrade: ## update the requirements/*.txt files with the latest packages satisfy quality: ## check coding style with pycodestyle and pylint tox -e quality +format: ## format code with black and isort. Enable ruff to fix E (pycodestyle) and I (isort) issues + ruff format openedx_authz tests test_utils manage.py setup.py + ruff check --fix openedx_authz tests test_utils manage.py setup.py + pii_check: ## check for PII annotations on all Django models tox -e pii_check diff --git a/openedx_authz/api/data.py b/openedx_authz/api/data.py index a4c1b359..effc6ad2 100644 --- a/openedx_authz/api/data.py +++ b/openedx_authz/api/data.py @@ -248,9 +248,7 @@ def get_subclass_by_external_key(mcs, external_key: str) -> Type["ScopeData"]: scope_subclass = mcs.scope_registry.get(namespace) if not scope_subclass: - raise ValueError( - f"Unknown scope: {namespace} for external_key: {external_key}" - ) + raise ValueError(f"Unknown scope: {namespace} for external_key: {external_key}") if not scope_subclass.validate_external_key(external_key): raise ValueError(f"Invalid external_key format: {external_key}") @@ -281,9 +279,7 @@ def validate_external_key(mcs, external_key: str) -> bool: Returns: bool: True if valid, False otherwise. """ - raise NotImplementedError( - "Subclasses must implement validate_external_key method." - ) + raise NotImplementedError("Subclasses must implement validate_external_key method.") @define diff --git a/openedx_authz/api/permissions.py b/openedx_authz/api/permissions.py index 18cb93bc..6448866a 100644 --- a/openedx_authz/api/permissions.py +++ b/openedx_authz/api/permissions.py @@ -43,9 +43,7 @@ def get_all_permissions_in_scope(scope: ScopeData) -> list[PermissionData]: list of PermissionData: A list of PermissionData objects associated with the given scope. """ enforcer = AuthzEnforcer.get_enforcer() - actions = enforcer.get_filtered_policy( - PolicyIndex.SCOPE.value, scope.namespaced_key - ) + actions = enforcer.get_filtered_policy(PolicyIndex.SCOPE.value, scope.namespaced_key) return [get_permission_from_policy(action) for action in actions] @@ -65,6 +63,4 @@ def is_subject_allowed( bool: True if the subject has the specified permission in the scope, False otherwise. """ enforcer = AuthzEnforcer.get_enforcer() - return enforcer.enforce( - subject.namespaced_key, action.namespaced_key, scope.namespaced_key - ) + return enforcer.enforce(subject.namespaced_key, action.namespaced_key, scope.namespaced_key) diff --git a/openedx_authz/api/roles.py b/openedx_authz/api/roles.py index 937a0502..ca0e67b9 100644 --- a/openedx_authz/api/roles.py +++ b/openedx_authz/api/roles.py @@ -78,9 +78,7 @@ def get_permissions_for_roles( permissions_by_role = {} for role in roles: - permissions_by_role[role.external_key] = { - "permissions": get_permissions_for_single_role(role) - } + permissions_by_role[role.external_key] = {"permissions": get_permissions_for_single_role(role)} return permissions_by_role @@ -116,22 +114,15 @@ def get_permissions_for_active_roles_in_scope( permissions and scopes. """ enforcer = AuthzEnforcer.get_enforcer() - filtered_policy = enforcer.get_filtered_grouping_policy( - GroupingPolicyIndex.SCOPE.value, scope.namespaced_key - ) + filtered_policy = enforcer.get_filtered_grouping_policy(GroupingPolicyIndex.SCOPE.value, scope.namespaced_key) if role: filtered_policy = [ - policy - for policy in filtered_policy - if policy[GroupingPolicyIndex.ROLE.value] == role.namespaced_key + policy for policy in filtered_policy if policy[GroupingPolicyIndex.ROLE.value] == role.namespaced_key ] return get_permissions_for_roles( - [ - RoleData(namespaced_key=policy[GroupingPolicyIndex.ROLE.value]) - for policy in filtered_policy - ] + [RoleData(namespaced_key=policy[GroupingPolicyIndex.ROLE.value]) for policy in filtered_policy] ) @@ -148,9 +139,7 @@ def get_role_definitions_in_scope(scope: ScopeData) -> list[RoleData]: list[Role]: A list of roles. """ enforcer = AuthzEnforcer.get_enforcer() - policy_filtered = enforcer.get_filtered_policy( - PolicyIndex.SCOPE.value, scope.namespaced_key - ) + policy_filtered = enforcer.get_filtered_policy(PolicyIndex.SCOPE.value, scope.namespaced_key) permissions_per_role = defaultdict( lambda: { @@ -162,9 +151,7 @@ def get_role_definitions_in_scope(scope: ScopeData) -> list[RoleData]: permissions_per_role[policy[PolicyIndex.ROLE.value]]["scopes"].append( ScopeData(namespaced_key=policy[PolicyIndex.SCOPE.value]) ) # TODO: I don't think this actually gets used anywhere - permissions_per_role[policy[PolicyIndex.ROLE.value]]["permissions"].append( - get_permission_from_policy(policy) - ) + permissions_per_role[policy[PolicyIndex.ROLE.value]]["permissions"].append(get_permission_from_policy(policy)) return [ RoleData( @@ -194,14 +181,10 @@ def get_all_roles_in_scope(scope: ScopeData) -> list[list[str]]: list[list[str]]: A list of policies in the specified scope. """ enforcer = AuthzEnforcer.get_enforcer() - return enforcer.get_filtered_grouping_policy( - GroupingPolicyIndex.SCOPE.value, scope.namespaced_key - ) + return enforcer.get_filtered_grouping_policy(GroupingPolicyIndex.SCOPE.value, scope.namespaced_key) -def assign_role_to_subject_in_scope( - subject: SubjectData, role: RoleData, scope: ScopeData -) -> bool: +def assign_role_to_subject_in_scope(subject: SubjectData, role: RoleData, scope: ScopeData) -> bool: """Assign a role to a subject. Args: @@ -220,9 +203,7 @@ def assign_role_to_subject_in_scope( ) -def batch_assign_role_to_subjects_in_scope( - subjects: list[SubjectData], role: RoleData, scope: ScopeData -) -> None: +def batch_assign_role_to_subjects_in_scope(subjects: list[SubjectData], role: RoleData, scope: ScopeData) -> None: """Assign a role to a list of subjects. Args: @@ -233,9 +214,7 @@ def batch_assign_role_to_subjects_in_scope( assign_role_to_subject_in_scope(subject, role, scope) -def unassign_role_from_subject_in_scope( - subject: SubjectData, role: RoleData, scope: ScopeData -) -> bool: +def unassign_role_from_subject_in_scope(subject: SubjectData, role: RoleData, scope: ScopeData) -> bool: """Unassign a role from a subject. Args: @@ -247,14 +226,10 @@ def unassign_role_from_subject_in_scope( bool: True if the role was unassigned successfully, False otherwise. """ enforcer = AuthzEnforcer.get_enforcer() - return enforcer.delete_roles_for_user_in_domain( - subject.namespaced_key, role.namespaced_key, scope.namespaced_key - ) + return enforcer.delete_roles_for_user_in_domain(subject.namespaced_key, role.namespaced_key, scope.namespaced_key) -def batch_unassign_role_from_subjects_in_scope( - subjects: list[SubjectData], role: RoleData, scope: ScopeData -) -> None: +def batch_unassign_role_from_subjects_in_scope(subjects: list[SubjectData], role: RoleData, scope: ScopeData) -> None: """Unassign a role from a list of subjects. Args: @@ -277,9 +252,7 @@ def get_subject_role_assignments(subject: SubjectData) -> list[RoleAssignmentDat """ enforcer = AuthzEnforcer.get_enforcer() role_assignments = [] - for policy in enforcer.get_filtered_grouping_policy( - GroupingPolicyIndex.SUBJECT.value, subject.namespaced_key - ): + for policy in enforcer.get_filtered_grouping_policy(GroupingPolicyIndex.SUBJECT.value, subject.namespaced_key): role = RoleData(namespaced_key=policy[GroupingPolicyIndex.ROLE.value]) role.permissions = get_permissions_for_single_role(role) @@ -293,9 +266,7 @@ def get_subject_role_assignments(subject: SubjectData) -> list[RoleAssignmentDat return role_assignments -def get_subject_role_assignments_in_scope( - subject: SubjectData, scope: ScopeData -) -> list[RoleAssignmentData]: +def get_subject_role_assignments_in_scope(subject: SubjectData, scope: ScopeData) -> list[RoleAssignmentData]: """Get the roles for a subject in a specific scope. Args: @@ -308,9 +279,7 @@ def get_subject_role_assignments_in_scope( enforcer = AuthzEnforcer.get_enforcer() # TODO: we still need to get the remaining data for the role like email, etc role_assignments = [] - for namespaced_key in enforcer.get_roles_for_user_in_domain( - subject.namespaced_key, scope.namespaced_key - ): + for namespaced_key in enforcer.get_roles_for_user_in_domain(subject.namespaced_key, scope.namespaced_key): role = RoleData(namespaced_key=namespaced_key) role_assignments.append( RoleAssignmentData( @@ -327,9 +296,7 @@ def get_subject_role_assignments_in_scope( return role_assignments -def get_subject_role_assignments_for_role_in_scope( - role: RoleData, scope: ScopeData -) -> list[RoleAssignmentData]: +def get_subject_role_assignments_for_role_in_scope(role: RoleData, scope: ScopeData) -> list[RoleAssignmentData]: """Get the subjects assigned to a specific role in a specific scope. Args: @@ -341,9 +308,7 @@ def get_subject_role_assignments_for_role_in_scope( """ enforcer = AuthzEnforcer.get_enforcer() role_assignments = [] - for subject in enforcer.get_users_for_role_in_domain( - role.namespaced_key, scope.namespaced_key - ): + for subject in enforcer.get_users_for_role_in_domain(role.namespaced_key, scope.namespaced_key): if subject.startswith(f"{RoleData.NAMESPACE}{RoleData.SEPARATOR}"): # Skip roles that are also subjects continue diff --git a/openedx_authz/engine/adapter.py b/openedx_authz/engine/adapter.py index c68cfe82..679735e9 100644 --- a/openedx_authz/engine/adapter.py +++ b/openedx_authz/engine/adapter.py @@ -76,7 +76,11 @@ def is_filtered(self) -> bool: """ return True - def load_filtered_policy(self, model: Model, filter: Filter) -> None: # pylint: disable=redefined-builtin + def load_filtered_policy( + self, + model: Model, + filter: Filter, # pylint: disable=redefined-builtin + ) -> None: """ Load policy rules from storage with filtering applied. @@ -99,7 +103,11 @@ def load_filtered_policy(self, model: Model, filter: Filter) -> None: # pylint: for line in filtered_queryset: persist.load_policy_line(str(line), model) - def filter_query(self, queryset: QuerySet, filter: Filter) -> QuerySet: # pylint: disable=redefined-builtin + def filter_query( + self, + queryset: QuerySet, + filter: Filter, # pylint: disable=redefined-builtin + ) -> QuerySet: """ Apply filter criteria to the policy queryset. diff --git a/openedx_authz/engine/utils.py b/openedx_authz/engine/utils.py index 11895a92..b6d4284d 100644 --- a/openedx_authz/engine/utils.py +++ b/openedx_authz/engine/utils.py @@ -51,31 +51,21 @@ def migrate_policy_between_enforcers( for grouping_policy_ptype in GROUPING_POLICY_PTYPES: try: - grouping_policies = source_enforcer.get_named_grouping_policy( - grouping_policy_ptype - ) + grouping_policies = source_enforcer.get_named_grouping_policy(grouping_policy_ptype) for grouping in grouping_policies: - if target_enforcer.has_named_grouping_policy( - grouping_policy_ptype, *grouping - ): + if target_enforcer.has_named_grouping_policy(grouping_policy_ptype, *grouping): logger.info( f"Grouping policy {grouping_policy_ptype}, {grouping} already exists in target, skipping." ) continue - target_enforcer.add_named_grouping_policy( - grouping_policy_ptype, *grouping - ) + target_enforcer.add_named_grouping_policy(grouping_policy_ptype, *grouping) # Ensure latest policies are loaded in the target enforcer after each addition # to avoid duplicates target_enforcer.load_policy() except KeyError as e: - logger.info( - f"Skipping {grouping_policy_ptype} policies: {e} not found in source enforcer." - ) - logger.info( - f"Successfully loaded policies from {source_enforcer.get_model()} into the database." - ) + logger.info(f"Skipping {grouping_policy_ptype} policies: {e} not found in source enforcer.") + logger.info(f"Successfully loaded policies from {source_enforcer.get_model()} into the database.") except Exception as e: logger.error(f"Error loading policies from file: {e}") raise diff --git a/openedx_authz/management/commands/enforcement.py b/openedx_authz/management/commands/enforcement.py index 31f7f902..55dd4717 100644 --- a/openedx_authz/management/commands/enforcement.py +++ b/openedx_authz/management/commands/enforcement.py @@ -78,9 +78,7 @@ def handle(self, *args, **options): Raises: CommandError: If model or policy files are not found or enforcer creation fails. """ - model_file_path = ( - self._get_file_path("model.conf") or options["model_file_path"] - ) + model_file_path = self._get_file_path("model.conf") or options["model_file_path"] policy_file_path = options["policy_file_path"] if not os.path.isfile(model_file_path): @@ -95,9 +93,7 @@ def handle(self, *args, **options): try: enforcer = casbin.Enforcer(model_file_path, policy_file_path) - self.stdout.write( - self.style.SUCCESS("Casbin enforcer created successfully") - ) + self.stdout.write(self.style.SUCCESS("Casbin enforcer created successfully")) policies = enforcer.get_policy() roles = enforcer.get_grouping_policy() @@ -160,9 +156,7 @@ def _run_interactive_mode(self, enforcer: casbin.Enforcer) -> None: self.stdout.write(self.style.ERROR("Exiting interactive mode...")) break - def _test_interactive_request( - self, enforcer: casbin.Enforcer, user_input: str - ) -> None: + def _test_interactive_request(self, enforcer: casbin.Enforcer, user_input: str) -> None: """Process and test a single enforcement request from user input. Parses the input string, validates the format, executes the enforcement @@ -180,11 +174,7 @@ def _test_interactive_request( try: parts = [part.strip() for part in user_input.split()] if len(parts) != 3: - self.stdout.write( - self.style.ERROR( - f"✗ Invalid format. Expected 3 parts, got {len(parts)}" - ) - ) + self.stdout.write(self.style.ERROR(f"✗ Invalid format. Expected 3 parts, got {len(parts)}")) self.stdout.write("Format: subject action scope") self.stdout.write("Example: user^alice act^read org^OpenedX") return @@ -193,13 +183,9 @@ def _test_interactive_request( result = enforcer.enforce(subject, action, scope) if result: - self.stdout.write( - self.style.SUCCESS(f"✓ ALLOWED: {subject} {action} {scope}") - ) + self.stdout.write(self.style.SUCCESS(f"✓ ALLOWED: {subject} {action} {scope}")) else: - self.stdout.write( - self.style.ERROR(f"✗ DENIED: {subject} {action} {scope}") - ) + self.stdout.write(self.style.ERROR(f"✗ DENIED: {subject} {action} {scope}")) except (ValueError, IndexError, TypeError) as e: self.stdout.write(self.style.ERROR(f"✗ Error processing request: {str(e)}")) diff --git a/openedx_authz/management/commands/load_policies.py b/openedx_authz/management/commands/load_policies.py index 57ca79de..9b2d2232 100644 --- a/openedx_authz/management/commands/load_policies.py +++ b/openedx_authz/management/commands/load_policies.py @@ -69,27 +69,38 @@ def handle(self, *args, **options): Raises: CommandError: If the policy file is not found or loading fails. """ - policy_file_path, model_file_path = options["policy_file_path"], options["model_file_path"] + policy_file_path, model_file_path = ( + options["policy_file_path"], + options["model_file_path"], + ) if policy_file_path is None: - policy_file_path = os.path.join( - ROOT_DIRECTORY, "engine", "config", "authz.policy" - ) + policy_file_path = os.path.join(ROOT_DIRECTORY, "engine", "config", "authz.policy") if model_file_path is None: - model_file_path = os.path.join( - ROOT_DIRECTORY, "engine", "config", "model.conf" - ) + model_file_path = os.path.join(ROOT_DIRECTORY, "engine", "config", "model.conf") target_enforcer = AuthzEnforcer.get_enforcer() if options.get("clear_existing"): target_enforcer.load_policy() - if click.confirm(click.style('Do you want to delete existing roles? ' - '(This will also delete the assignments related to those roles)', - fg='yellow', bold=True), default=False): + if click.confirm( + click.style( + "Do you want to delete existing roles? " + "(This will also delete the assignments related to those roles)", + fg="yellow", + bold=True, + ), + default=False, + ): self._delete_existing_roles(target_enforcer) - if click.confirm(click.style('Do you want to delete existing permissions inheritance?', - fg='yellow', bold=True), default=False): + if click.confirm( + click.style( + "Do you want to delete existing permissions inheritance?", + fg="yellow", + bold=True, + ), + default=False, + ): self._delete_permissions_inheritance(target_enforcer) source_enforcer = casbin.Enforcer(model_file_path, policy_file_path) diff --git a/openedx_authz/rest_api/utils.py b/openedx_authz/rest_api/utils.py index 370437a4..4af6aaa1 100644 --- a/openedx_authz/rest_api/utils.py +++ b/openedx_authz/rest_api/utils.py @@ -72,7 +72,9 @@ def get_user_by_username_or_email(username_or_email: str) -> User: def sort_users( - users: list[dict], sort_by: SortField = SortField.USERNAME, order: SortOrder = SortOrder.ASC + users: list[dict], + sort_by: SortField = SortField.USERNAME, + order: SortOrder = SortOrder.ASC, ) -> list[dict]: """ Sort users by a given field and order. @@ -95,7 +97,11 @@ def sort_users( if order not in SortOrder.values(): raise ValueError(f"Invalid order: '{order}'. Must be one of {SortOrder.values()}") - sorted_users = sorted(users, key=lambda user: (user.get(sort_by) or "").lower(), reverse=order == SortOrder.DESC) + sorted_users = sorted( + users, + key=lambda user: (user.get(sort_by) or "").lower(), + reverse=order == SortOrder.DESC, + ) return sorted_users diff --git a/openedx_authz/rest_api/v1/serializers.py b/openedx_authz/rest_api/v1/serializers.py index 92dee429..5c1d3e88 100644 --- a/openedx_authz/rest_api/v1/serializers.py +++ b/openedx_authz/rest_api/v1/serializers.py @@ -111,10 +111,14 @@ class ListUsersInRoleWithScopeSerializer(ScopeMixin): # pylint: disable=abstrac roles = CommaSeparatedListField(required=False, default=[]) sort_by = serializers.ChoiceField( - required=False, choices=[(e.value, e.name) for e in SortField], default=SortField.USERNAME + required=False, + choices=[(e.value, e.name) for e in SortField], + default=SortField.USERNAME, ) order = serializers.ChoiceField( - required=False, choices=[(e.value, e.name) for e in SortOrder], default=SortOrder.ASC + required=False, + choices=[(e.value, e.name) for e in SortOrder], + default=SortOrder.ASC, ) search = LowercaseCharField(required=False, default=None) diff --git a/openedx_authz/rest_api/v1/urls.py b/openedx_authz/rest_api/v1/urls.py index 83c350f9..b6e7c3fa 100644 --- a/openedx_authz/rest_api/v1/urls.py +++ b/openedx_authz/rest_api/v1/urls.py @@ -5,7 +5,11 @@ from openedx_authz.rest_api.v1 import views urlpatterns = [ - path("permissions/validate/me", views.PermissionValidationMeView.as_view(), name="permission-validation-me"), + path( + "permissions/validate/me", + views.PermissionValidationMeView.as_view(), + name="permission-validation-me", + ), path("roles/", views.RoleListView.as_view(), name="role-list"), path("roles/users/", views.RoleUserAPIView.as_view(), name="role-user-list"), ] diff --git a/openedx_authz/rest_api/v1/views.py b/openedx_authz/rest_api/v1/views.py index 3fb10759..b21180da 100644 --- a/openedx_authz/rest_api/v1/views.py +++ b/openedx_authz/rest_api/v1/views.py @@ -114,7 +114,10 @@ def post(self, request: HttpRequest) -> Response: response_data.append({"action": action, "scope": scope, "allowed": allowed}) except ValueError as e: logger.error(f"Error validating permission for user {username}: {e}") - return Response(data={"message": "Invalid scope format"}, status=status.HTTP_400_BAD_REQUEST) + return Response( + data={"message": "Invalid scope format"}, + status=status.HTTP_400_BAD_REQUEST, + ) except Exception as e: # pylint: disable=broad-exception-caught logger.error(f"Error validating permission for user {username}: {e}") return Response( @@ -308,7 +311,9 @@ def put(self, request: HttpRequest) -> Response: @apidocs.schema( parameters=[ apidocs.query_parameter( - "users", str, description="List of user identifiers (username or email) separated by a comma" + "users", + str, + description="List of user identifiers (username or email) separated by a comma", ), apidocs.query_parameter("role", str, description="The role to remove the users from"), apidocs.query_parameter("scope", str, description="The scope to remove the users from"), diff --git a/openedx_authz/settings/common.py b/openedx_authz/settings/common.py index 7c468cae..0de26897 100644 --- a/openedx_authz/settings/common.py +++ b/openedx_authz/settings/common.py @@ -21,8 +21,6 @@ def plugin_settings(settings): if casbin_adapter_app not in settings.INSTALLED_APPS: settings.INSTALLED_APPS.append(casbin_adapter_app) # Add Casbin configuration - settings.CASBIN_MODEL = os.path.join( - ROOT_DIRECTORY, "engine", "config", "model.conf" - ) + settings.CASBIN_MODEL = os.path.join(ROOT_DIRECTORY, "engine", "config", "model.conf") if not hasattr(settings, "CASBIN_AUTO_LOAD_POLICY_INTERVAL"): settings.CASBIN_AUTO_LOAD_POLICY_INTERVAL = 5 diff --git a/openedx_authz/tests/api/test_data.py b/openedx_authz/tests/api/test_data.py index e55b5119..8a9ca928 100644 --- a/openedx_authz/tests/api/test_data.py +++ b/openedx_authz/tests/api/test_data.py @@ -203,9 +203,7 @@ def test_content_library_data_with_external_key(self, external_key): """ library = ContentLibraryData(external_key=external_key) - expected_namespaced_key = ( - f"{library.NAMESPACE}{library.SEPARATOR}{external_key}" - ) + expected_namespaced_key = f"{library.NAMESPACE}{library.SEPARATOR}{external_key}" self.assertIsInstance(library, ContentLibraryData) self.assertEqual(library.external_key, external_key) @@ -233,9 +231,7 @@ def test_scope_data_registration(self): ("sc^generic_scope", ScopeData), ) @unpack - def test_dynamic_instantiation_via_namespaced_key( - self, namespaced_key, expected_class - ): + def test_dynamic_instantiation_via_namespaced_key(self, namespaced_key, expected_class): """Test that ScopeData dynamically instantiates the correct subclass. Expected Result: @@ -418,9 +414,7 @@ def test_scope_data_str_and_repr(self, external_key, expected_str, expected_repr ("course_staff", "Course Staff", "role^course_staff"), ) @unpack - def test_role_data_str_without_permissions( - self, external_key, expected_name, expected_repr - ): + def test_role_data_str_without_permissions(self, external_key, expected_name, expected_repr): """Test RoleData __str__ and __repr__ methods without permissions. Expected Result: @@ -456,12 +450,15 @@ def test_role_data_str_with_permissions(self): @data( ("read", "allow", "Read - allow", "act^read => allow"), ("write", "deny", "Write - deny", "act^write => deny"), - ("delete_library", "allow", "Delete Library - allow", "act^delete_library => allow"), + ( + "delete_library", + "allow", + "Delete Library - allow", + "act^delete_library => allow", + ), ) @unpack - def test_permission_data_str_and_repr( - self, action_key, effect, expected_str, expected_repr - ): + def test_permission_data_str_and_repr(self, action_key, effect, expected_str, expected_repr): """Test PermissionData __str__ and __repr__ methods. Expected Result: @@ -508,7 +505,5 @@ def test_role_assignment_data_repr(self): actual_repr = repr(assignment) - expected_repr = ( - "user^john_doe => [role^instructor, role^library_admin] @ lib^lib:DemoX:CSPROB" - ) + expected_repr = "user^john_doe => [role^instructor, role^library_admin] @ lib^lib:DemoX:CSPROB" self.assertEqual(actual_repr, expected_repr) diff --git a/openedx_authz/tests/api/test_roles.py b/openedx_authz/tests/api/test_roles.py index 80ec90f0..ffac5f9c 100644 --- a/openedx_authz/tests/api/test_roles.py +++ b/openedx_authz/tests/api/test_roles.py @@ -297,9 +297,7 @@ def test_get_permissions_for_roles(self, role_name, expected_permissions): - Permissions are correctly retrieved for the given roles and scope. - The permissions match the expected permissions. """ - assigned_permissions = get_permissions_for_single_role( - RoleData(external_key=role_name) - ) + assigned_permissions = get_permissions_for_single_role(RoleData(external_key=role_name)) self.assertEqual(assigned_permissions, expected_permissions) @@ -324,9 +322,7 @@ def test_get_permissions_for_roles(self, role_name, expected_permissions): ), ) @unpack - def test_get_permissions_for_active_role_in_specific_scope( - self, role_name, scope_name, expected_permissions - ): + def test_get_permissions_for_active_role_in_specific_scope(self, role_name, scope_name, expected_permissions): """Test retrieving permissions for a specific role after role assignments. Expected result: @@ -400,9 +396,7 @@ def test_get_roles_in_scope(self, scope_name, expected_roles): ("non_existent_user", "lib:Org999:non_existent_scope", set()), ) @unpack - def test_get_subject_role_assignments_in_scope( - self, subject_name, scope_name, expected_roles - ): + def test_get_subject_role_assignments_in_scope(self, subject_name, scope_name, expected_roles): """Test retrieving roles assigned to a subject in a specific scope. Expected result: @@ -461,19 +455,13 @@ def test_get_all_role_assignments_scopes(self, subject_name, expected_roles): - All roles assigned to the subject across all scopes are correctly retrieved. - Each role includes its associated permissions. """ - role_assignments = get_subject_role_assignments( - SubjectData(external_key=subject_name) - ) + role_assignments = get_subject_role_assignments(SubjectData(external_key=subject_name)) self.assertEqual(len(role_assignments), len(expected_roles)) for expected_role in expected_roles: # Compare the role part of the assignment - found = any( - expected_role in assignment.roles for assignment in role_assignments - ) - self.assertTrue( - found, f"Expected role {expected_role} not found in assignments" - ) + found = any(expected_role in assignment.roles for assignment in role_assignments) + self.assertTrue(found, f"Expected role {expected_role} not found in assignments") @ddt_data( ("library_admin", "lib:Org1:math_101", 1), @@ -548,9 +536,7 @@ class TestRoleAssignmentAPI(RolesTestSetupMixin): ("oliver", "library_admin", "lib:Org1:math_101", False), ) @unpack - def test_batch_assign_role_to_subjects_in_scope( - self, subject_names, role, scope_name, batch - ): + def test_batch_assign_role_to_subjects_in_scope(self, subject_names, role, scope_name, batch): """Test assigning a role to a single or multiple subjects in a specific scope. Expected result: @@ -569,7 +555,8 @@ def test_batch_assign_role_to_subjects_in_scope( ) for subject_name in subject_names: user_roles = get_subject_role_assignments_in_scope( - SubjectData(external_key=subject_name), ScopeData(external_key=scope_name) + SubjectData(external_key=subject_name), + ScopeData(external_key=scope_name), ) role_names = {r.external_key for assignment in user_roles for r in assignment.roles} self.assertIn(role, role_names) @@ -607,9 +594,7 @@ def test_batch_assign_role_to_subjects_in_scope( ("oliver", "library_admin", "lib:Org1:math_101", False), ) @unpack - def test_unassign_role_from_subject_in_scope( - self, subject_names, role, scope_name, batch - ): + def test_unassign_role_from_subject_in_scope(self, subject_names, role, scope_name, batch): """Test unassigning a role from a subject or multiple subjects in a specific scope. Expected result: @@ -649,10 +634,12 @@ def test_unassign_role_from_subject_in_scope( [ RoleAssignmentData( subject=SubjectData(external_key="alice"), - roles=[RoleData( - external_key="library_admin", - permissions=LIST_LIBRARY_ADMIN_PERMISSIONS, - )], + roles=[ + RoleData( + external_key="library_admin", + permissions=LIST_LIBRARY_ADMIN_PERMISSIONS, + ) + ], scope=ScopeData(external_key="lib:Org1:math_101"), ) ], @@ -662,10 +649,12 @@ def test_unassign_role_from_subject_in_scope( [ RoleAssignmentData( subject=SubjectData(external_key="bob"), - roles=[RoleData( - external_key="library_author", - permissions=LIST_LIBRARY_AUTHOR_PERMISSIONS, - )], + roles=[ + RoleData( + external_key="library_author", + permissions=LIST_LIBRARY_AUTHOR_PERMISSIONS, + ) + ], scope=ScopeData(external_key="lib:Org1:history_201"), ) ], @@ -675,10 +664,12 @@ def test_unassign_role_from_subject_in_scope( [ RoleAssignmentData( subject=SubjectData(external_key="carol"), - roles=[RoleData( - external_key="library_contributor", - permissions=LIST_LIBRARY_CONTRIBUTOR_PERMISSIONS, - )], + roles=[ + RoleData( + external_key="library_contributor", + permissions=LIST_LIBRARY_CONTRIBUTOR_PERMISSIONS, + ) + ], scope=ScopeData(external_key="lib:Org1:science_301"), ) ], @@ -688,10 +679,12 @@ def test_unassign_role_from_subject_in_scope( [ RoleAssignmentData( subject=SubjectData(external_key="dave"), - roles=[RoleData( - external_key="library_user", - permissions=LIST_LIBRARY_USER_PERMISSIONS, - )], + roles=[ + RoleData( + external_key="library_user", + permissions=LIST_LIBRARY_USER_PERMISSIONS, + ) + ], scope=ScopeData(external_key="lib:Org1:english_101"), ) ], @@ -706,9 +699,7 @@ def test_get_all_role_assignments_in_scope(self, scope_name, expected_assignment - All role assignments in the specified scope are correctly retrieved. - Each assignment includes the subject, role, and scope information with permissions. """ - role_assignments = get_all_subject_role_assignments_in_scope( - ScopeData(external_key=scope_name) - ) + role_assignments = get_all_subject_role_assignments_in_scope(ScopeData(external_key=scope_name)) self.assertEqual(len(role_assignments), len(expected_assignments)) for assignment in role_assignments: diff --git a/openedx_authz/tests/api/test_users.py b/openedx_authz/tests/api/test_users.py index 5590c1b2..7f11fc1c 100644 --- a/openedx_authz/tests/api/test_users.py +++ b/openedx_authz/tests/api/test_users.py @@ -64,13 +64,9 @@ def test_assign_role_to_user_in_scope(self, username, role, scope_name, batch): - The role is successfully assigned to the user in the specified scope. """ if batch: - batch_assign_role_to_users_in_scope( - users=username, role_external_key=role, scope_external_key=scope_name - ) + batch_assign_role_to_users_in_scope(users=username, role_external_key=role, scope_external_key=scope_name) for user in username: - user_roles = get_user_role_assignments_in_scope( - user_external_key=user, scope_external_key=scope_name - ) + user_roles = get_user_role_assignments_in_scope(user_external_key=user, scope_external_key=scope_name) role_names = {r.external_key for assignment in user_roles for r in assignment.roles} self.assertIn(role, role_names) else: @@ -79,9 +75,7 @@ def test_assign_role_to_user_in_scope(self, username, role, scope_name, batch): role_external_key=role, scope_external_key=scope_name, ) - user_roles = get_user_role_assignments_in_scope( - user_external_key=username, scope_external_key=scope_name - ) + user_roles = get_user_role_assignments_in_scope(user_external_key=username, scope_external_key=scope_name) role_names = {r.external_key for assignment in user_roles for r in assignment.roles} self.assertIn(role, role_names) @@ -100,13 +94,9 @@ def test_unassign_role_from_user(self, username, role, scope_name, batch): - The user no longer has the role in the specified scope. """ if batch: - batch_unassign_role_from_users( - users=username, role_external_key=role, scope_external_key=scope_name - ) + batch_unassign_role_from_users(users=username, role_external_key=role, scope_external_key=scope_name) for user in username: - user_roles = get_user_role_assignments_in_scope( - user_external_key=user, scope_external_key=scope_name - ) + user_roles = get_user_role_assignments_in_scope(user_external_key=user, scope_external_key=scope_name) role_names = {r.external_key for assignment in user_roles for r in assignment.roles} self.assertNotIn(role, role_names) else: @@ -115,9 +105,7 @@ def test_unassign_role_from_user(self, username, role, scope_name, batch): role_external_key=role, scope_external_key=scope_name, ) - user_roles = get_user_role_assignments_in_scope( - user_external_key=username, scope_external_key=scope_name - ) + user_roles = get_user_role_assignments_in_scope(user_external_key=username, scope_external_key=scope_name) role_names = {r.external_key for assignment in user_roles for r in assignment.roles} self.assertNotIn(role, role_names) @@ -136,9 +124,7 @@ def test_get_user_role_assignments(self, username, expected_roles): """ role_assignments = get_user_role_assignments(user_external_key=username) - assigned_role_names = { - r.external_key for assignment in role_assignments for r in assignment.roles - } + assigned_role_names = {r.external_key for assignment in role_assignments for r in assignment.roles} self.assertEqual(assigned_role_names, expected_roles) @data( @@ -148,18 +134,14 @@ def test_get_user_role_assignments(self, username, expected_roles): ("grace", "lib:Org1:math_advanced", {"library_contributor"}), ) @unpack - def test_get_user_role_assignments_in_scope( - self, username, scope_name, expected_roles - ): + def test_get_user_role_assignments_in_scope(self, username, scope_name, expected_roles): """Test retrieving role assignments for a user within a specific scope. Expected result: - The role assigned to the user in the specified scope is correctly retrieved. - The returned role assignments contain the assigned role. """ - user_roles = get_user_role_assignments_in_scope( - user_external_key=username, scope_external_key=scope_name - ) + user_roles = get_user_role_assignments_in_scope(user_external_key=username, scope_external_key=scope_name) role_names = {r.external_key for assignment in user_roles for r in assignment.roles} self.assertEqual(role_names, expected_roles) @@ -170,9 +152,7 @@ def test_get_user_role_assignments_in_scope( ("library_contributor", "lib:Org1:math_advanced", {"grace", "heidi"}), ) @unpack - def test_get_user_role_assignments_for_role_in_scope( - self, role_name, scope_name, expected_users - ): + def test_get_user_role_assignments_for_role_in_scope(self, role_name, scope_name, expected_users): """Test retrieving all users assigned to a specific role within a specific scope. Expected result: @@ -183,9 +163,7 @@ def test_get_user_role_assignments_for_role_in_scope( role_external_key=role_name, scope_external_key=scope_name ) - assigned_usernames = { - assignment.subject.username for assignment in user_assignments - } + assigned_usernames = {assignment.subject.username for assignment in user_assignments} self.assertEqual(assigned_usernames, expected_users) @@ -195,10 +173,12 @@ def test_get_user_role_assignments_for_role_in_scope( [ RoleAssignmentData( subject=UserData(external_key="alice"), - roles=[RoleData( - external_key="library_admin", - permissions=LIST_LIBRARY_ADMIN_PERMISSIONS, - )], + roles=[ + RoleData( + external_key="library_admin", + permissions=LIST_LIBRARY_ADMIN_PERMISSIONS, + ) + ], scope=ContentLibraryData(external_key="lib:Org1:math_101"), ), ], @@ -208,10 +188,12 @@ def test_get_user_role_assignments_for_role_in_scope( [ RoleAssignmentData( subject=UserData(external_key="bob"), - roles=[RoleData( - external_key="library_author", - permissions=LIST_LIBRARY_AUTHOR_PERMISSIONS, - )], + roles=[ + RoleData( + external_key="library_author", + permissions=LIST_LIBRARY_AUTHOR_PERMISSIONS, + ) + ], scope=ContentLibraryData(external_key="lib:Org1:history_201"), ), ], @@ -221,28 +203,26 @@ def test_get_user_role_assignments_for_role_in_scope( [ RoleAssignmentData( subject=UserData(external_key="eve"), - roles=[RoleData( - external_key="library_admin", - permissions=LIST_LIBRARY_ADMIN_PERMISSIONS, - )], + roles=[ + RoleData( + external_key="library_admin", + permissions=LIST_LIBRARY_ADMIN_PERMISSIONS, + ) + ], scope=ContentLibraryData(external_key="lib:Org2:physics_401"), ), ], ), ) @unpack - def test_get_all_user_role_assignments_in_scope( - self, scope_name, expected_assignments - ): + def test_get_all_user_role_assignments_in_scope(self, scope_name, expected_assignments): """Test retrieving all user role assignments within a specific scope. Expected result: - All user role assignments in the specified scope are correctly retrieved. - Each assignment includes the subject, role, and scope information. """ - role_assignments = get_all_user_role_assignments_in_scope( - scope_external_key=scope_name - ) + role_assignments = get_all_user_role_assignments_in_scope(scope_external_key=scope_name) self.assertEqual(len(role_assignments), len(expected_assignments)) for assignment in role_assignments: diff --git a/openedx_authz/tests/rest_api/test_views.py b/openedx_authz/tests/rest_api/test_views.py index 159e91bc..390a4ffc 100644 --- a/openedx_authz/tests/rest_api/test_views.py +++ b/openedx_authz/tests/rest_api/test_views.py @@ -209,10 +209,22 @@ def test_permission_validation_success(self, request_data: list[dict], permissio [{}, {}], [{}, {"action": "edit_library", "scope": "lib:Org1:LIB1"}], [{"action": "edit_library", "scope": "lib:Org1:LIB1"}, {}], - [{"action": "edit_library", "scope": "lib:Org1:LIB1"}, {"action": "", "scope": "lib:Org1:LIB1"}], - [{"action": "edit_library", "scope": "lib:Org1:LIB1"}, {"action": "edit_library", "scope": ""}], - [{"action": "edit_library", "scope": "lib:Org1:LIB1"}, {"scope": "lib:Org1:LIB1"}], - [{"action": "edit_library", "scope": "lib:Org1:LIB1"}, {"action": "edit_library"}], + [ + {"action": "edit_library", "scope": "lib:Org1:LIB1"}, + {"action": "", "scope": "lib:Org1:LIB1"}, + ], + [ + {"action": "edit_library", "scope": "lib:Org1:LIB1"}, + {"action": "edit_library", "scope": ""}, + ], + [ + {"action": "edit_library", "scope": "lib:Org1:LIB1"}, + {"scope": "lib:Org1:LIB1"}, + ], + [ + {"action": "edit_library", "scope": "lib:Org1:LIB1"}, + {"action": "edit_library"}, + ], ) def test_permission_validation_invalid_data(self, invalid_data: list[dict]): """Test permission validation with invalid request data. @@ -239,7 +251,11 @@ def test_permission_validation_unauthenticated(self): self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) @data( - (Exception(), status.HTTP_500_INTERNAL_SERVER_ERROR, "An error occurred while validating permissions"), + ( + Exception(), + status.HTTP_500_INTERNAL_SERVER_ERROR, + "An error occurred while validating permissions", + ), (ValueError(), status.HTTP_400_BAD_REQUEST, "Invalid scope format"), ) @unpack @@ -252,7 +268,9 @@ def test_permission_validation_exception_handling(self, exception: Exception, st """ with patch.object(api, "is_user_allowed", side_effect=exception): response = self.client.post( - self.url, data=[{"action": "edit_library", "scope": "lib:Org1:LIB1"}], format="json" + self.url, + data=[{"action": "edit_library", "scope": "lib:Org1:LIB1"}], + format="json", ) self.assertEqual(response.status_code, status_code) @@ -388,12 +406,25 @@ def test_get_users_by_scope_permissions(self, username: str, status_code: int): # Single user - success (regular user) (["regular_1@example.com"], 1, 0), # Multiple users - admin and regular users - (["admin_1@example.com", "regular_1@example.com", "regular_2@example.com"], 3, 0), + ( + ["admin_1@example.com", "regular_1@example.com", "regular_2@example.com"], + 3, + 0, + ), # With username and email -------------------- # All success (["admin_1", "regular_1@example.com", "regular_2@example.com"], 3, 0), # Mixed results (user not found) - (["admin_1", "regular_1@example.com", "nonexistent", "notexistent@example.com"], 2, 2), + ( + [ + "admin_1", + "regular_1@example.com", + "nonexistent", + "notexistent@example.com", + ], + 2, + 2, + ), ) @unpack def test_add_users_to_role_success(self, users: list[str], expected_completed: int, expected_errors: int): @@ -440,7 +471,11 @@ def test_add_users_to_role_already_has_role(self, users: list[str], expected_com @patch.object(api, "assign_role_to_user_in_scope") def test_add_users_to_role_exception_handling(self, mock_assign_role_to_user_in_scope): """Test adding users to a role with exception handling.""" - request_data = {"role": "library_admin", "scope": "lib:Org1:LIB1", "users": ["regular_1"]} + request_data = { + "role": "library_admin", + "scope": "lib:Org1:LIB1", + "users": ["regular_1"], + } mock_assign_role_to_user_in_scope.side_effect = Exception() with patch.object(api.ContentLibraryData, "exists", return_value=True): @@ -450,7 +485,10 @@ def test_add_users_to_role_exception_handling(self, mock_assign_role_to_user_in_ self.assertEqual(len(response.data["completed"]), 0) self.assertEqual(len(response.data["errors"]), 1) self.assertEqual(response.data["errors"][0]["user_identifier"], "regular_1") - self.assertEqual(response.data["errors"][0]["error"], RoleOperationError.ROLE_ASSIGNMENT_ERROR) + self.assertEqual( + response.data["errors"][0]["error"], + RoleOperationError.ROLE_ASSIGNMENT_ERROR, + ) @data( {}, @@ -492,7 +530,11 @@ def test_add_users_to_role_permissions(self, username: str, status_code: int): Expected result: - Returns appropriate status code based on permissions """ - request_data = {"role": "library_admin", "scope": "lib:Org3:LIB3", "users": ["regular_2"]} + request_data = { + "role": "library_admin", + "scope": "lib:Org3:LIB3", + "users": ["regular_2"], + } user = User.objects.filter(username=username).first() self.client.force_authenticate(user=user) @@ -515,12 +557,25 @@ def test_add_users_to_role_permissions(self, username: str, status_code: int): # Single user - success (regular user) (["regular_3@example.com"], 1, 0), # Multiple users - all success (admin and regular users) - (["admin_2@example.com", "regular_3@example.com", "regular_4@example.com"], 3, 0), + ( + ["admin_2@example.com", "regular_3@example.com", "regular_4@example.com"], + 3, + 0, + ), # With username and email ------------------- # All success (["admin_2", "regular_3@example.com", "regular_4@example.com"], 3, 0), # Mixed results (user not found) - (["admin_2", "regular_3@example.com", "nonexistent", "notexistent@example.com"], 2, 2), + ( + [ + "admin_2", + "regular_3@example.com", + "nonexistent", + "notexistent@example.com", + ], + 2, + 2, + ), ) @unpack def test_remove_users_from_role_success(self, users: list[str], expected_completed: int, expected_errors: int): @@ -530,7 +585,11 @@ def test_remove_users_from_role_success(self, users: list[str], expected_complet - Returns 207 MULTI-STATUS status - Returns appropriate completed and error counts """ - query_params = {"role": "library_user", "scope": "lib:Org2:LIB2", "users": ",".join(users)} + query_params = { + "role": "library_user", + "scope": "lib:Org2:LIB2", + "users": ",".join(users), + } with patch.object(api.ContentLibraryData, "exists", return_value=True): response = self.client.delete(f"{self.url}?{urlencode(query_params)}") @@ -542,7 +601,11 @@ def test_remove_users_from_role_success(self, users: list[str], expected_complet @patch.object(api, "unassign_role_from_user") def test_remove_users_from_role_exception_handling(self, mock_unassign_role_from_user): """Test removing users from a role with exception handling.""" - query_params = {"role": "library_admin", "scope": "lib:Org1:LIB1", "users": "regular_1,regular_2,regular_3"} + query_params = { + "role": "library_admin", + "scope": "lib:Org1:LIB1", + "users": "regular_1,regular_2,regular_3", + } mock_unassign_role_from_user.side_effect = [True, False, Exception()] with patch.object(api.ContentLibraryData, "exists", return_value=True): @@ -551,11 +614,20 @@ def test_remove_users_from_role_exception_handling(self, mock_unassign_role_from self.assertEqual(len(response.data["completed"]), 1) self.assertEqual(len(response.data["errors"]), 2) self.assertEqual(response.data["completed"][0]["user_identifier"], "regular_1") - self.assertEqual(response.data["completed"][0]["status"], RoleOperationStatus.ROLE_REMOVED) + self.assertEqual( + response.data["completed"][0]["status"], + RoleOperationStatus.ROLE_REMOVED, + ) self.assertEqual(response.data["errors"][0]["user_identifier"], "regular_2") - self.assertEqual(response.data["errors"][0]["error"], RoleOperationError.USER_DOES_NOT_HAVE_ROLE) + self.assertEqual( + response.data["errors"][0]["error"], + RoleOperationError.USER_DOES_NOT_HAVE_ROLE, + ) self.assertEqual(response.data["errors"][1]["user_identifier"], "regular_3") - self.assertEqual(response.data["errors"][1]["error"], RoleOperationError.ROLE_REMOVAL_ERROR) + self.assertEqual( + response.data["errors"][1]["error"], + RoleOperationError.ROLE_REMOVAL_ERROR, + ) @data( {}, @@ -596,7 +668,11 @@ def test_remove_users_from_role_permissions(self, username: str, status_code: in Expected result: - Returns appropriate status code based on permissions """ - query_params = {"role": "library_admin", "scope": "lib:Org3:LIB3", "users": "user1,user2"} + query_params = { + "role": "library_admin", + "scope": "lib:Org3:LIB3", + "users": "user1,user2", + } user = User.objects.filter(username=username).first() self.client.force_authenticate(user=user) diff --git a/openedx_authz/tests/test_commands.py b/openedx_authz/tests/test_commands.py index 16ae986e..6fa90679 100644 --- a/openedx_authz/tests/test_commands.py +++ b/openedx_authz/tests/test_commands.py @@ -44,7 +44,10 @@ def test_requires_policy_file_argument(self): with self.assertRaises(CommandError) as ctx: call_command("enforcement") - self.assertEqual("Error: the following arguments are required: --policy-file-path", str(ctx.exception)) + self.assertEqual( + "Error: the following arguments are required: --policy-file-path", + str(ctx.exception), + ) def test_policy_file_not_found_raises(self): """Test that command errors when the provided policy file does not exist.""" @@ -61,7 +64,10 @@ def test_model_file_not_found_raises(self, mock_get_file_path: Mock): with self.assertRaises(CommandError) as ctx: call_command("enforcement", policy_file_path=self.policy_file_path.name) - self.assertEqual(f"Model file not found: {mock_get_file_path.return_value}", str(ctx.exception)) + self.assertEqual( + f"Model file not found: {mock_get_file_path.return_value}", + str(ctx.exception), + ) @patch("openedx_authz.management.commands.enforcement.casbin.Enforcer") def test_error_creating_enforcer_raises(self, mock_enforcer_cls: Mock): @@ -71,7 +77,10 @@ def test_error_creating_enforcer_raises(self, mock_enforcer_cls: Mock): with self.assertRaises(CommandError) as ctx: call_command("enforcement", policy_file_path=self.policy_file_path.name) - self.assertEqual("Error creating Casbin enforcer: Enforcer creation error", str(ctx.exception)) + self.assertEqual( + "Error creating Casbin enforcer: Enforcer creation error", + str(ctx.exception), + ) @patch("openedx_authz.management.commands.enforcement.casbin.Enforcer") @patch.object(EnforcementCommand, "_run_interactive_mode") @@ -92,7 +101,11 @@ def test_successful_run_prints_summary(self, mock_run_interactive: Mock, mock_en mock_enforcer.get_named_grouping_policy.return_value = action_grouping mock_enforcer_cls.return_value = mock_enforcer - call_command("enforcement", policy_file_path=self.policy_file_path.name, stdout=self.buffer) + call_command( + "enforcement", + policy_file_path=self.policy_file_path.name, + stdout=self.buffer, + ) output = self.buffer.getvalue() self.assertIn("Casbin Interactive Enforcement", output) @@ -110,7 +123,10 @@ def test_run_interactive_mode_displays_help(self): example_text = f"Example: {make_user_key('alice')} {make_action_key('read')} {make_scope_key('org', 'OpenedX')}" self.assertIn("Interactive Mode", self.buffer.getvalue()) self.assertIn("Test custom enforcement requests interactively.", self.buffer.getvalue()) - self.assertIn("Enter 'quit', 'exit', or 'q' to exit the interactive mode.", self.buffer.getvalue()) + self.assertIn( + "Enter 'quit', 'exit', or 'q' to exit the interactive mode.", + self.buffer.getvalue(), + ) self.assertIn("Format: subject action scope", self.buffer.getvalue()) self.assertIn(example_text, self.buffer.getvalue()) @@ -211,10 +227,10 @@ def setUp(self): super().setUp() self.buffer = io.StringIO() - @patch('openedx_authz.engine.enforcer.AuthzEnforcer.get_enforcer') - @patch('casbin.Enforcer') - @patch('os.path.join') - @patch('click.confirm') + @patch("openedx_authz.engine.enforcer.AuthzEnforcer.get_enforcer") + @patch("casbin.Enforcer") + @patch("os.path.join") + @patch("click.confirm") def test_handle_with_default_paths(self, mock_confirm, mock_join, mock_casbin_enforcer, mock_get_enforcer): """Test handle method with default policy and model paths.""" # Setup mocks @@ -245,18 +261,14 @@ def test_handle_with_default_paths(self, mock_confirm, mock_join, mock_casbin_en model_path, policy_path, ) - mock_join.assert_any_call( - ROOT_DIRECTORY, "engine", "config", "authz.policy" - ) - mock_join.assert_any_call( - ROOT_DIRECTORY, "engine", "config", "model.conf" - ) + mock_join.assert_any_call(ROOT_DIRECTORY, "engine", "config", "authz.policy") + mock_join.assert_any_call(ROOT_DIRECTORY, "engine", "config", "model.conf") mock_confirm.assert_not_called() command.migrate_policies.assert_called_once_with(mock_source_enforcer, mock_target_enforcer) - @patch('openedx_authz.engine.enforcer.AuthzEnforcer.get_enforcer') - @patch('casbin.Enforcer') - @patch('click.confirm') + @patch("openedx_authz.engine.enforcer.AuthzEnforcer.get_enforcer") + @patch("casbin.Enforcer") + @patch("click.confirm") def test_handle_with_custom_paths(self, mock_confirm, mock_casbin_enforcer, mock_get_enforcer): """Test handle method with custom policy and model paths.""" # Setup mocks @@ -271,21 +283,25 @@ def test_handle_with_custom_paths(self, mock_confirm, mock_casbin_enforcer, mock command.migrate_policies = Mock() # Custom paths - policy_path = '/custom/path/to/policy.csv' - model_path = '/custom/path/to/model.conf' + policy_path = "/custom/path/to/policy.csv" + model_path = "/custom/path/to/model.conf" # Call handle method - command.handle(policy_file_path=policy_path, model_file_path=model_path, clear_existing=False) + command.handle( + policy_file_path=policy_path, + model_file_path=model_path, + clear_existing=False, + ) # Assertions mock_casbin_enforcer.assert_called_once_with(model_path, policy_path) mock_confirm.assert_not_called() command.migrate_policies.assert_called_once_with(mock_source_enforcer, mock_target_enforcer) - @patch('openedx_authz.engine.enforcer.AuthzEnforcer.get_enforcer') - @patch('casbin.Enforcer') - @patch('click.confirm') - @patch('click.style') + @patch("openedx_authz.engine.enforcer.AuthzEnforcer.get_enforcer") + @patch("casbin.Enforcer") + @patch("click.confirm") + @patch("click.style") def test_handle_clear_existing_roles_confirmed( self, mock_style, mock_confirm, mock_casbin_enforcer, mock_get_enforcer ): @@ -317,10 +333,10 @@ def test_handle_clear_existing_roles_confirmed( command._delete_permissions_inheritance.assert_not_called() command.migrate_policies.assert_called_once_with(mock_source_enforcer, mock_target_enforcer) - @patch('openedx_authz.engine.enforcer.AuthzEnforcer.get_enforcer') - @patch('casbin.Enforcer') - @patch('click.confirm') - @patch('click.style') + @patch("openedx_authz.engine.enforcer.AuthzEnforcer.get_enforcer") + @patch("casbin.Enforcer") + @patch("click.confirm") + @patch("click.style") def test_handle_clear_existing_permissions_confirmed( self, mock_style, mock_confirm, mock_casbin_enforcer, mock_get_enforcer ): @@ -352,9 +368,9 @@ def test_handle_clear_existing_permissions_confirmed( command._delete_permissions_inheritance.assert_called_once_with(mock_target_enforcer) command.migrate_policies.assert_called_once_with(mock_source_enforcer, mock_target_enforcer) - @patch('openedx_authz.engine.enforcer.AuthzEnforcer.get_enforcer') - @patch('casbin.Enforcer') - @patch('click.confirm') + @patch("openedx_authz.engine.enforcer.AuthzEnforcer.get_enforcer") + @patch("casbin.Enforcer") + @patch("click.confirm") def test_handle_clear_existing_both_denied(self, mock_confirm, mock_casbin_enforcer, mock_get_enforcer): """Test handle method with clear_existing but denied deletions.""" expected_mock_confirm_calls = 2 diff --git a/openedx_authz/tests/test_enforcement.py b/openedx_authz/tests/test_enforcement.py index 63fb9c5b..4938be1d 100644 --- a/openedx_authz/tests/test_enforcement.py +++ b/openedx_authz/tests/test_enforcement.py @@ -167,8 +167,19 @@ class ActionGroupingTests(CasbinEnforcementTestCase): """ POLICY = [ - ["p", make_role_key("role-1"), make_action_key("manage"), make_scope_key("org", "*"), "allow"], - ["g", make_user_key("user-1"), make_role_key("role-1"), make_scope_key("org", "any-org")], + [ + "p", + make_role_key("role-1"), + make_action_key("manage"), + make_scope_key("org", "*"), + "allow", + ], + [ + "g", + make_user_key("user-1"), + make_role_key("role-1"), + make_scope_key("org", "any-org"), + ], ] + COMMON_ACTION_GROUPING CASES = [ @@ -216,29 +227,112 @@ class RoleAssignmentTests(CasbinEnforcementTestCase): POLICY = [ # Policies ["p", make_role_key("platform_admin"), make_action_key("manage"), "*", "allow"], - ["p", make_role_key("org_admin"), make_action_key("manage"), make_scope_key("org", "*"), "allow"], - ["p", make_role_key("org_editor"), make_action_key("edit"), make_scope_key("org", "*"), "allow"], - ["p", make_role_key("org_author"), make_action_key("write"), make_scope_key("org", "*"), "allow"], - ["p", make_role_key("course_admin"), make_action_key("manage"), make_scope_key("course", "*"), "allow"], - ["p", make_role_key("library_admin"), make_action_key("manage"), make_scope_key("lib", "*"), "allow"], - ["p", make_role_key("library_editor"), make_action_key("edit"), make_scope_key("lib", "*"), "allow"], - ["p", make_role_key("library_reviewer"), make_action_key("read"), make_scope_key("lib", "*"), "allow"], - ["p", make_role_key("library_author"), make_action_key("write"), make_scope_key("lib", "*"), "allow"], + [ + "p", + make_role_key("org_admin"), + make_action_key("manage"), + make_scope_key("org", "*"), + "allow", + ], + [ + "p", + make_role_key("org_editor"), + make_action_key("edit"), + make_scope_key("org", "*"), + "allow", + ], + [ + "p", + make_role_key("org_author"), + make_action_key("write"), + make_scope_key("org", "*"), + "allow", + ], + [ + "p", + make_role_key("course_admin"), + make_action_key("manage"), + make_scope_key("course", "*"), + "allow", + ], + [ + "p", + make_role_key("library_admin"), + make_action_key("manage"), + make_scope_key("lib", "*"), + "allow", + ], + [ + "p", + make_role_key("library_editor"), + make_action_key("edit"), + make_scope_key("lib", "*"), + "allow", + ], + [ + "p", + make_role_key("library_reviewer"), + make_action_key("read"), + make_scope_key("lib", "*"), + "allow", + ], + [ + "p", + make_role_key("library_author"), + make_action_key("write"), + make_scope_key("lib", "*"), + "allow", + ], # Role assignments ["g", make_user_key("user-1"), make_role_key("platform_admin"), "*"], - ["g", make_user_key("user-2"), make_role_key("org_admin"), make_scope_key("org", "any-org")], - ["g", make_user_key("user-3"), make_role_key("org_editor"), make_scope_key("org", "any-org")], - ["g", make_user_key("user-4"), make_role_key("org_author"), make_scope_key("org", "any-org")], + [ + "g", + make_user_key("user-2"), + make_role_key("org_admin"), + make_scope_key("org", "any-org"), + ], + [ + "g", + make_user_key("user-3"), + make_role_key("org_editor"), + make_scope_key("org", "any-org"), + ], + [ + "g", + make_user_key("user-4"), + make_role_key("org_author"), + make_scope_key("org", "any-org"), + ], [ "g", make_user_key("user-5"), make_role_key("course_admin"), make_scope_key("course", "course-v1:any-org+any-course+any-course-run"), ], - ["g", make_user_key("user-6"), make_role_key("library_admin"), make_library_key("lib:DemoX:CSPROB")], - ["g", make_user_key("user-7"), make_role_key("library_editor"), make_library_key("lib:DemoX:CSPROB")], - ["g", make_user_key("user-8"), make_role_key("library_reviewer"), make_library_key("lib:DemoX:CSPROB")], - ["g", make_user_key("user-9"), make_role_key("library_author"), make_library_key("lib:DemoX:CSPROB")], + [ + "g", + make_user_key("user-6"), + make_role_key("library_admin"), + make_library_key("lib:DemoX:CSPROB"), + ], + [ + "g", + make_user_key("user-7"), + make_role_key("library_editor"), + make_library_key("lib:DemoX:CSPROB"), + ], + [ + "g", + make_user_key("user-8"), + make_role_key("library_reviewer"), + make_library_key("lib:DemoX:CSPROB"), + ], + [ + "g", + make_user_key("user-9"), + make_role_key("library_author"), + make_library_key("lib:DemoX:CSPROB"), + ], ] + COMMON_ACTION_GROUPING CASES = [ @@ -383,9 +477,27 @@ class WildcardScopeTests(CasbinEnforcementTestCase): POLICY = [ # Policies ["p", make_role_key("platform_admin"), make_action_key("manage"), "*", "allow"], - ["p", make_role_key("org_admin"), make_action_key("manage"), make_scope_key("org", "*"), "allow"], - ["p", make_role_key("course_admin"), make_action_key("manage"), make_scope_key("course", "*"), "allow"], - ["p", make_role_key("library_admin"), make_action_key("manage"), make_scope_key("lib", "*"), "allow"], + [ + "p", + make_role_key("org_admin"), + make_action_key("manage"), + make_scope_key("org", "*"), + "allow", + ], + [ + "p", + make_role_key("course_admin"), + make_action_key("manage"), + make_scope_key("course", "*"), + "allow", + ], + [ + "p", + make_role_key("library_admin"), + make_action_key("manage"), + make_scope_key("lib", "*"), + "allow", + ], # Role assignments ["g", make_user_key("user-1"), make_role_key("platform_admin"), "*"], ["g", make_user_key("user-2"), make_role_key("org_admin"), "*"], diff --git a/openedx_authz/tests/test_engine_utils.py b/openedx_authz/tests/test_engine_utils.py index 89d3b225..74f33321 100644 --- a/openedx_authz/tests/test_engine_utils.py +++ b/openedx_authz/tests/test_engine_utils.py @@ -150,7 +150,10 @@ def test_migrate_action_inheritance_from_file(self): # Verify a sample of expected g2 rules from the file self.assertIn( - [make_action_key("delete_library"), make_action_key("edit_library_content")], + [ + make_action_key("delete_library"), + make_action_key("edit_library_content"), + ], target_g2, ) self.assertIn( @@ -192,11 +195,7 @@ def test_migrate_idempotent(self): "Running migration twice should not duplicate g2 rules", ) - duplicates = ( - CasbinRule.objects.values("v0", "v1", "v2") - .annotate(total=Count("*")) - .filter(total__gt=1) - ) + duplicates = CasbinRule.objects.values("v0", "v1", "v2").annotate(total=Count("*")).filter(total__gt=1) duplicate_list = list(duplicates) self.assertEqual( len(duplicate_list), @@ -254,11 +253,7 @@ def test_migrate_partial_duplicates(self): "Should have 31 policies total, with no duplicates", ) - duplicates = ( - CasbinRule.objects.values("v0", "v1", "v2") - .annotate(total=Count("*")) - .filter(total__gt=1) - ) + duplicates = CasbinRule.objects.values("v0", "v1", "v2").annotate(total=Count("*")).filter(total__gt=1) duplicate_list = list(duplicates) self.assertEqual( len(duplicate_list), @@ -341,12 +336,8 @@ def test_migrate_preserves_existing_db_policies(self): migrate_policy_between_enforcers(self.source_enforcer, self.target_enforcer) target_policies = self.target_enforcer.get_policy() - self.assertEqual( - len(target_policies), 32, "Should have 31 file policies + 1 custom policy" - ) - self.assertIn( - custom_policy, target_policies, "Custom database policy should be preserved" - ) + self.assertEqual(len(target_policies), 32, "Should have 31 file policies + 1 custom policy") + self.assertIn(custom_policy, target_policies, "Custom database policy should be preserved") def test_migrate_preserves_user_role_assignments_in_db(self): """Test that migration preserves user role assignments (g rules) in the database. @@ -370,9 +361,7 @@ def test_migrate_preserves_user_role_assignments_in_db(self): migrate_policy_between_enforcers(self.source_enforcer, self.target_enforcer) target_grouping = self.target_enforcer.get_grouping_policy() - self.assertEqual( - len(target_grouping), 2, "User role assignments should be preserved" - ) + self.assertEqual(len(target_grouping), 2, "User role assignments should be preserved") self.assertIn( [ make_user_key("user-1"), @@ -383,6 +372,4 @@ def test_migrate_preserves_user_role_assignments_in_db(self): ) target_policies = self.target_enforcer.get_policy() - self.assertEqual( - len(target_policies), 31, "All 31 policies from file should be loaded" - ) + self.assertEqual(len(target_policies), 31, "All 31 policies from file should be loaded") diff --git a/openedx_authz/tests/test_filter.py b/openedx_authz/tests/test_filter.py index 666cb163..475aa533 100644 --- a/openedx_authz/tests/test_filter.py +++ b/openedx_authz/tests/test_filter.py @@ -40,7 +40,7 @@ def test_initialization_with_multiple_attributes(self): ptype=["p"], v0=[make_user_key("alice")], v1=[make_action_key("read")], - v2=[make_scope_key("org", "MIT")] + v2=[make_scope_key("org", "MIT")], ) self.assertEqual(f.ptype, ["p"]) self.assertEqual(f.v0, [make_user_key("alice")]) @@ -132,7 +132,7 @@ def test_filter_role_assignments(self): ptype=["g"], v0=[make_user_key("alice")], v1=[make_role_key("admin")], - v2=[make_scope_key("org", "MIT")] + v2=[make_scope_key("org", "MIT")], ) self.assertEqual(f.ptype, ["g"]) self.assertEqual(f.v0, [make_user_key("alice")]) diff --git a/requirements/dev.txt b/requirements/dev.txt index 2fa26b57..33040c84 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -223,8 +223,6 @@ pycasbin==2.2.0 # via # -r requirements/quality.txt # casbin-django-orm-adapter -pycodestyle==2.14.0 - # via -r requirements/quality.txt pycparser==2.23 # via # -r requirements/quality.txt @@ -305,6 +303,8 @@ requests==2.32.5 # via # -r requirements/quality.txt # edx-drf-extensions +ruff==0.14.1 + # via -r requirements/quality.txt semantic-version==2.10.0 # via # -r requirements/quality.txt diff --git a/requirements/quality.in b/requirements/quality.in index 93661d98..b370720d 100644 --- a/requirements/quality.in +++ b/requirements/quality.in @@ -5,6 +5,5 @@ -r test.txt # Core and testing dependencies for this package edx-lint # edX pylint rules and plugins -isort # to standardize order of imports -pycodestyle # PEP 8 compliance validation pydocstyle # PEP 257 compliance validation +ruff # Fast Python linter and formatter diff --git a/requirements/quality.txt b/requirements/quality.txt index cd19e5ae..9f1325e3 100644 --- a/requirements/quality.txt +++ b/requirements/quality.txt @@ -122,9 +122,7 @@ iniconfig==2.1.0 # -r requirements/test.txt # pytest isort==6.0.1 - # via - # -r requirements/quality.in - # pylint + # via pylint jinja2==3.1.6 # via # -r requirements/test.txt @@ -157,8 +155,6 @@ pycasbin==2.2.0 # via # -r requirements/test.txt # casbin-django-orm-adapter -pycodestyle==2.14.0 - # via -r requirements/quality.in pycparser==2.23 # via # -r requirements/test.txt @@ -222,6 +218,8 @@ requests==2.32.5 # via # -r requirements/test.txt # edx-drf-extensions +ruff==0.14.1 + # via -r requirements/quality.in semantic-version==2.10.0 # via # -r requirements/test.txt diff --git a/ruff.toml b/ruff.toml new file mode 100644 index 00000000..3aae9db8 --- /dev/null +++ b/ruff.toml @@ -0,0 +1,17 @@ +# Ruff configuration for openedx-authz +# See https://docs.astral.sh/ruff/configuration/ + +line-length = 120 + +[lint] +# E: pycodestyle errors +# I: isort import sorting +select = ["E", "I"] + +exclude = [ + ".git", + ".tox", + "migrations", + "__pycache__", + "*.pyc", +] diff --git a/setup.py b/setup.py index 91aa3307..294146fa 100755 --- a/setup.py +++ b/setup.py @@ -103,7 +103,7 @@ def add_version_constraint_or_raise(current_line, current_requirements, add_if_n add_version_constraint_or_raise(line, requirements, False) # process back into list of pkg><=constraints strings - constrained_requirements = [f'{pkg}{version or ""}' for (pkg, version) in sorted(requirements.items())] + constrained_requirements = [f"{pkg}{version or ''}" for (pkg, version) in sorted(requirements.items())] return constrained_requirements diff --git a/tests/test_models.py b/tests/test_models.py index 9f808bb1..38c29ce8 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -6,9 +6,7 @@ import pytest -@pytest.mark.skip( - reason="Placeholder to allow pytest to succeed before real tests are in place." -) +@pytest.mark.skip(reason="Placeholder to allow pytest to succeed before real tests are in place.") def test_placeholder(): """ TODO: Delete this test once there are real tests. diff --git a/tox.ini b/tox.ini index 0c2ec5cb..6326bb9d 100644 --- a/tox.ini +++ b/tox.ini @@ -5,9 +5,10 @@ envlist = py{311,312}-django{42,52} ; D001 = Line too long ignore = D001 -[pycodestyle] -exclude = .git,.tox,migrations -max-line-length = 120 +[ruff] +# Configuration moved to ruff.toml +# select = E (pycodestyle errors), I (isort) +# max-line-length = 120 [pydocstyle] ; D101 = Missing docstring in public class @@ -75,9 +76,8 @@ commands = touch tests/__init__.py pylint openedx_authz tests test_utils manage.py setup.py rm tests/__init__.py - pycodestyle openedx_authz tests manage.py setup.py + ruff check openedx_authz tests test_utils manage.py setup.py pydocstyle openedx_authz tests manage.py setup.py - isort --check-only --diff tests test_utils openedx_authz manage.py setup.py make selfcheck [testenv:pii_check]