Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Unreleased
Added
=====

* Add ``users/validate`` endpoint for bulk validation of user identifiers (usernames or emails).
Comment thread
mariajgrimaldi marked this conversation as resolved.
* Add org-wide support to migration commands for forward and backward migration of course authoring permissions.

1.5.0 - 2026-04-09
Expand Down Expand Up @@ -378,4 +379,4 @@ Added
Added
=====

* Basic repo structure and initial setup.
* Basic repo structure and initial setup.
30 changes: 30 additions & 0 deletions openedx_authz/api/users.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
(e.g., 'user^john_doe').
"""

from django.contrib.auth import get_user_model

from openedx_authz.api.data import (
ActionData,
PermissionData,
Expand All @@ -35,6 +37,7 @@
unassign_subject_from_all_roles,
)
from openedx_authz.api.utils import filter_user_assignments, get_user_assignment_map
from openedx_authz.utils import get_user_by_username_or_email

__all__ = [
"assign_role_to_user_in_scope",
Expand All @@ -51,6 +54,7 @@
"get_scopes_for_user_and_permission",
"get_users_for_role_in_scope",
"unassign_all_roles_from_user",
"validate_users",
]


Expand Down Expand Up @@ -339,3 +343,29 @@ def unassign_all_roles_from_user(user_external_key: str) -> bool:
bool: True if any roles were removed, False otherwise.
"""
return unassign_subject_from_all_roles(UserData(external_key=user_external_key))


def validate_users(user_identifiers: list[str]) -> tuple[list[str], list[str]]:
"""Validate a list of user identifiers.

Args:
user_identifiers (list[str]): List of usernames or emails to validate

Returns:
tuple: (valid_users, invalid_users) lists
"""
User = get_user_model()
valid_users = []
invalid_users = []

for user_identifier in user_identifiers:
try:
user = get_user_by_username_or_email(user_identifier)
if user.is_active:
valid_users.append(user_identifier)
else:
invalid_users.append(user_identifier)
except User.DoesNotExist:
invalid_users.append(user_identifier)

return valid_users, invalid_users
33 changes: 33 additions & 0 deletions openedx_authz/rest_api/v1/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,3 +259,36 @@ def get_email(self, obj: UserAssignments) -> str:
def get_assignation_count(self, obj: UserAssignments) -> int:
"""Get the assignation count for the given role assignment."""
return len(obj.assignments)
Comment thread
rodmgwgu marked this conversation as resolved.


class UserValidationAPIViewSerializer(serializers.Serializer): # pylint: disable=abstract-method
Comment thread
rodmgwgu marked this conversation as resolved.
"""Serializer for validating user existence."""

users = serializers.ListField(
child=serializers.CharField(max_length=255), allow_empty=False, help_text="List of user identifiers to validate"
)

def validate_users(self, value) -> list[str]:
"""Eliminate duplicates preserving order"""
return list(dict.fromkeys(value))


class UserValidationSummarySerializer(serializers.Serializer): # pylint: disable=abstract-method
"""Serializer for user validation summary statistics."""

total = serializers.IntegerField(help_text="Total number of users validated")
valid_count = serializers.IntegerField(help_text="Number of valid users found")
invalid_count = serializers.IntegerField(help_text="Number of invalid users")


class UserValidationAPIViewResponseSerializer(serializers.Serializer): # pylint: disable=abstract-method
"""Serializer for user validation response."""

valid_users = serializers.ListField(
child=serializers.CharField(max_length=255), help_text="List of user identifiers that were found to be valid"
)
invalid_users = serializers.ListField(
child=serializers.CharField(max_length=255),
help_text="List of user identifiers that were not found or are invalid",
)
summary = UserValidationSummarySerializer(help_text="Summary statistics for the validation operation")
1 change: 1 addition & 0 deletions openedx_authz/rest_api/v1/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@
path("roles/users/", views.RoleUserAPIView.as_view(), name="role-user-list"),
path("orgs/", views.AdminConsoleOrgsAPIView.as_view(), name="orgs-list"),
path("users/", views.TeamMembersAPIView.as_view(), name="user-list"),
path("users/validate/", views.UserValidationAPIView.as_view(), name="user-validation"),
]
85 changes: 85 additions & 0 deletions openedx_authz/rest_api/v1/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
RemoveUsersFromRoleWithScopeSerializer,
TeamMemberSerializer,
UserRoleAssignmentSerializer,
UserValidationAPIViewResponseSerializer,
UserValidationAPIViewSerializer,
)
from openedx_authz.utils import get_user_by_username_or_email

Expand Down Expand Up @@ -590,3 +592,86 @@ def get(self, request: HttpRequest) -> Response:
paginator = self.pagination_class()
paginated_response_data = paginator.paginate_queryset(team_members, request)
return paginator.get_paginated_response(paginated_response_data)


