diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 82e50821..e202c8a0 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -14,6 +14,14 @@ Change Log Unreleased ********** +1.15.0 - 2026-04-30 +******************* + +Added +===== + +* Add support for course permission in Authz REST APIs (#274) + 1.14.0 - 2026-04-22 ******************* diff --git a/openedx_authz/__init__.py b/openedx_authz/__init__.py index 28e97ad9..c55ab814 100644 --- a/openedx_authz/__init__.py +++ b/openedx_authz/__init__.py @@ -4,6 +4,6 @@ import os -__version__ = "1.14.0" +__version__ = "1.15.0" ROOT_DIRECTORY = os.path.dirname(os.path.abspath(__file__)) diff --git a/openedx_authz/rest_api/v1/permissions.py b/openedx_authz/rest_api/v1/permissions.py index 8d5c18f7..fa55e163 100644 --- a/openedx_authz/rest_api/v1/permissions.py +++ b/openedx_authz/rest_api/v1/permissions.py @@ -59,13 +59,23 @@ class BaseScopePermission(BasePermission, metaclass=PermissionMeta): def get_scope_value(self, request) -> str | None: """Extract the scope value from the request. + When a ``scopes`` list is provided, returns only the first element. + This is intentional: bulk requests are expected to be homogeneous + (all scopes must share the same namespace). Actual per-scope permission + validation for bulk requests is handled in ``DynamicScopePermission``. + Args: request: The Django REST framework request object. Returns: str | None: The scope value if found (e.g., 'lib:DemoX:CSPROB'), or None if not present. """ - return request.data.get("scope") or request.query_params.get("scope") + scope = request.data.get("scope") or request.query_params.get("scope") + if not scope: + scopes = request.data.get("scopes") + if scopes and isinstance(scopes, list): + scope = scopes[0] + return scope def get_scope_namespace(self, request) -> str: """Derive the namespace from the request scope value. @@ -87,6 +97,12 @@ def get_scope_namespace(self, request) -> str: >>> permission.get_scope_namespace(request) 'global' """ + scopes_list = request.data.get("scopes") + if scopes_list and isinstance(scopes_list, list): + if not self._scopes_have_homogeneous_namespaces(scopes_list): + raise ValueError( + f"Mixed scope namespaces in bulk request are not allowed: {scopes_list}" + ) scope_value = self.get_scope_value(request) if not scope_value: return self.NAMESPACE @@ -95,6 +111,22 @@ def get_scope_namespace(self, request) -> str: except ValueError: return self.NAMESPACE + def _scopes_have_homogeneous_namespaces(self, scopes_list: list[str]) -> bool: + """Check that all scopes in the list share the same namespace. + + Args: + scopes_list: List of scope values to check. + Returns: + bool: True if all scopes share the same namespace, False otherwise. + """ + namespaces = set() + for scope in scopes_list: + try: + namespaces.add(api.ScopeData(external_key=scope).NAMESPACE) + except ValueError: + pass + return len(namespaces) <= 1 + def has_permission(self, request, view) -> bool: """Fallback permission check (deny by default). @@ -141,6 +173,9 @@ class DynamicScopePermission(BaseScopePermission): Note: Superusers and staff members always have permission regardless of scope. + Bulk requests (``scopes`` list) must be homogeneous — all scopes must share + the same namespace (e.g., all ``course-v1:`` or all ``lib:``). Mixed namespaces + will raise a ``ValueError`` during namespace resolution. """ NAMESPACE: ClassVar[None] = None @@ -168,6 +203,36 @@ def _get_permission_instance(self, request) -> BaseScopePermission: perm_class = PermissionMeta.get_permission_class(scope_namespace) return perm_class() + def _has_bulk_permission(self, request, view, scopes_list: list[str]) -> bool: + """Check permissions for a bulk request carrying multiple scopes. + + Bulk operations are only supported for endpoints decorated with + ``@authz_permissions``. A handler that does not use the decorator (i.e. does + not mix in ``MethodPermissionMixin``) has no declared permissions to evaluate + per-scope, so bulk access is denied outright. + + Every scope in ``scopes_list`` must pass at least one of the required + permissions declared by the decorator (OR logic per permission, AND logic + across scopes). + + Args: + request: The Django REST framework request object. + view: The view being accessed. + scopes_list: The list of scope values from ``request.data["scopes"]``. + + Returns: + bool: True only if every scope passes at least one required permission. + """ + perm_instance = self._get_permission_instance(request) # namespace resolved from scopes[0] + # Bulk without @authz_permissions decorator is not supported: there are no + # per-method permissions to iterate over, so we cannot safely grant access. + if not isinstance(perm_instance, MethodPermissionMixin): + return False + required = perm_instance.get_required_permissions(request, view) + if not required: + return False + return all(perm_instance.validate_permissions(request, required, sv) for sv in scopes_list) + def has_permission(self, request, view) -> bool: """Delegate permission check to the appropriate scope-specific permission class. @@ -175,6 +240,10 @@ def has_permission(self, request, view) -> bool: users, the permission check is delegated to the permission class registered for the request's scope namespace. + For bulk requests that carry a ``scopes`` list, delegates to + ``_has_bulk_permission``: every scope must pass at least one of the required + permissions (OR logic per permission, AND logic across scopes). + Examples: >>> # Regular user gets scope-specific check >>> request.data = {"scope": "lib:DemoX:CSPROB"} @@ -182,6 +251,9 @@ def has_permission(self, request, view) -> bool: """ if request.user.is_superuser or request.user.is_staff: return True + scopes_list = request.data.get("scopes") + if scopes_list and isinstance(scopes_list, list): + return self._has_bulk_permission(request, view, scopes_list) return self._get_permission_instance(request).has_permission(request, view) def has_object_permission(self, request, view, obj) -> bool: @@ -240,23 +312,19 @@ def get_required_permissions(self, request, view) -> list[str]: return [] def validate_permissions(self, request, permissions: list[str], scope_value: str) -> bool: - """Validate that the user has all required permissions for the scope. + """Validate that the user has at least one of the required permissions for the scope. Args: request: The Django REST framework request object. - permissions: List of permission identifiers to check. + permissions: List of permission identifiers to check (OR logic — any one suffices). scope_value: The scope to check permissions against. Returns: - bool: True if user has all required permissions, False otherwise. + bool: True if user has at least one required permission, False otherwise. """ if not permissions: return False - - for permission in permissions: - if not api.is_user_allowed(request.user.username, permission, scope_value): - return False - return True + return any(api.is_user_allowed(request.user.username, permission, scope_value) for permission in permissions) class AnyScopePermission(MethodPermissionMixin, BasePermission): @@ -282,6 +350,37 @@ def has_permission(self, request, view) -> bool: return any(api.get_scopes_for_user_and_permission(request.user.username, permission) for permission in required) +class CoursePermission(MethodPermissionMixin, BaseScopePermission): + """Permission handler for course scopes. + + This class implements permission checks specific to course operations. + It uses the authz API to verify whether a user has the necessary permissions + to perform actions on course team members or course resources. + """ + + NAMESPACE: ClassVar[str] = "course-v1" + """``course-v1`` for course scopes.""" + + def has_permission(self, request, view) -> bool: + """Check if the user has permission to perform the requested action. + + First checks if the view method has @authz_permissions decorator. + If present, validates all required permissions. If not present, + allows access by default. + + Returns: + bool: True if the user has the required permission, False otherwise. + Also returns False if no scope value is provided in the request. + """ + scope_value = self.get_scope_value(request) + if not scope_value: + return False + permissions = self.get_required_permissions(request, view) + if permissions: + return self.validate_permissions(request, permissions, scope_value) + return True + + class ContentLibraryPermission(MethodPermissionMixin, BaseScopePermission): """Permission handler for content library scopes. diff --git a/openedx_authz/rest_api/v1/views.py b/openedx_authz/rest_api/v1/views.py index 7e1d3946..201bd75b 100644 --- a/openedx_authz/rest_api/v1/views.py +++ b/openedx_authz/rest_api/v1/views.py @@ -292,7 +292,7 @@ class RoleUserAPIView(APIView): status.HTTP_401_UNAUTHORIZED: "The user is not authenticated or does not have the required permissions", }, ) - @authz_permissions([permissions.VIEW_LIBRARY.identifier]) + @authz_permissions([permissions.VIEW_LIBRARY.identifier, permissions.COURSES_VIEW_COURSE_TEAM.identifier]) def get(self, request: HttpRequest) -> Response: """Retrieve all users with role assignments within a specific scope.""" serializer = ListUsersInRoleWithScopeSerializer(data=request.query_params) @@ -319,7 +319,7 @@ def get(self, request: HttpRequest) -> Response: status.HTTP_401_UNAUTHORIZED: "The user is not authenticated or does not have the required permissions", }, ) - @authz_permissions([permissions.MANAGE_LIBRARY_TEAM.identifier]) + @authz_permissions([permissions.MANAGE_LIBRARY_TEAM.identifier, permissions.COURSES_MANAGE_COURSE_TEAM.identifier]) def put(self, request: HttpRequest) -> Response: """Assign multiple users to a specific role within one or more scopes.""" serializer = AddUsersToRoleWithScopeSerializer(data=request.data) @@ -366,7 +366,7 @@ def put(self, request: HttpRequest) -> Response: status.HTTP_401_UNAUTHORIZED: "The user is not authenticated or does not have the required permissions", }, ) - @authz_permissions([permissions.MANAGE_LIBRARY_TEAM.identifier]) + @authz_permissions([permissions.MANAGE_LIBRARY_TEAM.identifier, permissions.COURSES_MANAGE_COURSE_TEAM.identifier]) def delete(self, request: HttpRequest) -> Response: """Remove multiple users from a specific role within a scope.""" serializer = RemoveUsersFromRoleWithScopeSerializer(data=request.query_params) @@ -468,7 +468,7 @@ class RoleListView(APIView): status.HTTP_401_UNAUTHORIZED: "The user is not authenticated or does not have the required permissions", }, ) - @authz_permissions([permissions.VIEW_LIBRARY.identifier]) + @authz_permissions([permissions.VIEW_LIBRARY.identifier, permissions.COURSES_VIEW_COURSE_TEAM.identifier]) def get(self, request: HttpRequest) -> Response: """Retrieve all roles and their permissions for a specific scope.""" serializer = ListRolesWithScopeSerializer(data=request.query_params) diff --git a/openedx_authz/tests/rest_api/test_permissions.py b/openedx_authz/tests/rest_api/test_permissions.py new file mode 100644 index 00000000..79f391ce --- /dev/null +++ b/openedx_authz/tests/rest_api/test_permissions.py @@ -0,0 +1,250 @@ +"""Unit tests for openedx_authz.rest_api.v1.permissions.""" + +from unittest.mock import MagicMock, patch + +import ddt +from django.test import TestCase + +from openedx_authz.rest_api.v1.permissions import ( + BaseScopePermission, + ContentLibraryPermission, + CoursePermission, + DynamicScopePermission, +) + + +def _make_user(superuser=False): + """Return a mock user. Regular user by default; pass superuser=True for a superuser.""" + user = MagicMock() + user.is_superuser = superuser + user.is_staff = False + user.username = "testuser" + return user + + +def _make_request(data=None, query_params=None, user=None, method="GET"): + """Return a mock DRF request with the given body data, query params, user, and HTTP method.""" + request = MagicMock() + request.data = data or {} + request.query_params = query_params or {} + request.method = method + request.user = user or _make_user() + return request + + +def _make_view(method="get", required_permissions=None): + """Return a mock view whose handler carries required_permissions when provided, + simulating the @authz_permissions decorator. Omit required_permissions to simulate + a plain handler with no decorator.""" + view = MagicMock() + handler = MagicMock() + if required_permissions is not None: + handler.required_permissions = required_permissions + else: + del handler.required_permissions + setattr(view, method, handler) + return view + + +class TestGetScopeValueScopesFallback(TestCase): + """Test scopes-list fallback in BaseScopePermission.get_scope_value.""" + + def setUp(self): + self.perm = BaseScopePermission() + + def test_scopes_list_fallback_returns_first_element(self): + """When no 'scope' key is present, the first item of the 'scopes' list is used as the scope value.""" + request = _make_request(data={"scopes": ["lib:Org:A", "lib:Org:B"]}) + self.assertEqual(self.perm.get_scope_value(request), "lib:Org:A") + + def test_scope_is_string_returns_value(self): + """When 'scope' is a plain string instead of a list, it is used as scope value.""" + request = _make_request(data={"scope": "lib:Org:A"}) + self.assertEqual(self.perm.get_scope_value(request), "lib:Org:A") + + +class TestGetScopeNamespaceMixedScopes(TestCase): + """Test that get_scope_namespace enforces namespace homogeneity for bulk scopes.""" + + def setUp(self): + self.perm = BaseScopePermission() + + def test_mixed_namespaces_raises_value_error(self): + """Passing scopes from different namespaces in a single bulk request raises ValueError.""" + request = _make_request(data={"scopes": ["lib:Org:A", "course-v1:Org1+C1+2024"]}) + with self.assertRaises(ValueError): + self.perm.get_scope_namespace(request) + + def test_homogeneous_namespaces_does_not_raise(self): + """Passing scopes that all share the same namespace does not raise.""" + request = _make_request(data={"scopes": ["lib:Org:A", "lib:Org:B"]}) + # Should not raise — just verify it completes without error + namespace = self.perm.get_scope_namespace(request) + self.assertEqual(namespace, "lib") + + +class TestDynamicScopePermissionBulkScopes(TestCase): + """Test bulk-scopes path in DynamicScopePermission.has_permission.""" + + def setUp(self): + self.perm = DynamicScopePermission() + + def test_non_mixin_namespace_returns_false(self): + """A 'global' scope resolves to BaseScopePermission which does not implement MethodPermissionMixin. + The bulk path requires MethodPermissionMixin, so the check is rejected immediately.""" + request = _make_request(data={"scopes": ["global:x"]}) + self.assertFalse(self.perm.has_permission(request, _make_view(required_permissions=["p"]))) + + def test_no_required_permissions_returns_false(self): + """When the view method has no @authz_permissions decorator, there are no required permissions + to evaluate, so the bulk check is rejected.""" + request = _make_request(data={"scopes": ["lib:Org:A", "lib:Org:B"]}) + self.assertFalse(self.perm.has_permission(request, _make_view(required_permissions=None))) + + @patch("openedx_authz.api.is_user_allowed", return_value=True) + def test_all_scopes_pass_returns_true(self, _): + """When the user has the required permission on every scope in the list, access is granted + (AND logic across scopes — all must pass).""" + request = _make_request(data={"scopes": ["lib:Org:A", "lib:Org:B"]}, method="GET") + self.assertTrue(self.perm.has_permission(request, _make_view(method="get", required_permissions=["p"]))) + + @patch("openedx_authz.api.is_user_allowed", side_effect=[True, False]) + def test_one_scope_fails_returns_false(self, _): + """When the user lacks the required permission on at least one scope, access is denied + (AND logic across scopes — a single failure is enough to reject).""" + request = _make_request(data={"scopes": ["lib:Org:A", "lib:Org:B"]}, method="GET") + self.assertFalse(self.perm.has_permission(request, _make_view(method="get", required_permissions=["p"]))) + + +@ddt.ddt +class TestDynamicScopePermissionBulkScopesMixed(TestCase): + """Test DynamicScopePermission bulk-scopes behaviour when mixing specific and org-level scopes. + + Parameterized over lib (lib:Org:A / lib:Org:*) and course-v1 (course-v1:Org1+C1+2024 / course-v1:Org1+*) + namespaces to verify that the AND-logic holds regardless of whether a scope targets a specific + resource or an entire org. + """ + + def setUp(self): + self.perm = DynamicScopePermission() + + def test_mixed_namespaces_raises_value_error(self): + """Mixing lib and course-v1 scopes in the same bulk request raises ValueError.""" + request = _make_request(data={"scopes": ["lib:Org:A", "course-v1:Org1+C1+2024"]}) + with self.assertRaises(ValueError): + self.perm.has_permission(request, _make_view(required_permissions=["p"])) + + @ddt.data( + (["lib:Org:A", "lib:Org:*"], "get"), + (["course-v1:Org1+C1+2024", "course-v1:Org1+*"], "get"), + ) + @ddt.unpack + @patch("openedx_authz.api.is_user_allowed", return_value=True) + def test_specific_and_org_scope_both_pass_returns_true(self, scopes, method, _): + """When the user has permission on both the specific scope and the org-level scope, access is granted.""" + request = _make_request(data={"scopes": scopes}, method=method.upper()) + self.assertTrue(self.perm.has_permission(request, _make_view(method=method, required_permissions=["p"]))) + + @ddt.data( + (["lib:Org:A", "lib:Org:*"], "get"), + (["course-v1:Org1+C1+2024", "course-v1:Org1+*"], "get"), + ) + @ddt.unpack + @patch("openedx_authz.api.is_user_allowed", side_effect=[True, False]) + def test_specific_passes_org_fails_returns_false(self, scopes, method, _): + """When the user has permission on the specific scope but not the org-level scope, access is denied.""" + request = _make_request(data={"scopes": scopes}, method=method.upper()) + self.assertFalse(self.perm.has_permission(request, _make_view(method=method, required_permissions=["p"]))) + + @ddt.data( + (["lib:Org:A", "lib:Org:*"], "get"), + (["course-v1:Org1+C1+2024", "course-v1:Org1+*"], "get"), + ) + @ddt.unpack + @patch("openedx_authz.api.is_user_allowed", side_effect=[False, True]) + def test_specific_fails_org_passes_returns_false(self, scopes, method, _): + """When the user has permission on the org-level scope but not the specific scope, access is denied.""" + request = _make_request(data={"scopes": scopes}, method=method.upper()) + self.assertFalse(self.perm.has_permission(request, _make_view(method=method, required_permissions=["p"]))) + + +class TestCoursePermission(TestCase): + """Test CoursePermission class.""" + + def setUp(self): + self.perm = CoursePermission() + + def test_no_scope_returns_false(self): + """A request without any scope value is always rejected — there is nothing to authorize against.""" + self.assertFalse(self.perm.has_permission(_make_request(), _make_view(required_permissions=["p"]))) + + def test_scope_no_decorator_returns_true(self): + """When a scope is present but the view method has no @authz_permissions decorator, + the endpoint is considered open and access is granted.""" + request = _make_request(data={"scope": "course-v1:Org1+C1+2024"}) + self.assertTrue(self.perm.has_permission(request, _make_view(required_permissions=None))) + + @patch("openedx_authz.api.is_user_allowed", return_value=True) + def test_scope_with_permission_allowed(self, _): + """When the user has the required permission on the given course scope, access is granted.""" + request = _make_request(data={"scope": "course-v1:Org1+C1+2024"}, method="GET") + self.assertTrue(self.perm.has_permission(request, _make_view(method="get", required_permissions=["p"]))) + + @patch("openedx_authz.api.is_user_allowed", return_value=False) + def test_scope_with_permission_denied(self, _): + """When the user lacks the required permission on the course scope, access is denied.""" + request = _make_request(data={"scope": "course-v1:Org1+C1+2024"}, method="GET") + self.assertFalse(self.perm.has_permission(request, _make_view(method="get", required_permissions=["p"]))) + + @patch("openedx_authz.api.is_user_allowed", return_value=True) + def test_org_scope_allowed(self, _): + """An org-level course scope ('course-v1:Org1+*') grants access when the user has the required permission.""" + request = _make_request(data={"scope": "course-v1:Org1+*"}, method="GET") + self.assertTrue(self.perm.has_permission(request, _make_view(method="get", required_permissions=["p"]))) + + @patch("openedx_authz.api.is_user_allowed", return_value=False) + def test_org_scope_denied(self, _): + """An org-level course scope ('course-v1:Org1+*') denies access when the user lacks the required permission.""" + request = _make_request(data={"scope": "course-v1:Org1+*"}, method="GET") + self.assertFalse(self.perm.has_permission(request, _make_view(method="get", required_permissions=["p"]))) + + +class TestContentLibraryPermission(TestCase): + """Test ContentLibraryPermission class.""" + + def setUp(self): + self.perm = ContentLibraryPermission() + + def test_no_scope_returns_false(self): + """A request without any scope value is always rejected — there is nothing to authorize against.""" + self.assertFalse(self.perm.has_permission(_make_request(), _make_view(required_permissions=["p"]))) + + def test_scope_no_decorator_returns_true(self): + """When a scope is present but the view method has no @authz_permissions decorator, + the endpoint is considered open and access is granted.""" + request = _make_request(data={"scope": "lib:Org1:A"}) + self.assertTrue(self.perm.has_permission(request, _make_view(required_permissions=None))) + + @patch("openedx_authz.api.is_user_allowed", return_value=True) + def test_scope_with_permission_allowed(self, _): + """When the user has the required permission on the given library scope, access is granted.""" + request = _make_request(data={"scope": "lib:Org1:A"}, method="GET") + self.assertTrue(self.perm.has_permission(request, _make_view(method="get", required_permissions=["p"]))) + + @patch("openedx_authz.api.is_user_allowed", return_value=False) + def test_scope_with_permission_denied(self, _): + """When the user lacks the required permission on the library scope, access is denied.""" + request = _make_request(data={"scope": "lib:Org1:A"}, method="GET") + self.assertFalse(self.perm.has_permission(request, _make_view(method="get", required_permissions=["p"]))) + + @patch("openedx_authz.api.is_user_allowed", return_value=True) + def test_org_scope_allowed(self, _): + """An org-level lib scope ('lib:Org1:*') grants access when the user has the required permission.""" + request = _make_request(data={"scope": "lib:Org1:*"}, method="GET") + self.assertTrue(self.perm.has_permission(request, _make_view(method="get", required_permissions=["p"]))) + + @patch("openedx_authz.api.is_user_allowed", return_value=False) + def test_org_scope_denied(self, _): + """An org-level lib scope ('lib:Org1:*') denies access when the user lacks the required permission.""" + request = _make_request(data={"scope": "lib:Org1:*"}, method="GET") + self.assertFalse(self.perm.has_permission(request, _make_view(method="get", required_permissions=["p"]))) diff --git a/openedx_authz/tests/rest_api/test_views.py b/openedx_authz/tests/rest_api/test_views.py index 72001aac..65c29f0e 100644 --- a/openedx_authz/tests/rest_api/test_views.py +++ b/openedx_authz/tests/rest_api/test_views.py @@ -17,6 +17,7 @@ from openedx_authz import api from openedx_authz.api.data import ( + CourseOverviewData, OrgContentLibraryGlobData, OrgCourseOverviewGlobData, ) @@ -34,6 +35,9 @@ User = get_user_model() +COURSE_SCOPE_ORG1 = "course-v1:Org1+COURSE1+2024" +COURSE_ORG1_GLOB = OrgCourseOverviewGlobData.build_external_key(CourseOverviewData(external_key=COURSE_SCOPE_ORG1).org) + class ViewTestMixin(BaseRolesTestCase): """Mixin providing common test utilities for view tests.""" @@ -141,12 +145,22 @@ def create_admin_users(cls, quantity: int): user.is_staff = True user.save() + @classmethod + def create_course_users(cls): + """Create course users (plain, non-staff).""" + users = ["course_admin", "course_editor", "course_auditor", "course_admin_org"] + for username in users: + User.objects.get_or_create( + username=username, defaults={"email": f"{username}@example.com"} + ) + @classmethod def setUpTestData(cls): """Set up test fixtures once for the entire test class.""" super().setUpTestData() cls.create_admin_users(quantity=3) cls.create_regular_users(quantity=10) + cls.create_course_users() def setUp(self): """Set up test fixtures.""" @@ -315,6 +329,29 @@ def test_permission_validation_exception_handling(self, exception: Exception, st class TestRoleUserAPIView(ViewTestMixin): """Test suite for RoleUserAPIView.""" + _COURSE_ASSIGNMENTS = [ + { + "subject_name": "course_admin", + "role_name": roles.COURSE_ADMIN.external_key, + "scope_name": COURSE_SCOPE_ORG1, + }, + { + "subject_name": "course_editor", + "role_name": roles.COURSE_EDITOR.external_key, + "scope_name": COURSE_SCOPE_ORG1, + }, + { + "subject_name": "course_auditor", + "role_name": roles.COURSE_AUDITOR.external_key, + "scope_name": COURSE_SCOPE_ORG1, + }, + ] + + @classmethod + def setUpClass(cls): + super().setUpClass() + cls._assign_roles_to_users(assignments=cls._COURSE_ASSIGNMENTS) + def setUp(self): """Set up test fixtures.""" super().setUp() @@ -421,6 +458,36 @@ def test_get_users_by_scope_permissions(self, username: str, status_code: int): self.assertEqual(response.status_code, status_code) + # --- Course scope equivalents --- + + @data( + # Unauthenticated + (None, status.HTTP_401_UNAUTHORIZED), + # Django superuser always passes + ("admin_1", status.HTTP_200_OK), + # course_admin has COURSES_MANAGE_COURSE_TEAM ⊇ COURSES_VIEW_COURSE_TEAM + ("course_admin", status.HTTP_200_OK), + # course_editor has COURSES_VIEW_COURSE_TEAM + ("course_editor", status.HTTP_200_OK), + # course_auditor has COURSES_VIEW_COURSE_TEAM + ("course_auditor", status.HTTP_200_OK), + # Library-only user has no course permission + ("regular_1", status.HTTP_403_FORBIDDEN), + ) + @unpack + def test_get_users_by_scope_course_permissions(self, username: str, status_code: int): + """Mirror of test_get_users_by_scope_permissions for course scopes. + + Expected result: + - Returns appropriate status code based on course-scope permissions. + """ + user = User.objects.filter(username=username).first() + self.client.force_authenticate(user=user) + + response = self.client.get(self.url, {"scope": COURSE_SCOPE_ORG1}) + + self.assertEqual(response.status_code, status_code) + @data( # With username ----------------------------- # Single user - success (admin user) @@ -661,6 +728,42 @@ def test_add_users_to_role_permissions(self, username: str, status_code: int): self.assertEqual(response.status_code, status_code) + # --- Course scope equivalents --- + + @data( + # Unauthenticated + (None, status.HTTP_401_UNAUTHORIZED), + # Django superuser always passes + ("admin_1", status.HTTP_207_MULTI_STATUS), + # course_admin has COURSES_MANAGE_COURSE_TEAM + ("course_admin", status.HTTP_207_MULTI_STATUS), + # course_editor has COURSES_VIEW_COURSE_TEAM only — cannot manage team + ("course_editor", status.HTTP_403_FORBIDDEN), + # course_auditor has COURSES_VIEW_COURSE_TEAM only — cannot manage team + ("course_auditor", status.HTTP_403_FORBIDDEN), + # Library-only user has no course permission + ("regular_1", status.HTTP_403_FORBIDDEN), + ) + @unpack + def test_add_users_to_role_course_permissions(self, username: str, status_code: int): + """Mirror of test_add_users_to_role_permissions for course scopes. + + Expected result: + - Returns appropriate status code based on course-scope permissions. + """ + request_data = { + "role": roles.COURSE_ADMIN.external_key, + "scope": COURSE_SCOPE_ORG1, + "users": ["regular_2"], + } + user = User.objects.filter(username=username).first() + self.client.force_authenticate(user=user) + + with patch.object(api.CourseOverviewData, "exists", return_value=True): + response = self.client.put(self.url, data=request_data, format="json") + + self.assertEqual(response.status_code, status_code) + @data( # With username ----------------------------- # Single user - success (admin user) @@ -799,6 +902,42 @@ def test_remove_users_from_role_permissions(self, username: str, status_code: in self.assertEqual(response.status_code, status_code) + # --- Course scope equivalents --- + + @data( + # Unauthenticated + (None, status.HTTP_401_UNAUTHORIZED), + # Django superuser always passes + ("admin_1", status.HTTP_207_MULTI_STATUS), + # course_admin has COURSES_MANAGE_COURSE_TEAM + ("course_admin", status.HTTP_207_MULTI_STATUS), + # course_editor has COURSES_VIEW_COURSE_TEAM only — cannot manage team + ("course_editor", status.HTTP_403_FORBIDDEN), + # course_auditor has COURSES_VIEW_COURSE_TEAM only — cannot manage team + ("course_auditor", status.HTTP_403_FORBIDDEN), + # Library-only user has no course permission + ("regular_1", status.HTTP_403_FORBIDDEN), + ) + @unpack + def test_remove_users_from_role_course_permissions(self, username: str, status_code: int): + """Mirror of test_remove_users_from_role_permissions for course scopes. + + Expected result: + - Returns appropriate status code based on course-scope permissions. + """ + query_params = { + "role": roles.COURSE_ADMIN.external_key, + "scope": COURSE_SCOPE_ORG1, + "users": "regular_2", + } + user = User.objects.filter(username=username).first() + self.client.force_authenticate(user=user) + + with patch.object(api.CourseOverviewData, "exists", return_value=True): + response = self.client.delete(f"{self.url}?{urlencode(query_params)}") + + self.assertEqual(response.status_code, status_code) + @ddt class TestRoleUserAPIViewScopeStringValidation(ViewTestMixin): @@ -952,7 +1091,7 @@ class TestScopesAPIView(ViewTestMixin): and the queryset helper methods, since those models live in openedx-platform. """ - COURSE_ORG1 = "course-v1:Org1+COURSE1+2024" + COURSE_ORG1 = COURSE_SCOPE_ORG1 COURSE_ORG2 = "course-v1:Org2+COURSE2+2024" LIBRARY_ORG1 = "lib:Org1:LIB1" LIBRARY_ORG2 = "lib:Org2:LIB2" @@ -1384,7 +1523,7 @@ def test_org_glob_scope_returns_all_org_courses(self): self.build_qs_patcher.stop() # Simulate get_scopes_for_user_and_permission returning an org-level glob. - glob_scope = OrgCourseOverviewGlobData(external_key="course-v1:Org1+*") + glob_scope = OrgCourseOverviewGlobData(external_key=COURSE_ORG1_GLOB) with patch( "openedx_authz.rest_api.v1.views.get_scopes_for_user_and_permission", return_value=[glob_scope], @@ -1764,7 +1903,7 @@ def setUpClass(cls): { "subject_name": "regular_9", "role_name": roles.COURSE_STAFF.external_key, - "scope_name": "course-v1:Org1+COURSE1+2024", + "scope_name": COURSE_SCOPE_ORG1, }, ] ) @@ -1922,7 +2061,7 @@ def test_get_orgs_user_with_both_permissions_allowed(self): { "subject_name": "regular_1", "role_name": roles.COURSE_STAFF.external_key, - "scope_name": "course-v1:Org1+COURSE1+2024", + "scope_name": COURSE_SCOPE_ORG1, }, ] ) @@ -2509,6 +2648,29 @@ def test_response_shape(self): class TestRoleListView(ViewTestMixin): """Test suite for RoleListView.""" + _COURSE_ASSIGNMENTS = [ + { + "subject_name": "course_admin", + "role_name": roles.COURSE_ADMIN.external_key, + "scope_name": COURSE_SCOPE_ORG1, + }, + { + "subject_name": "course_editor", + "role_name": roles.COURSE_EDITOR.external_key, + "scope_name": COURSE_SCOPE_ORG1, + }, + { + "subject_name": "course_auditor", + "role_name": roles.COURSE_AUDITOR.external_key, + "scope_name": COURSE_SCOPE_ORG1, + }, + ] + + @classmethod + def setUpClass(cls): + super().setUpClass() + cls._assign_roles_to_users(assignments=cls._COURSE_ASSIGNMENTS) + def setUp(self): """Set up test fixtures.""" super().setUp() @@ -2644,6 +2806,34 @@ def test_get_roles_permissions(self, username: str, status_code: int): self.assertIn("results", response.data) self.assertIn("count", response.data) + # --- Course scope equivalents --- + + @data( + # Unauthenticated + (None, status.HTTP_401_UNAUTHORIZED), + # Django superuser always passes + ("admin_1", status.HTTP_200_OK), + # course_admin has COURSES_MANAGE_COURSE_TEAM ⊇ COURSES_VIEW_COURSE_TEAM + ("course_admin", status.HTTP_200_OK), + # course_auditor has COURSES_VIEW_COURSE_TEAM + ("course_auditor", status.HTTP_200_OK), + # Library-only user has no course permission + ("regular_9", status.HTTP_403_FORBIDDEN), + ) + @unpack + def test_get_roles_course_permissions(self, username: str, status_code: int): + """Mirror of test_get_roles_permissions for course scopes. + + Expected result: + - Returns appropriate status code based on course-scope permissions. + """ + user = User.objects.filter(username=username).first() + self.client.force_authenticate(user=user) + + response = self.client.get(self.url, {"scope": COURSE_SCOPE_ORG1}) + + self.assertEqual(response.status_code, status_code) + @ddt class TestUserValidationAPIView(ViewTestMixin): @@ -3536,12 +3726,12 @@ def setUpClass(cls): { "subject_name": "regular_9", "role_name": roles.COURSE_STAFF.external_key, - "scope_name": "course-v1:Org1+COURSE1+2024", + "scope_name": COURSE_SCOPE_ORG1, }, { "subject_name": "regular_10", "role_name": roles.COURSE_AUDITOR.external_key, - "scope_name": "course-v1:Org1+COURSE1+2024", + "scope_name": COURSE_SCOPE_ORG1, }, ] ) @@ -3720,7 +3910,7 @@ def test_user_with_org_course_permission_sees_org_course_assignments(self): { "subject_name": "regular_10", "role_name": roles.COURSE_STAFF.external_key, - "scope_name": "course-v1:Org1+*", + "scope_name": COURSE_ORG1_GLOB, }, ] ) @@ -3838,3 +4028,100 @@ def test_user_with_both_library_and_course_permissions(self): scope_types = {item["scope"].split(":")[0] for item in non_superadmin_items} self.assertIn("lib", scope_types) self.assertIn("course-v1", scope_types) + + +@ddt +class TestBulkPutScopesAllLogic(ViewTestMixin): + """Test that DynamicScopePermission enforces AND logic across scopes in bulk PUT. + + validate_permissions uses OR logic (any permission suffices per scope), but + DynamicScopePermission wraps that with all(...for sv in scopes_list), meaning + the user must pass the permission check for EVERY scope in the list. + + Two users illustrate the difference between specific-scope and org-level permissions: + - course_admin: COURSE_ADMIN on COURSE_SCOPE_ORG1 only (specific course). + - course_admin_org: COURSE_ADMIN on COURSE_ORG1_GLOB (all courses in Org1). + """ + + ANOTHER_COURSE_SCOPE = "course-v1:Org2+COURSE2+2024" + _COURSE_ASSIGNMENTS = [ + { + "subject_name": "course_admin", + "role_name": roles.COURSE_ADMIN.external_key, + "scope_name": COURSE_SCOPE_ORG1, + }, + { + "subject_name": "course_admin_org", + "role_name": roles.COURSE_ADMIN.external_key, + "scope_name": COURSE_ORG1_GLOB, + }, + ] + + def setUp(self): + super().setUp() + self.user = User.objects.get(username="course_admin") + self.client.force_authenticate(user=self.user) + self.url = reverse("openedx_authz:role-user-list") + self._assign_roles_to_users(assignments=self._COURSE_ASSIGNMENTS) + + def _put_course(self, scopes): + request_data = {"role": roles.COURSE_ADMIN.external_key, "scopes": scopes, "users": ["regular_2"]} + with patch.object(api.CourseOverviewData, "exists", return_value=True), \ + patch.object(api.OrgCourseOverviewGlobData, "exists", return_value=True): + return self.client.put(self.url, data=request_data, format="json") + + def _put_lib(self, scopes): + request_data = {"role": roles.LIBRARY_ADMIN.external_key, "scopes": scopes, "users": ["regular_2"]} + with patch.object(api.ContentLibraryData, "exists", return_value=True): + return self.client.put(self.url, data=request_data, format="json") + + def test_all_scopes_permitted_succeeds(self): + """course_admin has permission on their specific scope → 207.""" + response = self._put_course([COURSE_SCOPE_ORG1]) + self.assertEqual(response.status_code, status.HTTP_207_MULTI_STATUS) + + def test_one_scope_not_permitted_denied(self): + """User lacks permission on one of the scopes → 403. + + course_admin has no role on ANOTHER_COURSE_SCOPE, so the all() check must + fail even though they pass for COURSE_SCOPE_ORG1. + """ + response = self._put_course([COURSE_SCOPE_ORG1, self.ANOTHER_COURSE_SCOPE]) + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + + def test_course_user_cannot_add_library_roles(self): + """A course-only user is denied when trying to assign library roles. + + course_admin has no library permissions at all, so a bulk PUT targeting + a library scope must be denied regardless of the OR logic inside + validate_permissions. + """ + response = self._put_lib(["lib:Org1:LIB1"]) + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + + @data( + # course_admin has COURSE_ADMIN on the specific course only. + # Passes for the exact scope, but fails for the org-level glob or any combo including it. + ("course_admin", [COURSE_SCOPE_ORG1], status.HTTP_207_MULTI_STATUS), + ("course_admin", [COURSE_ORG1_GLOB], status.HTTP_403_FORBIDDEN), + ("course_admin", [COURSE_SCOPE_ORG1, COURSE_ORG1_GLOB], status.HTTP_403_FORBIDDEN), + # course_admin_org has COURSE_ADMIN on the org-level glob. + # Via Casbin glob matching, this covers both the glob itself and any specific course within the org. + ("course_admin_org", [COURSE_SCOPE_ORG1], status.HTTP_207_MULTI_STATUS), + ("course_admin_org", [COURSE_ORG1_GLOB], status.HTTP_207_MULTI_STATUS), + ("course_admin_org", [COURSE_SCOPE_ORG1, COURSE_ORG1_GLOB], status.HTTP_207_MULTI_STATUS), + ) + @unpack + def test_scope_permission_vs_org_permission(self, username, scopes, expected_status): + """A user with a specific-scope role and one with an org-level role behave differently + when bulk PUT includes the org-level glob alongside specific scopes. + + course_admin (specific scope) fails on any scope list that includes the org glob, + because they have no permission at that level. + course_admin_org (org-level glob) passes for both the glob and any specific course + within that org, thanks to Casbin's glob matching. + """ + user = User.objects.get(username=username) + self.client.force_authenticate(user=user) + response = self._put_course(scopes) + self.assertEqual(response.status_code, expected_status)