-
Notifications
You must be signed in to change notification settings - Fork 6
feat: add support for course permission in authz rest api #274
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
bb20c63
6b4dea3
c8d3d1c
528ca50
96a74e8
69b10ac
4d637ac
bdf434d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -59,13 +59,23 @@ class BaseScopePermission(BasePermission, metaclass=PermissionMeta): | |
| def get_scope_value(self, request) -> str | None: | ||
| """Extract the scope value from the request. | ||
|
|
||
| When a ``scopes`` list is provided, returns only the first element. | ||
| This is intentional: bulk requests are expected to be homogeneous | ||
| (all scopes must share the same namespace). Actual per-scope permission | ||
| validation for bulk requests is handled in ``DynamicScopePermission``. | ||
|
|
||
| Args: | ||
| request: The Django REST framework request object. | ||
|
|
||
| Returns: | ||
| str | None: The scope value if found (e.g., 'lib:DemoX:CSPROB'), or None if not present. | ||
| """ | ||
| return request.data.get("scope") or request.query_params.get("scope") | ||
| scope = request.data.get("scope") or request.query_params.get("scope") | ||
| if not scope: | ||
| scopes = request.data.get("scopes") | ||
| if scopes and isinstance(scopes, list): | ||
| scope = scopes[0] | ||
| return scope | ||
|
|
||
| def get_scope_namespace(self, request) -> str: | ||
| """Derive the namespace from the request scope value. | ||
|
|
@@ -87,6 +97,12 @@ def get_scope_namespace(self, request) -> str: | |
| >>> permission.get_scope_namespace(request) | ||
| 'global' | ||
| """ | ||
| scopes_list = request.data.get("scopes") | ||
| if scopes_list and isinstance(scopes_list, list): | ||
| if not self._scopes_have_homogeneous_namespaces(scopes_list): | ||
| raise ValueError( | ||
| f"Mixed scope namespaces in bulk request are not allowed: {scopes_list}" | ||
| ) | ||
| scope_value = self.get_scope_value(request) | ||
| if not scope_value: | ||
| return self.NAMESPACE | ||
|
|
@@ -95,6 +111,22 @@ def get_scope_namespace(self, request) -> str: | |
| except ValueError: | ||
| return self.NAMESPACE | ||
|
|
||
| def _scopes_have_homogeneous_namespaces(self, scopes_list: list[str]) -> bool: | ||
| """Check that all scopes in the list share the same namespace. | ||
|
|
||
| Args: | ||
| scopes_list: List of scope values to check. | ||
| Returns: | ||
| bool: True if all scopes share the same namespace, False otherwise. | ||
| """ | ||
| namespaces = set() | ||
| for scope in scopes_list: | ||
| try: | ||
| namespaces.add(api.ScopeData(external_key=scope).NAMESPACE) | ||
| except ValueError: | ||
| pass | ||
| return len(namespaces) <= 1 | ||
|
|
||
| def has_permission(self, request, view) -> bool: | ||
| """Fallback permission check (deny by default). | ||
|
|
||
|
|
@@ -141,6 +173,9 @@ class DynamicScopePermission(BaseScopePermission): | |
|
|
||
| Note: | ||
| Superusers and staff members always have permission regardless of scope. | ||
| Bulk requests (``scopes`` list) must be homogeneous — all scopes must share | ||
| the same namespace (e.g., all ``course-v1:`` or all ``lib:``). Mixed namespaces | ||
| will raise a ``ValueError`` during namespace resolution. | ||
| """ | ||
|
|
||
| NAMESPACE: ClassVar[None] = None | ||
|
|
@@ -175,13 +210,26 @@ def has_permission(self, request, view) -> bool: | |
| users, the permission check is delegated to the permission class registered | ||
| for the request's scope namespace. | ||
|
|
||
| For bulk PUT requests that carry a ``scopes`` list, every scope in the list | ||
| must pass at least one of the required permissions (OR logic per permission, | ||
| AND logic across scopes). | ||
|
|
||
| Examples: | ||
| >>> # Regular user gets scope-specific check | ||
| >>> request.data = {"scope": "lib:DemoX:CSPROB"} | ||
| >>> permission.has_permission(request, view) # Delegates to ContentLibraryPermission | ||
| """ | ||
| if request.user.is_superuser or request.user.is_staff: | ||
| return True | ||
| scopes_list = request.data.get("scopes") | ||
| if scopes_list and isinstance(scopes_list, list): | ||
| perm_instance = self._get_permission_instance(request) # namespace resolved from scopes[0] | ||
| if not isinstance(perm_instance, MethodPermissionMixin): | ||
| return False | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need this validation?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This is a refined answer with Claude's help; my first try wasn't as clear as this version. 🙈 Note: I copied and pasted a piece of code of a single-scope handler, you can check it here #274 (comment)
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the explanation! |
||
| required = perm_instance.get_required_permissions(request, view) | ||
| if not required: | ||
| return False | ||
|
mariajgrimaldi marked this conversation as resolved.
Outdated
|
||
| return all(perm_instance.validate_permissions(request, required, sv) for sv in scopes_list) | ||
|
MaferMazu marked this conversation as resolved.
Outdated
|
||
| return self._get_permission_instance(request).has_permission(request, view) | ||
|
|
||
| def has_object_permission(self, request, view, obj) -> bool: | ||
|
|
@@ -240,23 +288,19 @@ def get_required_permissions(self, request, view) -> list[str]: | |
| return [] | ||
|
|
||
| def validate_permissions(self, request, permissions: list[str], scope_value: str) -> bool: | ||
| """Validate that the user has all required permissions for the scope. | ||
| """Validate that the user has at least one of the required permissions for the scope. | ||
|
|
||
| Args: | ||
| request: The Django REST framework request object. | ||
| permissions: List of permission identifiers to check. | ||
| permissions: List of permission identifiers to check (OR logic — any one suffices). | ||
| scope_value: The scope to check permissions against. | ||
|
|
||
| Returns: | ||
| bool: True if user has all required permissions, False otherwise. | ||
| bool: True if user has at least one required permission, False otherwise. | ||
| """ | ||
| if not permissions: | ||
| return False | ||
|
|
||
| for permission in permissions: | ||
| if not api.is_user_allowed(request.user.username, permission, scope_value): | ||
| return False | ||
| return True | ||
| return any(api.is_user_allowed(request.user.username, permission, scope_value) for permission in permissions) | ||
|
|
||
|
|
||
| class AnyScopePermission(MethodPermissionMixin, BasePermission): | ||
|
|
@@ -282,6 +326,21 @@ def has_permission(self, request, view) -> bool: | |
| return any(api.get_scopes_for_user_and_permission(request.user.username, permission) for permission in required) | ||
|
|
||
|
|
||
| class CoursePermission(MethodPermissionMixin, BaseScopePermission): | ||
| """Permission handler for course scopes (namespace ``course-v1``).""" | ||
|
|
||
| NAMESPACE: ClassVar[str] = "course-v1" | ||
|
|
||
| def has_permission(self, request, view) -> bool: | ||
|
MaferMazu marked this conversation as resolved.
|
||
| scope_value = self.get_scope_value(request) | ||
| if not scope_value: | ||
| return False | ||
| permissions = self.get_required_permissions(request, view) | ||
| if permissions: | ||
| return self.validate_permissions(request, permissions, scope_value) | ||
|
mariajgrimaldi marked this conversation as resolved.
|
||
| return True | ||
|
|
||
|
|
||
| class ContentLibraryPermission(MethodPermissionMixin, BaseScopePermission): | ||
| """Permission handler for content library scopes. | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,143 @@ | ||
| """Unit tests for openedx_authz.rest_api.v1.permissions.""" | ||
|
|
||
| from unittest.mock import MagicMock, patch | ||
|
|
||
| from django.test import TestCase | ||
|
|
||
| from openedx_authz.rest_api.v1.permissions import ( | ||
| BaseScopePermission, | ||
| CoursePermission, | ||
| DynamicScopePermission, | ||
| ) | ||
|
|
||
|
|
||
| def _make_user(superuser=False): | ||
| """Return a mock user. Regular user by default; pass superuser=True for a superuser.""" | ||
| user = MagicMock() | ||
| user.is_superuser = superuser | ||
| user.is_staff = False | ||
| user.username = "testuser" | ||
| return user | ||
|
|
||
|
|
||
| def _make_request(data=None, query_params=None, user=None, method="GET"): | ||
| """Return a mock DRF request with the given body data, query params, user, and HTTP method.""" | ||
| request = MagicMock() | ||
| request.data = data or {} | ||
| request.query_params = query_params or {} | ||
| request.method = method | ||
| request.user = user or _make_user() | ||
| return request | ||
|
|
||
|
|
||
| def _make_view(method="get", required_permissions=None): | ||
| """Return a mock view whose handler carries required_permissions when provided, | ||
| simulating the @authz_permissions decorator. Omit required_permissions to simulate | ||
| a plain handler with no decorator.""" | ||
| view = MagicMock() | ||
| handler = MagicMock() | ||
| if required_permissions is not None: | ||
| handler.required_permissions = required_permissions | ||
| else: | ||
| del handler.required_permissions | ||
| setattr(view, method, handler) | ||
| return view | ||
|
|
||
|
|
||
| class TestGetScopeValueScopesFallback(TestCase): | ||
| """Test scopes-list fallback in BaseScopePermission.get_scope_value.""" | ||
|
|
||
| def setUp(self): | ||
| self.perm = BaseScopePermission() | ||
|
|
||
| def test_scopes_list_fallback_returns_first_element(self): | ||
| """When no 'scope' key is present, the first item of the 'scopes' list is used as the scope value.""" | ||
| request = _make_request(data={"scopes": ["lib:Org:A", "lib:Org:B"]}) | ||
| self.assertEqual(self.perm.get_scope_value(request), "lib:Org:A") | ||
|
|
||
| def test_scope_is_string_returns_value(self): | ||
| """When 'scope' is a plain string instead of a list, it is used as scope value.""" | ||
| request = _make_request(data={"scope": "lib:Org:A"}) | ||
| self.assertEqual(self.perm.get_scope_value(request), "lib:Org:A") | ||
|
|
||
|
|
||
| class TestGetScopeNamespaceMixedScopes(TestCase): | ||
| """Test that get_scope_namespace enforces namespace homogeneity for bulk scopes.""" | ||
|
|
||
| def setUp(self): | ||
| self.perm = BaseScopePermission() | ||
|
|
||
| def test_mixed_namespaces_raises_value_error(self): | ||
| """Passing scopes from different namespaces in a single bulk request raises ValueError.""" | ||
| request = _make_request(data={"scopes": ["lib:Org:A", "course-v1:Org1+C1+2024"]}) | ||
| with self.assertRaises(ValueError): | ||
| self.perm.get_scope_namespace(request) | ||
|
|
||
| def test_homogeneous_namespaces_does_not_raise(self): | ||
| """Passing scopes that all share the same namespace does not raise.""" | ||
| request = _make_request(data={"scopes": ["lib:Org:A", "lib:Org:B"]}) | ||
| # Should not raise — just verify it completes without error | ||
| namespace = self.perm.get_scope_namespace(request) | ||
| self.assertEqual(namespace, "lib") | ||
|
|
||
|
|
||
| class TestDynamicScopePermissionBulkScopes(TestCase): | ||
|
MaferMazu marked this conversation as resolved.
|
||
| """Test bulk-scopes path in DynamicScopePermission.has_permission.""" | ||
|
|
||
| def setUp(self): | ||
| self.perm = DynamicScopePermission() | ||
|
|
||
| def test_non_mixin_namespace_returns_false(self): | ||
| """A 'global' scope resolves to BaseScopePermission which does not implement MethodPermissionMixin. | ||
| The bulk path requires MethodPermissionMixin, so the check is rejected immediately.""" | ||
| request = _make_request(data={"scopes": ["global:x"]}) | ||
| self.assertFalse(self.perm.has_permission(request, _make_view(required_permissions=["p"]))) | ||
|
|
||
| def test_no_required_permissions_returns_false(self): | ||
| """When the view method has no @authz_permissions decorator, there are no required permissions | ||
| to evaluate, so the bulk check is rejected.""" | ||
| request = _make_request(data={"scopes": ["lib:Org:A", "lib:Org:B"]}) | ||
| self.assertFalse(self.perm.has_permission(request, _make_view(required_permissions=None))) | ||
|
|
||
| @patch("openedx_authz.api.is_user_allowed", return_value=True) | ||
| def test_all_scopes_pass_returns_true(self, _): | ||
| """When the user has the required permission on every scope in the list, access is granted | ||
| (AND logic across scopes — all must pass).""" | ||
| request = _make_request(data={"scopes": ["lib:Org:A", "lib:Org:B"]}, method="GET") | ||
| self.assertTrue(self.perm.has_permission(request, _make_view(method="get", required_permissions=["p"]))) | ||
|
|
||
| @patch("openedx_authz.api.is_user_allowed", side_effect=[True, False]) | ||
| def test_one_scope_fails_returns_false(self, _): | ||
| """When the user lacks the required permission on at least one scope, access is denied | ||
| (AND logic across scopes — a single failure is enough to reject).""" | ||
| request = _make_request(data={"scopes": ["lib:Org:A", "lib:Org:B"]}, method="GET") | ||
| self.assertFalse(self.perm.has_permission(request, _make_view(method="get", required_permissions=["p"]))) | ||
|
|
||
|
|
||
| class TestCoursePermission(TestCase): | ||
| """Test CoursePermission class.""" | ||
|
|
||
| def setUp(self): | ||
| self.perm = CoursePermission() | ||
|
|
||
| def test_no_scope_returns_false(self): | ||
| """A request without any scope value is always rejected — there is nothing to authorize against.""" | ||
| self.assertFalse(self.perm.has_permission(_make_request(), _make_view(required_permissions=["p"]))) | ||
|
|
||
| def test_scope_no_decorator_returns_true(self): | ||
| """When a scope is present but the view method has no @authz_permissions decorator, | ||
| the endpoint is considered open and access is granted.""" | ||
| request = _make_request(data={"scope": "course-v1:Org1+C1+2024"}) | ||
| self.assertTrue(self.perm.has_permission(request, _make_view(required_permissions=None))) | ||
|
|
||
| @patch("openedx_authz.api.is_user_allowed", return_value=True) | ||
| def test_scope_with_permission_allowed(self, _): | ||
| """When the user has the required permission on the given course scope, access is granted.""" | ||
| request = _make_request(data={"scope": "course-v1:Org1+C1+2024"}, method="GET") | ||
| self.assertTrue(self.perm.has_permission(request, _make_view(method="get", required_permissions=["p"]))) | ||
|
|
||
| @patch("openedx_authz.api.is_user_allowed", return_value=False) | ||
| def test_scope_with_permission_denied(self, _): | ||
| """When the user lacks the required permission on the course scope, access is denied.""" | ||
| request = _make_request(data={"scope": "course-v1:Org1+C1+2024"}, method="GET") | ||
| self.assertFalse(self.perm.has_permission(request, _make_view(method="get", required_permissions=["p"]))) | ||
Uh oh!
There was an error while loading. Please reload this page.