From b0de0051f3437e86f5b89ae3c8be230adf62a06c Mon Sep 17 00:00:00 2001 From: Maria Grimaldi Date: Tue, 7 Oct 2025 21:12:03 +0200 Subject: [PATCH 1/9] feat: add decorator to manage policy lifecycle for low-level APIs --- openedx_authz/api/data.py | 11 + openedx_authz/api/decorators.py | 79 ++++ openedx_authz/api/permissions.py | 2 + openedx_authz/api/roles.py | 20 +- openedx_authz/engine/adapter.py | 2 +- openedx_authz/tests/api/test_decorators.py | 377 +++++++++++++++++ openedx_authz/tests/api/test_permissions.py | 371 +++++++++++++++++ openedx_authz/tests/api/test_roles.py | 1 - openedx_authz/tests/test_adapter.py | 233 +++++++++++ openedx_authz/tests/test_enforcer.py | 435 +++++++++++++++++++- 10 files changed, 1524 insertions(+), 7 deletions(-) create mode 100644 openedx_authz/api/decorators.py create mode 100644 openedx_authz/tests/api/test_decorators.py create mode 100644 openedx_authz/tests/api/test_permissions.py create mode 100644 openedx_authz/tests/test_adapter.py diff --git a/openedx_authz/api/data.py b/openedx_authz/api/data.py index a4c1b359..641fdc4b 100644 --- a/openedx_authz/api/data.py +++ b/openedx_authz/api/data.py @@ -306,6 +306,8 @@ class ScopeData(AuthZData, metaclass=ScopeMeta): """ NAMESPACE: ClassVar[str] = "sc" + POLICY_POSITION = 2 # Position of scope in Casbin policy rules (p = sub, act, obj) + GROUPING_POLICY_POSITION = 2 # Position of scope in Casbin grouping policy rules (g = sub, role, scope) @classmethod def validate_external_key(cls, _: str) -> bool: @@ -332,6 +334,15 @@ def exists(self) -> bool: """ raise NotImplementedError("Subclasses must implement exists method.") + @property + def policy_template(self) -> str: + """Get the policy template for the scope. + + Returns: + str: The policy template string. + """ + return f"{self.NAMESPACE}{self.SEPARATOR}*" + @define class ContentLibraryData(ScopeData): diff --git a/openedx_authz/api/decorators.py b/openedx_authz/api/decorators.py new file mode 100644 index 00000000..e129c30a --- /dev/null +++ b/openedx_authz/api/decorators.py @@ -0,0 +1,79 @@ +"""Decorators for the authorization public API.""" +from functools import wraps + +from openedx_authz.api.data import ScopeData +from openedx_authz.engine.enforcer import enforcer +from openedx_authz.engine.filter import Filter + + +def manage_policy_lifecycle(filter_on: str = ""): + """Decorator to manage policy lifecycle around API calls. + + This decorator ensures proper policy loading and clearing around API function calls. + It loads relevant policies before execution and clears them afterward to prevent + stale policy issues in long-running processes. + + Can be used in two ways: + @manage_policy_lifecycle() -> Loads full policy + @manage_policy_lifecycle(filter_on="scope") -> Loads filtered policy by scope + + Args: + filter_on (str): The type of data class to filter on (e.g., "scope"). + If empty, loads full policy. + + Returns: + callable: The decorated function or decorator. + + Examples: + # Without filtering (loads all policies): + @manage_policy_lifecycle() + def get_all_roles(): + return enforcer.get_all_roles() + + # With scope filtering (loads only relevant policies): + @manage_policy_lifecycle(filter_on="scope") + def get_roles_in_scope(scope: ScopeData): + return enforcer.get_filtered_roles(scope.namespaced_key) + """ + FILTER_DATA_CLASSES = { + "scope": ScopeData, + } + + def build_filter_from_args(args) -> Filter: + """Build a Filter object from function arguments based on the filter_on parameter. + + Args: + args (tuple): Positional arguments passed to the decorated function. + + Returns: + Filter: A Filter object populated with relevant filter values. + """ + filter_obj = Filter() + if filter_on and filter_on in FILTER_DATA_CLASSES: + for arg in args: + if isinstance(arg, FILTER_DATA_CLASSES[filter_on]): + filter_value = getattr(filter_obj, f"v{arg.POLICY_POSITION}") + filter_value.append(arg.policy_template) # Used to load p type policies as well. E.g., lib^* + filter_value.append(arg.namespaced_key) # E.g., lib^lib:DemoX:CSPROB + return filter_obj + + def decorator(f): + """Inner decorator that wraps the function with policy lifecycle management.""" + @wraps(f) + def wrapper(*args, **kwargs): + """Wrapper that handles policy loading, execution, and cleanup.""" + filter_obj = build_filter_from_args(args) + + if any([filter_obj.ptype, filter_obj.v0, filter_obj.v1, filter_obj.v2]): + enforcer.load_filtered_policy(filter_obj) + else: + enforcer.load_policy() + + try: + return f(*args, **kwargs) + finally: + enforcer.clear_policy() + + return wrapper + + return decorator diff --git a/openedx_authz/api/permissions.py b/openedx_authz/api/permissions.py index 097ebb81..1594a0c3 100644 --- a/openedx_authz/api/permissions.py +++ b/openedx_authz/api/permissions.py @@ -6,6 +6,7 @@ """ from openedx_authz.api.data import ActionData, PermissionData, PolicyIndex, ScopeData, SubjectData +from openedx_authz.api.decorators import manage_policy_lifecycle from openedx_authz.engine.enforcer import enforcer __all__ = [ @@ -46,6 +47,7 @@ def get_all_permissions_in_scope(scope: ScopeData) -> list[PermissionData]: return [get_permission_from_policy(action) for action in actions] +@manage_policy_lifecycle(filter_on="scope") def is_subject_allowed( subject: SubjectData, action: ActionData, diff --git a/openedx_authz/api/roles.py b/openedx_authz/api/roles.py index c1db6c9f..3d4b11d8 100644 --- a/openedx_authz/api/roles.py +++ b/openedx_authz/api/roles.py @@ -21,6 +21,7 @@ ) from openedx_authz.api.permissions import get_permission_from_policy from openedx_authz.engine.enforcer import enforcer +from openedx_authz.api.decorators import manage_policy_lifecycle __all__ = [ "get_permissions_for_single_role", @@ -48,6 +49,7 @@ # in this case, ALL the policies, but that might not be the case +@manage_policy_lifecycle() def get_permissions_for_single_role( role: RoleData, ) -> list[PermissionData]: @@ -63,6 +65,7 @@ def get_permissions_for_single_role( return [get_permission_from_policy(policy) for policy in policies] +@manage_policy_lifecycle() def get_permissions_for_roles( roles: list[RoleData], ) -> dict[str, dict[str, list[PermissionData | str]]]: @@ -84,6 +87,7 @@ def get_permissions_for_roles( return permissions_by_role +@manage_policy_lifecycle(filter_on="scope") def get_permissions_for_active_roles_in_scope( scope: ScopeData, role: RoleData | None = None ) -> dict[str, dict[str, list[PermissionData | str]]]: @@ -134,6 +138,7 @@ def get_permissions_for_active_roles_in_scope( ) +@manage_policy_lifecycle(filter_on="scope") def get_role_definitions_in_scope(scope: ScopeData) -> list[RoleData]: """Get all role definitions available in a specific scope. @@ -173,7 +178,7 @@ def get_role_definitions_in_scope(scope: ScopeData) -> list[RoleData]: for role, permissions in permissions_per_role.items() ] - +@manage_policy_lifecycle() def get_all_roles_names() -> list[str]: """Get all the available roles names in the current environment. @@ -183,6 +188,7 @@ def get_all_roles_names() -> list[str]: return enforcer.get_all_subjects() +@manage_policy_lifecycle(filter_on="scope") def get_all_roles_in_scope(scope: ScopeData) -> list[list[str]]: """Get all the available role grouping policies in a specific scope. @@ -198,6 +204,7 @@ def get_all_roles_in_scope(scope: ScopeData) -> list[list[str]]: ) +@manage_policy_lifecycle(filter_on="scope") def assign_role_to_subject_in_scope( subject: SubjectData, role: RoleData, scope: ScopeData ) -> bool: @@ -219,6 +226,7 @@ def assign_role_to_subject_in_scope( ) +@manage_policy_lifecycle(filter_on="scope") def batch_assign_role_to_subjects_in_scope( subjects: list[SubjectData], role: RoleData, scope: ScopeData ) -> None: @@ -232,6 +240,7 @@ def batch_assign_role_to_subjects_in_scope( assign_role_to_subject_in_scope(subject, role, scope) +@manage_policy_lifecycle(filter_on="scope") def unassign_role_from_subject_in_scope( subject: SubjectData, role: RoleData, scope: ScopeData ) -> bool: @@ -251,6 +260,7 @@ def unassign_role_from_subject_in_scope( ) +@manage_policy_lifecycle(filter_on="scope") def batch_unassign_role_from_subjects_in_scope( subjects: list[SubjectData], role: RoleData, scope: ScopeData ) -> None: @@ -265,6 +275,7 @@ def batch_unassign_role_from_subjects_in_scope( unassign_role_from_subject_in_scope(subject, role, scope) +@manage_policy_lifecycle() def get_subject_role_assignments(subject: SubjectData) -> list[RoleAssignmentData]: """Get all the roles for a subject across all scopes. @@ -291,6 +302,7 @@ def get_subject_role_assignments(subject: SubjectData) -> list[RoleAssignmentDat return role_assignments +@manage_policy_lifecycle(filter_on="scope") def get_subject_role_assignments_in_scope( subject: SubjectData, scope: ScopeData ) -> list[RoleAssignmentData]: @@ -325,6 +337,7 @@ def get_subject_role_assignments_in_scope( return role_assignments +@manage_policy_lifecycle(filter_on="scope") def get_subject_role_assignments_for_role_in_scope( role: RoleData, scope: ScopeData ) -> list[RoleAssignmentData]: @@ -361,9 +374,8 @@ def get_subject_role_assignments_for_role_in_scope( return role_assignments -def get_all_subject_role_assignments_in_scope( - scope: ScopeData, -) -> list[RoleAssignmentData]: +@manage_policy_lifecycle(filter_on="scope") +def get_all_subject_role_assignments_in_scope(scope: ScopeData) -> list[RoleAssignmentData]: """Get all the subjects assigned to any role in a specific scope. Args: diff --git a/openedx_authz/engine/adapter.py b/openedx_authz/engine/adapter.py index c68cfe82..0fe49828 100644 --- a/openedx_authz/engine/adapter.py +++ b/openedx_authz/engine/adapter.py @@ -18,7 +18,7 @@ from casbin.persist import FilteredAdapter from casbin_adapter.adapter import Adapter from casbin_adapter.models import CasbinRule -from django.db.models import QuerySet +from django.db.models import QuerySet, Q from openedx_authz.engine.filter import Filter diff --git a/openedx_authz/tests/api/test_decorators.py b/openedx_authz/tests/api/test_decorators.py new file mode 100644 index 00000000..5b7b443c --- /dev/null +++ b/openedx_authz/tests/api/test_decorators.py @@ -0,0 +1,377 @@ +"""Test cases for decorator functions. + +This test suite verifies the functionality of the `manage_policy_lifecycle` decorator, +which manages policy loading and clearing around API function calls. +""" + +import casbin +from ddt import data as ddt_data +from ddt import ddt, unpack +from django.test import TestCase + +from openedx_authz.api.data import ( + ActionData, + RoleData, + ScopeData, + SubjectData, +) +from openedx_authz.api.decorators import manage_policy_lifecycle +from openedx_authz.api.roles import ( + assign_role_to_subject_in_scope, + get_permissions_for_active_roles_in_scope, +) +from openedx_authz.engine.enforcer import enforcer as global_enforcer +from openedx_authz.engine.filter import Filter +from openedx_authz.engine.utils import migrate_policy_between_enforcers + + +@ddt +class TestPolicyLifecycleDecorator(TestCase): + """Test cases for the manage_policy_lifecycle decorator. + + These tests verify that the decorator properly manages the enforcer policy lifecycle + by loading filtered policies before function execution and clearing policies after. + + The enforcer used in these test cases is the default global enforcer + instance from `openedx_authz.engine.enforcer` to ensure consistency + with production environments. + """ + + @classmethod + def _seed_database_with_policies(cls): + """Seed the database with policies from the policy file.""" + global_enforcer.load_policy() + migrate_policy_between_enforcers( + source_enforcer=casbin.Enforcer( + "openedx_authz/engine/config/model.conf", + "openedx_authz/engine/config/authz.policy", + ), + target_enforcer=global_enforcer, + ) + global_enforcer.clear_policy() + + @classmethod + def setUpClass(cls): + """Set up test class environment.""" + super().setUpClass() + cls._seed_database_with_policies() + # Create test role assignments for various subjects and scopes + assignments = [ + { + "subject_name": "alice", + "role_name": "library_admin", + "scope_name": "lib:Org1:math_101", + }, + { + "subject_name": "bob", + "role_name": "library_user", + "scope_name": "lib:Org2:science_201", + }, + { + "subject_name": "carol", + "role_name": "library_author", + "scope_name": "lib:Org3:history_301", + }, + ] + for assignment in assignments: + assign_role_to_subject_in_scope( + subject=SubjectData(external_key=assignment["subject_name"]), + role=RoleData(external_key=assignment["role_name"]), + scope=ScopeData(external_key=assignment["scope_name"]), + ) + + def setUp(self): + """Set up test environment.""" + super().setUp() + global_enforcer.clear_policy() + + def tearDown(self): + """Clean up after each test to ensure isolation.""" + super().tearDown() + global_enforcer.clear_policy() + + def test_decorator_filters_by_scope_and_clears(self): + """Test decorator loads filtered policies by scope and clears after execution. + + Expected result: + - Decorator loads filtered policies for the given scope + - Function can access and count the filtered policies + - Policies enable correct enforcement decisions + - Enforcer is cleared after execution + """ + scope = ScopeData(external_key="lib:Org1:math_101") + + @manage_policy_lifecycle(filter_on="scope") + def get_policy_info(scope_arg): + policy_count = len(global_enforcer.get_policy()) + grouping_policy_count = len(global_enforcer.get_grouping_policy()) + return { + "policies": policy_count, + "grouping_policies": grouping_policy_count, + "total": policy_count + grouping_policy_count, + } + + result = get_policy_info(scope) + + # Verify exact counts by loading filtered policy and checking + # Load both wildcard and specific scope like the decorator does + global_enforcer.load_filtered_policy(Filter(v2=[scope.policy_template, scope.namespaced_key])) + expected_policies = len(global_enforcer.get_policy()) + expected_grouping = len(global_enforcer.get_grouping_policy()) + global_enforcer.clear_policy() + + self.assertEqual(result["policies"], expected_policies) + self.assertEqual(result["grouping_policies"], expected_grouping) + self.assertEqual(result["total"], expected_policies + expected_grouping) + + def test_decorator_loads_full_policy_without_filter(self): + """Test decorator loads full policy when no filter criteria is provided. + + Expected result: + - load_policy is called when no scope arguments present + - Enforcer has all policies loaded during execution + """ + + @manage_policy_lifecycle(filter_on="scope") + def get_full_policy_count(some_arg): + """Function that does not take a scope argument. + + This should cause the decorator to load the full policy. + """ + policy_count = len(global_enforcer.get_policy()) + grouping_count = len(global_enforcer.get_grouping_policy()) + return policy_count + grouping_count + + total_count = get_full_policy_count("some_string") + + global_enforcer.load_policy() + self.assertEqual( + total_count, + len(global_enforcer.get_policy()) + len(global_enforcer.get_grouping_policy()), + "Should have loaded full policy" + ) + + def test_decorator_clears_policy_on_exception(self): + """Test decorator clears policy even when decorated function raises exception. + + Expected result: + - Policy is loaded before function execution + - Exception propagates correctly + - Enforcer is cleared even when exception occurs + """ + + @manage_policy_lifecycle(filter_on="scope") + def failing_function(scope_arg): + """Function that raises an exception to test decorator cleanup.""" + if len(global_enforcer.get_policy()) >= 0: + raise ValueError("Intentional test exception") + return "should not reach here" + + scope = ScopeData(external_key="lib:Org1:math_101") + + with self.assertRaises(ValueError) as context: + failing_function(scope) + + self.assertEqual(str(context.exception), "Intentional test exception") + + def test_decorator_with_enforcement_checks(self): + """Test that policies loaded by decorator enable correct enforcement decisions. + + Expected result: + - Decorator loads policies that can be used for authorization checks + - Enforcer.enforce() works correctly with loaded policies + - Correct authorization decisions are made based on roles + - Policy counts are verified + """ + scope = ScopeData(external_key="lib:Org1:math_101") + subject = SubjectData(external_key="alice") + + @manage_policy_lifecycle(filter_on="scope") + def check_permissions(scope_arg, subject_arg): + """Check if subject has permissions in the given scope. + + Expected scenario: + - Alice has library_admin role in lib:Org1:math_101 + - library_admin role should allow delete_library action + - library_admin role should NOT allow some_nonexistent_action + """ + can_delete = global_enforcer.enforce( + subject_arg.namespaced_key, + ActionData(external_key="delete_library").namespaced_key, + scope_arg.namespaced_key, + ) + + cannot_do_fake = not global_enforcer.enforce( + subject_arg.namespaced_key, + ActionData(external_key="some_nonexistent_action").namespaced_key, + scope_arg.namespaced_key, + ) + + policy_count = len(global_enforcer.get_policy()) + grouping_count = len(global_enforcer.get_grouping_policy()) + + return { + "can_delete": can_delete, + "cannot_do_fake": cannot_do_fake, + "policy_count": policy_count, + "grouping_count": grouping_count, + } + + result = check_permissions(scope, subject) + + self.assertTrue(result["can_delete"], "Alice should be able to delete library") + self.assertTrue( + result["cannot_do_fake"], + "Alice should not be able to do nonexistent action", + ) + + # Verify exact counts + # Load both wildcard and specific scope like the decorator does + global_enforcer.load_filtered_policy(Filter(v2=[scope.policy_template, scope.namespaced_key])) + expected_policies = len(global_enforcer.get_policy()) + expected_grouping = len(global_enforcer.get_grouping_policy()) + global_enforcer.clear_policy() + + self.assertEqual(result["policy_count"], expected_policies) + self.assertEqual(result["grouping_count"], expected_grouping) + + def test_decorator_enforcement_with_different_subjects(self): + """Test enforcement with different subjects having different roles. + + Expected result: + - Each subject's role-based permissions are correctly enforced + - Different subjects have different access rights + - Policy loading and clearing works correctly for complex scenarios + """ + scope = ScopeData(external_key="lib:Org2:science_201") + alice = SubjectData(external_key="alice") + bob = SubjectData(external_key="bob") + + @manage_policy_lifecycle(filter_on="scope") + def check_multiple_subjects(scope_arg): + """Check permissions for multiple subjects in the same scope. + + Expected scenario: + - Bob has library_user role - can view but not delete + - Alice has no role in this scope - cannot view or delete + """ + bob_can_view = global_enforcer.enforce( + bob.namespaced_key, + ActionData(external_key="view_library").namespaced_key, + scope_arg.namespaced_key, + ) + + bob_cannot_delete = not global_enforcer.enforce( + bob.namespaced_key, + ActionData(external_key="delete_library").namespaced_key, + scope_arg.namespaced_key, + ) + + alice_cannot_view = not global_enforcer.enforce( + alice.namespaced_key, + ActionData(external_key="view_library").namespaced_key, + scope_arg.namespaced_key, + ) + + policy_count = len(global_enforcer.get_policy()) + grouping_count = len(global_enforcer.get_grouping_policy()) + + return { + "bob_can_view": bob_can_view, + "bob_cannot_delete": bob_cannot_delete, + "alice_cannot_view": alice_cannot_view, + "policy_count": policy_count, + "grouping_count": grouping_count, + } + + result = check_multiple_subjects(scope) + + self.assertTrue(result["bob_can_view"], "Bob should be able to view library") + self.assertTrue( + result["bob_cannot_delete"], "Bob should not be able to delete library" + ) + self.assertTrue( + result["alice_cannot_view"], + "Alice should not be able to view (no role in this scope)", + ) + + # Verify exact counts + # Load both wildcard and specific scope like the decorator does + global_enforcer.load_filtered_policy(Filter(v2=[scope.policy_template, scope.namespaced_key])) + expected_policies = len(global_enforcer.get_policy()) + expected_grouping = len(global_enforcer.get_grouping_policy()) + global_enforcer.clear_policy() + + self.assertEqual(result["policy_count"], expected_policies) + self.assertEqual(result["grouping_count"], expected_grouping) + + def test_decorator_integration_with_real_api_function(self): + """Test decorator behavior with actual API function. + + This verifies the decorator works correctly in its intended use case: + wrapping API functions that query policies. + + Expected result: + - Decorator loads policies filtered by scope + - API function returns correct permissions for active roles + - Enforcer is cleared after execution + """ + scope = ScopeData(external_key="lib:Org1:math_101") + + permissions = get_permissions_for_active_roles_in_scope(scope) + + self.assertIsInstance(permissions, dict) + self.assertIn("library_admin", permissions) + self.assertIn("permissions", permissions["library_admin"]) + self.assertIsInstance(permissions["library_admin"]["permissions"], list) + + # Verify exact permission count + # Load both wildcard and specific scope like the decorator does + global_enforcer.load_filtered_policy(Filter(v2=[scope.policy_template, scope.namespaced_key])) + expected_perms_count = len([p for p in global_enforcer.get_policy() if "role^library_admin" in p[0]]) + global_enforcer.clear_policy() + + self.assertEqual( + len(permissions["library_admin"]["permissions"]), + expected_perms_count, + ) + + @ddt_data( + ("lib:Org1:math_101", "library_admin", True), + ("lib:Org2:science_201", "library_user", True), + ("lib:Org3:history_301", "library_author", True), + ("lib:NonExistent:scope", "any_role", False), + ) + @unpack + def test_decorator_with_different_scopes( + self, scope_name, expected_role, should_find_role + ): + """Test decorator behavior with different scope values. + + Expected result: + - Decorator loads appropriate policies for each scope + - API functions return correct results for each scope + - Enforcer is cleared after each call + - Policy filtering works correctly for different scopes + """ + scope = ScopeData(external_key=scope_name) + + permissions = get_permissions_for_active_roles_in_scope(scope) + + if should_find_role: + self.assertIn(expected_role, permissions) + self.assertIn("permissions", permissions[expected_role]) + self.assertIsInstance(permissions[expected_role]["permissions"], list) + + # Verify exact permission count + # Load both wildcard and specific scope like the decorator does + global_enforcer.load_filtered_policy(Filter(v2=[scope.policy_template, scope.namespaced_key])) + expected_perms_count = len([p for p in global_enforcer.get_policy() if f"role^{expected_role}" in p[0]]) + global_enforcer.clear_policy() + + self.assertEqual( + len(permissions[expected_role]["permissions"]), + expected_perms_count, + ) + else: + self.assertEqual(len(permissions), 0) diff --git a/openedx_authz/tests/api/test_permissions.py b/openedx_authz/tests/api/test_permissions.py new file mode 100644 index 00000000..c8e16af5 --- /dev/null +++ b/openedx_authz/tests/api/test_permissions.py @@ -0,0 +1,371 @@ +"""Test cases for permissions API functions. + +This test suite verifies the functionality of the permissions API, +including permission retrieval and authorization checks for subjects +within specific scopes. +""" + +import casbin +from ddt import data as ddt_data +from ddt import ddt, unpack +from django.test import TestCase + +from openedx_authz.api.data import ( + ActionData, + RoleData, + ScopeData, + SubjectData, +) +from openedx_authz.api.permissions import ( + is_subject_allowed, +) +from openedx_authz.api.roles import ( + assign_role_to_subject_in_scope, +) +from openedx_authz.engine.enforcer import enforcer as global_enforcer +from openedx_authz.engine.utils import migrate_policy_between_enforcers + + +@ddt +class TestPermissionsAPI(TestCase): + """Test cases for permissions API functions. + + The enforcer used in these test cases is the default global enforcer + instance from `openedx_authz.engine.enforcer` to ensure consistency + with production environments. + + These tests verify that the is_subject_allowed function correctly + checks permissions for subjects in various scopes. + """ + + @classmethod + def _seed_database_with_policies(cls): + """Seed the database with policies from the policy file.""" + migrate_policy_between_enforcers( + source_enforcer=casbin.Enforcer( + "openedx_authz/engine/config/model.conf", + "openedx_authz/engine/config/authz.policy", + ), + target_enforcer=global_enforcer, + ) + + @classmethod + def setUpClass(cls): + """Set up test class environment.""" + super().setUpClass() + cls._seed_database_with_policies() + # Create test role assignments for various subjects and scopes + assignments = [ + { + "subject_name": "alice", + "role_name": "library_admin", + "scope_name": "lib:Org1:math_101", + }, + { + "subject_name": "bob", + "role_name": "library_user", + "scope_name": "lib:Org2:science_201", + }, + { + "subject_name": "carol", + "role_name": "library_author", + "scope_name": "lib:Org3:history_301", + }, + { + "subject_name": "dave", + "role_name": "library_collaborator", + "scope_name": "lib:Org4:english_401", + }, + ] + for assignment in assignments: + assign_role_to_subject_in_scope( + subject=SubjectData(external_key=assignment["subject_name"]), + role=RoleData(external_key=assignment["role_name"]), + scope=ScopeData(external_key=assignment["scope_name"]), + ) + + def setUp(self): + """Set up test environment.""" + super().setUp() + global_enforcer.clear_policy() + + def tearDown(self): + """Clean up after each test to ensure isolation.""" + super().tearDown() + global_enforcer.clear_policy() + + @ddt_data( + # Library admin permissions - should allow admin actions + ( + "alice", + "delete_library", + "lib:Org1:math_101", + True, + ), + ( + "alice", + "publish_library", + "lib:Org1:math_101", + True, + ), + ( + "alice", + "manage_library_team", + "lib:Org1:math_101", + True, + ), + ( + "alice", + "manage_library_tags", + "lib:Org1:math_101", + True, + ), + ( + "alice", + "create_library", + "lib:Org1:math_101", + True, + ), + # Library user permissions - should allow view actions only + ( + "bob", + "view_library", + "lib:Org2:science_201", + True, + ), + ( + "bob", + "view_library_team", + "lib:Org2:science_201", + True, + ), + ( + "bob", + "reuse_library_content", + "lib:Org2:science_201", + True, + ), + # Library user should NOT be able to delete + ( + "bob", + "delete_library", + "lib:Org2:science_201", + False, + ), + ( + "bob", + "publish_library", + "lib:Org2:science_201", + False, + ), + # Library author permissions + ( + "carol", + "edit_library", + "lib:Org3:history_301", + True, + ), + ( + "carol", + "delete_library_content", + "lib:Org3:history_301", + True, + ), + ( + "carol", + "publish_library_content", + "lib:Org3:history_301", + True, + ), + ( + "carol", + "manage_library_tags", + "lib:Org3:history_301", + True, + ), + # Library author should NOT be able to delete library itself + ( + "carol", + "delete_library", + "lib:Org3:history_301", + False, + ), + # Library collaborator permissions + ( + "dave", + "edit_library", + "lib:Org4:english_401", + True, + ), + ( + "dave", + "delete_library_content", + "lib:Org4:english_401", + True, + ), + ( + "dave", + "manage_library_tags", + "lib:Org4:english_401", + True, + ), + # Library collaborator should NOT be able to publish library + ( + "dave", + "publish_library", + "lib:Org4:english_401", + False, + ), + # Non-existent user should have no permissions + ( + "nonexistent_user", + "view_library", + "lib:Org1:math_101", + False, + ), + # User in wrong scope should have no permissions + ( + "alice", + "delete_library", + "lib:Org2:science_201", + False, + ), + # Non-existent action should always be denied + ( + "alice", + "nonexistent_action", + "lib:Org1:math_101", + False, + ), + ) + @unpack + def test_is_subject_allowed( + self, subject_name, action_name, scope_name, expected_allowed + ): + """Test checking if a subject is allowed to perform an action in a scope. + + Expected result: + - Subject with appropriate role can perform allowed actions + - Subject cannot perform actions not granted by their role + - Subject cannot perform actions in scopes where they have no role + - Non-existent subjects are denied all actions + - Non-existent actions are always denied + """ + subject = SubjectData(external_key=subject_name) + action = ActionData(external_key=action_name) + scope = ScopeData(external_key=scope_name) + + result = is_subject_allowed(subject, action, scope) + + self.assertEqual( + result, + expected_allowed, + f"Expected {subject_name} to be {'allowed' if expected_allowed else 'denied'} " + f"for action {action_name} in scope {scope_name}", + ) + + def test_is_subject_allowed_with_multiple_roles_in_different_scopes(self): + """Test subject with multiple roles in different scopes. + + Expected result: + - Subject can perform actions in scopes where they have appropriate roles + - Subject permissions are isolated to their assigned scopes + """ + # Assign multiple roles to same subject in different scopes + eve = SubjectData(external_key="eve") + + assign_role_to_subject_in_scope( + subject=eve, + role=RoleData(external_key="library_admin"), + scope=ScopeData(external_key="lib:Org5:project_alpha"), + ) + + assign_role_to_subject_in_scope( + subject=eve, + role=RoleData(external_key="library_user"), + scope=ScopeData(external_key="lib:Org5:project_beta"), + ) + + # Should have admin permissions in project_alpha + self.assertTrue( + is_subject_allowed( + eve, + ActionData(external_key="delete_library"), + ScopeData(external_key="lib:Org5:project_alpha"), + ) + ) + + # Should NOT have admin permissions in project_beta (only user role) + self.assertFalse( + is_subject_allowed( + eve, + ActionData(external_key="delete_library"), + ScopeData(external_key="lib:Org5:project_beta"), + ) + ) + + # Should have view permissions in project_beta + self.assertTrue( + is_subject_allowed( + eve, + ActionData(external_key="view_library"), + ScopeData(external_key="lib:Org5:project_beta"), + ) + ) + + def test_is_subject_allowed_enforcer_cleared_after_execution(self): + """Test that enforcer is cleared after is_subject_allowed execution. + + Expected result: + - Function loads policies during execution + - Function returns correct result + - Enforcer is cleared after execution (due to decorator) + """ + subject = SubjectData(external_key="alice") + action = ActionData(external_key="delete_library") + scope = ScopeData(external_key="lib:Org1:math_101") + + result = is_subject_allowed(subject, action, scope) + + self.assertTrue(result, "Alice should be allowed to delete library") + + def test_is_subject_allowed_with_case_sensitivity(self): + """Test that permission checks are case-sensitive. + + Expected result: + - Actions with different casing are treated as different actions + - Subject identifiers are case-sensitive + """ + subject = SubjectData(external_key="alice") + action_lowercase = ActionData(external_key="delete_library") + action_uppercase = ActionData(external_key="DELETE_LIBRARY") + scope = ScopeData(external_key="lib:Org1:math_101") + + self.assertTrue(is_subject_allowed(subject, action_lowercase, scope)) + self.assertFalse(is_subject_allowed(subject, action_uppercase, scope)) + + @ddt_data( + # Test various library scope formats + ("alice", "delete_library", "lib:Org1:math_101", True), + # Different subject in different org + ("bob", "view_library", "lib:Org2:science_201", True), + # Cross-org access should fail + ("alice", "delete_library", "lib:Org2:science_201", False), + ) + @unpack + def test_is_subject_allowed_with_different_scope_formats( + self, subject_name, action_name, scope_name, expected_allowed + ): + """Test permission checks with different scope naming formats. + + Expected result: + - Scope format is correctly parsed and used for permission checks + - Permissions are properly scoped to the correct resource + """ + subject = SubjectData(external_key=subject_name) + action = ActionData(external_key=action_name) + scope = ScopeData(external_key=scope_name) + + result = is_subject_allowed(subject, action, scope) + + self.assertEqual(result, expected_allowed) diff --git a/openedx_authz/tests/api/test_roles.py b/openedx_authz/tests/api/test_roles.py index 6a8df637..2474fc1b 100644 --- a/openedx_authz/tests/api/test_roles.py +++ b/openedx_authz/tests/api/test_roles.py @@ -50,7 +50,6 @@ def _seed_database_with_policies(cls): This simulates the one-time database seeding that would happen during application deployment, separate from the runtime policy loading. """ - global_enforcer.load_policy() migrate_policy_between_enforcers( source_enforcer=casbin.Enforcer( "openedx_authz/engine/config/model.conf", diff --git a/openedx_authz/tests/test_adapter.py b/openedx_authz/tests/test_adapter.py new file mode 100644 index 00000000..5925cf7a --- /dev/null +++ b/openedx_authz/tests/test_adapter.py @@ -0,0 +1,233 @@ +""" +Tests for the ExtendedAdapter class used in Casbin policy management. + +This module contains unit tests for the ExtendedAdapter class, which extends +the base Django adapter with filtering capabilities for efficient policy loading. +The tests verify proper query filtering, ordering, and the adapter's filtering +capability reporting. +""" + +from casbin_adapter.models import CasbinRule +from ddt import data as ddt_data +from ddt import ddt, unpack +from django.db.models import QuerySet +from django.test import TestCase + +from openedx_authz.engine.adapter import ExtendedAdapter +from openedx_authz.engine.filter import Filter +from openedx_authz.tests.test_utils import make_action_key, make_role_key, make_scope_key, make_user_key + + +@ddt +class TestExtendedAdapter(TestCase): + """ + Tests for the ExtendedAdapter class. + + This test class verifies the behavior of the ExtendedAdapter, including: + - Query filtering with various filter criteria + - Filter combinations with policy types, roles, and scopes + - Query result ordering + - Filtering capability reporting + """ + + def setUp(self): + """Set up test environment with sample policy data.""" + super().setUp() + self.adapter = ExtendedAdapter() + + # Create test policy rules + CasbinRule.objects.create( + ptype="p", + v0=make_role_key("library_admin"), + v1=make_action_key("edit"), + v2=make_scope_key("lib", "*"), + v3="allow" + ) + CasbinRule.objects.create( + ptype="p", + v0=make_role_key("library_user"), + v1=make_action_key("read"), + v2=make_scope_key("lib", "*"), + v3="allow" + ) + CasbinRule.objects.create( + ptype="g", + v0=make_user_key("alice"), + v1=make_role_key("library_admin"), + v2=make_scope_key("lib", "test-lib") + ) + CasbinRule.objects.create( + ptype="g", + v0=make_user_key("bob"), + v1=make_role_key("library_user"), + v2=make_scope_key("lib", "test-lib") + ) + + def test_is_filtered_returns_true(self): + """Test that adapter correctly reports filtering capability. + + Expected result: + - The adapter's is_filtered() method returns True + """ + self.assertTrue(self.adapter.is_filtered()) + + def test_filter_query_no_filter(self): + """Test filtering policies without any filter criteria. + + When no filter criteria are provided, all policy rules should be returned. + + Expected result: + - All CasbinRule objects are returned + - Result is a QuerySet instance + """ + filter_obj = Filter() + queryset = CasbinRule.objects.all() + filtered = self.adapter.filter_query(queryset, filter_obj) + + self.assertIsInstance(filtered, QuerySet) + self.assertEqual(filtered.count(), CasbinRule.objects.count()) + + def test_filter_query_with_ptype_filter(self): + """Test filtering policies by policy type. + + When filtering by ptype='p', only policy rules (not grouping rules) + should be returned. + + Expected result: + - Only 'p' type policy rules are returned + - No grouping rules ('g') are included + """ + filter_obj = Filter(ptype=["p"]) + queryset = CasbinRule.objects.all() + filtered = self.adapter.filter_query(queryset, filter_obj) + + self.assertIsInstance(filtered, QuerySet) + self.assertGreater(filtered.count(), 0) + + for rule in filtered: + self.assertEqual(rule.ptype, "p") + + def test_filter_query_with_role_filter(self): + """Test filtering policies by role (v0 attribute). + + When filtering by a specific role, only policies for that role + should be returned. + + Expected result: + - Only policies for library_admin role are returned + - Exactly 1 policy matches the filter + """ + filter_obj = Filter(ptype=["p"], v0=[make_role_key("library_admin")]) + queryset = CasbinRule.objects.all() + filtered = self.adapter.filter_query(queryset, filter_obj) + + self.assertIsInstance(filtered, QuerySet) + self.assertEqual(filtered.count(), 1) + + for rule in filtered: + self.assertEqual(rule.ptype, "p") + self.assertEqual(rule.v0, make_role_key("library_admin")) + + def test_filter_query_with_multiple_ptypes(self): + """Test filtering with multiple policy types. + + When filtering with ptype=['p', 'g'], both policy and grouping + rules should be returned. + + Expected result: + - Both 'p' and 'g' type rules are returned + - All CasbinRule objects match (4 total from setUp) + """ + filter_obj = Filter(ptype=["p", "g"]) + queryset = CasbinRule.objects.all() + filtered = self.adapter.filter_query(queryset, filter_obj) + + self.assertIsInstance(filtered, QuerySet) + self.assertEqual(filtered.count(), CasbinRule.objects.count()) + + for rule in filtered: + self.assertIn(rule.ptype, ["p", "g"]) + + def test_filter_query_with_scope_filter(self): + """Test filtering policies by scope (v2 attribute). + + When filtering by scope, only policies for that scope should be returned. + + Expected result: + - Only policies with 'lib^*' scope are returned + - At least one matching policy exists + """ + filter_obj = Filter(v2=[make_scope_key("lib", "*")]) + queryset = CasbinRule.objects.all() + filtered = self.adapter.filter_query(queryset, filter_obj) + + self.assertIsInstance(filtered, QuerySet) + self.assertGreater(filtered.count(), 0) + + for rule in filtered: + self.assertEqual(rule.v2, make_scope_key("lib", "*")) + + def test_filter_query_ordering(self): + """Test that filtered queries are ordered by id. + + Results should always be ordered consistently by id for predictable behavior. + + Expected result: + - Query results are ordered by id in ascending order + - IDs are sequential and sorted + """ + filter_obj = Filter() + queryset = CasbinRule.objects.all() + filtered = self.adapter.filter_query(queryset, filter_obj) + + ids = list(filtered.values_list('id', flat=True)) + self.assertEqual(ids, sorted(ids)) + + @ddt_data( + (Filter(ptype=["p"]), 2), + (Filter(ptype=["g"]), 2), + (Filter(ptype=["p", "g"]), 4), + (Filter(v0=[make_role_key("library_admin")]), 1), + (Filter(v0=[make_user_key("alice")]), 1), + (Filter(v2=[make_scope_key("lib", "*")]), 2), + ) + @unpack + def test_filter_query_counts(self, filter_obj, expected_count): + """ + Test that various filter combinations return expected counts. + + This verifies that different filter criteria produce the expected + number of matching policy rules. + + Expected result: + - Each filter produces the expected number of matching CasbinRule objects + - Filter combinations work correctly + """ + queryset = CasbinRule.objects.all() + filtered = self.adapter.filter_query(queryset, filter_obj) + + self.assertEqual(filtered.count(), expected_count) + + def test_filter_query_combined_filters(self): + """Test filtering with multiple criteria combined. + + When combining ptype and v0 filters, only rules matching both + criteria should be returned. + + Expected result: + - Only grouping rules ('g' type) are returned + - Only rules for alice or bob are included + - Exactly 2 matching rules exist + """ + filter_obj = Filter( + ptype=["g"], + v0=[make_user_key("alice"), make_user_key("bob")] + ) + queryset = CasbinRule.objects.all() + filtered = self.adapter.filter_query(queryset, filter_obj) + + self.assertEqual(filtered.count(), 2) + + for rule in filtered: + self.assertEqual(rule.ptype, "g") + self.assertIn(rule.v0, [make_user_key("alice"), make_user_key("bob")]) diff --git a/openedx_authz/tests/test_enforcer.py b/openedx_authz/tests/test_enforcer.py index 3d3b033c..6a1a8397 100644 --- a/openedx_authz/tests/test_enforcer.py +++ b/openedx_authz/tests/test_enforcer.py @@ -6,13 +6,15 @@ """ import casbin +from casbin_adapter.models import CasbinRule from ddt import data as ddt_data -from ddt import ddt +from ddt import ddt, unpack from django.test import TestCase from openedx_authz.engine.enforcer import enforcer as global_enforcer from openedx_authz.engine.filter import Filter from openedx_authz.engine.utils import migrate_policy_between_enforcers +from openedx_authz.tests.test_utils import make_action_key, make_role_key, make_scope_key, make_user_key class PolicyLoadingTestSetupMixin(TestCase): @@ -406,3 +408,434 @@ def test_multi_scope_filtering(self): total_count = len(global_enforcer.get_policy()) self.assertEqual(total_count, lib_count + course_count + org_count) + + +@ddt +class TestFilteredPolicyEnforcement(TestCase): + """ + Integration tests for filtered policy loading with enforcement decisions. + + These tests verify that after loading filtered policies and role assignments, + the enforcer can correctly make allow/deny decisions based on what's loaded. + This ensures filtered loading works end-to-end for scope-based authorization. + """ + + def setUp(self): + """Set up test environment with enforcer and sample data.""" + super().setUp() + self.enforcer = global_enforcer + self.enforcer.clear_policy() + + # Create policy rules with wildcard scope templates (like authz.policy) + # These define what roles can do in ANY library/course + CasbinRule.objects.create( + ptype="p", + v0=make_role_key("library_admin"), + v1=make_action_key("edit"), + v2=make_scope_key("lib", "*"), + v3="allow" + ) + CasbinRule.objects.create( + ptype="p", + v0=make_role_key("library_admin"), + v1=make_action_key("delete"), + v2=make_scope_key("lib", "*"), + v3="allow" + ) + CasbinRule.objects.create( + ptype="p", + v0=make_role_key("library_user"), + v1=make_action_key("view"), + v2=make_scope_key("lib", "*"), + v3="allow" + ) + # Create role assignments for specific scope instances + # These assign users to roles in SPECIFIC libraries + CasbinRule.objects.create( + ptype="g", + v0=make_user_key("alice"), + v1=make_role_key("library_admin"), + v2=make_scope_key("lib", "test-lib-1") # Specific instance + ) + CasbinRule.objects.create( + ptype="g", + v0=make_user_key("bob"), + v1=make_role_key("library_user"), + v2=make_scope_key("lib", "test-lib-2") # Specific instance + ) + + def tearDown(self): + """Clean up after test.""" + self.enforcer.clear_policy() + super().tearDown() + + def test_enforcement_with_filtered_scope_allows_action(self): + """Test that filtering by scope allows correct enforcement decisions. + + When loading policies for a specific scope with role assignments, + users should be allowed to perform actions defined in that scope. + + Expected result: + - Alice can edit in test-lib-1 (her scope, her permission) + - Alice can delete in test-lib-1 (her scope, her permission) + """ + # Load policies and role assignments for test-lib-1 scope + scope_filter = Filter( + v2=[ + make_scope_key("lib", "test-lib-1"), + make_scope_key("lib", "*"), # Load wildcard policies too + ], + ) + self.enforcer.load_filtered_policy(scope_filter) + + # Alice should be allowed to edit in test-lib-1 + result = self.enforcer.enforce( + make_user_key("alice"), + make_action_key("edit"), + make_scope_key("lib", "test-lib-1") + ) + self.assertTrue(result) + + # Alice should be allowed to delete in test-lib-1 + result = self.enforcer.enforce( + make_user_key("alice"), + make_action_key("delete"), + make_scope_key("lib", "test-lib-1") + ) + self.assertTrue(result) + + def test_enforcement_with_filtered_scope_denies_out_of_scope(self): + """Test that filtering by scope denies actions outside the loaded scope. + + When only loading policies for one specific scope instance, actions in + other scope instances should be denied even if the user has roles there. + Note: Since policies use wildcards (lib^*), we filter by specific instances + in the role assignments. + + Expected result: + - Alice cannot view in test-lib-2 (role assignment not loaded) + - Bob's actions are not allowed (his role assignment not loaded) + """ + # Load only test-lib-1 scope (alice's role assignment + wildcard policies) + scope_filter = Filter(v2=[make_scope_key("lib", "test-lib-1")]) + self.enforcer.load_filtered_policy(scope_filter) + + # Alice should NOT be allowed to act in test-lib-2 (role assignment not loaded) + result = self.enforcer.enforce( + make_user_key("alice"), + make_action_key("view"), + make_scope_key("lib", "test-lib-2") + ) + self.assertFalse(result) + + # Bob should NOT be allowed (his role assignment not loaded) + result = self.enforcer.enforce( + make_user_key("bob"), + make_action_key("view"), + make_scope_key("lib", "test-lib-2") + ) + self.assertFalse(result) + + def test_enforcement_with_multiple_scopes_loaded(self): + """Test enforcement when multiple scopes are loaded. + + When loading policies for multiple scopes, users should have + access according to their roles in each loaded scope. + + Expected result: + - Alice can edit in test-lib-1 + - Bob can view in test-lib-2 + - Alice cannot view in test-lib-2 (no role there) + - Bob cannot edit in test-lib-1 (no role there) + """ + scope_filter = Filter(v2=[ + make_scope_key("lib", "test-lib-1"), + make_scope_key("lib", "test-lib-2"), + make_scope_key("lib", "*"), # Load wildcard policies too to load definitions + ]) + self.enforcer.load_filtered_policy(scope_filter) + + # Alice can edit in test-lib-1 + self.assertTrue(self.enforcer.enforce( + make_user_key("alice"), + make_action_key("edit"), + make_scope_key("lib", "test-lib-1") + )) + + # Bob can view in test-lib-2 + self.assertTrue(self.enforcer.enforce( + make_user_key("bob"), + make_action_key("view"), + make_scope_key("lib", "test-lib-2") + )) + + # Alice cannot view in test-lib-2 (no role assignment) + self.assertFalse(self.enforcer.enforce( + make_user_key("alice"), + make_action_key("view"), + make_scope_key("lib", "test-lib-2") + )) + + # Bob cannot edit in test-lib-1 (no role assignment) + self.assertFalse(self.enforcer.enforce( + make_user_key("bob"), + make_action_key("edit"), + make_scope_key("lib", "test-lib-1") + )) + + def test_enforcement_without_grouping_policy_denies(self): + """Test that loading only policies without role assignments denies access. + + When only loading 'p' type policies without 'g' grouping policies, + users cannot access anything because role assignments aren't loaded. + Note: We filter by wildcard scope since that's what's in the policies. + + Expected result: + - Alice cannot edit even though the policy exists + - No users can perform any actions + """ + # Load only 'p' type policies with wildcard scope, no role assignments + policy_filter = Filter(ptype=["p"], v2=[make_scope_key("lib", "*")]) + self.enforcer.load_filtered_policy(policy_filter) + + # Alice should NOT be allowed (no role assignment loaded) + result = self.enforcer.enforce( + make_user_key("alice"), + make_action_key("edit"), + make_scope_key("lib", "test-lib-1") + ) + self.assertFalse(result) + + def test_enforcement_with_only_grouping_policy_denies(self): + """Test that loading only role assignments without policies denies access. + + When only loading 'g' type grouping policies without 'p' policies, + users cannot access anything because the permissions aren't defined. + + Expected result: + - Alice cannot edit even though she has the role assignment + """ + # Load only 'g' type grouping policies, no permission policies + grouping_filter = Filter(ptype=["g"], v2=[make_scope_key("lib", "test-lib-1")]) + self.enforcer.load_filtered_policy(grouping_filter) + + # Alice should NOT be allowed (no permission policies loaded) + result = self.enforcer.enforce( + make_user_key("alice"), + make_action_key("edit"), + make_scope_key("lib", "test-lib-1") + ) + self.assertFalse(result) + + # TODO: add tests for global scopes once supported + + +@ddt +class TestUserContextPolicyLoading(TestCase): + """ + Tests for loading policies in a user-specific context. + + These tests demonstrate strategies for loading only the policies relevant + to a specific user, which is a common production scenario for optimizing + memory usage and performance in multi-tenant applications. + """ + + def setUp(self): + """Set up test environment with user-specific policy data.""" + super().setUp() + self.enforcer = global_enforcer + self.enforcer.clear_policy() + + CasbinRule.objects.create( + ptype="p", + v0=make_role_key("library_admin"), + v1=make_action_key("edit"), + v2=make_scope_key("lib", "*"), + v3="allow" + ) + CasbinRule.objects.create( + ptype="p", + v0=make_role_key("library_admin"), + v1=make_action_key("delete"), + v2=make_scope_key("lib", "*"), + v3="allow" + ) + CasbinRule.objects.create( + ptype="p", + v0=make_role_key("library_user"), + v1=make_action_key("view"), + v2=make_scope_key("lib", "*"), + v3="allow" + ) + CasbinRule.objects.create( + ptype="p", + v0=make_role_key("course_instructor"), + v1=make_action_key("manage"), + v2=make_scope_key("course", "*"), + v3="allow" + ) + + CasbinRule.objects.create( + ptype="g", + v0=make_user_key("alice"), + v1=make_role_key("library_admin"), + v2=make_scope_key("lib", "alice-lib") + ) + CasbinRule.objects.create( + ptype="g", + v0=make_user_key("alice"), + v1=make_role_key("course_instructor"), + v2=make_scope_key("course", "alice-course") + ) + CasbinRule.objects.create( + ptype="g", + v0=make_user_key("bob"), + v1=make_role_key("library_user"), + v2=make_scope_key("lib", "bob-lib") + ) + + def tearDown(self): + """Clean up after test.""" + self.enforcer.clear_policy() + super().tearDown() + + def test_load_user_context_by_scope(self): + """Test loading policies for a user by their accessible scopes. + + This is the simplest approach: if you know which scopes a user + can access, filter by those scopes to load all relevant policies + and role assignments. Must include both wildcard template and specific scope. + + Expected result: + - Only library-related policies are loaded + - Alice can edit in her library scope + - Alice cannot manage courses (not loaded) + """ + # Load library policies: both the template (lib^*) and alice's specific scope + user_scopes = [make_scope_key("lib", "*"), make_scope_key("lib", "alice-lib")] + scope_filter = Filter(v2=user_scopes) + self.enforcer.load_filtered_policy(scope_filter) + + # Alice should be able to edit in her library + self.assertTrue(self.enforcer.enforce( + make_user_key("alice"), + make_action_key("edit"), + make_scope_key("lib", "alice-lib") + )) + + # Alice should NOT be able to manage courses (not loaded) + self.assertFalse(self.enforcer.enforce( + make_user_key("alice"), + make_action_key("manage"), + make_scope_key("course", "alice-course") + )) + + def test_load_multiple_users_in_shared_scope(self): + """Test loading policies for multiple users in a shared scope. + + When multiple users share access to the same scope, loading + by scope is more efficient than loading per-user. Must include + both wildcard template and specific scopes. + + Expected result: + - All users in the library scope are loaded + - Each user has appropriate access based on their roles + """ + # Load all library policies: template and specific instances + lib_filter = Filter(v2=[ + make_scope_key("lib", "*"), # Policy template + make_scope_key("lib", "alice-lib"), # Alice's role assignment + make_scope_key("lib", "bob-lib"), # Bob's role assignment + ]) + self.enforcer.load_filtered_policy(lib_filter) + + # Both alice and bob should have access based on their roles + self.assertTrue(self.enforcer.enforce( + make_user_key("alice"), + make_action_key("edit"), + make_scope_key("lib", "alice-lib") + )) + + self.assertTrue(self.enforcer.enforce( + make_user_key("bob"), + make_action_key("view"), + make_scope_key("lib", "bob-lib") + )) + + def test_load_user_specific_scope_policies(self): + """Test loading policies for specific user-scope combination. + + This is useful when you want to load only the policies for a user + in a specific scope they're currently working in. Must include + both wildcard template and specific scope. + + Expected result: + - Only alice's library policies are loaded + - Alice's course policies are not loaded + - Bob's library policies are not loaded + """ + # Load alice's library scope: template + specific scope + alice_lib_filter = Filter(v2=[ + make_scope_key("lib", "*"), # Policy template + make_scope_key("lib", "alice-lib") # Alice's role assignment + ]) + self.enforcer.load_filtered_policy(alice_lib_filter) + + # Alice can act in her library + self.assertTrue(self.enforcer.enforce( + make_user_key("alice"), + make_action_key("edit"), + make_scope_key("lib", "alice-lib") + )) + + # Alice cannot act in courses (not loaded) + self.assertFalse(self.enforcer.enforce( + make_user_key("alice"), + make_action_key("manage"), + make_scope_key("course", "alice-course") + )) + + # Bob cannot act in his library (not loaded) + self.assertFalse(self.enforcer.enforce( + make_user_key("bob"), + make_action_key("view"), + make_scope_key("lib", "bob-lib") + )) + + @ddt_data( + # (user, scopes_to_load, expected_accessible_scopes) + ( + make_user_key("alice"), + [make_scope_key("lib", "alice-lib")], + {make_scope_key("lib", "alice-lib")} + ), + ( + make_user_key("alice"), + [make_scope_key("lib", "alice-lib"), make_scope_key("course", "alice-course")], + {make_scope_key("lib", "alice-lib"), make_scope_key("course", "alice-course")} + ), + ( + make_user_key("bob"), + [make_scope_key("lib", "bob-lib")], + {make_scope_key("lib", "bob-lib")} + ), + ) + @unpack + def test_user_context_loading_scenarios(self, user, scopes_to_load, expected_accessible_scopes): + """Test various user context loading scenarios. + + This parameterized test verifies that loading policies for different + user-scope combinations produces the expected access patterns. + + Expected result: + - User can only access scopes that were loaded + - User cannot access scopes that were not loaded + """ + scope_filter = Filter(v2=scopes_to_load) + self.enforcer.load_filtered_policy(scope_filter) + + all_grouping = self.enforcer.get_grouping_policy() + user_assignments = [g for g in all_grouping if g[0] == user] + + loaded_scopes = {assignment[2] for assignment in user_assignments} + self.assertEqual(loaded_scopes, expected_accessible_scopes) From 33a14f447cb27317b1891250fa9db56e4eb0088a Mon Sep 17 00:00:00 2001 From: Maria Grimaldi Date: Fri, 10 Oct 2025 12:36:21 +0200 Subject: [PATCH 2/9] refactor: remove level of nested --- openedx_authz/api/decorators.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/openedx_authz/api/decorators.py b/openedx_authz/api/decorators.py index e129c30a..7a26fa5d 100644 --- a/openedx_authz/api/decorators.py +++ b/openedx_authz/api/decorators.py @@ -49,12 +49,15 @@ def build_filter_from_args(args) -> Filter: Filter: A Filter object populated with relevant filter values. """ filter_obj = Filter() - if filter_on and filter_on in FILTER_DATA_CLASSES: - for arg in args: - if isinstance(arg, FILTER_DATA_CLASSES[filter_on]): - filter_value = getattr(filter_obj, f"v{arg.POLICY_POSITION}") - filter_value.append(arg.policy_template) # Used to load p type policies as well. E.g., lib^* - filter_value.append(arg.namespaced_key) # E.g., lib^lib:DemoX:CSPROB + if not filter_on or filter_on not in FILTER_DATA_CLASSES: + return filter_obj + + for arg in args: + if isinstance(arg, FILTER_DATA_CLASSES[filter_on]): + filter_value = getattr(filter_obj, f"v{arg.POLICY_POSITION}") + filter_value.append(arg.policy_template) # Used to load p type policies as well. E.g., lib^* + filter_value.append(arg.namespaced_key) # E.g., lib^lib:DemoX:CSPROB + return filter_obj def decorator(f): From c27328e275af4ad1bc296e07f6687b4e9999caed Mon Sep 17 00:00:00 2001 From: Maria Grimaldi Date: Fri, 10 Oct 2025 13:29:17 +0200 Subject: [PATCH 3/9] refactor: consider no filtering as first version --- openedx_authz/api/decorators.py | 18 +++++++- openedx_authz/settings/common.py | 5 +++ openedx_authz/tests/api/test_decorators.py | 48 ++++++++++++++++++++++ 3 files changed, 70 insertions(+), 1 deletion(-) diff --git a/openedx_authz/api/decorators.py b/openedx_authz/api/decorators.py index 7a26fa5d..a7830189 100644 --- a/openedx_authz/api/decorators.py +++ b/openedx_authz/api/decorators.py @@ -1,6 +1,8 @@ """Decorators for the authorization public API.""" from functools import wraps +from django.conf import settings + from openedx_authz.api.data import ScopeData from openedx_authz.engine.enforcer import enforcer from openedx_authz.engine.filter import Filter @@ -35,8 +37,18 @@ def get_all_roles(): def get_roles_in_scope(scope: ScopeData): return enforcer.get_filtered_roles(scope.namespaced_key) """ - FILTER_DATA_CLASSES = { + FILTER_DATA_CLASSES = { # Consider empty for no filtering "scope": ScopeData, + # TODO: currently ALLOW_FILTERED_POLICY_LOADING is False to avoid partial policy loads. We can + # Allow filtering on scope (initially) once we have a CONF model that supports this so filtering is meaningful, + # consistent and doesn't lead to partial policy loads. + # We can consider this modeling to avoid partial loads and inconsistent states: + # 1. g only for user-role-scope bindings + # 2. p only for permission-role bindings + # 3. g2 only for role-role bindings + # 4. g3 only for permission grouping + # This way for a user we'd only need to load g ( filter only for the scope or user) , p, g2, g3 policies in each request + # The only filter binding would be g, the rest loads entirely to avoid not loading definitions. } def build_filter_from_args(args) -> Filter: @@ -48,6 +60,10 @@ def build_filter_from_args(args) -> Filter: Returns: Filter: A Filter object populated with relevant filter values. """ + # Fallback to no filtering in case of misbehavior + if settings.ALLOW_FILTERED_POLICY_LOADING is False: + return Filter() + filter_obj = Filter() if not filter_on or filter_on not in FILTER_DATA_CLASSES: return filter_obj diff --git a/openedx_authz/settings/common.py b/openedx_authz/settings/common.py index 22feabd3..0744da34 100644 --- a/openedx_authz/settings/common.py +++ b/openedx_authz/settings/common.py @@ -28,3 +28,8 @@ def plugin_settings(settings): # Redis host and port are temporarily loaded here for the MVP settings.REDIS_HOST = "redis" settings.REDIS_PORT = 6379 + + # TODO: Set to True when we have a CONF model that supports filtered policy loading meaningfully + # to avoid partial policy loads and inconsistent states. + # See comments in api/decorators.py for more details. + settings.ALLOW_FILTERED_POLICY_LOADING = False diff --git a/openedx_authz/tests/api/test_decorators.py b/openedx_authz/tests/api/test_decorators.py index 5b7b443c..bf09e2f0 100644 --- a/openedx_authz/tests/api/test_decorators.py +++ b/openedx_authz/tests/api/test_decorators.py @@ -375,3 +375,51 @@ def test_decorator_with_different_scopes( ) else: self.assertEqual(len(permissions), 0) + + def test_decorator_with_permission_grouping(self): + """Test decorator behavior with permission grouping in policies. + + For example: + - manage_library_team includes view_library_team through g2 + + This test verifies that when a user has edit permissions, they also implicitly have + delete permissions due to the permission grouping defined in the policy. + + Expected result: + - Decorator loads filtered policies for the given scope + - User with manage_library_team role can also view_library_team + - Enforcer is cleared after execution + """ + scope = ScopeData(external_key="lib:Org1:math_101") + subject = SubjectData(external_key="alice") + + @manage_policy_lifecycle(filter_on="scope") + def check_grouped_permissions(scope_arg, subject_arg): + """Check if subject has grouped permissions in the given scope. + + Expected scenario: + - Alice has library_admin role in lib:Org1:math_101 + - library_admin role includes manage_library_team + - manage_library_team includes view_library_team through g2 + """ + can_manage_team = global_enforcer.enforce( + subject_arg.namespaced_key, + ActionData(external_key="manage_library_team").namespaced_key, + scope_arg.namespaced_key, + ) + + can_view_team = global_enforcer.enforce( + subject_arg.namespaced_key, + ActionData(external_key="view_library_team").namespaced_key, + scope_arg.namespaced_key, + ) + + return { + "can_manage_team": can_manage_team, + "can_view_team": can_view_team, + } + + result = check_grouped_permissions(scope, subject) + + self.assertTrue(result["can_manage_team"], "Alice should be able to manage library team") + self.assertTrue(result["can_view_team"], "Alice should be able to view library team due to grouping") From 4d0cfb5582eb80b4bf9f176ea7184416e876c95e Mon Sep 17 00:00:00 2001 From: Maria Grimaldi Date: Mon, 13 Oct 2025 12:00:19 +0200 Subject: [PATCH 4/9] refactor: address quality issues --- openedx_authz/api/decorators.py | 4 ++-- openedx_authz/api/roles.py | 2 +- openedx_authz/engine/adapter.py | 2 +- openedx_authz/settings/test.py | 1 + openedx_authz/tests/api/test_decorators.py | 25 ++++++++++------------ 5 files changed, 16 insertions(+), 18 deletions(-) diff --git a/openedx_authz/api/decorators.py b/openedx_authz/api/decorators.py index a7830189..9b5100de 100644 --- a/openedx_authz/api/decorators.py +++ b/openedx_authz/api/decorators.py @@ -47,8 +47,8 @@ def get_roles_in_scope(scope: ScopeData): # 2. p only for permission-role bindings # 3. g2 only for role-role bindings # 4. g3 only for permission grouping - # This way for a user we'd only need to load g ( filter only for the scope or user) , p, g2, g3 policies in each request - # The only filter binding would be g, the rest loads entirely to avoid not loading definitions. + # This way for a user we'd only need to load g ( filter only for the scope or user) , p, g2, g3 policies in + # each request. The only filter binding would be g, the rest loads entirely to avoid not loading definitions. } def build_filter_from_args(args) -> Filter: diff --git a/openedx_authz/api/roles.py b/openedx_authz/api/roles.py index 3d4b11d8..e837c3ee 100644 --- a/openedx_authz/api/roles.py +++ b/openedx_authz/api/roles.py @@ -19,9 +19,9 @@ ScopeData, SubjectData, ) +from openedx_authz.api.decorators import manage_policy_lifecycle from openedx_authz.api.permissions import get_permission_from_policy from openedx_authz.engine.enforcer import enforcer -from openedx_authz.api.decorators import manage_policy_lifecycle __all__ = [ "get_permissions_for_single_role", diff --git a/openedx_authz/engine/adapter.py b/openedx_authz/engine/adapter.py index 0fe49828..c68cfe82 100644 --- a/openedx_authz/engine/adapter.py +++ b/openedx_authz/engine/adapter.py @@ -18,7 +18,7 @@ from casbin.persist import FilteredAdapter from casbin_adapter.adapter import Adapter from casbin_adapter.models import CasbinRule -from django.db.models import QuerySet, Q +from django.db.models import QuerySet from openedx_authz.engine.filter import Filter diff --git a/openedx_authz/settings/test.py b/openedx_authz/settings/test.py index 8e2ea3e7..1f02ab81 100644 --- a/openedx_authz/settings/test.py +++ b/openedx_authz/settings/test.py @@ -58,3 +58,4 @@ CASBIN_WATCHER_ENABLED = False USE_TZ = True ROOT_URLCONF = "openedx_authz.urls" +ALLOW_FILTERED_POLICY_LOADING = False diff --git a/openedx_authz/tests/api/test_decorators.py b/openedx_authz/tests/api/test_decorators.py index bf09e2f0..9b4405cb 100644 --- a/openedx_authz/tests/api/test_decorators.py +++ b/openedx_authz/tests/api/test_decorators.py @@ -5,21 +5,15 @@ """ import casbin +import pytest from ddt import data as ddt_data from ddt import ddt, unpack +from django.conf import settings from django.test import TestCase -from openedx_authz.api.data import ( - ActionData, - RoleData, - ScopeData, - SubjectData, -) +from openedx_authz.api.data import ActionData, RoleData, ScopeData, SubjectData from openedx_authz.api.decorators import manage_policy_lifecycle -from openedx_authz.api.roles import ( - assign_role_to_subject_in_scope, - get_permissions_for_active_roles_in_scope, -) +from openedx_authz.api.roles import assign_role_to_subject_in_scope, get_permissions_for_active_roles_in_scope from openedx_authz.engine.enforcer import enforcer as global_enforcer from openedx_authz.engine.filter import Filter from openedx_authz.engine.utils import migrate_policy_between_enforcers @@ -90,6 +84,7 @@ def tearDown(self): super().tearDown() global_enforcer.clear_policy() + @pytest.mark.skipif(settings.ALLOW_FILTERED_POLICY_LOADING is False, reason="Filtered policy loading not allowed") def test_decorator_filters_by_scope_and_clears(self): """Test decorator loads filtered policies by scope and clears after execution. @@ -102,7 +97,7 @@ def test_decorator_filters_by_scope_and_clears(self): scope = ScopeData(external_key="lib:Org1:math_101") @manage_policy_lifecycle(filter_on="scope") - def get_policy_info(scope_arg): + def get_policy_info(scope_arg): # pylint: disable=unused-argument policy_count = len(global_enforcer.get_policy()) grouping_policy_count = len(global_enforcer.get_grouping_policy()) return { @@ -133,7 +128,7 @@ def test_decorator_loads_full_policy_without_filter(self): """ @manage_policy_lifecycle(filter_on="scope") - def get_full_policy_count(some_arg): + def get_full_policy_count(some_arg): # pylint: disable=unused-argument """Function that does not take a scope argument. This should cause the decorator to load the full policy. @@ -161,7 +156,7 @@ def test_decorator_clears_policy_on_exception(self): """ @manage_policy_lifecycle(filter_on="scope") - def failing_function(scope_arg): + def failing_function(scope_arg): # pylint: disable=unused-argument """Function that raises an exception to test decorator cleanup.""" if len(global_enforcer.get_policy()) >= 0: raise ValueError("Intentional test exception") @@ -174,7 +169,8 @@ def failing_function(scope_arg): self.assertEqual(str(context.exception), "Intentional test exception") - def test_decorator_with_enforcement_checks(self): + @pytest.mark.skipif(settings.ALLOW_FILTERED_POLICY_LOADING is False, reason="Filtered policy loading not allowed") + def test_decorator_with_enforcement_checks_with_filtered_loading(self): """Test that policies loaded by decorator enable correct enforcement decisions. Expected result: @@ -235,6 +231,7 @@ def check_permissions(scope_arg, subject_arg): self.assertEqual(result["policy_count"], expected_policies) self.assertEqual(result["grouping_count"], expected_grouping) + @pytest.mark.skipif(settings.ALLOW_FILTERED_POLICY_LOADING is False, reason="Filtered policy loading not allowed") def test_decorator_enforcement_with_different_subjects(self): """Test enforcement with different subjects having different roles. From 78cdd84a2d2e9b5577db05e6e960222fac996fe0 Mon Sep 17 00:00:00 2001 From: Maria Grimaldi Date: Tue, 14 Oct 2025 10:54:28 +0200 Subject: [PATCH 5/9] refactor: address quality issues --- openedx_authz/api/decorators.py | 3 ++- openedx_authz/api/roles.py | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/openedx_authz/api/decorators.py b/openedx_authz/api/decorators.py index 9b5100de..b23fe970 100644 --- a/openedx_authz/api/decorators.py +++ b/openedx_authz/api/decorators.py @@ -37,8 +37,9 @@ def get_all_roles(): def get_roles_in_scope(scope: ScopeData): return enforcer.get_filtered_roles(scope.namespaced_key) """ + FILTER_DATA_CLASSES = { # Consider empty for no filtering - "scope": ScopeData, + # "scope": ScopeData, # TODO: currently ALLOW_FILTERED_POLICY_LOADING is False to avoid partial policy loads. We can # Allow filtering on scope (initially) once we have a CONF model that supports this so filtering is meaningful, # consistent and doesn't lead to partial policy loads. diff --git a/openedx_authz/api/roles.py b/openedx_authz/api/roles.py index e837c3ee..90339e7e 100644 --- a/openedx_authz/api/roles.py +++ b/openedx_authz/api/roles.py @@ -178,6 +178,7 @@ def get_role_definitions_in_scope(scope: ScopeData) -> list[RoleData]: for role, permissions in permissions_per_role.items() ] + @manage_policy_lifecycle() def get_all_roles_names() -> list[str]: """Get all the available roles names in the current environment. From d666e354e505173db232ed39ad69f2c13bb089fd Mon Sep 17 00:00:00 2001 From: Maria Grimaldi Date: Tue, 14 Oct 2025 11:07:29 +0200 Subject: [PATCH 6/9] docs: update inline doc with better structure --- openedx_authz/api/decorators.py | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/openedx_authz/api/decorators.py b/openedx_authz/api/decorators.py index b23fe970..8f6befcb 100644 --- a/openedx_authz/api/decorators.py +++ b/openedx_authz/api/decorators.py @@ -3,7 +3,6 @@ from django.conf import settings -from openedx_authz.api.data import ScopeData from openedx_authz.engine.enforcer import enforcer from openedx_authz.engine.filter import Filter @@ -38,18 +37,21 @@ def get_roles_in_scope(scope: ScopeData): return enforcer.get_filtered_roles(scope.namespaced_key) """ - FILTER_DATA_CLASSES = { # Consider empty for no filtering + FILTER_DATA_CLASSES = { # Consider empty for no filtering # "scope": ScopeData, - # TODO: currently ALLOW_FILTERED_POLICY_LOADING is False to avoid partial policy loads. We can - # Allow filtering on scope (initially) once we have a CONF model that supports this so filtering is meaningful, - # consistent and doesn't lead to partial policy loads. - # We can consider this modeling to avoid partial loads and inconsistent states: - # 1. g only for user-role-scope bindings - # 2. p only for permission-role bindings - # 3. g2 only for role-role bindings - # 4. g3 only for permission grouping - # This way for a user we'd only need to load g ( filter only for the scope or user) , p, g2, g3 policies in - # each request. The only filter binding would be g, the rest loads entirely to avoid not loading definitions. + # TODO: Currently, ALLOW_FILTERED_POLICY_LOADING is set to False to prevent partial policy loads, + # so this dictionary is also intentionally left empty. + # We can enable scope-based filtering once we have a CONF model that supports it, + # ensuring the filtering is meaningful, consistent, and does not cause partial policy loads. + # + # One possible model to support safe filtering and avoid inconsistent states could be: + # 1. g -> user-role-scope bindings + # 2. p -> permission-role bindings + # 3. g2 -> role-role bindings + # 4. g3 -> permission grouping + # + # With this structure, for a given user we would only need to load g (filtered by scope or user), + # while p, g2, and g3 would be fully loaded to ensure all definitions are available. } def build_filter_from_args(args) -> Filter: From 3dca2a242126bdd0d33395a1da23bf3bc6de57cf Mon Sep 17 00:00:00 2001 From: Maria Grimaldi Date: Tue, 14 Oct 2025 11:08:32 +0200 Subject: [PATCH 7/9] docs: address quality issues --- openedx_authz/tests/api/test_permissions.py | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/openedx_authz/tests/api/test_permissions.py b/openedx_authz/tests/api/test_permissions.py index c8e16af5..5bf43821 100644 --- a/openedx_authz/tests/api/test_permissions.py +++ b/openedx_authz/tests/api/test_permissions.py @@ -10,18 +10,9 @@ from ddt import ddt, unpack from django.test import TestCase -from openedx_authz.api.data import ( - ActionData, - RoleData, - ScopeData, - SubjectData, -) -from openedx_authz.api.permissions import ( - is_subject_allowed, -) -from openedx_authz.api.roles import ( - assign_role_to_subject_in_scope, -) +from openedx_authz.api.data import ActionData, RoleData, ScopeData, SubjectData +from openedx_authz.api.permissions import is_subject_allowed +from openedx_authz.api.roles import assign_role_to_subject_in_scope from openedx_authz.engine.enforcer import enforcer as global_enforcer from openedx_authz.engine.utils import migrate_policy_between_enforcers From 1f6d978794aa450fd266b6bb2cd80af9c773bd84 Mon Sep 17 00:00:00 2001 From: Maria Grimaldi Date: Tue, 14 Oct 2025 11:24:11 +0200 Subject: [PATCH 8/9] refactor: drop filter_on in this first version --- openedx_authz/api/roles.py | 27 ++++++++++----------------- 1 file changed, 10 insertions(+), 17 deletions(-) diff --git a/openedx_authz/api/roles.py b/openedx_authz/api/roles.py index 90339e7e..44a839cd 100644 --- a/openedx_authz/api/roles.py +++ b/openedx_authz/api/roles.py @@ -87,7 +87,7 @@ def get_permissions_for_roles( return permissions_by_role -@manage_policy_lifecycle(filter_on="scope") +@manage_policy_lifecycle() def get_permissions_for_active_roles_in_scope( scope: ScopeData, role: RoleData | None = None ) -> dict[str, dict[str, list[PermissionData | str]]]: @@ -118,7 +118,6 @@ def get_permissions_for_active_roles_in_scope( dict[str, list[PermissionData]]: A dictionary mapping the role external_key to its permissions and scopes. """ - enforcer.load_policy() filtered_policy = enforcer.get_filtered_grouping_policy( GroupingPolicyIndex.SCOPE.value, scope.namespaced_key ) @@ -138,7 +137,7 @@ def get_permissions_for_active_roles_in_scope( ) -@manage_policy_lifecycle(filter_on="scope") +@manage_policy_lifecycle() def get_role_definitions_in_scope(scope: ScopeData) -> list[RoleData]: """Get all role definitions available in a specific scope. @@ -151,7 +150,6 @@ def get_role_definitions_in_scope(scope: ScopeData) -> list[RoleData]: Returns: list[Role]: A list of roles. """ - enforcer.load_policy() policy_filtered = enforcer.get_filtered_policy( PolicyIndex.SCOPE.value, scope.namespaced_key ) @@ -189,7 +187,7 @@ def get_all_roles_names() -> list[str]: return enforcer.get_all_subjects() -@manage_policy_lifecycle(filter_on="scope") +@manage_policy_lifecycle() def get_all_roles_in_scope(scope: ScopeData) -> list[list[str]]: """Get all the available role grouping policies in a specific scope. @@ -199,13 +197,12 @@ def get_all_roles_in_scope(scope: ScopeData) -> list[list[str]]: Returns: list[list[str]]: A list of policies in the specified scope. """ - enforcer.load_policy() return enforcer.get_filtered_grouping_policy( GroupingPolicyIndex.SCOPE.value, scope.namespaced_key ) -@manage_policy_lifecycle(filter_on="scope") +@manage_policy_lifecycle() def assign_role_to_subject_in_scope( subject: SubjectData, role: RoleData, scope: ScopeData ) -> bool: @@ -219,7 +216,6 @@ def assign_role_to_subject_in_scope( Returns: bool: True if the role was assigned successfully, False otherwise. """ - enforcer.load_policy() return enforcer.add_role_for_user_in_domain( subject.namespaced_key, role.namespaced_key, @@ -227,7 +223,7 @@ def assign_role_to_subject_in_scope( ) -@manage_policy_lifecycle(filter_on="scope") +@manage_policy_lifecycle() def batch_assign_role_to_subjects_in_scope( subjects: list[SubjectData], role: RoleData, scope: ScopeData ) -> None: @@ -241,7 +237,7 @@ def batch_assign_role_to_subjects_in_scope( assign_role_to_subject_in_scope(subject, role, scope) -@manage_policy_lifecycle(filter_on="scope") +@manage_policy_lifecycle() def unassign_role_from_subject_in_scope( subject: SubjectData, role: RoleData, scope: ScopeData ) -> bool: @@ -255,13 +251,12 @@ def unassign_role_from_subject_in_scope( Returns: bool: True if the role was unassigned successfully, False otherwise. """ - enforcer.load_policy() return enforcer.delete_roles_for_user_in_domain( subject.namespaced_key, role.namespaced_key, scope.namespaced_key ) -@manage_policy_lifecycle(filter_on="scope") +@manage_policy_lifecycle() def batch_unassign_role_from_subjects_in_scope( subjects: list[SubjectData], role: RoleData, scope: ScopeData ) -> None: @@ -303,7 +298,7 @@ def get_subject_role_assignments(subject: SubjectData) -> list[RoleAssignmentDat return role_assignments -@manage_policy_lifecycle(filter_on="scope") +@manage_policy_lifecycle() def get_subject_role_assignments_in_scope( subject: SubjectData, scope: ScopeData ) -> list[RoleAssignmentData]: @@ -316,7 +311,6 @@ def get_subject_role_assignments_in_scope( Returns: list[RoleAssignmentData]: A list of role assignments for the subject in the scope. """ - enforcer.load_policy() # TODO: we still need to get the remaining data for the role like email, etc role_assignments = [] for namespaced_key in enforcer.get_roles_for_user_in_domain( @@ -338,7 +332,7 @@ def get_subject_role_assignments_in_scope( return role_assignments -@manage_policy_lifecycle(filter_on="scope") +@manage_policy_lifecycle() def get_subject_role_assignments_for_role_in_scope( role: RoleData, scope: ScopeData ) -> list[RoleAssignmentData]: @@ -375,7 +369,7 @@ def get_subject_role_assignments_for_role_in_scope( return role_assignments -@manage_policy_lifecycle(filter_on="scope") +@manage_policy_lifecycle() def get_all_subject_role_assignments_in_scope(scope: ScopeData) -> list[RoleAssignmentData]: """Get all the subjects assigned to any role in a specific scope. @@ -415,6 +409,5 @@ def get_subjects_for_role(role: RoleData) -> list[SubjectData]: Returns: list[SubjectData]: A list of subjects assigned to the specified role. """ - enforcer.load_policy() policies = enforcer.get_filtered_grouping_policy(GroupingPolicyIndex.ROLE.value, role.namespaced_key) return [SubjectData(namespaced_key=policy[GroupingPolicyIndex.SUBJECT.value]) for policy in policies] From 9223611b28568987728febef2e92a1b34ce292ed Mon Sep 17 00:00:00 2001 From: Maria Grimaldi Date: Tue, 14 Oct 2025 13:53:04 +0200 Subject: [PATCH 9/9] refactor: avoid clearing policies after finishing to avoid inconsistent states --- openedx_authz/api/decorators.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/openedx_authz/api/decorators.py b/openedx_authz/api/decorators.py index 8f6befcb..91c41010 100644 --- a/openedx_authz/api/decorators.py +++ b/openedx_authz/api/decorators.py @@ -91,10 +91,9 @@ def wrapper(*args, **kwargs): else: enforcer.load_policy() - try: - return f(*args, **kwargs) - finally: - enforcer.clear_policy() + # Avoid clearing policies to prevent issues with shared enforcer state + # in long-running processes or concurrent requests. + return f(*args, **kwargs) return wrapper