diff --git a/src/app/application/commands/create_user.py b/src/app/application/commands/create_user.py index 6e8f81e..95e38be 100644 --- a/src/app/application/commands/create_user.py +++ b/src/app/application/commands/create_user.py @@ -3,6 +3,7 @@ from typing import TypedDict from uuid import UUID +from app.application.common.ports.flusher import Flusher from app.application.common.ports.transaction_manager import ( TransactionManager, ) @@ -52,13 +53,15 @@ class CreateUserInteractor: def __init__( self, current_user_service: CurrentUserService, - user_command_gateway: UserCommandGateway, user_service: UserService, + user_command_gateway: UserCommandGateway, + flusher: Flusher, transaction_manager: TransactionManager, ): self._current_user_service = current_user_service - self._user_command_gateway = user_command_gateway self._user_service = user_service + self._user_command_gateway = user_command_gateway + self._flusher = flusher self._transaction_manager = transaction_manager async def __call__(self, request_data: CreateUserRequest) -> CreateUserResponse: @@ -84,7 +87,7 @@ async def __call__(self, request_data: CreateUserRequest) -> CreateUserResponse: self._user_command_gateway.add(user) try: - await self._transaction_manager.flush() + await self._flusher.flush() except UsernameAlreadyExistsError: raise diff --git a/src/app/application/common/ports/flusher.py b/src/app/application/common/ports/flusher.py new file mode 100644 index 0000000..22e6673 --- /dev/null +++ b/src/app/application/common/ports/flusher.py @@ -0,0 +1,17 @@ +from abc import abstractmethod +from typing import Protocol + + +class Flusher(Protocol): + """ + Interface for flushing intermediate changes during a business transaction. + """ + + @abstractmethod + async def flush(self) -> None: + """ + Flush pending changes to validate constraints or trigger side effects. + + :raises DataMapperError: + :raises UsernameAlreadyExists: + """ diff --git a/src/app/application/common/ports/transaction_manager.py b/src/app/application/common/ports/transaction_manager.py index b5836de..445e605 100644 --- a/src/app/application/common/ports/transaction_manager.py +++ b/src/app/application/common/ports/transaction_manager.py @@ -4,25 +4,15 @@ class TransactionManager(Protocol): """ - UOW-compatible interface for flushing and - committing changes to the data source. - The actual implementation of UOW can be bundled with an ORM, - like SQLAlchemy's session. + UoW-compatible interface for committing a business transaction. + May be extended with rollback support. + The implementation may be an ORM session, such as SQLAlchemy's. """ - @abstractmethod - async def flush(self) -> None: - """ - Mostly to check data source constraints. - - :raises DataMapperError: - :raises UsernameAlreadyExists: - """ - @abstractmethod async def commit(self) -> None: """ - Persist changes to the data source. + Commit the successful outcome of a business transaction. :raises DataMapperError: """ diff --git a/src/app/infrastructure/adapters/main_flusher_sqla.py b/src/app/infrastructure/adapters/main_flusher_sqla.py new file mode 100644 index 0000000..b0798d4 --- /dev/null +++ b/src/app/infrastructure/adapters/main_flusher_sqla.py @@ -0,0 +1,43 @@ +import logging +from collections.abc import Mapping +from typing import Any, cast + +from sqlalchemy.exc import IntegrityError, SQLAlchemyError + +from app.application.common.ports.flusher import Flusher +from app.domain.exceptions.user import UsernameAlreadyExistsError +from app.infrastructure.adapters.constants import ( + DB_CONSTRAINT_VIOLATION, + DB_FLUSH_DONE, + DB_FLUSH_FAILED, + DB_QUERY_FAILED, +) +from app.infrastructure.adapters.types import MainAsyncSession +from app.infrastructure.exceptions.gateway import DataMapperError + +log = logging.getLogger(__name__) + + +class SqlaMainFlusher(Flusher): + def __init__(self, session: MainAsyncSession): + self._session = session + + async def flush(self) -> None: + """ + :raises DataMapperError: + :raises UsernameAlreadyExists: + """ + try: + await self._session.flush() + log.debug("%s Main session.", DB_FLUSH_DONE) + + except IntegrityError as error: + if "uq_users_username" in str(error): + params: Mapping[str, Any] = cast(Mapping[str, Any], error.params) + username = str(params.get("username", "unknown")) + raise UsernameAlreadyExistsError(username) from error + + raise DataMapperError(DB_CONSTRAINT_VIOLATION) from error + + except SQLAlchemyError as error: + raise DataMapperError(f"{DB_QUERY_FAILED} {DB_FLUSH_FAILED}") from error diff --git a/src/app/infrastructure/adapters/main_transaction_manager_sqla.py b/src/app/infrastructure/adapters/main_transaction_manager_sqla.py index fddec11..ce34b13 100644 --- a/src/app/infrastructure/adapters/main_transaction_manager_sqla.py +++ b/src/app/infrastructure/adapters/main_transaction_manager_sqla.py @@ -1,19 +1,13 @@ import logging -from collections.abc import Mapping -from typing import Any, cast -from sqlalchemy.exc import IntegrityError, SQLAlchemyError +from sqlalchemy.exc import SQLAlchemyError from app.application.common.ports.transaction_manager import ( TransactionManager, ) -from app.domain.exceptions.user import UsernameAlreadyExistsError from app.infrastructure.adapters.constants import ( DB_COMMIT_DONE, DB_COMMIT_FAILED, - DB_CONSTRAINT_VIOLATION, - DB_FLUSH_DONE, - DB_FLUSH_FAILED, DB_QUERY_FAILED, ) from app.infrastructure.adapters.types import MainAsyncSession @@ -26,26 +20,6 @@ class SqlaMainTransactionManager(TransactionManager): def __init__(self, session: MainAsyncSession): self._session = session - async def flush(self) -> None: - """ - :raises DataMapperError: - :raises UsernameAlreadyExists: - """ - try: - await self._session.flush() - log.debug("%s Main session.", DB_FLUSH_DONE) - - except IntegrityError as error: - if "uq_users_username" in str(error): - params: Mapping[str, Any] = cast(Mapping[str, Any], error.params) - username = str(params.get("username", "unknown")) - raise UsernameAlreadyExistsError(username) from error - - raise DataMapperError(DB_CONSTRAINT_VIOLATION) from error - - except SQLAlchemyError as error: - raise DataMapperError(f"{DB_QUERY_FAILED} {DB_FLUSH_FAILED}") from error - async def commit(self) -> None: """ :raises DataMapperError: diff --git a/src/app/infrastructure/auth/handlers/log_in.py b/src/app/infrastructure/auth/handlers/log_in.py index 4d6e4d1..5b15d55 100644 --- a/src/app/infrastructure/auth/handlers/log_in.py +++ b/src/app/infrastructure/auth/handlers/log_in.py @@ -50,13 +50,13 @@ class LogInHandler: def __init__( self, - user_command_gateway: UserCommandGateway, current_user_service: CurrentUserService, + user_command_gateway: UserCommandGateway, user_service: UserService, auth_session_service: AuthSessionService, ): - self._user_command_gateway = user_command_gateway self._current_user_service = current_user_service + self._user_command_gateway = user_command_gateway self._user_service = user_service self._auth_session_service = auth_session_service diff --git a/src/app/infrastructure/auth/handlers/sign_up.py b/src/app/infrastructure/auth/handlers/sign_up.py index 82b2aaf..8a5ed30 100644 --- a/src/app/infrastructure/auth/handlers/sign_up.py +++ b/src/app/infrastructure/auth/handlers/sign_up.py @@ -3,6 +3,7 @@ from typing import TypedDict from uuid import UUID +from app.application.common.ports.flusher import Flusher from app.application.common.ports.transaction_manager import TransactionManager from app.application.common.ports.user_command_gateway import UserCommandGateway from app.application.common.services.current_user import CurrentUserService @@ -48,15 +49,17 @@ class SignUpHandler: def __init__( self, - user_command_gateway: UserCommandGateway, - transaction_manager: TransactionManager, current_user_service: CurrentUserService, user_service: UserService, + user_command_gateway: UserCommandGateway, + flusher: Flusher, + transaction_manager: TransactionManager, ): - self._user_command_gateway = user_command_gateway - self._transaction_manager = transaction_manager self._current_user_service = current_user_service self._user_service = user_service + self._user_command_gateway = user_command_gateway + self._flusher = flusher + self._transaction_manager = transaction_manager async def __call__(self, request_data: SignUpRequest) -> SignUpResponse: log.info("Sign up: started. Username: '%s'.", request_data.username) @@ -75,7 +78,7 @@ async def __call__(self, request_data: SignUpRequest) -> SignUpResponse: self._user_command_gateway.add(user) try: - await self._transaction_manager.flush() + await self._flusher.flush() except UsernameAlreadyExistsError: raise diff --git a/src/app/infrastructure/auth/session/ports/transaction_manager.py b/src/app/infrastructure/auth/session/ports/transaction_manager.py index 654c51e..9d25c68 100644 --- a/src/app/infrastructure/auth/session/ports/transaction_manager.py +++ b/src/app/infrastructure/auth/session/ports/transaction_manager.py @@ -7,16 +7,15 @@ class AuthSessionTransactionManager(Protocol): Defined to allow easier mocking and swapping of implementations in the same layer. - UOW-compatible interface for flushing and - committing changes to the data source. - The actual implementation of UOW can be bundled with an ORM, - like SQLAlchemy's session. + UoW-compatible interface for committing a business transaction. + May be extended with rollback support. + The implementation may be an ORM session, such as SQLAlchemy's. """ @abstractmethod async def commit(self) -> None: """ - Persist changes to the data source. + Commit the successful outcome of a business transaction. :raises DataMapperError: """ diff --git a/src/app/setup/ioc/application.py b/src/app/setup/ioc/application.py index d933e81..01a866e 100644 --- a/src/app/setup/ioc/application.py +++ b/src/app/setup/ioc/application.py @@ -7,6 +7,7 @@ from app.application.commands.reactivate_user import ReactivateUserInteractor from app.application.commands.revoke_admin import RevokeAdminInteractor from app.application.common.ports.access_revoker import AccessRevoker +from app.application.common.ports.flusher import Flusher from app.application.common.ports.identity_provider import IdentityProvider from app.application.common.ports.transaction_manager import ( TransactionManager, @@ -15,6 +16,7 @@ from app.application.common.ports.user_query_gateway import UserQueryGateway from app.application.common.services.current_user import CurrentUserService from app.application.queries.list_users import ListUsersQueryService +from app.infrastructure.adapters.main_flusher_sqla import SqlaMainFlusher from app.infrastructure.adapters.main_transaction_manager_sqla import ( SqlaMainTransactionManager, ) @@ -53,6 +55,10 @@ class ApplicationProvider(Provider): source=SqlaMainTransactionManager, provides=TransactionManager, ) + flusher = provide( + source=SqlaMainFlusher, + provides=Flusher, + ) user_command_gateway = provide( source=SqlaUserDataMapper, provides=UserCommandGateway,