Skip to content
11 changes: 11 additions & 0 deletions openedx_authz/api/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,8 @@ class ScopeData(AuthZData, metaclass=ScopeMeta):
"""

NAMESPACE: ClassVar[str] = "sc"
POLICY_POSITION = 2 # Position of scope in Casbin policy rules (p = sub, act, obj)
GROUPING_POLICY_POSITION = 2 # Position of scope in Casbin grouping policy rules (g = sub, role, scope)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused variable


@classmethod
def validate_external_key(cls, _: str) -> bool:
Expand All @@ -332,6 +334,15 @@ def exists(self) -> bool:
"""
raise NotImplementedError("Subclasses must implement exists method.")

@property
def policy_template(self) -> str:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add examples in the docstring? I think they are very useful for understanding.

"""Get the policy template for the scope.

Returns:
str: The policy template string.
"""
return f"{self.NAMESPACE}{self.SEPARATOR}*"
Copy link
Copy Markdown
Contributor

@BryanttV BryanttV Oct 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use the GENERIC_SCOPE_WILDCARD variable instead of the * literal character?



@define
class ContentLibraryData(ScopeData):
Expand Down
100 changes: 100 additions & 0 deletions openedx_authz/api/decorators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
"""Decorators for the authorization public API."""
from functools import wraps

from django.conf import settings

from openedx_authz.engine.enforcer import enforcer
from openedx_authz.engine.filter import Filter


def manage_policy_lifecycle(filter_on: str = ""):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: You can probably just annotate this return as Any from typing.

"""Decorator to manage policy lifecycle around API calls.

This decorator ensures proper policy loading and clearing around API function calls.
It loads relevant policies before execution and clears them afterward to prevent
stale policy issues in long-running processes.

Can be used in two ways:
@manage_policy_lifecycle() -> Loads full policy
@manage_policy_lifecycle(filter_on="scope") -> Loads filtered policy by scope

Args:
filter_on (str): The type of data class to filter on (e.g., "scope").
If empty, loads full policy.

Returns:
callable: The decorated function or decorator.

Examples:
# Without filtering (loads all policies):
@manage_policy_lifecycle()
def get_all_roles():
return enforcer.get_all_roles()

# With scope filtering (loads only relevant policies):
@manage_policy_lifecycle(filter_on="scope")
def get_roles_in_scope(scope: ScopeData):
return enforcer.get_filtered_roles(scope.namespaced_key)
Comment on lines +29 to +37
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To highlight the code:

Suggested change
# Without filtering (loads all policies):
@manage_policy_lifecycle()
def get_all_roles():
return enforcer.get_all_roles()
# With scope filtering (loads only relevant policies):
@manage_policy_lifecycle(filter_on="scope")
def get_roles_in_scope(scope: ScopeData):
return enforcer.get_filtered_roles(scope.namespaced_key)
- Without filtering (loads all policies)::
@manage_policy_lifecycle()
def get_all_roles():
return enforcer.get_all_roles()
- With scope filtering (loads only relevant policies)::
@manage_policy_lifecycle(filter_on="scope")
def get_roles_in_scope(scope: ScopeData):
return enforcer.get_filtered_roles(scope.namespaced_key)

"""

FILTER_DATA_CLASSES = { # Consider empty for no filtering
# "scope": ScopeData,
# TODO: Currently, ALLOW_FILTERED_POLICY_LOADING is set to False to prevent partial policy loads,
# so this dictionary is also intentionally left empty.
# We can enable scope-based filtering once we have a CONF model that supports it,
# ensuring the filtering is meaningful, consistent, and does not cause partial policy loads.
#
# One possible model to support safe filtering and avoid inconsistent states could be:
# 1. g -> user-role-scope bindings
# 2. p -> permission-role bindings
# 3. g2 -> role-role bindings
# 4. g3 -> permission grouping
#
# With this structure, for a given user we would only need to load g (filtered by scope or user),
# while p, g2, and g3 would be fully loaded to ensure all definitions are available.
}

def build_filter_from_args(args) -> Filter:
"""Build a Filter object from function arguments based on the filter_on parameter.

Args:
args (tuple): Positional arguments passed to the decorated function.

Returns:
Filter: A Filter object populated with relevant filter values.
"""
# Fallback to no filtering in case of misbehavior
if settings.ALLOW_FILTERED_POLICY_LOADING is False:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we planning on getting this working for Ulmo? If not, I would rather temporarily remove the code we're not using than have code that we don't trust just disabled via a flag.

Copy link
Copy Markdown
Member Author

@mariajgrimaldi mariajgrimaldi Oct 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree! I can remove it and save it in another branch. My initial idea was to start by loading the entire policy and then address any issues with this approach retroactively. I did try adding support for filtered loading, but I ran into some issues while finishing the implementation and adding tests. As I mentioned here: https://github.com/openedx/openedx-authz/pull/86/files#diff-a6773904192669f0814472909dcec3567d5a893f6a97e6b8682cdd2ba8bcb55dR40-R55, in my opinion, making filtered loading consistent would probably require a different model, which isn't a priority right now. We could revisit it later if we notice resource consumption issues.

