Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ omit =
*admin.py
*/static/*
*/templates/*
*/tests/*
1 change: 1 addition & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,7 @@ def on_init(app): # pylint: disable=unused-argument
docs_path,
os.path.join(root_path, "openedx_authz"),
os.path.join(root_path, "openedx_authz/migrations"),
os.path.join(root_path, "openedx_authz/tests"),
]
)

Expand Down
6 changes: 5 additions & 1 deletion openedx_authz/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
"""
One-line description for README and other doc files.
Open edX AuthZ provides the architecture and foundations of the authorization framework.
"""

import os

__version__ = "0.1.0"

ROOT_DIRECTORY = os.path.dirname(os.path.abspath(__file__))
1 change: 1 addition & 0 deletions openedx_authz/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ class OpenedxAuthzConfig(AppConfig):
"""

name = "openedx_authz"
plugin_app = {}
11 changes: 11 additions & 0 deletions openedx_authz/engine/config/authz.policy
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# ===== ACTION GROUPING (g2) =====

# manage implies edit, delete, read, write
g2, act:manage, act:edit
g2, act:manage, act:delete
g2, act:edit, act:read
g2, act:edit, act:write

# edit implies read, write
g2, act:edit, act:read
g2, act:edit, act:write
91 changes: 91 additions & 0 deletions openedx_authz/engine/config/model.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
############################################
# Open edX AuthZ — Casbin Model Configuration
#
# This model supports:
# - Scoped role assignments (user roles tied to specific contexts)
# - Action grouping (manage → read/write/edit/delete to reduce duplication)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the difference between write and edit here? Is write just create + edit?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, this example can also be seen like this:

# manage implies edit and delete
["g2", "act:manage", "act:edit"],
["g2", "act:manage", "act:delete"],
# edit implies read and write
["g2", "act:edit", "act:read"],
["g2", "act:edit", "act:write"],

by inheritance manage would include all permissions

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah thank you, I think I missed that there could be multiple levels of inheritance

# - System-wide roles (global scope "*" applies everywhere)
# - Negative rules (deny overrides allow for exceptions)
# - Namespace support (course:*, lib:*, org:*, etc.)
# - Extensibility (new resource types just need new namespaces)
############################################

[request_definition]
# Request format: subject (user), action, scope (specific resource being accessed)
#
# sub = subject/principal with namespace (e.g., "user:alice", "service:lms")
# act = action with namespace (e.g., "act:read", "act:manage", "act:edit-courses")
# scope = authorization scope context (e.g., "org:OpenedX", "course-v1:...", "*" for global)
#
# SCOPE SEMANTICS:
# Scope determines the authorization context and which role assignments apply
# - "*" = global scope (system-wide roles apply everywhere)
# - "org:OpenedX" = organization-scoped roles (apply within OpenedX org)
# - "course-v1:..." = course-scoped roles (apply within specific course)
#
# Application must provide appropriate scope based on business logic.
r = sub, act, scope

[policy_definition]
# Policy format: subject (role), action, scope (pattern), effect
#
# sub = role or user with namespace (e.g., "role:org_admin", "user:bob")
# act = action identifier (e.g., "act:manage", "act:read", "act:edit-courses")
# scope = scope where policy applies (e.g., "*", "org:*", "course-v1:*", "lib:*")
# eft = "allow" or "deny" (deny overrides allow for exceptions)
p = sub, act, scope, eft

[role_definition]
# g: Role assignments with scope
# Format: user/subject, role, scope
#
# Examples:
# g, user:alice, role:org_admin, org:OpenedX # Alice is org admin for OpenedX
# g, user:bob, role:course_instructor, course-v1:... # Bob is instructor for specific course
# g, user:carol, role:library_admin, * # Carol is global library admin
#
# Role hierarchy (optional):
# g, role:org_admin, role:org_editor, org:OpenedX # org_admin inherits org_editor permissions
g = _, _, _

# g2: Action grouping and implications
# Maps high-level actions to specific actions to reduce policy duplication
#
# Examples:
# g2, act:manage, act:edit # manage implies edit
# g2, act:manage, act:delete # manage implies delete
# g2, act:edit-courses, act:read # edit-courses implies read (for resource access)
# g2, act:edit-courses, act:write # edit-courses implies write (for resource modification)
g2 = _, _

[policy_effect]
# Deny-override policy: allow if any rule allows AND no rule denies
# This enables negative rules/exceptions (e.g., "manage all courses except course Z")
#
# Evaluation order:
# 1. Check if any policy grants allow
# 2. Check if any policy specifies deny
# 3. If deny found, result is deny (exceptions win)
# 4. If allow found and no deny, result is allow
# 5. If no matches, result is deny (default secure)
e = some(where (p.eft == allow)) && !some(where (p.eft == deny))

[matchers]
# Authorization matching logic
#
# ROLE MATCHING:
# - g(r.sub, p.sub, r.scope): check if subject has role in requested scope
# - g(r.sub, p.sub, "*"): check if subject has role in all resources in the scope
#
# SCOPE MATCHING:
# - keyMatch(r.scope, p.scope): scope matches pattern
#
# ACTION MATCHING:
# - r.act == p.act: exact action match
# - g2(p.act, r.act): policy action implies requested action via grouping
#
# All conditions must be true for a policy to match:
# 1. Subject must have role in scope OR global role
# 2. Scope must match pattern
# 3. Action must match OR inherit via action grouping
m = (g(r.sub, p.sub, r.scope) || g(r.sub, p.sub, "*")) && keyMatch(r.scope, p.scope) && (r.act == p.act || g2(p.act, r.act))
Empty file.
Empty file.
187 changes: 187 additions & 0 deletions openedx_authz/management/commands/enforcement.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
"""
Django management command for interactive Casbin enforcement testing.

This command creates a Casbin enforcer using the model.conf configuration and a
user-specified policy file, then provides an interactive mode for testing
authorization enforcement requests.

The command supports:
- Loading Casbin model from the built-in model.conf file
- Using custom policy files (specified via --policy-file-path argument)
- Interactive testing with format: subject action scope
- Real-time enforcement results with visual feedback (✓ ALLOWED / ✗ DENIED)
- Display of loaded policies, role assignments, and action grouping rules

Example usage:
python manage.py enforcement --policy-file-path /path/to/authz.policy

Example test input:
user:alice act:read org:OpenedX
"""

import os

import casbin
from django.core.management.base import BaseCommand, CommandError

from openedx_authz import ROOT_DIRECTORY


class Command(BaseCommand):
"""
Django management command for interactive Casbin enforcement testing.

This command loads a Casbin model configuration and user-specified policy file
to create an enforcer instance, then provides an interactive shell for testing
authorization requests in real-time with immediate feedback.
"""

help = (
"Interactive mode for testing Casbin enforcement policies using model.conf and a custom policy file. "
"Provides real-time authorization testing with format: subject action scope. "
"Use --policy-file-path to specify the policy file location."
)
Comment on lines +42 to +47
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More than testing I see this as a simulation for a real environment to test whether our model is working properly. Can we change the wording to reflect that? Also I think we need to change requests.txt for requests.sample.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I updated the command. What do you think?


def add_arguments(self, parser) -> None:
"""Add command-line arguments to the argument parser.

Args:
parser: The Django argument parser instance to configure.
"""
parser.add_argument(
"--policy-file-path",
type=str,
required=True,
help="Path to the Casbin policy file (supports CSV format with policies, roles, and action grouping)",
)
parser.add_argument(
"--model-file-path",
type=str,
required=False,
help="Path to the Casbin model file. If not provided, the default model.conf file will be used.",
)

def handle(self, *args, **options):
"""Execute the enforcement testing command.

Loads the Casbin model and policy files, creates an enforcer instance,
displays configuration summary, and starts the interactive testing mode.

Args:
*args: Positional command arguments (unused).
**options: Command options including `policy_file_path`.

Raises:
CommandError: If model or policy files are not found or enforcer creation fails.
"""
model_file_path = self._get_file_path("model.conf") or options["model_file_path"]
policy_file_path = options["policy_file_path"]

if not os.path.isfile(model_file_path):
raise CommandError(f"Model file not found: {model_file_path}")
if not os.path.isfile(policy_file_path):
raise CommandError(f"Policy file not found: {policy_file_path}")

self.stdout.write(self.style.SUCCESS("Casbin Interactive Enforcement"))
self.stdout.write(f"Model file path: {model_file_path}")
self.stdout.write(f"Policy file path: {policy_file_path}")
self.stdout.write("")

try:
enforcer = casbin.Enforcer(model_file_path, policy_file_path)
self.stdout.write(self.style.SUCCESS("Casbin enforcer created successfully"))

policies = enforcer.get_policy()
roles = enforcer.get_grouping_policy()
action_grouping = enforcer.get_named_grouping_policy("g2")

self.stdout.write(f"✓ Loaded {len(policies)} policies")
self.stdout.write(f"✓ Loaded {len(roles)} role assignments")
self.stdout.write(f"✓ Loaded {len(action_grouping)} action grouping rules")
self.stdout.write("")

self._run_interactive_mode(enforcer)

except Exception as e:
raise CommandError(f"Error creating Casbin enforcer: {str(e)}") from e

def _get_file_path(self, file_name: str) -> str:
"""Construct the full file path for a configuration file.

Args:
file_name (str): The name of the configuration file (e.g., 'model.conf').

Returns:
str: The absolute path to the configuration file in the engine/config directory.
"""
return os.path.join(ROOT_DIRECTORY, "engine", "config", file_name)

def _run_interactive_mode(self, enforcer: casbin.Enforcer) -> None:
"""Start the interactive enforcement testing shell.

Provides a continuous loop where users can input enforcement requests
in the format 'subject action scope' and receive immediate
authorization results with visual feedback.

Args:
enforcer (casbin.Enforcer): The configured Casbin enforcer instance for testing.

Note:
Exit the interactive mode with Ctrl+C or Ctrl+D.
"""
self.stdout.write(self.style.SUCCESS("Interactive Mode"))
self.stdout.write("Test custom enforcement requests interactively.")
self.stdout.write("Enter 'quit', 'exit', or 'q' to exit the interactive mode.")
self.stdout.write("")
self.stdout.write("Format: subject action scope")
self.stdout.write("Example: user:alice act:read org:OpenedX")
self.stdout.write("")

while True:
try:
user_input = input("Enter enforcement test: ").strip()

if not user_input:
continue

if user_input.lower() in ["quit", "exit", "q"]:
break

self._test_interactive_request(enforcer, user_input)
except (KeyboardInterrupt, EOFError):
self.stdout.write(self.style.ERROR("Exiting interactive mode..."))
break

def _test_interactive_request(self, enforcer: casbin.Enforcer, user_input: str) -> None:
"""Process and test a single enforcement request from user input.

Parses the input string, validates the format, executes the enforcement
check, and displays the result with appropriate styling.

Args:
enforcer (casbin.Enforcer): The Casbin enforcer instance to use for testing.
user_input (str): The user's input string in format 'subject action scope'.

Expected format:
subject: The requesting entity (e.g., 'user:alice')
action: The requested action (e.g., 'act:read')
scope: The authorization context (e.g., 'org:OpenedX')
"""
try:
parts = [part.strip() for part in user_input.split()]
if len(parts) != 3:
self.stdout.write(self.style.ERROR(f"✗ Invalid format. Expected 3 parts, got {len(parts)}"))
self.stdout.write("Format: subject action scope")
self.stdout.write("Example: user:alice act:read org:OpenedX")
return

subject, action, scope = parts
result = enforcer.enforce(subject, action, scope)

if result:
self.stdout.write(self.style.SUCCESS(f"✓ ALLOWED: {subject} {action} {scope}"))
else:
self.stdout.write(self.style.ERROR(f"✗ DENIED: {subject} {action} {scope}"))

except (ValueError, IndexError, TypeError) as e:
self.stdout.write(self.style.ERROR(f"✗ Error processing request: {str(e)}"))
5 changes: 5 additions & 0 deletions openedx_authz/tests/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""
Tests package for openedx_authz.

This package contains all tests for the openedx_authz application.
"""
Loading