Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Change Log
Unreleased
**********

*
* Migrate from using pycodestyle and isort to ruff for code quality checks and formatting.

0.6.0 - 2025-10-22
******************
Expand Down
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ upgrade: ## update the requirements/*.txt files with the latest packages satisfy
quality: ## check coding style with pycodestyle and pylint
tox -e quality

format: ## format code with black and isort. Enable ruff to fix E (pycodestyle) and I (isort) issues
ruff format openedx_authz tests test_utils manage.py setup.py
ruff check --fix openedx_authz tests test_utils manage.py setup.py

pii_check: ## check for PII annotations on all Django models
tox -e pii_check

Expand Down
8 changes: 2 additions & 6 deletions openedx_authz/api/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,7 @@ def get_subclass_by_external_key(mcs, external_key: str) -> Type["ScopeData"]:
scope_subclass = mcs.scope_registry.get(namespace)

if not scope_subclass:
raise ValueError(
f"Unknown scope: {namespace} for external_key: {external_key}"
)
raise ValueError(f"Unknown scope: {namespace} for external_key: {external_key}")

if not scope_subclass.validate_external_key(external_key):
raise ValueError(f"Invalid external_key format: {external_key}")
Expand Down Expand Up @@ -281,9 +279,7 @@ def validate_external_key(mcs, external_key: str) -> bool:
Returns:
bool: True if valid, False otherwise.
"""
raise NotImplementedError(
"Subclasses must implement validate_external_key method."
)
raise NotImplementedError("Subclasses must implement validate_external_key method.")


@define
Expand Down
8 changes: 2 additions & 6 deletions openedx_authz/api/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@ def get_all_permissions_in_scope(scope: ScopeData) -> list[PermissionData]:
list of PermissionData: A list of PermissionData objects associated with the given scope.
"""
enforcer = AuthzEnforcer.get_enforcer()
actions = enforcer.get_filtered_policy(
PolicyIndex.SCOPE.value, scope.namespaced_key
)
actions = enforcer.get_filtered_policy(PolicyIndex.SCOPE.value, scope.namespaced_key)
return [get_permission_from_policy(action) for action in actions]


