Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@ Change Log
Unreleased
**********

1.6.0 - 2026-04-10
******************

Added
=====

* Add ``users/validate`` endpoint for bulk validation of user identifiers (usernames or emails).
Comment thread
mariajgrimaldi marked this conversation as resolved.

1.5.0 - 2026-04-09
******************

Expand Down Expand Up @@ -370,4 +378,4 @@ Added
Added
=====

* Basic repo structure and initial setup.
* Basic repo structure and initial setup.
2 changes: 1 addition & 1 deletion openedx_authz/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@

import os

__version__ = "1.5.0"
__version__ = "1.6.0"

ROOT_DIRECTORY = os.path.dirname(os.path.abspath(__file__))
30 changes: 30 additions & 0 deletions openedx_authz/api/users.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
(e.g., 'user^john_doe').
"""

from django.contrib.auth import get_user_model

from openedx_authz.api.data import (
ActionData,
PermissionData,
Expand All @@ -35,6 +37,7 @@
unassign_subject_from_all_roles,
)
from openedx_authz.api.utils import filter_user_assignments, get_user_assignment_map
from openedx_authz.utils import get_user_by_username_or_email

__all__ = [
"assign_role_to_user_in_scope",
Expand All @@ -51,6 +54,7 @@
"get_scopes_for_user_and_permission",
"get_users_for_role_in_scope",
"unassign_all_roles_from_user",
"validate_users",
]


Expand Down Expand Up @@ -339,3 +343,29 @@ def unassign_all_roles_from_user(user_external_key: str) -> bool:
bool: True if any roles were removed, False otherwise.
"""
return unassign_subject_from_all_roles(UserData(external_key=user_external_key))


def validate_users(user_identifiers: list[str]) -> tuple[list[str], list[str]]:
"""Validate a list of user identifiers.

Args:
user_identifiers (list[str]): List of usernames or emails to validate

Returns:
tuple: (valid_users, invalid_users) lists
"""
User = get_user_model()
valid_users = []
invalid_users = []

for user_identifier in user_identifiers:
try:
user = get_user_by_username_or_email(user_identifier)
if user.is_active:
valid_users.append(user_identifier)
else:
invalid_users.append(user_identifier)
except User.DoesNotExist:
invalid_users.append(user_identifier)

return valid_users, invalid_users
33 changes: 33 additions & 0 deletions openedx_authz/rest_api/v1/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,3 +259,36 @@ def get_email(self, obj: UserAssignments) -> str:
def get_assignation_count(self, obj: UserAssignments) -> int:
"""Get the assignation count for the given role assignment."""
return len(obj.assignments)
Comment thread
rodmgwgu marked this conversation as resolved.


class UserValidationAPIViewSerializer(serializers.Serializer): # pylint: disable=abstract-method
Comment thread
rodmgwgu marked this conversation as resolved.
"""Serializer for validating user existence."""

users = serializers.ListField(
child=serializers.CharField(max_length=255), allow_empty=False, help_text="List of user identifiers to validate"
)

def validate_users(self, value) -> list[str]:
"""Eliminate duplicates preserving order"""
return list(dict.fromkeys(value))


class UserValidationSummarySerializer(serializers.Serializer): # pylint: disable=abstract-method
"""Serializer for user validation summary statistics."""

total = serializers.IntegerField(help_text="Total number of users validated")
valid_count = serializers.IntegerField(help_text="Number of valid users found")
invalid_count = serializers.IntegerField(help_text="Number of invalid users")


class UserValidationAPIViewResponseSerializer(serializers.Serializer): # pylint: disable=abstract-method
"""Serializer for user validation response."""

valid_users = serializers.ListField(
child=serializers.CharField(max_length=255), help_text="List of user identifiers that were found to be valid"
)
invalid_users = serializers.ListField(
child=serializers.CharField(max_length=255),
help_text="List of user identifiers that were not found or are invalid",
)
summary = UserValidationSummarySerializer(help_text="Summary statistics for the validation operation")
1 change: 1 addition & 0 deletions openedx_authz/rest_api/v1/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@
path("roles/users/", views.RoleUserAPIView.as_view(), name="role-user-list"),
path("orgs/", views.AdminConsoleOrgsAPIView.as_view(), name="orgs-list"),
path("users/", views.TeamMembersAPIView.as_view(), name="user-list"),
path("users/validate/", views.UserValidationAPIView.as_view(), name="user-validation"),
]
85 changes: 85 additions & 0 deletions openedx_authz/rest_api/v1/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
RemoveUsersFromRoleWithScopeSerializer,
TeamMemberSerializer,
UserRoleAssignmentSerializer,
UserValidationAPIViewResponseSerializer,
UserValidationAPIViewSerializer,
)
from openedx_authz.utils import get_user_by_username_or_email

Expand Down Expand Up @@ -586,3 +588,86 @@ def get(self, request: HttpRequest) -> Response:
paginator = self.pagination_class()
paginated_response_data = paginator.paginate_queryset(team_members, request)
return paginator.get_paginated_response(paginated_response_data)


