diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 084075e1..1dc83133 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -14,12 +14,21 @@ Change Log Unreleased ********** +1.7.0 - 2026-04-14 +****************** + +Added +===== + +* Add ``users/validate`` endpoint for bulk validation of user identifiers (usernames or emails). + 1.6.0 - 2026-04-10 ****************** Added ===== +* Add ``users/validate`` endpoint for bulk validation of user identifiers (usernames or emails). * Add org-wide support to migration commands for forward and backward migration of course authoring permissions. 1.5.0 - 2026-04-09 @@ -378,4 +387,4 @@ Added Added ===== -* Basic repo structure and initial setup. +* Basic repo structure and initial setup. \ No newline at end of file diff --git a/openedx_authz/__init__.py b/openedx_authz/__init__.py index 192ac7e1..2a978b1f 100644 --- a/openedx_authz/__init__.py +++ b/openedx_authz/__init__.py @@ -4,6 +4,6 @@ import os -__version__ = "1.6.0" +__version__ = "1.7.0" ROOT_DIRECTORY = os.path.dirname(os.path.abspath(__file__)) diff --git a/openedx_authz/api/users.py b/openedx_authz/api/users.py index b2dd2654..6edd736d 100644 --- a/openedx_authz/api/users.py +++ b/openedx_authz/api/users.py @@ -9,6 +9,8 @@ (e.g., 'user^john_doe'). """ +from django.contrib.auth import get_user_model + from openedx_authz.api.data import ( ActionData, PermissionData, @@ -35,6 +37,7 @@ unassign_subject_from_all_roles, ) from openedx_authz.api.utils import filter_user_assignments, get_user_assignment_map +from openedx_authz.utils import get_user_by_username_or_email __all__ = [ "assign_role_to_user_in_scope", @@ -51,6 +54,7 @@ "get_scopes_for_user_and_permission", "get_users_for_role_in_scope", "unassign_all_roles_from_user", + "validate_users", ] @@ -339,3 +343,29 @@ def unassign_all_roles_from_user(user_external_key: str) -> bool: bool: True if any roles were removed, False otherwise. """ return unassign_subject_from_all_roles(UserData(external_key=user_external_key)) + + +def validate_users(user_identifiers: list[str]) -> tuple[list[str], list[str]]: + """Validate a list of user identifiers. + + Args: + user_identifiers (list[str]): List of usernames or emails to validate + + Returns: + tuple: (valid_users, invalid_users) lists + """ + User = get_user_model() + valid_users = [] + invalid_users = [] + + for user_identifier in user_identifiers: + try: + user = get_user_by_username_or_email(user_identifier) + if user.is_active: + valid_users.append(user_identifier) + else: + invalid_users.append(user_identifier) + except User.DoesNotExist: + invalid_users.append(user_identifier) + + return valid_users, invalid_users diff --git a/openedx_authz/rest_api/v1/serializers.py b/openedx_authz/rest_api/v1/serializers.py index 551329ed..3f6c8b39 100644 --- a/openedx_authz/rest_api/v1/serializers.py +++ b/openedx_authz/rest_api/v1/serializers.py @@ -259,3 +259,36 @@ def get_email(self, obj: UserAssignments) -> str: def get_assignation_count(self, obj: UserAssignments) -> int: """Get the assignation count for the given role assignment.""" return len(obj.assignments) + + +class UserValidationAPIViewSerializer(serializers.Serializer): # pylint: disable=abstract-method + """Serializer for validating user existence.""" + + users = serializers.ListField( + child=serializers.CharField(max_length=255), allow_empty=False, help_text="List of user identifiers to validate" + ) + + def validate_users(self, value) -> list[str]: + """Eliminate duplicates preserving order""" + return list(dict.fromkeys(value)) + + +class UserValidationSummarySerializer(serializers.Serializer): # pylint: disable=abstract-method + """Serializer for user validation summary statistics.""" + + total = serializers.IntegerField(help_text="Total number of users validated") + valid_count = serializers.IntegerField(help_text="Number of valid users found") + invalid_count = serializers.IntegerField(help_text="Number of invalid users") + + +class UserValidationAPIViewResponseSerializer(serializers.Serializer): # pylint: disable=abstract-method + """Serializer for user validation response.""" + + valid_users = serializers.ListField( + child=serializers.CharField(max_length=255), help_text="List of user identifiers that were found to be valid" + ) + invalid_users = serializers.ListField( + child=serializers.CharField(max_length=255), + help_text="List of user identifiers that were not found or are invalid", + ) + summary = UserValidationSummarySerializer(help_text="Summary statistics for the validation operation") diff --git a/openedx_authz/rest_api/v1/urls.py b/openedx_authz/rest_api/v1/urls.py index dafdc1c5..8dbca5c5 100644 --- a/openedx_authz/rest_api/v1/urls.py +++ b/openedx_authz/rest_api/v1/urls.py @@ -14,4 +14,5 @@ path("roles/users/", views.RoleUserAPIView.as_view(), name="role-user-list"), path("orgs/", views.AdminConsoleOrgsAPIView.as_view(), name="orgs-list"), path("users/", views.TeamMembersAPIView.as_view(), name="user-list"), + path("users/validate/", views.UserValidationAPIView.as_view(), name="user-validation"), ] diff --git a/openedx_authz/rest_api/v1/views.py b/openedx_authz/rest_api/v1/views.py index 55fd7322..26bdd2a4 100644 --- a/openedx_authz/rest_api/v1/views.py +++ b/openedx_authz/rest_api/v1/views.py @@ -43,6 +43,8 @@ RemoveUsersFromRoleWithScopeSerializer, TeamMemberSerializer, UserRoleAssignmentSerializer, + UserValidationAPIViewResponseSerializer, + UserValidationAPIViewSerializer, ) from openedx_authz.utils import get_user_by_username_or_email @@ -590,3 +592,86 @@ def get(self, request: HttpRequest) -> Response: paginator = self.pagination_class() paginated_response_data = paginator.paginate_queryset(team_members, request) return paginator.get_paginated_response(paginated_response_data) + + +@view_auth_classes() +class UserValidationAPIView(APIView): + """API view for validating that provided user identifiers correspond to existing users. + + This view allows clients to verify that a list of user identifiers (usernames or emails) + correspond to valid users in the system. It is designed to support bulk validation of multiple + user identifiers in a single request, providing a convenient way to check the validity of users before + performing operations such as role assignments. + + **Endpoints** + - POST: Validate that the provided list of usernames or emails correspond to existing users + + **Request Format (POST)** + - users: List of user identifiers (username or email) + + **Response Format (POST)** + + Returns HTTP 200 OK with:: + + { + "valid_users": ["john_doe", "jane@example.com"], + "invalid_users": ["nonexistent_user"], + "summary": { + "total": 3, + "valid_count": 2, + "invalid_count": 1 + } + } + + **Authentication and Permissions** + + - Requires authenticated user. + - Requires ``manage_library_team`` or ``manage_course_team`` permission in any scope. + + **Example Request** + + POST /api/authz/v1/users/validate/ :: + + { + "users": ["john_doe", "jane@example.com", "nonexistent_user"] + } + """ + + permission_classes = [AnyScopePermission] + + @apidocs.schema( + body=UserValidationAPIViewSerializer, + responses={ + status.HTTP_200_OK: UserValidationAPIViewResponseSerializer, + status.HTTP_400_BAD_REQUEST: "The request data is invalid", + status.HTTP_401_UNAUTHORIZED: "The user is not authenticated", + status.HTTP_403_FORBIDDEN: "The user does not have the required permissions", + status.HTTP_500_INTERNAL_SERVER_ERROR: "An unexpected error occurred while validating users", + }, + ) + @authz_permissions([permissions.MANAGE_LIBRARY_TEAM.identifier, permissions.COURSES_MANAGE_COURSE_TEAM.identifier]) + def post(self, request: HttpRequest) -> Response: + """Validates the provided usernames or emails correspond to existing users.""" + request_serializer = UserValidationAPIViewSerializer(data=request.data) + request_serializer.is_valid(raise_exception=True) + serialized_request_users = request_serializer.validated_data["users"] + try: + valid_users, invalid_users = api.validate_users(serialized_request_users) + except Exception as e: # pylint: disable=broad-exception-caught + logger.error(f"Error validating users: {e}") + return Response( + data={"message": "An error occurred while validating users"}, + status=status.HTTP_500_INTERNAL_SERVER_ERROR, + ) + + response_data = { + "valid_users": valid_users, + "invalid_users": invalid_users, + "summary": { + "total": len(serialized_request_users), + "valid_count": len(valid_users), + "invalid_count": len(invalid_users), + }, + } + response_serializer = UserValidationAPIViewResponseSerializer(response_data) + return Response(response_serializer.data, status=status.HTTP_200_OK) diff --git a/openedx_authz/tests/api/test_users.py b/openedx_authz/tests/api/test_users.py index 2da05e21..08b89327 100644 --- a/openedx_authz/tests/api/test_users.py +++ b/openedx_authz/tests/api/test_users.py @@ -1,6 +1,9 @@ """Test suite for user-role assignment API functions.""" +from unittest.mock import patch + from ddt import data, ddt, unpack +from django.contrib.auth import get_user_model from openedx_authz.api.data import ContentLibraryData, RoleAssignmentData, RoleData, UserData from openedx_authz.api.users import ( @@ -14,6 +17,7 @@ is_user_allowed, unassign_all_roles_from_user, unassign_role_from_user, + validate_users, ) from openedx_authz.constants import permissions, roles from openedx_authz.constants.roles import LIBRARY_ADMIN_PERMISSIONS, LIBRARY_AUTHOR_PERMISSIONS @@ -514,3 +518,55 @@ def test_is_user_allowed(self, username, action, scope_name, expected_result): scope_external_key=scope_name, ) self.assertEqual(result, expected_result) + + +@ddt +class TestValidateUsersAPI(UserAssignmentsSetupMixin): + """Test suite for validate_users API function - focused on business logic.""" + + def test_validate_users_empty_list(self): + """Test validate_users with empty input list.""" + valid_users, invalid_users = validate_users([]) + + self.assertEqual(valid_users, []) + self.assertEqual(invalid_users, []) + + def test_validate_users_inactive_user_edge_case(self): + """Test that inactive users are correctly identified as invalid.""" + User = get_user_model() + + # Create an inactive user for this test + inactive_user = User.objects.create_user( + username="inactive_api_test", email="inactive_api@example.com", is_active=False + ) + + valid_users, invalid_users = validate_users([inactive_user.username]) + + # Cleanup + inactive_user.delete() + + self.assertEqual(valid_users, []) + self.assertEqual(invalid_users, [inactive_user.username]) + + @patch("openedx_authz.api.users.get_user_by_username_or_email") + def test_validate_users_unexpected_exception_propagation(self, mock_get_user): + """Test that unexpected exceptions from get_user_by_username_or_email are re-raised.""" + # Simulate an unexpected database error + mock_get_user.side_effect = Exception("Database connection lost") + + with self.assertRaises(Exception) as cm: + validate_users(["any_user"]) + + self.assertEqual(str(cm.exception), "Database connection lost") + mock_get_user.assert_called_once_with("any_user") + + @patch("openedx_authz.api.users.get_user_by_username_or_email") + def test_validate_users_user_does_not_exist_handling(self, mock_get_user): + """Test handling of User.DoesNotExist exception.""" + User = get_user_model() + mock_get_user.side_effect = User.DoesNotExist("User not found") + + valid_users, invalid_users = validate_users(["nonexistent_user"]) + + self.assertEqual(valid_users, []) + self.assertEqual(invalid_users, ["nonexistent_user"]) diff --git a/openedx_authz/tests/rest_api/test_views.py b/openedx_authz/tests/rest_api/test_views.py index 45aaa472..bd97aa60 100644 --- a/openedx_authz/tests/rest_api/test_views.py +++ b/openedx_authz/tests/rest_api/test_views.py @@ -19,7 +19,8 @@ from openedx_authz.api.users import assign_role_to_user_in_scope from openedx_authz.constants import permissions, roles from openedx_authz.rest_api.data import RoleOperationError, RoleOperationStatus -from openedx_authz.rest_api.v1.permissions import DynamicScopePermission +from openedx_authz.rest_api.v1.permissions import AnyScopePermission, DynamicScopePermission +from openedx_authz.rest_api.v1.views import UserValidationAPIView from openedx_authz.tests.api.test_roles import BaseRolesTestCase User = get_user_model() @@ -1447,3 +1448,301 @@ def test_get_roles_permissions(self, username: str, status_code: int): if status_code == status.HTTP_200_OK: self.assertIn("results", response.data) self.assertIn("count", response.data) + + +@ddt +class TestUserValidationAPIView(ViewTestMixin): + """Test suite for UserValidationAPIView.""" + + def setUp(self): + """Set up test fixtures.""" + super().setUp() + self.url = reverse("openedx_authz:user-validation") + + @data( + # All users valid - usernames + (["admin_1", "regular_1"], ["admin_1", "regular_1"], []), + # All users valid - emails + (["admin_1@example.com", "regular_1@example.com"], ["admin_1@example.com", "regular_1@example.com"], []), + # Mixed usernames and emails + (["admin_1", "regular_1@example.com"], ["admin_1", "regular_1@example.com"], []), + # Single user + (["admin_1"], ["admin_1"], []), + ) + @unpack + def test_post_all_users_valid(self, input_users: list, expected_valid: list, expected_invalid: list): + """Test user validation when all users are valid. + + Expected result: + - Returns 200 OK status + - All users are in valid_users list + - invalid_users list is empty + - Summary contains correct counts + """ + self.client.force_authenticate(user=self.admin_user) + request_data = {"users": input_users} + response = self.client.post(self.url, data=request_data, format="json") + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data["valid_users"], expected_valid) + self.assertEqual(response.data["invalid_users"], expected_invalid) + self.assertEqual(response.data["summary"]["total"], len(input_users)) + self.assertEqual(response.data["summary"]["valid_count"], len(expected_valid)) + self.assertEqual(response.data["summary"]["invalid_count"], len(expected_invalid)) + + @data( + # Mix of valid and invalid users + (["admin_1", "nonexistent_user"], ["admin_1"], ["nonexistent_user"]), + # Mix of valid and invalid with emails + (["admin_1@example.com", "fake@example.com"], ["admin_1@example.com"], ["fake@example.com"]), + # Mix of usernames and emails with some invalid + ( + ["admin_1", "fake@example.com", "regular_1@example.com"], + ["admin_1", "regular_1@example.com"], + ["fake@example.com"], + ), + # More complex mix + ( + ["admin_1", "nonexistent1", "regular_1@example.com", "nonexistent2"], + ["admin_1", "regular_1@example.com"], + ["nonexistent1", "nonexistent2"], + ), + ) + @unpack + def test_post_mixed_valid_invalid_users(self, input_users: list, expected_valid: list, expected_invalid: list): + """Test user validation when some users are valid and others are invalid. + + Expected result: + - Returns 200 OK status + - Valid users are in valid_users list + - Invalid users are in invalid_users list + - Summary contains correct counts + """ + self.client.force_authenticate(user=self.admin_user) + request_data = {"users": input_users} + response = self.client.post(self.url, data=request_data, format="json") + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(set(response.data["valid_users"]), set(expected_valid)) + self.assertEqual(set(response.data["invalid_users"]), set(expected_invalid)) + self.assertEqual(response.data["summary"]["total"], len(input_users)) + self.assertEqual(response.data["summary"]["valid_count"], len(expected_valid)) + self.assertEqual(response.data["summary"]["invalid_count"], len(expected_invalid)) + + @data( + # All users invalid + (["nonexistent1", "nonexistent2"], [], ["nonexistent1", "nonexistent2"]), + # All invalid emails + (["fake1@example.com", "fake2@example.com"], [], ["fake1@example.com", "fake2@example.com"]), + # Single invalid user + (["nonexistent_user"], [], ["nonexistent_user"]), + # Single invalid email + (["fake@example.com"], [], ["fake@example.com"]), + ) + @unpack + def test_post_all_users_invalid(self, input_users: list, expected_valid: list, expected_invalid: list): + """Test user validation when all users are invalid. + + Expected result: + - Returns 200 OK status + - valid_users list is empty + - All users are in invalid_users list + - Summary contains correct counts + """ + self.client.force_authenticate(user=self.admin_user) + request_data = {"users": input_users} + response = self.client.post(self.url, data=request_data, format="json") + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data["valid_users"], expected_valid) + self.assertEqual(set(response.data["invalid_users"]), set(expected_invalid)) + self.assertEqual(response.data["summary"]["total"], len(input_users)) + self.assertEqual(response.data["summary"]["valid_count"], len(expected_valid)) + self.assertEqual(response.data["summary"]["invalid_count"], len(expected_invalid)) + + @data( + # Missing users field + {}, + {"other_field": "value"}, + # Empty users list (not allowed by serializer) + {"users": []}, + # Invalid data types + {"users": "not_a_list"}, + {"users": [{"not": "string"}]}, + # Null values + {"users": None}, + {"users": [None, "admin_1"]}, + # Users with strings too long (over 255 characters) + {"users": ["a" * 256]}, + ) + def test_post_invalid_request_data(self, request_data: dict): + """Test user validation with invalid request data. + + Test cases: + - Missing required fields + - Empty users list (not allowed) + - Invalid data types + - Null values + - Strings exceeding max length + + Expected result: + - Returns 400 BAD REQUEST status + """ + self.client.force_authenticate(user=self.admin_user) + response = self.client.post(self.url, data=request_data, format="json") + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + + @data( + # Unauthenticated request + (None, status.HTTP_401_UNAUTHORIZED), + # Admin user with proper permissions (superuser) + ("admin_1", status.HTTP_200_OK), + # Regular user without required permissions (only LIBRARY_USER) + ("regular_1", status.HTTP_403_FORBIDDEN), + # Regular user with LIBRARY_ADMIN role (has MANAGE_LIBRARY_TEAM permission) + ("regular_5", status.HTTP_200_OK), + ) + @unpack + def test_post_authentication_and_permissions(self, username: str, expected_status: int): + """Test user validation with different authentication and permission scenarios. + + Expected result: + - Returns 401 UNAUTHORIZED for unauthenticated requests + - Returns 403 FORBIDDEN for authenticated users without permissions + - Returns 200 OK for users with proper permissions + """ + if username: + user = User.objects.get(username=username) + self.client.force_authenticate(user=user) + else: + self.client.force_authenticate(user=None) + request_data = {"users": ["admin_1", "regular_1"]} + response = self.client.post(self.url, data=request_data, format="json") + self.assertEqual(response.status_code, expected_status) + if expected_status == status.HTTP_200_OK: + self.assertIn("valid_users", response.data) + self.assertIn("invalid_users", response.data) + self.assertIn("summary", response.data) + self.assertIn("total", response.data["summary"]) + self.assertIn("valid_count", response.data["summary"]) + self.assertIn("invalid_count", response.data["summary"]) + + def test_post_serializer_deduplication(self): + """Test that serializer properly deduplicates users while preserving order. + + The serializer automatically removes duplicates using dict.fromkeys(). + + Expected result: + - Returns 200 OK status + - Duplicates are automatically removed by the serializer + - Order is preserved for first occurrence + """ + self.client.force_authenticate(user=self.admin_user) + request_data = {"users": ["admin_1", "admin_1", "nonexistent", "nonexistent", "regular_1"]} + response = self.client.post(self.url, data=request_data, format="json") + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data["valid_users"], ["admin_1", "regular_1"]) + self.assertEqual(response.data["invalid_users"], ["nonexistent"]) + self.assertEqual(response.data["summary"]["total"], 3) + self.assertEqual(response.data["summary"]["valid_count"], 2) + self.assertEqual(response.data["summary"]["invalid_count"], 1) + + def test_post_large_user_list(self): + """Test user validation with a large list of users. + + Expected result: + - Returns 200 OK status + - Correctly processes all users in the list + - Response structure is maintained + """ + self.client.force_authenticate(user=self.admin_user) + valid_users = ["admin_1", "admin_2", "regular_1", "regular_2"] + invalid_users = [f"nonexistent_{i}" for i in range(10)] + all_users = valid_users + invalid_users + request_data = {"users": all_users} + response = self.client.post(self.url, data=request_data, format="json") + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(set(response.data["valid_users"]), set(valid_users)) + self.assertEqual(set(response.data["invalid_users"]), set(invalid_users)) + self.assertEqual(response.data["summary"]["total"], len(all_users)) + self.assertEqual(response.data["summary"]["valid_count"], len(valid_users)) + self.assertEqual(response.data["summary"]["invalid_count"], len(invalid_users)) + + def test_post_response_serializer_structure(self): + """Test that response matches UserValidationAPIViewResponseSerializer structure. + + Expected result: + - Returns 200 OK status + - Response contains all required fields + - Field types match serializer definition + """ + self.client.force_authenticate(user=self.admin_user) + request_data = {"users": ["admin_1", "nonexistent"]} + response = self.client.post(self.url, data=request_data, format="json") + self.assertEqual(response.status_code, status.HTTP_200_OK) + required_fields = ["valid_users", "invalid_users", "summary"] + for field in required_fields: + self.assertIn(field, response.data) + summary_fields = ["total", "valid_count", "invalid_count"] + for field in summary_fields: + self.assertIn(field, response.data["summary"]) + self.assertIsInstance(response.data["summary"][field], int) + self.assertIsInstance(response.data["valid_users"], list) + self.assertIsInstance(response.data["invalid_users"], list) + + def test_post_inactive_user_validation(self): + """Test that inactive users are returned as invalid. + + Expected result: + - Inactive users appear in invalid_users list + - Summary counts reflect inactive users as invalid + - Active users appear in valid_users list + """ + User.objects.create(username="inactive_user", email="inactive@example.com", is_active=False) + self.client.force_authenticate(user=self.admin_user) + request_data = {"users": ["inactive_user", "inactive@example.com", "admin_1"]} + response = self.client.post(self.url, data=request_data, format="json") + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertIn("inactive_user", response.data["invalid_users"]) + self.assertIn("inactive@example.com", response.data["invalid_users"]) + self.assertIn("admin_1", response.data["valid_users"]) + self.assertEqual(response.data["summary"]["total"], 3) + self.assertEqual(response.data["summary"]["valid_count"], 1) + self.assertEqual(response.data["summary"]["invalid_count"], 2) + + def test_post_with_validate_users_exception(self): + """Test handling of unexpected exceptions from validate_users.""" + self.client.force_authenticate(user=self.admin_user) + with patch.object(api, "validate_users") as mock_validate_users: + mock_validate_users.side_effect = Exception("Database connection error") + request_data = {"users": ["admin_1"]} + response = self.client.post(self.url, data=request_data, format="json") + self.assertEqual(response.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR) + self.assertEqual(response.data["message"], "An error occurred while validating users") + + def test_post_global_permission_inheritance(self): + """Test that UserValidationAPIView properly inherits from AnyScopePermission class.""" + self.assertIn(AnyScopePermission, UserValidationAPIView.permission_classes) + + def test_post_multiple_roles_user_access(self): + """Test access for a user with multiple roles that include management permissions.""" + test_user = User.objects.create(username="multi_role_user", email="multi@example.com") + assign_role_to_user_in_scope( + user_external_key="multi_role_user", + role_external_key=roles.LIBRARY_ADMIN.external_key, + scope_external_key="lib:Org1:LIB1", + ) + assign_role_to_user_in_scope( + user_external_key="multi_role_user", + role_external_key=roles.LIBRARY_USER.external_key, + scope_external_key="lib:Org2:LIB2", + ) + self.client.force_authenticate(user=test_user) + request_data = {"users": ["admin_1", "regular_1"]} + response = self.client.post(self.url, data=request_data, format="json") + self.assertEqual(response.status_code, status.HTTP_200_OK) + + def test_post_empty_role_assignments_denied(self): + """Test that a user with no role assignments is properly denied access.""" + test_user = User.objects.create(username="no_roles_user", email="noroles@example.com") + self.client.force_authenticate(user=test_user) + request_data = {"users": ["admin_1", "regular_1"]} + response = self.client.post(self.url, data=request_data, format="json") + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)