Expand All @@ -65,6 +63,4 @@ def is_subject_allowed(
bool: True if the subject has the specified permission in the scope, False otherwise.
"""
enforcer = AuthzEnforcer.get_enforcer()
return enforcer.enforce(
subject.namespaced_key, action.namespaced_key, scope.namespaced_key
)
return enforcer.enforce(subject.namespaced_key, action.namespaced_key, scope.namespaced_key)
69 changes: 17 additions & 52 deletions openedx_authz/api/roles.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,7 @@ def get_permissions_for_roles(
permissions_by_role = {}

for role in roles:
permissions_by_role[role.external_key] = {
"permissions": get_permissions_for_single_role(role)
}
permissions_by_role[role.external_key] = {"permissions": get_permissions_for_single_role(role)}

return permissions_by_role

Expand Down Expand Up @@ -116,22 +114,15 @@ def get_permissions_for_active_roles_in_scope(
permissions and scopes.
"""
enforcer = AuthzEnforcer.get_enforcer()
filtered_policy = enforcer.get_filtered_grouping_policy(
GroupingPolicyIndex.SCOPE.value, scope.namespaced_key
)
filtered_policy = enforcer.get_filtered_grouping_policy(GroupingPolicyIndex.SCOPE.value, scope.namespaced_key)

if role:
filtered_policy = [
policy
for policy in filtered_policy
if policy[GroupingPolicyIndex.ROLE.value] == role.namespaced_key
policy for policy in filtered_policy if policy[GroupingPolicyIndex.ROLE.value] == role.namespaced_key
]

return get_permissions_for_roles(
[
RoleData(namespaced_key=policy[GroupingPolicyIndex.ROLE.value])
for policy in filtered_policy
]
[RoleData(namespaced_key=policy[GroupingPolicyIndex.ROLE.value]) for policy in filtered_policy]
)


Expand All @@ -148,9 +139,7 @@ def get_role_definitions_in_scope(scope: ScopeData) -> list[RoleData]:
list[Role]: A list of roles.
"""
enforcer = AuthzEnforcer.get_enforcer()
policy_filtered = enforcer.get_filtered_policy(
PolicyIndex.SCOPE.value, scope.namespaced_key
)
policy_filtered = enforcer.get_filtered_policy(PolicyIndex.SCOPE.value, scope.namespaced_key)

permissions_per_role = defaultdict(
lambda: {
Expand All @@ -162,9 +151,7 @@ def get_role_definitions_in_scope(scope: ScopeData) -> list[RoleData]:
permissions_per_role[policy[PolicyIndex.ROLE.value]]["scopes"].append(
ScopeData(namespaced_key=policy[PolicyIndex.SCOPE.value])
) # TODO: I don't think this actually gets used anywhere
permissions_per_role[policy[PolicyIndex.ROLE.value]]["permissions"].append(
get_permission_from_policy(policy)
)
permissions_per_role[policy[PolicyIndex.ROLE.value]]["permissions"].append(get_permission_from_policy(policy))

return [
RoleData(
Expand Down Expand Up @@ -194,14 +181,10 @@ def get_all_roles_in_scope(scope: ScopeData) -> list[list[str]]:
list[list[str]]: A list of policies in the specified scope.
"""
enforcer = AuthzEnforcer.get_enforcer()
return enforcer.get_filtered_grouping_policy(
GroupingPolicyIndex.SCOPE.value, scope.namespaced_key
)
return enforcer.get_filtered_grouping_policy(GroupingPolicyIndex.SCOPE.value, scope.namespaced_key)


def assign_role_to_subject_in_scope(
subject: SubjectData, role: RoleData, scope: ScopeData
) -> bool:
def assign_role_to_subject_in_scope(subject: SubjectData, role: RoleData, scope: ScopeData) -> bool:
"""Assign a role to a subject.

Args:
Expand All @@ -220,9 +203,7 @@ def assign_role_to_subject_in_scope(
)


def batch_assign_role_to_subjects_in_scope(
subjects: list[SubjectData], role: RoleData, scope: ScopeData
) -> None:
def batch_assign_role_to_subjects_in_scope(subjects: list[SubjectData], role: RoleData, scope: ScopeData) -> None:
"""Assign a role to a list of subjects.

Args:
Expand All @@ -233,9 +214,7 @@ def batch_assign_role_to_subjects_in_scope(
assign_role_to_subject_in_scope(subject, role, scope)


def unassign_role_from_subject_in_scope(
subject: SubjectData, role: RoleData, scope: ScopeData
) -> bool:
def unassign_role_from_subject_in_scope(subject: SubjectData, role: RoleData, scope: ScopeData) -> bool:
"""Unassign a role from a subject.

Args:
Expand All @@ -247,14 +226,10 @@ def unassign_role_from_subject_in_scope(
bool: True if the role was unassigned successfully, False otherwise.
"""
enforcer = AuthzEnforcer.get_enforcer()
return enforcer.delete_roles_for_user_in_domain(
subject.namespaced_key, role.namespaced_key, scope.namespaced_key
)
return enforcer.delete_roles_for_user_in_domain(subject.namespaced_key, role.namespaced_key, scope.namespaced_key)


def batch_unassign_role_from_subjects_in_scope(
subjects: list[SubjectData], role: RoleData, scope: ScopeData
) -> None:
def batch_unassign_role_from_subjects_in_scope(subjects: list[SubjectData], role: RoleData, scope: ScopeData) -> None:
"""Unassign a role from a list of subjects.

Args:
Expand All @@ -277,9 +252,7 @@ def get_subject_role_assignments(subject: SubjectData) -> list[RoleAssignmentDat
"""
enforcer = AuthzEnforcer.get_enforcer()
role_assignments = []
for policy in enforcer.get_filtered_grouping_policy(
GroupingPolicyIndex.SUBJECT.value, subject.namespaced_key
):
for policy in enforcer.get_filtered_grouping_policy(GroupingPolicyIndex.SUBJECT.value, subject.namespaced_key):
role = RoleData(namespaced_key=policy[GroupingPolicyIndex.ROLE.value])
role.permissions = get_permissions_for_single_role(role)

Expand All @@ -293,9 +266,7 @@ def get_subject_role_assignments(subject: SubjectData) -> list[RoleAssignmentDat
return role_assignments


def get_subject_role_assignments_in_scope(
subject: SubjectData, scope: ScopeData
) -> list[RoleAssignmentData]:
def get_subject_role_assignments_in_scope(subject: SubjectData, scope: ScopeData) -> list[RoleAssignmentData]:
"""Get the roles for a subject in a specific scope.

Args:
Expand All @@ -308,9 +279,7 @@ def get_subject_role_assignments_in_scope(
enforcer = AuthzEnforcer.get_enforcer()
# TODO: we still need to get the remaining data for the role like email, etc
role_assignments = []
for namespaced_key in enforcer.get_roles_for_user_in_domain(
subject.namespaced_key, scope.namespaced_key
):
for namespaced_key in enforcer.get_roles_for_user_in_domain(subject.namespaced_key, scope.namespaced_key):
role = RoleData(namespaced_key=namespaced_key)
role_assignments.append(
RoleAssignmentData(
Expand All @@ -327,9 +296,7 @@ def get_subject_role_assignments_in_scope(
return role_assignments


def get_subject_role_assignments_for_role_in_scope(
role: RoleData, scope: ScopeData
) -> list[RoleAssignmentData]:
def get_subject_role_assignments_for_role_in_scope(role: RoleData, scope: ScopeData) -> list[RoleAssignmentData]:
"""Get the subjects assigned to a specific role in a specific scope.

Args:
Expand All @@ -341,9 +308,7 @@ def get_subject_role_assignments_for_role_in_scope(
"""
enforcer = AuthzEnforcer.get_enforcer()
role_assignments = []
for subject in enforcer.get_users_for_role_in_domain(
role.namespaced_key, scope.namespaced_key
):
for subject in enforcer.get_users_for_role_in_domain(role.namespaced_key, scope.namespaced_key):
if subject.startswith(f"{RoleData.NAMESPACE}{RoleData.SEPARATOR}"):
# Skip roles that are also subjects
continue
Expand Down
12 changes: 10 additions & 2 deletions openedx_authz/engine/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,11 @@ def is_filtered(self) -> bool:
"""
return True

def load_filtered_policy(self, model: Model, filter: Filter) -> None: # pylint: disable=redefined-builtin
def load_filtered_policy(
self,
model: Model,
filter: Filter, # pylint: disable=redefined-builtin
) -> None:
"""
Load policy rules from storage with filtering applied.

Expand All @@ -99,7 +103,11 @@ def load_filtered_policy(self, model: Model, filter: Filter) -> None: # pylint:
for line in filtered_queryset:
persist.load_policy_line(str(line), model)

def filter_query(self, queryset: QuerySet, filter: Filter) -> QuerySet: # pylint: disable=redefined-builtin
def filter_query(
self,
queryset: QuerySet,
filter: Filter, # pylint: disable=redefined-builtin
) -> QuerySet:
"""
Apply filter criteria to the policy queryset.

Expand Down
20 changes: 5 additions & 15 deletions openedx_authz/engine/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,31 +51,21 @@ def migrate_policy_between_enforcers(

for grouping_policy_ptype in GROUPING_POLICY_PTYPES:
try:
grouping_policies = source_enforcer.get_named_grouping_policy(
grouping_policy_ptype
)
grouping_policies = source_enforcer.get_named_grouping_policy(grouping_policy_ptype)
for grouping in grouping_policies:
if target_enforcer.has_named_grouping_policy(
grouping_policy_ptype, *grouping
):
if target_enforcer.has_named_grouping_policy(grouping_policy_ptype, *grouping):
logger.info(
f"Grouping policy {grouping_policy_ptype}, {grouping} already exists in target, skipping."
)
continue
target_enforcer.add_named_grouping_policy(
grouping_policy_ptype, *grouping
)
target_enforcer.add_named_grouping_policy(grouping_policy_ptype, *grouping)

# Ensure latest policies are loaded in the target enforcer after each addition
# to avoid duplicates
target_enforcer.load_policy()
except KeyError as e:
logger.info(
f"Skipping {grouping_policy_ptype} policies: {e} not found in source enforcer."
)
logger.info(
f"Successfully loaded policies from {source_enforcer.get_model()} into the database."
)
logger.info(f"Skipping {grouping_policy_ptype} policies: {e} not found in source enforcer.")
logger.info(f"Successfully loaded policies from {source_enforcer.get_model()} into the database.")
except Exception as e:
logger.error(f"Error loading policies from file: {e}")
raise
26 changes: 6 additions & 20 deletions openedx_authz/management/commands/enforcement.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,7 @@ def handle(self, *args, **options):
Raises:
CommandError: If model or policy files are not found or enforcer creation fails.
"""
model_file_path = (
self._get_file_path("model.conf") or options["model_file_path"]
)
model_file_path = self._get_file_path("model.conf") or options["model_file_path"]
policy_file_path = options["policy_file_path"]

if not os.path.isfile(model_file_path):
Expand All @@ -95,9 +93,7 @@ def handle(self, *args, **options):

try:
enforcer = casbin.Enforcer(model_file_path, policy_file_path)
self.stdout.write(
self.style.SUCCESS("Casbin enforcer created successfully")
)
self.stdout.write(self.style.SUCCESS("Casbin enforcer created successfully"))

policies = enforcer.get_policy()
roles = enforcer.get_grouping_policy()
Expand Down Expand Up @@ -160,9 +156,7 @@ def _run_interactive_mode(self, enforcer: casbin.Enforcer) -> None:
self.stdout.write(self.style.ERROR("Exiting interactive mode..."))
break

def _test_interactive_request(
self, enforcer: casbin.Enforcer, user_input: str
) -> None:
def _test_interactive_request(self, enforcer: casbin.Enforcer, user_input: str) -> None:
"""Process and test a single enforcement request from user input.

Parses the input string, validates the format, executes the enforcement
Expand All @@ -180,11 +174,7 @@ def _test_interactive_request(
try:
parts = [part.strip() for part in user_input.split()]
if len(parts) != 3:
self.stdout.write(
self.style.ERROR(
f"✗ Invalid format. Expected 3 parts, got {len(parts)}"
)
)
self.stdout.write(self.style.ERROR(f"✗ Invalid format. Expected 3 parts, got {len(parts)}"))
self.stdout.write("Format: subject action scope")
self.stdout.write("Example: user^alice act^read org^OpenedX")
return
Expand All @@ -193,13 +183,9 @@ def _test_interactive_request(
result = enforcer.enforce(subject, action, scope)

if result:
self.stdout.write(
self.style.SUCCESS(f"✓ ALLOWED: {subject} {action} {scope}")
)
self.stdout.write(self.style.SUCCESS(f"✓ ALLOWED: {subject} {action} {scope}"))
else:
self.stdout.write(
self.style.ERROR(f"✗ DENIED: {subject} {action} {scope}")
)
self.stdout.write(self.style.ERROR(f"✗ DENIED: {subject} {action} {scope}"))

except (ValueError, IndexError, TypeError) as e:
self.stdout.write(self.style.ERROR(f"✗ Error processing request: {str(e)}"))
Loading