Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ Unreleased

*

0.4.1 - 2025-10-16
******************

Fixed
=====

* Load policy before adding policies in the loading script to avoid duplicates.

0.4.0 - 2025-16-10
******************

Expand Down
2 changes: 1 addition & 1 deletion openedx_authz/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@

import os

__version__ = "0.4.0"
__version__ = "0.4.1"

ROOT_DIRECTORY = os.path.dirname(os.path.abspath(__file__))
38 changes: 32 additions & 6 deletions openedx_authz/engine/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,24 +27,50 @@ def migrate_policy_between_enforcers(
# Load latest policies from the source enforcer
source_enforcer.load_policy()
policies = source_enforcer.get_policy()
logger.info(f"Loaded {len(policies)} policies from source enforcer.")

# Load target enforcer policies to check for duplicates
target_enforcer.load_policy()
logger.info(f"Target enforcer has {len(target_enforcer.get_policy())} existing policies before migration.")

# TODO: this operations use the enforcer directly, which may not be ideal
# since we have to load the policy after each addition to avoid duplicates.
# I think we should consider using an API which can validate whether
# all policies exist before adding them or we have the
# latest policies loaded in the enforcer.

for policy in policies:
if not target_enforcer.has_policy(*policy):
target_enforcer.add_policy(*policy)
if target_enforcer.has_policy(*policy):
logger.info(f"Policy {policy} already exists in target, skipping.")
continue
target_enforcer.add_policy(*policy)

# Ensure latest policies are loaded in the target enforcer after each addition
# to avoid duplicates
target_enforcer.load_policy()

for grouping_policy_ptype in GROUPING_POLICY_PTYPES:
try:
grouping_policies = source_enforcer.get_named_grouping_policy(
grouping_policy_ptype
)
for grouping in grouping_policies:
if not target_enforcer.has_named_grouping_policy(
if target_enforcer.has_named_grouping_policy(
grouping_policy_ptype, *grouping
):
target_enforcer.add_named_grouping_policy(
grouping_policy_ptype, *grouping
logger.info(
f"Grouping policy {grouping_policy_ptype}, {grouping} already exists in target, skipping."
)
continue
target_enforcer.add_named_grouping_policy(
grouping_policy_ptype, *grouping
)

# Ensure latest policies are loaded in the target enforcer after each addition
# to avoid duplicates
target_enforcer.load_policy()
except KeyError as e:
logger.debug(
logger.info(
f"Skipping {grouping_policy_ptype} policies: {e} not found in source enforcer."
)
logger.info(
Expand Down
Loading