return Filter()

filter_obj = Filter()
if not filter_on or filter_on not in FILTER_DATA_CLASSES:
return filter_obj

for arg in args:
if isinstance(arg, FILTER_DATA_CLASSES[filter_on]):
filter_value = getattr(filter_obj, f"v{arg.POLICY_POSITION}")
filter_value.append(arg.policy_template) # Used to load p type policies as well. E.g., lib^*
filter_value.append(arg.namespaced_key) # E.g., lib^lib:DemoX:CSPROB

return filter_obj

def decorator(f):
"""Inner decorator that wraps the function with policy lifecycle management."""
@wraps(f)
def wrapper(*args, **kwargs):
"""Wrapper that handles policy loading, execution, and cleanup."""
filter_obj = build_filter_from_args(args)

if any([filter_obj.ptype, filter_obj.v0, filter_obj.v1, filter_obj.v2]):
enforcer.load_filtered_policy(filter_obj)
else:
enforcer.load_policy()

# Avoid clearing policies to prevent issues with shared enforcer state
# in long-running processes or concurrent requests.
return f(*args, **kwargs)

return wrapper

return decorator
2 changes: 2 additions & 0 deletions openedx_authz/api/permissions.py
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add the decorator to the get_subjects_for_role function

Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"""

from openedx_authz.api.data import ActionData, PermissionData, PolicyIndex, ScopeData, SubjectData
from openedx_authz.api.decorators import manage_policy_lifecycle
from openedx_authz.engine.enforcer import enforcer

__all__ = [
Expand Down Expand Up @@ -46,6 +47,7 @@ def get_all_permissions_in_scope(scope: ScopeData) -> list[PermissionData]:
return [get_permission_from_policy(action) for action in actions]


@manage_policy_lifecycle(filter_on="scope")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
@manage_policy_lifecycle(filter_on="scope")
@manage_policy_lifecycle()

def is_subject_allowed(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, remove the enforcer.load_policy()

subject: SubjectData,
action: ActionData,
Expand Down
26 changes: 16 additions & 10 deletions openedx_authz/api/roles.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
ScopeData,
SubjectData,
)
from openedx_authz.api.decorators import manage_policy_lifecycle
from openedx_authz.api.permissions import get_permission_from_policy
from openedx_authz.engine.enforcer import enforcer

Expand Down Expand Up @@ -48,6 +49,7 @@
# in this case, ALL the policies, but that might not be the case


@manage_policy_lifecycle()
def get_permissions_for_single_role(
Comment on lines +52 to 53
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm concerned about having to load the policy twice, since this function is called internally by other functions here.

role: RoleData,
) -> list[PermissionData]:
Expand All @@ -63,6 +65,7 @@ def get_permissions_for_single_role(
return [get_permission_from_policy(policy) for policy in policies]


@manage_policy_lifecycle()
def get_permissions_for_roles(
roles: list[RoleData],
) -> dict[str, dict[str, list[PermissionData | str]]]:
Expand All @@ -84,6 +87,7 @@ def get_permissions_for_roles(
return permissions_by_role


@manage_policy_lifecycle()
def get_permissions_for_active_roles_in_scope(
scope: ScopeData, role: RoleData | None = None
) -> dict[str, dict[str, list[PermissionData | str]]]:
Expand Down Expand Up @@ -114,7 +118,6 @@ def get_permissions_for_active_roles_in_scope(
dict[str, list[PermissionData]]: A dictionary mapping the role external_key to its
permissions and scopes.
"""
enforcer.load_policy()
filtered_policy = enforcer.get_filtered_grouping_policy(
GroupingPolicyIndex.SCOPE.value, scope.namespaced_key
)
Expand All @@ -134,6 +137,7 @@ def get_permissions_for_active_roles_in_scope(
)


@manage_policy_lifecycle()
def get_role_definitions_in_scope(scope: ScopeData) -> list[RoleData]:
"""Get all role definitions available in a specific scope.

Expand All @@ -146,7 +150,6 @@ def get_role_definitions_in_scope(scope: ScopeData) -> list[RoleData]:
Returns:
list[Role]: A list of roles.
"""
enforcer.load_policy()
policy_filtered = enforcer.get_filtered_policy(
PolicyIndex.SCOPE.value, scope.namespaced_key
)
Expand Down Expand Up @@ -174,6 +177,7 @@ def get_role_definitions_in_scope(scope: ScopeData) -> list[RoleData]:
]


@manage_policy_lifecycle()
def get_all_roles_names() -> list[str]:
"""Get all the available roles names in the current environment.

Expand All @@ -183,6 +187,7 @@ def get_all_roles_names() -> list[str]:
return enforcer.get_all_subjects()


@manage_policy_lifecycle()
def get_all_roles_in_scope(scope: ScopeData) -> list[list[str]]:
"""Get all the available role grouping policies in a specific scope.