@view_auth_classes()
class UserValidationAPIView(APIView):
"""API view for validating that provided user identifiers correspond to existing users.

This view allows clients to verify that a list of user identifiers (usernames or emails)
correspond to valid users in the system. It is designed to support bulk validation of multiple
user identifiers in a single request, providing a convenient way to check the validity of users before
performing operations such as role assignments.

**Endpoints**
- POST: Validate that the provided list of usernames or emails correspond to existing users

**Request Format (POST)**
- users: List of user identifiers (username or email)

**Response Format (POST)**

Returns HTTP 200 OK with::

{
"valid_users": ["john_doe", "[email protected]"],
"invalid_users": ["nonexistent_user"],
"summary": {
"total": 3,
"valid_count": 2,
"invalid_count": 1
}
}

**Authentication and Permissions**

- Requires authenticated user.
- Requires ``manage_library_team`` or ``manage_course_team`` permission in any scope.

**Example Request**

POST /api/authz/v1/users/validate/ ::

{
"users": ["john_doe", "[email protected]", "nonexistent_user"]
}
"""

permission_classes = [AnyScopePermission]

@apidocs.schema(
body=UserValidationAPIViewSerializer,
responses={
status.HTTP_200_OK: UserValidationAPIViewResponseSerializer,
status.HTTP_400_BAD_REQUEST: "The request data is invalid",
status.HTTP_401_UNAUTHORIZED: "The user is not authenticated",
status.HTTP_403_FORBIDDEN: "The user does not have the required permissions",
Comment thread
jacobo-dominguez-wgu marked this conversation as resolved.
status.HTTP_500_INTERNAL_SERVER_ERROR: "An unexpected error occurred while validating users",
},
)
@authz_permissions([permissions.MANAGE_LIBRARY_TEAM.identifier, permissions.COURSES_MANAGE_COURSE_TEAM.identifier])
def post(self, request: HttpRequest) -> Response:
"""Validates the provided usernames or emails correspond to existing users."""
request_serializer = UserValidationAPIViewSerializer(data=request.data)
request_serializer.is_valid(raise_exception=True)
serialized_request_users = request_serializer.validated_data["users"]
try:
valid_users, invalid_users = api.validate_users(serialized_request_users)
except Exception as e: # pylint: disable=broad-exception-caught
logger.error(f"Error validating users: {e}")
return Response(
data={"message": "An error occurred while validating users"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)

response_data = {
"valid_users": valid_users,
"invalid_users": invalid_users,
"summary": {
"total": len(serialized_request_users),
"valid_count": len(valid_users),
"invalid_count": len(invalid_users),
},
}
Comment thread
rodmgwgu marked this conversation as resolved.
response_serializer = UserValidationAPIViewResponseSerializer(response_data)
return Response(response_serializer.data, status=status.HTTP_200_OK)
56 changes: 56 additions & 0 deletions openedx_authz/tests/api/test_users.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
"""Test suite for user-role assignment API functions."""

from unittest.mock import patch

from ddt import data, ddt, unpack
from django.contrib.auth import get_user_model

from openedx_authz.api.data import ContentLibraryData, RoleAssignmentData, RoleData, UserData
from openedx_authz.api.users import (
Expand All @@ -14,6 +17,7 @@
is_user_allowed,
unassign_all_roles_from_user,
unassign_role_from_user,
validate_users,
)
from openedx_authz.constants import permissions, roles
from openedx_authz.constants.roles import LIBRARY_ADMIN_PERMISSIONS, LIBRARY_AUTHOR_PERMISSIONS
Expand Down Expand Up @@ -514,3 +518,55 @@ def test_is_user_allowed(self, username, action, scope_name, expected_result):
scope_external_key=scope_name,
)
self.assertEqual(result, expected_result)


@ddt
class TestValidateUsersAPI(UserAssignmentsSetupMixin):
"""Test suite for validate_users API function - focused on business logic."""

def test_validate_users_empty_list(self):
"""Test validate_users with empty input list."""
valid_users, invalid_users = validate_users([])

self.assertEqual(valid_users, [])
self.assertEqual(invalid_users, [])

def test_validate_users_inactive_user_edge_case(self):
"""Test that inactive users are correctly identified as invalid."""
User = get_user_model()

# Create an inactive user for this test
inactive_user = User.objects.create_user(
username="inactive_api_test", email="[email protected]", is_active=False
)

valid_users, invalid_users = validate_users([inactive_user.username])

# Cleanup
inactive_user.delete()

self.assertEqual(valid_users, [])
self.assertEqual(invalid_users, [inactive_user.username])

@patch("openedx_authz.api.users.get_user_by_username_or_email")
def test_validate_users_unexpected_exception_propagation(self, mock_get_user):
"""Test that unexpected exceptions from get_user_by_username_or_email are re-raised."""
# Simulate an unexpected database error
mock_get_user.side_effect = Exception("Database connection lost")

with self.assertRaises(Exception) as cm:
validate_users(["any_user"])

self.assertEqual(str(cm.exception), "Database connection lost")
mock_get_user.assert_called_once_with("any_user")

@patch("openedx_authz.api.users.get_user_by_username_or_email")
def test_validate_users_user_does_not_exist_handling(self, mock_get_user):
"""Test handling of User.DoesNotExist exception."""
User = get_user_model()
mock_get_user.side_effect = User.DoesNotExist("User not found")

valid_users, invalid_users = validate_users(["nonexistent_user"])

self.assertEqual(valid_users, [])
self.assertEqual(invalid_users, ["nonexistent_user"])
Loading