-
Notifications
You must be signed in to change notification settings - Fork 74
Expand file tree
/
Copy pathlog_in.py
More file actions
92 lines (76 loc) · 3.12 KB
/
log_in.py
File metadata and controls
92 lines (76 loc) · 3.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import logging
from dataclasses import dataclass
from app.application.common.ports.user_command_gateway import UserCommandGateway
from app.application.common.services.current_user import CurrentUserService
from app.domain.entities.user import User
from app.domain.exceptions.user import UserNotFoundByUsernameError
from app.domain.services.user import UserService
from app.domain.value_objects.raw_password.raw_password import RawPassword
from app.domain.value_objects.username.username import Username
from app.infrastructure.auth.exceptions import (
AlreadyAuthenticatedError,
AuthenticationError,
)
from app.infrastructure.auth.handlers.constants import (
AUTH_ACCOUNT_INACTIVE,
AUTH_ALREADY_AUTHENTICATED,
)
from app.infrastructure.auth.session.constants import AUTH_INVALID_PASSWORD
from app.infrastructure.auth.session.service import AuthSessionService
log = logging.getLogger(__name__)
@dataclass(frozen=True, slots=True, kw_only=True)
class LogInRequest:
username: str
password: str
class LogInHandler:
"""
Open to everyone.
Authenticates registered user,
sets a JWT access token with a session ID in cookies,
and creates a session.
A logged-in user cannot log in again
until the session expires or is terminated.
Authentication renews automatically
when accessing protected routes before expiration.
If the JWT is invalid, expired, or the session is terminated,
the user loses authentication.
:raises AlreadyAuthenticatedError:
:raises AuthorizationError:
:raises DataMapperError:
:raises DomainFieldError:
:raises UserNotFoundByUsername:
"""
def __init__(
self,
current_user_service: CurrentUserService,
user_command_gateway: UserCommandGateway,
user_service: UserService,
auth_session_service: AuthSessionService,
):
self._current_user_service = current_user_service
self._user_command_gateway = user_command_gateway
self._user_service = user_service
self._auth_session_service = auth_session_service
async def __call__(self, request_data: LogInRequest) -> None:
log.info("Log in: started. Username: '%s'.", request_data.username)
try:
await self._current_user_service.get_current_user()
raise AlreadyAuthenticatedError(AUTH_ALREADY_AUTHENTICATED)
except AuthenticationError:
pass
username = Username(request_data.username)
password = RawPassword(request_data.password)
user: User | None = await self._user_command_gateway.read_by_username(username)
if user is None:
raise UserNotFoundByUsernameError(username)
if not self._user_service.is_password_valid(user, password):
raise AuthenticationError(AUTH_INVALID_PASSWORD)
if not user.is_active:
raise AuthenticationError(AUTH_ACCOUNT_INACTIVE)
await self._auth_session_service.create_session(user.id_)
log.info(
"Log in: done. User, ID: '%s', username '%s', role '%s'.",
user.id_.value,
user.username.value,
user.role.value,
)