Expand All @@ -192,12 +197,12 @@ def get_all_roles_in_scope(scope: ScopeData) -> list[list[str]]:
Returns:
list[list[str]]: A list of policies in the specified scope.
"""
enforcer.load_policy()
return enforcer.get_filtered_grouping_policy(
GroupingPolicyIndex.SCOPE.value, scope.namespaced_key
)


@manage_policy_lifecycle()
def assign_role_to_subject_in_scope(
subject: SubjectData, role: RoleData, scope: ScopeData
) -> bool:
Expand All @@ -211,14 +216,14 @@ def assign_role_to_subject_in_scope(
Returns:
bool: True if the role was assigned successfully, False otherwise.
"""
enforcer.load_policy()
return enforcer.add_role_for_user_in_domain(
subject.namespaced_key,
role.namespaced_key,
scope.namespaced_key,
)


@manage_policy_lifecycle()
def batch_assign_role_to_subjects_in_scope(
subjects: list[SubjectData], role: RoleData, scope: ScopeData
) -> None:
Expand All @@ -232,6 +237,7 @@ def batch_assign_role_to_subjects_in_scope(
assign_role_to_subject_in_scope(subject, role, scope)


@manage_policy_lifecycle()
def unassign_role_from_subject_in_scope(
subject: SubjectData, role: RoleData, scope: ScopeData
) -> bool:
Expand All @@ -245,12 +251,12 @@ def unassign_role_from_subject_in_scope(
Returns:
bool: True if the role was unassigned successfully, False otherwise.
"""
enforcer.load_policy()
return enforcer.delete_roles_for_user_in_domain(
subject.namespaced_key, role.namespaced_key, scope.namespaced_key
)


@manage_policy_lifecycle()
def batch_unassign_role_from_subjects_in_scope(
subjects: list[SubjectData], role: RoleData, scope: ScopeData
) -> None:
Expand All @@ -265,6 +271,7 @@ def batch_unassign_role_from_subjects_in_scope(
unassign_role_from_subject_in_scope(subject, role, scope)


@manage_policy_lifecycle()
def get_subject_role_assignments(subject: SubjectData) -> list[RoleAssignmentData]:
"""Get all the roles for a subject across all scopes.

Expand All @@ -291,6 +298,7 @@ def get_subject_role_assignments(subject: SubjectData) -> list[RoleAssignmentDat
return role_assignments


@manage_policy_lifecycle()
def get_subject_role_assignments_in_scope(
subject: SubjectData, scope: ScopeData
) -> list[RoleAssignmentData]:
Expand All @@ -303,7 +311,6 @@ def get_subject_role_assignments_in_scope(
Returns:
list[RoleAssignmentData]: A list of role assignments for the subject in the scope.
"""
enforcer.load_policy()
# TODO: we still need to get the remaining data for the role like email, etc
role_assignments = []
for namespaced_key in enforcer.get_roles_for_user_in_domain(
Expand All @@ -325,6 +332,7 @@ def get_subject_role_assignments_in_scope(
return role_assignments


@manage_policy_lifecycle()
def get_subject_role_assignments_for_role_in_scope(
role: RoleData, scope: ScopeData
) -> list[RoleAssignmentData]:
Expand Down Expand Up @@ -361,9 +369,8 @@ def get_subject_role_assignments_for_role_in_scope(
return role_assignments


def get_all_subject_role_assignments_in_scope(
scope: ScopeData,
) -> list[RoleAssignmentData]:
@manage_policy_lifecycle()
def get_all_subject_role_assignments_in_scope(scope: ScopeData) -> list[RoleAssignmentData]:
"""Get all the subjects assigned to any role in a specific scope.

Args:
Expand Down Expand Up @@ -402,6 +409,5 @@ def get_subjects_for_role(role: RoleData) -> list[SubjectData]:
Returns:
list[SubjectData]: A list of subjects assigned to the specified role.
"""
enforcer.load_policy()
policies = enforcer.get_filtered_grouping_policy(GroupingPolicyIndex.ROLE.value, role.namespaced_key)
return [SubjectData(namespaced_key=policy[GroupingPolicyIndex.SUBJECT.value]) for policy in policies]
5 changes: 5 additions & 0 deletions openedx_authz/settings/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,8 @@ def plugin_settings(settings):
# Redis host and port are temporarily loaded here for the MVP
settings.REDIS_HOST = "redis"
settings.REDIS_PORT = 6379

# TODO: Set to True when we have a CONF model that supports filtered policy loading meaningfully
# to avoid partial policy loads and inconsistent states.
# See comments in api/decorators.py for more details.
settings.ALLOW_FILTERED_POLICY_LOADING = False
1 change: 1 addition & 0 deletions openedx_authz/settings/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,4 @@
CASBIN_WATCHER_ENABLED = False
USE_TZ = True
ROOT_URLCONF = "openedx_authz.urls"
ALLOW_FILTERED_POLICY_LOADING = False
Loading