diff --git a/CHANGELOG.rst b/CHANGELOG.rst index b87f7f31..58b24951 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -14,6 +14,14 @@ Change Log Unreleased ********** +1.10.0 - 2026-04-16 +******************* + +Added +===== + +* Add ``scopes/`` endpoint to list all scopes (courses and libraries), sorted by org, with search and pagination support. + 1.9.0 - 2026-04-14 ****************** diff --git a/openedx_authz/__init__.py b/openedx_authz/__init__.py index 96948662..762138e2 100644 --- a/openedx_authz/__init__.py +++ b/openedx_authz/__init__.py @@ -4,6 +4,6 @@ import os -__version__ = "1.9.0" +__version__ = "1.10.0" ROOT_DIRECTORY = os.path.dirname(os.path.abspath(__file__)) diff --git a/openedx_authz/api/data.py b/openedx_authz/api/data.py index 2319fd8d..042c2e32 100644 --- a/openedx_authz/api/data.py +++ b/openedx_authz/api/data.py @@ -14,7 +14,12 @@ from opaque_keys.edx.locator import LibraryLocatorV2 from organizations.models import Organization -from openedx_authz.constants.permissions import COURSES_VIEW_COURSE_TEAM, VIEW_LIBRARY_TEAM +from openedx_authz.constants.permissions import ( + COURSES_MANAGE_COURSE_TEAM, + COURSES_VIEW_COURSE_TEAM, + MANAGE_LIBRARY_TEAM, + VIEW_LIBRARY_TEAM, +) from openedx_authz.data import AUTHZ_POLICY_ATTRIBUTES_SEPARATOR, ActionData, AuthzBaseClass, AuthZData, PermissionData from openedx_authz.models.scopes import get_content_library_model, get_course_overview_model @@ -357,6 +362,20 @@ def get_admin_view_permission(cls) -> PermissionData: """ raise NotImplementedError("Subclasses must implement get_admin_view_permission method.") + @classmethod + @abstractmethod + def get_admin_manage_permission(cls) -> PermissionData: + """Get the permission required to manage this scope + + This method should be implemented on every ScopeData subclass to define + which permission to check against when a user tries to manage assignations + related to this scope in the Admin Console. + + Returns: + PermissionData: The permission required to manage this scope in the admin console. + """ + raise NotImplementedError("Subclasses must implement get_admin_manage_permission method.") + @abstractmethod def get_object(self) -> Any | None: """Retrieve the underlying domain object that this scope represents. @@ -463,6 +482,15 @@ def get_admin_view_permission(cls) -> PermissionData: """ return VIEW_LIBRARY_TEAM + @classmethod + def get_admin_manage_permission(cls) -> PermissionData: + """Get the permission required to manage this scope + + Returns: + PermissionData: The permission required to manage this scope in the admin console. + """ + return MANAGE_LIBRARY_TEAM + def get_object(self) -> ContentLibrary | None: """Retrieve the ContentLibrary instance associated with this scope. @@ -585,6 +613,15 @@ def get_admin_view_permission(cls) -> PermissionData: """ return COURSES_VIEW_COURSE_TEAM + @classmethod + def get_admin_manage_permission(cls) -> PermissionData: + """Get the permission required to manage this scope + + Returns: + PermissionData: The permission required to manage this scope in the admin console. + """ + return COURSES_MANAGE_COURSE_TEAM + def get_object(self) -> CourseOverview | None: """Retrieve the CourseOverview instance associated with this scope. @@ -715,6 +752,15 @@ def build_external_key(cls, org: str) -> str: """ return f"{cls.NAMESPACE}{EXTERNAL_KEY_SEPARATOR}{org}{cls.ID_SEPARATOR}{GLOBAL_SCOPE_WILDCARD}" + @classmethod + def get_admin_manage_permission(cls) -> PermissionData: + """Get the permission required to manage this scope + + Returns: + PermissionData: The permission required to manage this scope in the admin console. + """ + raise NotImplementedError("Subclasses must implement get_admin_manage_permission method.") + @classmethod def get_org(cls, external_key: str) -> str | None: """Extract the organization identifier from the glob pattern. @@ -813,6 +859,15 @@ def get_admin_view_permission(cls) -> PermissionData: """ return VIEW_LIBRARY_TEAM + @classmethod + def get_admin_manage_permission(cls) -> PermissionData: + """Get the permission required to manage this scope + + Returns: + PermissionData: The permission required to manage this scope in the admin console. + """ + return MANAGE_LIBRARY_TEAM + @define class OrgCourseOverviewGlobData(OrgGlobData): @@ -862,6 +917,15 @@ def get_admin_view_permission(cls) -> PermissionData: """ return COURSES_VIEW_COURSE_TEAM + @classmethod + def get_admin_manage_permission(cls) -> PermissionData: + """Get the permission required to manage this scope + + Returns: + PermissionData: The permission required to manage this scope in the admin console. + """ + return COURSES_MANAGE_COURSE_TEAM + class CCXCourseOverviewData(CourseOverviewData): """CCX course scope for authorization in the Open edX platform. diff --git a/openedx_authz/rest_api/data.py b/openedx_authz/rest_api/data.py index 3e3d1e24..5f24dc9e 100644 --- a/openedx_authz/rest_api/data.py +++ b/openedx_authz/rest_api/data.py @@ -69,3 +69,19 @@ class RoleOperationError(BaseEnum): USER_DOES_NOT_HAVE_ROLE = "user_does_not_have_role" ROLE_ASSIGNMENT_ERROR = "role_assignment_error" ROLE_REMOVAL_ERROR = "role_removal_error" + + +class ScopesQuerySetFields(BaseEnum): + """Enum for the annotated fields used in the Scopes query set for the scopes endpoint""" + + SCOPE_ID = "scope_id" + DISPLAY_NAME_COL = "display_name_col" + ORG_NAME = "org_name" + SCOPE_TYPE = "scope_type" + + +class ScopesTypeField(BaseEnum): + """Enum for the scope_type query field on the scopes endpoint""" + + COURSE = "course" + LIBRARY = "library" diff --git a/openedx_authz/rest_api/v1/serializers.py b/openedx_authz/rest_api/v1/serializers.py index e095f12e..1e484036 100644 --- a/openedx_authz/rest_api/v1/serializers.py +++ b/openedx_authz/rest_api/v1/serializers.py @@ -1,11 +1,19 @@ """Serializers for the Open edX AuthZ REST API.""" from django.contrib.auth import get_user_model +from opaque_keys.edx.locator import LibraryLocatorV2 +from organizations.serializers import OrganizationSerializer from rest_framework import serializers from openedx_authz import api from openedx_authz.api.data import UserAssignments -from openedx_authz.rest_api.data import AssignmentSortField, SortField, SortOrder, UserAssignmentSortField +from openedx_authz.rest_api.data import ( + AssignmentSortField, + ScopesTypeField, + SortField, + SortOrder, + UserAssignmentSortField, +) from openedx_authz.rest_api.utils import get_generic_scope from openedx_authz.rest_api.v1.fields import ( CaseSensitiveCommaSeparatedListField, @@ -49,6 +57,12 @@ class OrderMixin(serializers.Serializer): # pylint: disable=abstract-method ) +class OrgMixin(serializers.Serializer): # pylint: disable=abstract-method + """Mixin providing org field functionality.""" + + org = serializers.CharField(required=False, max_length=255) + + class PermissionValidationSerializer(ActionMixin, ScopeMixin): # pylint: disable=abstract-method """Serializer for permission validation request.""" @@ -215,6 +229,16 @@ def get_roles(self, obj: api.RoleAssignmentData) -> list[str]: return [role.external_key for role in obj.roles] +class ListScopesQuerySerializer(OrgMixin): # pylint: disable=abstract-method + """Serializer for validating query parameters in ScopesAPIView.""" + + management_permission_only = serializers.BooleanField(required=False, default=False) + scope_type = serializers.ChoiceField( + choices=[(e.value, e.name) for e in ScopesTypeField], required=False, default=None, allow_null=True + ) + search = serializers.CharField(required=False, default="", allow_blank=True) + + class ListTeamMembersSerializer(OrderMixin): # pylint: disable=abstract-method """ Serializer for listing team members. @@ -379,3 +403,28 @@ class ListAssignmentsQuerySerializer(ListTeamMemberAssignmentsQuerySerializer): choices=[(e.value, e.name) for e in UserAssignmentSortField], default=UserAssignmentSortField.FULL_NAME, ) + + +class ScopeSerializer(serializers.Serializer): # pylint: disable=abstract-method + """ + Serializer for scope. + """ + + external_key = serializers.SerializerMethodField() + display_name = serializers.SerializerMethodField() + org = serializers.SerializerMethodField() + + def get_external_key(self, obj: dict) -> str: + """Get the external key for the given scope.""" + if obj["scope_type"] == ScopesTypeField.LIBRARY: + return str(LibraryLocatorV2(org=obj["org_name"], slug=obj["scope_id"])) + return obj["scope_id"] + + def get_display_name(self, obj: dict) -> str: + """Get the display name for the given scope.""" + return str(obj.get("display_name_col") or "") + + def get_org(self, obj: dict) -> dict | None: + """Get the org for the given scope.""" + org = self.context.get("org_map", {}).get(obj["org_name"]) + return OrganizationSerializer(org).data if org else None diff --git a/openedx_authz/rest_api/v1/urls.py b/openedx_authz/rest_api/v1/urls.py index 4bb1f896..6c7def28 100644 --- a/openedx_authz/rest_api/v1/urls.py +++ b/openedx_authz/rest_api/v1/urls.py @@ -19,4 +19,5 @@ "users//assignments/", views.TeamMemberAssignmentsAPIView.as_view(), name="user-assignment-list" ), path("assignments/", views.AssignmentsAPIView.as_view(), name="assignment-list"), + path("scopes/", views.ScopesAPIView.as_view(), name="scope-list"), ] diff --git a/openedx_authz/rest_api/v1/views.py b/openedx_authz/rest_api/v1/views.py index 766601ab..a2e43cb8 100644 --- a/openedx_authz/rest_api/v1/views.py +++ b/openedx_authz/rest_api/v1/views.py @@ -6,10 +6,13 @@ """ import logging +import operator +from functools import reduce import edx_api_doc_tools as apidocs from django.contrib.auth import get_user_model -from django.db.models import QuerySet +from django.db.models import CharField, Q, QuerySet, Value +from django.db.models.functions import Cast from django.http import HttpRequest from django.utils.decorators import method_decorator from edx_api_doc_tools import schema_for @@ -20,14 +23,24 @@ from rest_framework.views import APIView from openedx_authz import api -from openedx_authz.api.data import RoleAssignmentData, SuperAdminAssignmentData, UserAssignmentData +from openedx_authz.api.data import ( + ContentLibraryData, + CourseOverviewData, + OrgContentLibraryGlobData, + OrgCourseOverviewGlobData, + RoleAssignmentData, + SuperAdminAssignmentData, + UserAssignmentData, +) from openedx_authz.api.users import ( + get_scopes_for_user_and_permission, get_superadmin_assignments, get_visible_user_role_assignments_filtered_by_current_user, ) from openedx_authz.api.utils import get_user_map from openedx_authz.constants import permissions -from openedx_authz.rest_api.data import RoleOperationError, RoleOperationStatus +from openedx_authz.models.scopes import get_content_library_model, get_course_overview_model +from openedx_authz.rest_api.data import RoleOperationError, RoleOperationStatus, ScopesQuerySetFields, ScopesTypeField from openedx_authz.rest_api.decorators import authz_permissions, view_auth_classes from openedx_authz.rest_api.utils import ( filter_users, @@ -48,12 +61,14 @@ ListAssignmentsQuerySerializer, ListRolesWithScopeResponseSerializer, ListRolesWithScopeSerializer, + ListScopesQuerySerializer, ListTeamMemberAssignmentsQuerySerializer, ListTeamMembersSerializer, ListUsersInRoleWithScopeSerializer, PermissionValidationResponseSerializer, PermissionValidationSerializer, RemoveUsersFromRoleWithScopeSerializer, + ScopeSerializer, TeamMemberAssignmentSerializer, TeamMemberSerializer, TeamMemberUserAssignmentSerializer, @@ -66,6 +81,8 @@ logger = logging.getLogger(__name__) User = get_user_model() +ContentLibrary = get_content_library_model() +CourseOverview = get_course_overview_model() @view_auth_classes() @@ -562,6 +579,315 @@ def get_queryset(self) -> QuerySet: return Organization.objects.filter(active=True).order_by("name") +@view_auth_classes() +@method_decorator( + authz_permissions( + [ + permissions.VIEW_LIBRARY_TEAM.identifier, + permissions.COURSES_VIEW_COURSE_TEAM.identifier, + ] + ), + name="get", +) +@schema_for( + "get", + parameters=[ + apidocs.query_parameter("search", str, description="Filter scopes by display name"), + apidocs.query_parameter("org", str, description="Filter scopes by org"), + apidocs.query_parameter("page", int, description="Page number for pagination"), + apidocs.query_parameter("page_size", int, description="Number of items per page"), + apidocs.query_parameter( + "management_permission_only", + bool, + description=( + "If true, returns only scopes to which the calling user has manage team permission, " + "otherwise, returns any scope to which the user has view team permission." + ), + ), + apidocs.query_parameter( + "scope_type", + str, + description="Filter by scope type. Either 'course' or 'library'. Returns both if not specified.", + ), + ], + responses={ + status.HTTP_200_OK: ScopeSerializer(many=True), + status.HTTP_400_BAD_REQUEST: "The request parameters are invalid", + status.HTTP_401_UNAUTHORIZED: "The user is not authenticated", + status.HTTP_403_FORBIDDEN: "The user does not have the required permissions", + }, +) +class ScopesAPIView(generics.ListAPIView): + """ + API view for listing scopes + This API is used on the filters and assign roles functionality on the Admin Console. + + **Endpoints** + + - GET: Retrieve all scopes + + **Query Parameters** + + - search (Optional): Search term to filter scopes by display name + - org (Optional): Filter scopes by org + - page (Optional): Page number for pagination + - page_size (Optional): Number of items per page + - scope_type (Optional): Filter scopes by type. Supported values are `course` and `library`. + - management_permission_only (Optional): Filter scopes either by only the ones to which the user has "manage team" + permissions (if true), or just "view team" permissions. + + **Response Format** + + Returns a paginated list of scope objects, each containing: + + - external_key: The scope external key + - display_name: The scope's name + - org: The organization serialized object + + **Authentication and Permissions** + + - Requires authenticated user with either a content library or course view team permission. + + **Example Request** + + GET /api/authz/v1/scopes/?search=edx&page=1&page_size=10 + + **Example Response**:: + + { + "count": 1, + "next": null, + "previous": null, + "results": [ + { + "external_key": "course-v1:OpenedX+DemoX+DemoCourse", + "display_name": "Open edX Demo Course", + "org": { + "id": 1, + "created": "2026-04-02T19:30:36.779095Z", + "modified": "2026-04-02T19:30:36.779095Z", + "name": "OpenedX", + "short_name": "OpenedX", + "description": "", + "logo": null, + "active": true + } + }, + ] + } + """ + + serializer_class = ScopeSerializer + pagination_class = AuthZAPIViewPagination + permission_classes = [AnyScopePermission] + + # Priority for fields used for stable sorting (first has more priority) + ordering_priority = ( + ScopesQuerySetFields.ORG_NAME, + ScopesQuerySetFields.SCOPE_TYPE, + ScopesQuerySetFields.DISPLAY_NAME_COL, + ScopesQuerySetFields.SCOPE_ID, + ) + + def get_serializer_context(self): + context = super().get_serializer_context() + context["org_map"] = Organization.objects.filter(active=True).in_bulk(field_name="short_name") + return context + + def _get_courses_queryset( + self, + allowed_ids: set | None = None, + allowed_orgs: set | None = None, + search: str = "", + org: str = "", + ) -> QuerySet: + """Return a CourseOverview queryset projected to the unified scope shape. + + If allowed_ids and/or allowed_orgs are provided, filter to matching courses. + If search is provided, filter by display_name. + If org is provided, filter by org short_name. + """ + qs = CourseOverview.objects + if allowed_ids is not None or allowed_orgs is not None: + org_filter = Q(org__in=allowed_orgs) if allowed_orgs else Q() + id_filter = Q(id__in=allowed_ids) if allowed_ids else Q() + combined_filter = org_filter | id_filter + if not combined_filter: + qs = qs.none() + else: + qs = qs.filter(combined_filter) + if org: + qs = qs.filter(org=org) + if search: + qs = qs.filter(display_name__icontains=search) + return qs.annotate( + scope_id=Cast("id", output_field=CharField(db_collation="utf8mb4_unicode_ci")), + display_name_col=Cast("display_name", output_field=CharField(db_collation="utf8mb4_unicode_ci")), + org_name=Cast("org", output_field=CharField(db_collation="utf8mb4_unicode_ci")), + scope_type=Value(ScopesTypeField.COURSE, output_field=CharField(db_collation="utf8mb4_unicode_ci")), + ).values( + ScopesQuerySetFields.SCOPE_ID, + ScopesQuerySetFields.DISPLAY_NAME_COL, + ScopesQuerySetFields.ORG_NAME, + ScopesQuerySetFields.SCOPE_TYPE, + ) + + def _get_libraries_queryset( + self, + allowed_pairs: set | None = None, + allowed_orgs: set | None = None, + search: str = "", + org: str = "", + ) -> QuerySet: + """Return a ContentLibrary queryset projected to the unified scope shape. + + If allowed_pairs and/or allowed_orgs are provided, filter to matching libraries. + If search is provided, filter by learning_package__title. + If org is provided, filter by org short_name. + """ + qs = ContentLibrary.objects + if allowed_pairs is not None or allowed_orgs is not None: + org_filter = Q(org__short_name__in=allowed_orgs) if allowed_orgs else Q() + pair_filter = ( + reduce(operator.or_, (Q(org__short_name=org, slug=slug) for org, slug in allowed_pairs)) + if allowed_pairs + else Q() + ) + combined = org_filter | pair_filter + if not combined: + qs = qs.none() + else: + qs = qs.filter(combined) + if org: + qs = qs.filter(org__short_name=org) + if search: + qs = qs.filter(learning_package__title__icontains=search) + return qs.annotate( + scope_id=Cast("slug", output_field=CharField(db_collation="utf8mb4_unicode_ci")), + display_name_col=Cast("learning_package__title", output_field=CharField(db_collation="utf8mb4_unicode_ci")), + org_name=Cast("org__short_name", output_field=CharField(db_collation="utf8mb4_unicode_ci")), + scope_type=Value(ScopesTypeField.LIBRARY, output_field=CharField(db_collation="utf8mb4_unicode_ci")), + ).values( + ScopesQuerySetFields.SCOPE_ID, + ScopesQuerySetFields.DISPLAY_NAME_COL, + ScopesQuerySetFields.ORG_NAME, + ScopesQuerySetFields.SCOPE_TYPE, + ) + + @staticmethod + def _get_allowed_scope_queryset( + *, + username: str, + scope_cls: type, + glob_cls: type, + get_permission: callable, + queryset_builder: callable, + extract_ids: callable, + search: str = "", + org: str = "", + ) -> QuerySet: + """Resolve allowed scopes from Casbin and return a filtered queryset. + + This helper encapsulates the shared pattern of: + 1. Fetching allowed scopes for a user and permission. + 2. Partitioning them into specific IDs vs org-level globs. + 3. Delegating to the appropriate queryset builder. + + Args: + username: The username to check permissions for. + scope_cls: The concrete scope data class (e.g., CourseOverviewData). + glob_cls: The org-level glob class (e.g., OrgCourseOverviewGlobData). + get_permission: Callable that returns the permission for a scope class. + queryset_builder: Callable that builds the filtered queryset (e.g., _get_courses_queryset). + extract_ids: Callable that extracts specific IDs from non-glob scopes. + search: Optional search term to filter by display name. + org: Optional org short_name to filter by. + + Returns: + QuerySet: The filtered queryset projected to the unified scope shape. + """ + allowed_scopes = get_scopes_for_user_and_permission(username, get_permission(scope_cls).identifier) + specific_scopes = [s for s in allowed_scopes if not isinstance(s, glob_cls)] + allowed_ids = extract_ids(specific_scopes) + allowed_orgs = {s.org for s in allowed_scopes if isinstance(s, glob_cls)} + return queryset_builder(allowed_ids, allowed_orgs, search=search, org=org) + + def _build_queryset(self, courses_qs: QuerySet | None, libraries_qs: QuerySet | None) -> QuerySet: + """Union the provided querysets and sort deterministically. + + Orders by org_name first (satisfying the 'ordered by org' requirement), then by + scope_type, display_name_col, and scope_id as tiebreakers to ensure stable pagination. + """ + if courses_qs is not None and libraries_qs is not None: + return courses_qs.union(libraries_qs).order_by(*self.ordering_priority) + qs = courses_qs if courses_qs is not None else libraries_qs + return qs.order_by(*self.ordering_priority) + + def get_queryset(self) -> QuerySet: + """Return scopes ordered by org, filtered by the user's permissions.""" + user = self.request.user + + # Validate and parse query parameters. + params_serializer = ListScopesQuerySerializer(data=self.request.query_params) + params_serializer.is_valid(raise_exception=True) + scope_type = params_serializer.validated_data["scope_type"] + search = params_serializer.validated_data["search"] + org = params_serializer.validated_data.get("org", "") + + # Staff and superusers can see all scopes, skip permission filtering. + if user.is_staff or user.is_superuser: + return self._build_queryset( + courses_qs=( + self._get_courses_queryset(search=search, org=org) + if scope_type != ScopesTypeField.LIBRARY + else None + ), + libraries_qs=( + self._get_libraries_queryset(search=search, org=org) + if scope_type != ScopesTypeField.COURSE + else None + ), + ) + + management_only = params_serializer.validated_data["management_permission_only"] + + # Determine which permission to check based on the query parameter. + def get_permission(scope_cls): + return scope_cls.get_admin_manage_permission() if management_only else scope_cls.get_admin_view_permission() + + # Resolve allowed scopes from Casbin and build filtered querysets. + courses_qs = None + if scope_type != ScopesTypeField.LIBRARY: + courses_qs = self._get_allowed_scope_queryset( + username=user.username, + scope_cls=CourseOverviewData, + glob_cls=OrgCourseOverviewGlobData, + get_permission=get_permission, + queryset_builder=self._get_courses_queryset, + extract_ids=lambda scopes: {s.external_key for s in scopes}, + search=search, + org=org, + ) + + libraries_qs = None + if scope_type != ScopesTypeField.COURSE: + libraries_qs = self._get_allowed_scope_queryset( + username=user.username, + scope_cls=ContentLibraryData, + glob_cls=OrgContentLibraryGlobData, + get_permission=get_permission, + queryset_builder=self._get_libraries_queryset, + extract_ids=lambda scopes: { + (s.external_key.split(":")[1], s.external_key.split(":")[2]) for s in scopes + }, + search=search, + org=org, + ) + + # Union the requested querysets and sort by org at the DB level. + return self._build_queryset(courses_qs, libraries_qs) + + @view_auth_classes() class TeamMembersAPIView(APIView): """ diff --git a/openedx_authz/tests/rest_api/test_views.py b/openedx_authz/tests/rest_api/test_views.py index 713cc628..e721dc16 100644 --- a/openedx_authz/tests/rest_api/test_views.py +++ b/openedx_authz/tests/rest_api/test_views.py @@ -16,12 +16,21 @@ from rest_framework.test import APIClient from openedx_authz import api +from openedx_authz.api.data import ( + OrgContentLibraryGlobData, + OrgCourseOverviewGlobData, +) from openedx_authz.api.users import assign_role_to_user_in_scope from openedx_authz.constants import permissions, roles +from openedx_authz.models.scopes import get_content_library_model, get_course_overview_model from openedx_authz.rest_api.data import RoleOperationError, RoleOperationStatus from openedx_authz.rest_api.v1.permissions import AnyScopePermission, DynamicScopePermission -from openedx_authz.rest_api.v1.views import UserValidationAPIView +from openedx_authz.rest_api.v1.views import ScopesAPIView, UserValidationAPIView from openedx_authz.tests.api.test_roles import BaseRolesTestCase +from openedx_authz.tests.stubs.models import LearningPackage + +ContentLibrary = get_content_library_model() +CourseOverview = get_course_overview_model() User = get_user_model() @@ -839,6 +848,652 @@ def test_put_accepts_valid_full_course_key_scope(self, _mock_exists, _mock_assig self.assertEqual(len(response.data["completed"]), 1) +@ddt +class TestScopesAPIView(ViewTestMixin): + """ + Test suite for ScopesAPIView. + + Setup summary (from ViewTestMixin.setUpClass): + lib:Org1:LIB1 → admin_1 (library_admin), regular_1 (library_user), regular_2 (library_user) + lib:Org2:LIB2 → admin_2 (library_user), regular_3 (library_user), regular_4 (library_user) + lib:Org3:LIB3 → admin_3 (library_admin), regular_5 (library_admin), regular_6 (library_author), + regular_7 (library_contributor), regular_8 (library_user) + + Courses and ContentLibrary objects are mocked via get_scopes_for_user_and_permission + and the queryset helper methods, since those models live in openedx-platform. + """ + + COURSE_ORG1 = "course-v1:Org1+COURSE1+2024" + COURSE_ORG2 = "course-v1:Org2+COURSE2+2024" + LIBRARY_ORG1 = "lib:Org1:LIB1" + LIBRARY_ORG2 = "lib:Org2:LIB2" + + @classmethod + def setUpClass(cls): + """Assign course and library roles to test users.""" + super().setUpClass() + cls._assign_roles_to_users( + [ + # regular_9: can view course team on Org1 course + { + "subject_name": "regular_9", + "role_name": roles.COURSE_STAFF.external_key, + "scope_name": cls.COURSE_ORG1, + }, + # regular_10: can manage course team on Org2 course + { + "subject_name": "regular_10", + "role_name": roles.COURSE_ADMIN.external_key, + "scope_name": cls.COURSE_ORG2, + }, + ] + ) + + @classmethod + def setUpTestData(cls): + """Create Organization, CourseOverview and ContentLibrary fixtures.""" + super().setUpTestData() + + org1, _ = Organization.objects.get_or_create(name="Org1", short_name="Org1") + org2, _ = Organization.objects.get_or_create(name="Org2", short_name="Org2") + org3, _ = Organization.objects.get_or_create(name="Org3", short_name="Org3") + + CourseOverview.objects.get_or_create( + id=cls.COURSE_ORG1, defaults={"org": "Org1", "display_name": "Course Org1"} + ) + CourseOverview.objects.get_or_create( + id=cls.COURSE_ORG2, defaults={"org": "Org2", "display_name": "Course Org2"} + ) + + lp1, _ = LearningPackage.objects.get_or_create(title="Library Org1") + lp2, _ = LearningPackage.objects.get_or_create(title="Library Org2") + lp3, _ = LearningPackage.objects.get_or_create(title="Library Org3") + + ContentLibrary.objects.get_or_create( + slug="LIB1", + org=org1, + defaults={"locator": "lib:Org1:LIB1", "title": "Library Org1", "learning_package": lp1}, + ) + ContentLibrary.objects.get_or_create( + slug="LIB2", + org=org2, + defaults={"locator": "lib:Org2:LIB2", "title": "Library Org2", "learning_package": lp2}, + ) + ContentLibrary.objects.get_or_create( + slug="LIB3", + org=org3, + defaults={"locator": "lib:Org3:LIB3", "title": "Library Org3", "learning_package": lp3}, + ) + + def setUp(self): + """Set up test fixtures.""" + super().setUp() + self.url = reverse("openedx_authz:scope-list") + + # Default combined result used by most tests. + self.fake_scopes = [ + { + "scope_id": self.COURSE_ORG1, + "display_name_col": "Course Org1", + "org_name": "Org1", + "scope_type": "course", + }, + {"scope_id": "LIB1", "display_name_col": "Library LIB1", "org_name": "Org1", "scope_type": "library"}, + { + "scope_id": self.COURSE_ORG2, + "display_name_col": "Course Org2", + "org_name": "Org2", + "scope_type": "course", + }, + {"scope_id": "LIB2", "display_name_col": "Library LIB2", "org_name": "Org2", "scope_type": "library"}, + ] + + # Patch _build_queryset so tests don't need real DB querysets. + self.build_qs_patcher = patch.object( + ScopesAPIView, + "_build_queryset", + return_value=self.fake_scopes, + ) + self.build_qs_patcher.start() + self.addCleanup(self.build_qs_patcher.stop) + + # ------------------------------------------------------------------ # + # Authentication # + # ------------------------------------------------------------------ # + + def test_unauthenticated_returns_401(self): + """Unauthenticated requests are rejected.""" + self.client.force_authenticate(user=None) + + response = self.client.get(self.url) + + self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) + + # ------------------------------------------------------------------ # + # Response shape # + # ------------------------------------------------------------------ # + + def test_response_shape(self): + """Each result contains external_key, display_name, and org fields.""" + response = self.client.get(self.url) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + for item in response.data["results"]: + self.assertIn("external_key", item) + self.assertIn("display_name", item) + self.assertIn("org", item) + + # ------------------------------------------------------------------ # + # Sorted by org # + # ------------------------------------------------------------------ # + + def test_results_sorted_by_org(self): + """Results are sorted by org_name across courses and libraries.""" + # Stop only build_qs_patcher; libraries_qs_patcher stays active (uses stub-compatible field name). + self.build_qs_patcher.stop() + + response = self.client.get(self.url) # admin_1 is staff, sees all + + self.build_qs_patcher.start() + + self.assertEqual(response.status_code, status.HTTP_200_OK) + org_names = [item["org"]["short_name"] if item["org"] else "" for item in response.data["results"]] + self.assertEqual(org_names, sorted(org_names)) + + # ------------------------------------------------------------------ # + # type param # + # ------------------------------------------------------------------ # + + @data( + ("course", "_get_courses_queryset", "_get_libraries_queryset"), + ("library", "_get_libraries_queryset", "_get_courses_queryset"), + ) + @unpack + def test_type_param_calls_only_expected_queryset(self, scope_type, called_method, skipped_method): + """When type=course only courses are fetched; when type=library only libraries.""" + self.build_qs_patcher.stop() + with ( + patch.object(ScopesAPIView, called_method, return_value=[]) as mock_called, + patch.object(ScopesAPIView, skipped_method) as mock_skipped, + patch.object(ScopesAPIView, "_build_queryset", return_value=[]), + ): + response = self.client.get(self.url, {"scope_type": scope_type}) + self.build_qs_patcher.start() + + self.assertEqual(response.status_code, status.HTTP_200_OK) + mock_called.assert_called_once() + mock_skipped.assert_not_called() + + def test_type_param_invalid_returns_400(self): + """An invalid type value returns 400.""" + response = self.client.get(self.url, {"scope_type": "invalid"}) + + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + + def test_type_param_absent_returns_both(self): + """When type is not specified, both courses and libraries are returned.""" + self.build_qs_patcher.stop() + with ( + patch.object(ScopesAPIView, "_get_courses_queryset", return_value=[]) as mock_courses, + patch.object(ScopesAPIView, "_get_libraries_queryset", return_value=[]) as mock_libraries, + patch.object(ScopesAPIView, "_build_queryset", return_value=[]), + ): + response = self.client.get(self.url) + self.build_qs_patcher.start() + + self.assertEqual(response.status_code, status.HTTP_200_OK) + mock_courses.assert_called_once() + mock_libraries.assert_called_once() + + # ------------------------------------------------------------------ # + # Search # + # ------------------------------------------------------------------ # + + def test_search_filters_by_display_name(self): + """search param filters results by display_name.""" + # Search is applied pre-union inside get_queryset. Use real DB rows (staff user, type=course) + # to avoid the union so the queryset remains filterable. + self.build_qs_patcher.stop() + + response_match = self.client.get(self.url, {"search": "Course Org1", "scope_type": "course"}) + response_no_match = self.client.get(self.url, {"search": "nonexistent_xyz", "scope_type": "course"}) + + self.build_qs_patcher.start() + + self.assertEqual(response_match.status_code, status.HTTP_200_OK) + self.assertEqual(response_match.data["count"], 1) + self.assertIn("Org1", response_match.data["results"][0]["display_name"]) + + self.assertEqual(response_no_match.status_code, status.HTTP_200_OK) + self.assertEqual(response_no_match.data["count"], 0) + + # ------------------------------------------------------------------ # + # Pagination # + # ------------------------------------------------------------------ # + + @data( + ({"page": 1, "page_size": 1}, 1, True), + ({"page": 2, "page_size": 1}, 1, True), + ({"page": 3, "page_size": 1}, 1, False), + ({"page": 1, "page_size": 3}, 3, False), + ) + @unpack + def test_pagination(self, query_params: dict, expected_page_count: int, has_next: bool): + """Results are paginated correctly.""" + mixed = [ + {"scope_id": self.COURSE_ORG1, "display_name_col": "Course 1", "org_name": "Org1", "scope_type": "course"}, + {"scope_id": "LIB1", "display_name_col": "Library 1", "org_name": "Org1", "scope_type": "library"}, + {"scope_id": self.COURSE_ORG2, "display_name_col": "Course 2", "org_name": "Org2", "scope_type": "course"}, + ] + self.build_qs_patcher.stop() + with patch.object(ScopesAPIView, "_build_queryset", return_value=mixed): + response = self.client.get(self.url, query_params) + self.build_qs_patcher.start() + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data["count"], 3) + self.assertEqual(len(response.data["results"]), expected_page_count) + if has_next: + self.assertIsNotNone(response.data["next"]) + else: + self.assertIsNone(response.data["next"]) + + # ------------------------------------------------------------------ # + # Staff / superuser bypass # + # ------------------------------------------------------------------ # + + def test_staff_sees_all_scopes_without_permission_check(self): + """Staff users bypass permission filtering and see all scopes.""" + with patch("openedx_authz.rest_api.v1.views.get_scopes_for_user_and_permission") as mock_get_scopes: + response = self.client.get(self.url) # admin_1 is staff + + self.assertEqual(response.status_code, status.HTTP_200_OK) + mock_get_scopes.assert_not_called() + + def test_non_staff_triggers_permission_check(self): + """Non-staff users go through get_scopes_for_user_and_permission.""" + user = User.objects.get(username="regular_1") + self.client.force_authenticate(user=user) + + with patch( + "openedx_authz.rest_api.v1.views.get_scopes_for_user_and_permission", + return_value=[], + ) as mock_get_scopes: + response = self.client.get(self.url) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(mock_get_scopes.call_count, 2) # once per scope type + + # ------------------------------------------------------------------ # + # Permission filtering: view # + # ------------------------------------------------------------------ # + + def test_view_permission_filters_courses_for_non_staff(self): + """Non-staff user only sees courses they have VIEW_COURSE_TEAM permission for.""" + # regular_9 has COURSE_STAFF on COURSE_ORG1 → VIEW_COURSE_TEAM granted + user = User.objects.get(username="regular_9") + self.client.force_authenticate(user=user) + self.build_qs_patcher.stop() + + response = self.client.get(self.url, {"scope_type": "course"}) + + self.build_qs_patcher.start() + self.assertEqual(response.status_code, status.HTTP_200_OK) + external_keys = [item["external_key"] for item in response.data["results"]] + self.assertIn(self.COURSE_ORG1, external_keys) + self.assertNotIn(self.COURSE_ORG2, external_keys) + + def test_view_permission_filters_libraries_for_non_staff(self): + """Non-staff user only sees libraries they have VIEW_LIBRARY_TEAM permission for.""" + # regular_1 has LIBRARY_USER on lib:Org1:LIB1 → VIEW_LIBRARY_TEAM granted + user = User.objects.get(username="regular_1") + self.client.force_authenticate(user=user) + self.build_qs_patcher.stop() + + response = self.client.get(self.url, {"scope_type": "library"}) + + self.build_qs_patcher.start() + self.assertEqual(response.status_code, status.HTTP_200_OK) + external_keys = [item["external_key"] for item in response.data["results"]] + self.assertIn(self.LIBRARY_ORG1, external_keys) + self.assertNotIn(self.LIBRARY_ORG2, external_keys) + # Verify display_name is populated from the library title, not empty. + for item in response.data["results"]: + self.assertTrue(item["display_name"]) + + def test_library_display_name_populated_in_standalone_path(self): + """display_name is non-empty for libraries when type=library bypasses the union. + + Regression test: without aliasing learning_package__title as display_name, + the standalone library queryset returns 'title' as the key and the serializer + silently produces empty strings since it only reads 'display_name'. + """ + user = User.objects.get(username="regular_1") + self.client.force_authenticate(user=user) + self.build_qs_patcher.stop() + + response = self.client.get(self.url, {"scope_type": "library"}) + + self.build_qs_patcher.start() + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertGreater(response.data["count"], 0) + for item in response.data["results"]: + self.assertTrue(item["display_name"]) + + # ------------------------------------------------------------------ # + # Permission filtering: manage # + # ------------------------------------------------------------------ # + + def test_manage_permission_filters_courses_for_non_staff(self): + """management_permission_only=true filters to courses with MANAGE_COURSE_TEAM only.""" + # regular_10 has COURSE_ADMIN on COURSE_ORG2 → MANAGE_COURSE_TEAM granted + user = User.objects.get(username="regular_10") + self.client.force_authenticate(user=user) + self.build_qs_patcher.stop() + + response = self.client.get(self.url, {"scope_type": "course", "management_permission_only": "true"}) + + self.build_qs_patcher.start() + self.assertEqual(response.status_code, status.HTTP_200_OK) + external_keys = [item["external_key"] for item in response.data["results"]] + self.assertIn(self.COURSE_ORG2, external_keys) + self.assertNotIn(self.COURSE_ORG1, external_keys) + + def test_manage_permission_filters_libraries_for_non_staff(self): + """management_permission_only=true filters to libraries with MANAGE_LIBRARY_TEAM only.""" + # regular_5 has LIBRARY_ADMIN on lib:Org3:LIB3 → MANAGE_LIBRARY_TEAM granted + # regular_1 has LIBRARY_USER on lib:Org1:LIB1 → only VIEW, not MANAGE + user = User.objects.get(username="regular_5") + self.client.force_authenticate(user=user) + self.build_qs_patcher.stop() + + response = self.client.get(self.url, {"scope_type": "library", "management_permission_only": "true"}) + + self.build_qs_patcher.start() + self.assertEqual(response.status_code, status.HTTP_200_OK) + external_keys = [item["external_key"] for item in response.data["results"]] + self.assertIn("lib:Org3:LIB3", external_keys) + self.assertNotIn(self.LIBRARY_ORG1, external_keys) + + def test_empty_allowed_library_pairs_returns_no_libraries(self): + """When a non-staff user has no allowed libraries, no libraries are returned. + + Regression test: an empty allowed_pairs set must not bypass the filter + and return all libraries (reduce with Q() default was a no-op). + """ + # regular_9 has no library permissions, only a course role. + user = User.objects.get(username="regular_9") + self.client.force_authenticate(user=user) + self.build_qs_patcher.stop() + + response = self.client.get(self.url, {"scope_type": "library"}) + + self.build_qs_patcher.start() + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data["count"], 0) + + def test_empty_allowed_course_ids_returns_no_courses(self): + """When a non-staff user has no allowed courses, no courses are returned. + + Regression test: an empty allowed_ids/allowed_orgs set must not bypass the filter + and return all courses (empty Q() | empty Q() was a no-op). + """ + # regular_1 has only library permissions, no course permissions. + user = User.objects.get(username="regular_1") + self.client.force_authenticate(user=user) + self.build_qs_patcher.stop() + + response = self.client.get(self.url, {"scope_type": "course"}) + + self.build_qs_patcher.start() + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data["count"], 0) + + def test_library_only_user_sees_no_courses_in_mixed_listing(self): + """A user with only library permissions sees no courses in the default mixed listing. + + Regression test: without the empty-set guard, a user with library access but no + course permissions would see all courses in the combined results. + """ + # regular_1 has only library permissions, no course permissions. + user = User.objects.get(username="regular_1") + self.client.force_authenticate(user=user) + self.build_qs_patcher.stop() + + response = self.client.get(self.url) + + self.build_qs_patcher.start() + self.assertEqual(response.status_code, status.HTTP_200_OK) + scope_types = {item["external_key"].split(":")[0] for item in response.data["results"]} + self.assertNotIn("course-v1", scope_types) + self.assertIn("lib", scope_types) + + def test_org_glob_scope_returns_all_org_libraries(self): + """A user with an org-level glob permission (lib:ORG:*) sees all libraries in that org.""" + user = User.objects.get(username="regular_1") + self.client.force_authenticate(user=user) + self.build_qs_patcher.stop() + + # Simulate get_scopes_for_user_and_permission returning an org-level glob. + glob_scope = OrgContentLibraryGlobData(external_key="lib:Org1:*") + with patch( + "openedx_authz.rest_api.v1.views.get_scopes_for_user_and_permission", + return_value=[glob_scope], + ): + response = self.client.get(self.url, {"scope_type": "library"}) + + self.build_qs_patcher.start() + self.assertEqual(response.status_code, status.HTTP_200_OK) + external_keys = [item["external_key"] for item in response.data["results"]] + self.assertIn(self.LIBRARY_ORG1, external_keys) + self.assertNotIn(self.LIBRARY_ORG2, external_keys) + + def test_org_glob_scope_returns_all_org_courses(self): + """A user with an org-level glob permission (course-v1:ORG+*) sees all courses in that org.""" + user = User.objects.get(username="regular_9") + self.client.force_authenticate(user=user) + self.build_qs_patcher.stop() + + # Simulate get_scopes_for_user_and_permission returning an org-level glob. + glob_scope = OrgCourseOverviewGlobData(external_key="course-v1:Org1+*") + with patch( + "openedx_authz.rest_api.v1.views.get_scopes_for_user_and_permission", + return_value=[glob_scope], + ): + response = self.client.get(self.url, {"scope_type": "course"}) + + self.build_qs_patcher.start() + self.assertEqual(response.status_code, status.HTTP_200_OK) + external_keys = [item["external_key"] for item in response.data["results"]] + self.assertIn(self.COURSE_ORG1, external_keys) + self.assertNotIn(self.COURSE_ORG2, external_keys) + + def test_manage_permission_only_uses_manage_permission(self): + """management_permission_only=true calls get_admin_manage_permission, not get_admin_view_permission.""" + user = User.objects.get(username="regular_1") + self.client.force_authenticate(user=user) + + with patch( + "openedx_authz.rest_api.v1.views.get_scopes_for_user_and_permission", + return_value=[], + ) as mock_get_scopes: + self.client.get(self.url, {"management_permission_only": "true"}) + + called_permissions = [call.args[1] for call in mock_get_scopes.call_args_list] + self.assertIn(permissions.MANAGE_LIBRARY_TEAM.identifier, called_permissions) + self.assertIn(permissions.COURSES_MANAGE_COURSE_TEAM.identifier, called_permissions) + + def test_view_permission_only_uses_view_permission(self): + """management_permission_only=false (default) calls get_admin_view_permission.""" + user = User.objects.get(username="regular_1") + self.client.force_authenticate(user=user) + + with patch( + "openedx_authz.rest_api.v1.views.get_scopes_for_user_and_permission", + return_value=[], + ) as mock_get_scopes: + self.client.get(self.url) + + called_permissions = [call.args[1] for call in mock_get_scopes.call_args_list] + self.assertIn(permissions.VIEW_LIBRARY_TEAM.identifier, called_permissions) + self.assertIn(permissions.COURSES_VIEW_COURSE_TEAM.identifier, called_permissions) + + # ------------------------------------------------------------------ # + # Org filter # + # ------------------------------------------------------------------ # + + def test_org_filter_staff_courses(self): + """Staff user with org param sees only courses from that org.""" + self.build_qs_patcher.stop() + + response = self.client.get(self.url, {"org": "Org1", "scope_type": "course"}) + + self.build_qs_patcher.start() + self.assertEqual(response.status_code, status.HTTP_200_OK) + for item in response.data["results"]: + self.assertIn("Org1", item["external_key"]) + # Org2 course should not appear + external_keys = [item["external_key"] for item in response.data["results"]] + self.assertNotIn(self.COURSE_ORG2, external_keys) + + def test_org_filter_staff_libraries(self): + """Staff user with org param sees only libraries from that org.""" + self.build_qs_patcher.stop() + + response = self.client.get(self.url, {"org": "Org2", "scope_type": "library"}) + + self.build_qs_patcher.start() + self.assertEqual(response.status_code, status.HTTP_200_OK) + external_keys = [item["external_key"] for item in response.data["results"]] + self.assertIn(self.LIBRARY_ORG2, external_keys) + self.assertNotIn(self.LIBRARY_ORG1, external_keys) + + def test_org_filter_staff_no_match(self): + """Staff user with org param for a non-existent org gets empty results.""" + self.build_qs_patcher.stop() + + response = self.client.get(self.url, {"org": "NonExistentOrg", "scope_type": "course"}) + + self.build_qs_patcher.start() + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data["count"], 0) + + def test_org_filter_non_staff_with_permission(self): + """Non-staff user with org param sees scopes only if they have permission for that org.""" + # regular_1 has LIBRARY_USER on lib:Org1:LIB1 → VIEW_LIBRARY_TEAM granted + user = User.objects.get(username="regular_1") + self.client.force_authenticate(user=user) + self.build_qs_patcher.stop() + + response = self.client.get(self.url, {"org": "Org1", "scope_type": "library"}) + + self.build_qs_patcher.start() + self.assertEqual(response.status_code, status.HTTP_200_OK) + external_keys = [item["external_key"] for item in response.data["results"]] + self.assertIn(self.LIBRARY_ORG1, external_keys) + + def test_org_filter_non_staff_without_permission(self): + """Non-staff user with org param for an org they have no permission for gets empty results.""" + # regular_1 has no permissions on Org2 + user = User.objects.get(username="regular_1") + self.client.force_authenticate(user=user) + self.build_qs_patcher.stop() + + response = self.client.get(self.url, {"org": "Org2", "scope_type": "library"}) + + self.build_qs_patcher.start() + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data["count"], 0) + + def test_org_filter_non_staff_courses(self): + """Non-staff user with org param sees only courses from that org if they have permission.""" + # regular_9 has COURSE_STAFF on COURSE_ORG1 → VIEW_COURSE_TEAM granted + user = User.objects.get(username="regular_9") + self.client.force_authenticate(user=user) + self.build_qs_patcher.stop() + + response = self.client.get(self.url, {"org": "Org1", "scope_type": "course"}) + + self.build_qs_patcher.start() + self.assertEqual(response.status_code, status.HTTP_200_OK) + external_keys = [item["external_key"] for item in response.data["results"]] + self.assertIn(self.COURSE_ORG1, external_keys) + + def test_org_filter_non_staff_courses_no_permission(self): + """Non-staff user with org param for an org they have no course permission for gets empty results.""" + # regular_9 has no course permissions on Org2 + user = User.objects.get(username="regular_9") + self.client.force_authenticate(user=user) + self.build_qs_patcher.stop() + + response = self.client.get(self.url, {"org": "Org2", "scope_type": "course"}) + + self.build_qs_patcher.start() + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data["count"], 0) + + def test_org_filter_with_glob_permission(self): + """Non-staff user with org glob permission and org filter sees only that org's scopes.""" + user = User.objects.get(username="regular_1") + self.client.force_authenticate(user=user) + self.build_qs_patcher.stop() + + glob_scope = OrgContentLibraryGlobData(external_key="lib:Org1:*") + with patch( + "openedx_authz.rest_api.v1.views.get_scopes_for_user_and_permission", + return_value=[glob_scope], + ): + response = self.client.get(self.url, {"org": "Org1", "scope_type": "library"}) + + self.build_qs_patcher.start() + self.assertEqual(response.status_code, status.HTTP_200_OK) + external_keys = [item["external_key"] for item in response.data["results"]] + self.assertIn(self.LIBRARY_ORG1, external_keys) + self.assertNotIn(self.LIBRARY_ORG2, external_keys) + + def test_org_filter_with_glob_permission_wrong_org(self): + """Non-staff user with org glob for Org1 but filtering by Org2 gets empty results.""" + user = User.objects.get(username="regular_1") + self.client.force_authenticate(user=user) + self.build_qs_patcher.stop() + + glob_scope = OrgContentLibraryGlobData(external_key="lib:Org1:*") + with patch( + "openedx_authz.rest_api.v1.views.get_scopes_for_user_and_permission", + return_value=[glob_scope], + ): + response = self.client.get(self.url, {"org": "Org2", "scope_type": "library"}) + + self.build_qs_patcher.start() + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data["count"], 0) + + def test_org_filter_absent_returns_all_permitted(self): + """When org param is absent, all permitted scopes are returned (existing behavior).""" + self.build_qs_patcher.stop() + + response = self.client.get(self.url, {"scope_type": "course"}) + + self.build_qs_patcher.start() + self.assertEqual(response.status_code, status.HTTP_200_OK) + # Staff user sees all courses + external_keys = [item["external_key"] for item in response.data["results"]] + self.assertIn(self.COURSE_ORG1, external_keys) + self.assertIn(self.COURSE_ORG2, external_keys) + + def test_org_filter_combined_with_search(self): + """Org filter works together with search filter.""" + self.build_qs_patcher.stop() + + response = self.client.get(self.url, {"org": "Org1", "search": "Course", "scope_type": "course"}) + + self.build_qs_patcher.start() + self.assertEqual(response.status_code, status.HTTP_200_OK) + external_keys = [item["external_key"] for item in response.data["results"]] + self.assertIn(self.COURSE_ORG1, external_keys) + self.assertNotIn(self.COURSE_ORG2, external_keys) + + @ddt class TestAdminConsoleOrgsAPIView(ViewTestMixin): """Test suite for AdminConsoleOrgsAPIView.""" diff --git a/openedx_authz/tests/stubs/migrations/0002_learningpackage_contentlibrary_learning_package.py b/openedx_authz/tests/stubs/migrations/0002_learningpackage_contentlibrary_learning_package.py new file mode 100644 index 00000000..cfff5bf5 --- /dev/null +++ b/openedx_authz/tests/stubs/migrations/0002_learningpackage_contentlibrary_learning_package.py @@ -0,0 +1,28 @@ +import django.db.models.deletion +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("stubs", "0001_initial"), + ] + + operations = [ + migrations.CreateModel( + name="LearningPackage", + fields=[ + ("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), + ("title", models.CharField(max_length=255)), + ], + ), + migrations.AddField( + model_name="contentlibrary", + name="learning_package", + field=models.OneToOneField( + blank=True, + null=True, + on_delete=django.db.models.deletion.CASCADE, + to="stubs.learningpackage", + ), + ), + ] diff --git a/openedx_authz/tests/stubs/models.py b/openedx_authz/tests/stubs/models.py index 317a4410..d443a612 100644 --- a/openedx_authz/tests/stubs/models.py +++ b/openedx_authz/tests/stubs/models.py @@ -55,6 +55,18 @@ def get_by_key(self, library_key): return obj +class LearningPackage(models.Model): + """Stub model representing a learning package for testing purposes. + + .. no_pii: + """ + + title = models.CharField(max_length=255) + + def __str__(self): + return self.title + + class ContentLibrary(models.Model): """Stub model representing a content library for testing purposes. @@ -65,10 +77,16 @@ class ContentLibrary(models.Model): title = models.CharField(max_length=255, blank=True, null=True) slug = models.SlugField(allow_unicode=True) org = models.ForeignKey(Organization, on_delete=models.PROTECT, null=True) + learning_package = models.OneToOneField(LearningPackage, on_delete=models.CASCADE, null=True, blank=True) created_at = models.DateTimeField(auto_now_add=True) objects = ContentLibraryManager() + @property + def library_key(self): + """Get the LibraryLocatorV2 opaque key for this library.""" + return LibraryLocatorV2(org=self.org.short_name, slug=self.slug) + def __str__(self): return str(self.locator)