@view_auth_classes()
class UserValidationAPIView(APIView):
"""API view for validating that provided user identifiers correspond to existing users.

This view allows clients to verify that a list of user identifiers (usernames or emails)
correspond to valid users in the system. It is designed to support bulk validation of multiple
user identifiers in a single request, providing a convenient way to check the validity of users before
performing operations such as role assignments.

**Endpoints**
- POST: Validate that the provided list of usernames or emails correspond to existing users

**Request Format (POST)**
- users: List of user identifiers (username or email)

**Response Format (POST)**

Returns HTTP 200 OK with::

{
"valid_users": ["john_doe", "[email protected]"],
"invalid_users": ["nonexistent_user"],
"summary": {
"total": 3,
"valid_count": 2,
"invalid_count": 1
}
}

**Authentication and Permissions**

- Requires authenticated user.
- Requires ``manage_library_team`` or ``manage_course_team`` permission in any scope.

**Example Request**

POST /api/authz/v1/users/validate/ ::

{
"users": ["john_doe", "[email protected]", "nonexistent_user"]
}
"""

permission_classes = [AnyScopePermission]

@apidocs.schema(
body=UserValidationAPIViewSerializer,
responses={
status.HTTP_200_OK: UserValidationAPIViewResponseSerializer,
status.HTTP_400_BAD_REQUEST: "The request data is invalid",
status.HTTP_401_UNAUTHORIZED: "The user is not authenticated",
status.HTTP_403_FORBIDDEN: "The user does not have the required permissions",
Comment thread
jacobo-dominguez-wgu marked this conversation as resolved.
status.HTTP_500_INTERNAL_SERVER_ERROR: "An unexpected error occurred while validating users",
},
)
@authz_permissions([permissions.MANAGE_LIBRARY_TEAM.identifier, permissions.COURSES_MANAGE_COURSE_TEAM.identifier])
def post(self, request: HttpRequest) -> Response:
"""Validates the provided usernames or emails correspond to existing users."""
request_serializer = UserValidationAPIViewSerializer(data=request.data)
request_serializer.is_valid(raise_exception=True)
serialized_request_users = request_serializer.validated_data["users"]
try:
valid_users, invalid_users = api.validate_users(serialized_request_users)
except Exception as e: # pylint: disable=broad-exception-caught
logger.error(f"Error validating users: {e}")
return Response(
data={"message": "An error occurred while validating users"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)

response_data = {
"valid_users": valid_users,
"invalid_users": invalid_users,
"summary": {
"total": len(serialized_request_users),
"valid_count": len(valid_users),
"invalid_count": len(invalid_users),
},
}
Comment thread
rodmgwgu marked this conversation as resolved.
response_serializer = UserValidationAPIViewResponseSerializer(response_data)
return Response(response_serializer.data, status=status.HTTP_200_OK)
56 changes: 56 additions & 0 deletions openedx_authz/tests/api/test_users.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
"""Test suite for user-role assignment API functions."""

from unittest.mock import patch

from ddt import data, ddt, unpack
from django.contrib.auth import get_user_model

from openedx_authz.api.data import ContentLibraryData, RoleAssignmentData, RoleData, UserData
from openedx_authz.api.users import (
Expand All @@ -14,6 +17,7 @@
is_user_allowed,
unassign_all_roles_from_user,
unassign_role_from_user,
validate_users,
)
from openedx_authz.constants import permissions, roles
from openedx_authz.constants.roles import LIBRARY_ADMIN_PERMISSIONS, LIBRARY_AUTHOR_PERMISSIONS
Expand Down Expand Up @@ -514,3 +518,55 @@ def test_is_user_allowed(self, username, action, scope_name, expected_result):
scope_external_key=scope_name,
)
self.assertEqual(result, expected_result)


@ddt
class TestValidateUsersAPI(UserAssignmentsSetupMixin):
"""Test suite for validate_users API function - focused on business logic."""

def test_validate_users_empty_list(self):
"""Test validate_users with empty input list."""
valid_users, invalid_users = validate_users([])

self.assertEqual(valid_users, [])
self.assertEqual(invalid_users, [])

def test_validate_users_inactive_user_edge_case(self):
"""Test that inactive users are correctly identified as invalid."""
User = get_user_model()

# Create an inactive user for this test
inactive_user = User.objects.create_user(
username="inactive_api_test", email="[email protected]", is_active=False
)

valid_users, invalid_users = validate_users([inactive_user.username])

# Cleanup
inactive_user.delete()

self.assertEqual(valid_users, [])
self.assertEqual(invalid_users, [inactive_user.username])

@patch("openedx_authz.api.users.get_user_by_username_or_email")
def test_validate_users_unexpected_exception_propagation(self, mock_get_user):
"""Test that unexpected exceptions from get_user_by_username_or_email are re-raised."""
# Simulate an unexpected database error
mock_get_user.side_effect = Exception("Database connection lost")

with self.assertRaises(Exception) as cm:
validate_users(["any_user"])

self.assertEqual(str(cm.exception), "Database connection lost")
mock_get_user.assert_called_once_with("any_user")

@patch("openedx_authz.api.users.get_user_by_username_or_email")
def test_validate_users_user_does_not_exist_handling(self, mock_get_user):
"""Test handling of User.DoesNotExist exception."""
User = get_user_model()
mock_get_user.side_effect = User.DoesNotExist("User not found")

valid_users, invalid_users = validate_users(["nonexistent_user"])

self.assertEqual(valid_users, [])
self.assertEqual(invalid_users, ["nonexistent_user"])
Loading