Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions src/app/application/commands/create_user.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import TypedDict
from uuid import UUID

from app.application.common.ports.flusher import Flusher
from app.application.common.ports.transaction_manager import (
TransactionManager,
)
Expand Down Expand Up @@ -52,13 +53,15 @@ class CreateUserInteractor:
def __init__(
self,
current_user_service: CurrentUserService,
user_command_gateway: UserCommandGateway,
user_service: UserService,
user_command_gateway: UserCommandGateway,
flusher: Flusher,
transaction_manager: TransactionManager,
):
self._current_user_service = current_user_service
self._user_command_gateway = user_command_gateway
self._user_service = user_service
self._user_command_gateway = user_command_gateway
self._flusher = flusher
self._transaction_manager = transaction_manager

async def __call__(self, request_data: CreateUserRequest) -> CreateUserResponse:
Expand All @@ -84,7 +87,7 @@ async def __call__(self, request_data: CreateUserRequest) -> CreateUserResponse:
self._user_command_gateway.add(user)

try:
await self._transaction_manager.flush()
await self._flusher.flush()
except UsernameAlreadyExistsError:
raise

Expand Down
17 changes: 17 additions & 0 deletions src/app/application/common/ports/flusher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from abc import abstractmethod
from typing import Protocol


class Flusher(Protocol):
"""
Interface for flushing intermediate changes during a business transaction.
"""

@abstractmethod
async def flush(self) -> None:
"""
Flush pending changes to validate constraints or trigger side effects.

:raises DataMapperError:
:raises UsernameAlreadyExists:
"""
18 changes: 4 additions & 14 deletions src/app/application/common/ports/transaction_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,15 @@

class TransactionManager(Protocol):
"""
UOW-compatible interface for flushing and
committing changes to the data source.
The actual implementation of UOW can be bundled with an ORM,
like SQLAlchemy's session.
UoW-compatible interface for committing a business transaction.
May be extended with rollback support.
The implementation may be an ORM session, such as SQLAlchemy's.
"""

@abstractmethod
async def flush(self) -> None:
"""
Mostly to check data source constraints.

:raises DataMapperError:
:raises UsernameAlreadyExists:
"""

@abstractmethod
async def commit(self) -> None:
"""
Persist changes to the data source.
Commit the successful outcome of a business transaction.

:raises DataMapperError:
"""
43 changes: 43 additions & 0 deletions src/app/infrastructure/adapters/main_flusher_sqla.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import logging
from collections.abc import Mapping
from typing import Any, cast

from sqlalchemy.exc import IntegrityError, SQLAlchemyError

from app.application.common.ports.flusher import Flusher
from app.domain.exceptions.user import UsernameAlreadyExistsError
from app.infrastructure.adapters.constants import (
DB_CONSTRAINT_VIOLATION,
DB_FLUSH_DONE,
DB_FLUSH_FAILED,
DB_QUERY_FAILED,
)
from app.infrastructure.adapters.types import MainAsyncSession
from app.infrastructure.exceptions.gateway import DataMapperError

log = logging.getLogger(__name__)


class SqlaMainFlusher(Flusher):
def __init__(self, session: MainAsyncSession):
self._session = session

async def flush(self) -> None:
"""
:raises DataMapperError:
:raises UsernameAlreadyExists:
"""
try:
await self._session.flush()
log.debug("%s Main session.", DB_FLUSH_DONE)

except IntegrityError as error:
if "uq_users_username" in str(error):
params: Mapping[str, Any] = cast(Mapping[str, Any], error.params)
username = str(params.get("username", "unknown"))
raise UsernameAlreadyExistsError(username) from error

raise DataMapperError(DB_CONSTRAINT_VIOLATION) from error

except SQLAlchemyError as error:
raise DataMapperError(f"{DB_QUERY_FAILED} {DB_FLUSH_FAILED}") from error
28 changes: 1 addition & 27 deletions src/app/infrastructure/adapters/main_transaction_manager_sqla.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,13 @@
import logging
from collections.abc import Mapping
from typing import Any, cast

from sqlalchemy.exc import IntegrityError, SQLAlchemyError
from sqlalchemy.exc import SQLAlchemyError

from app.application.common.ports.transaction_manager import (
TransactionManager,
)
from app.domain.exceptions.user import UsernameAlreadyExistsError
from app.infrastructure.adapters.constants import (
DB_COMMIT_DONE,
DB_COMMIT_FAILED,
DB_CONSTRAINT_VIOLATION,
DB_FLUSH_DONE,
DB_FLUSH_FAILED,
DB_QUERY_FAILED,
)
from app.infrastructure.adapters.types import MainAsyncSession
Expand All @@ -26,26 +20,6 @@ class SqlaMainTransactionManager(TransactionManager):
def __init__(self, session: MainAsyncSession):
self._session = session

async def flush(self) -> None:
"""
:raises DataMapperError:
:raises UsernameAlreadyExists:
"""
try:
await self._session.flush()
log.debug("%s Main session.", DB_FLUSH_DONE)

except IntegrityError as error:
if "uq_users_username" in str(error):
params: Mapping[str, Any] = cast(Mapping[str, Any], error.params)
username = str(params.get("username", "unknown"))
raise UsernameAlreadyExistsError(username) from error

raise DataMapperError(DB_CONSTRAINT_VIOLATION) from error

except SQLAlchemyError as error:
raise DataMapperError(f"{DB_QUERY_FAILED} {DB_FLUSH_FAILED}") from error

async def commit(self) -> None:
"""
:raises DataMapperError:
Expand Down
4 changes: 2 additions & 2 deletions src/app/infrastructure/auth/handlers/log_in.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,13 @@ class LogInHandler:

def __init__(
self,
user_command_gateway: UserCommandGateway,
current_user_service: CurrentUserService,
user_command_gateway: UserCommandGateway,
user_service: UserService,
auth_session_service: AuthSessionService,
):
self._user_command_gateway = user_command_gateway
self._current_user_service = current_user_service
self._user_command_gateway = user_command_gateway
self._user_service = user_service
self._auth_session_service = auth_session_service

Expand Down
13 changes: 8 additions & 5 deletions src/app/infrastructure/auth/handlers/sign_up.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import TypedDict
from uuid import UUID

from app.application.common.ports.flusher import Flusher
from app.application.common.ports.transaction_manager import TransactionManager
from app.application.common.ports.user_command_gateway import UserCommandGateway
from app.application.common.services.current_user import CurrentUserService
Expand Down Expand Up @@ -48,15 +49,17 @@ class SignUpHandler:

def __init__(
self,
user_command_gateway: UserCommandGateway,
transaction_manager: TransactionManager,
current_user_service: CurrentUserService,
user_service: UserService,
user_command_gateway: UserCommandGateway,
flusher: Flusher,
transaction_manager: TransactionManager,
):
self._user_command_gateway = user_command_gateway
self._transaction_manager = transaction_manager
self._current_user_service = current_user_service
self._user_service = user_service
self._user_command_gateway = user_command_gateway
self._flusher = flusher
self._transaction_manager = transaction_manager

async def __call__(self, request_data: SignUpRequest) -> SignUpResponse:
log.info("Sign up: started. Username: '%s'.", request_data.username)
Expand All @@ -75,7 +78,7 @@ async def __call__(self, request_data: SignUpRequest) -> SignUpResponse:
self._user_command_gateway.add(user)

try:
await self._transaction_manager.flush()
await self._flusher.flush()
except UsernameAlreadyExistsError:
raise

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,15 @@ class AuthSessionTransactionManager(Protocol):
Defined to allow easier mocking and swapping
of implementations in the same layer.

UOW-compatible interface for flushing and
committing changes to the data source.
The actual implementation of UOW can be bundled with an ORM,
like SQLAlchemy's session.
UoW-compatible interface for committing a business transaction.
May be extended with rollback support.
The implementation may be an ORM session, such as SQLAlchemy's.
"""

@abstractmethod
async def commit(self) -> None:
"""
Persist changes to the data source.
Commit the successful outcome of a business transaction.

:raises DataMapperError:
"""
6 changes: 6 additions & 0 deletions src/app/setup/ioc/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from app.application.commands.reactivate_user import ReactivateUserInteractor
from app.application.commands.revoke_admin import RevokeAdminInteractor
from app.application.common.ports.access_revoker import AccessRevoker
from app.application.common.ports.flusher import Flusher
from app.application.common.ports.identity_provider import IdentityProvider
from app.application.common.ports.transaction_manager import (
TransactionManager,
Expand All @@ -15,6 +16,7 @@
from app.application.common.ports.user_query_gateway import UserQueryGateway
from app.application.common.services.current_user import CurrentUserService
from app.application.queries.list_users import ListUsersQueryService
from app.infrastructure.adapters.main_flusher_sqla import SqlaMainFlusher
from app.infrastructure.adapters.main_transaction_manager_sqla import (
SqlaMainTransactionManager,
)
Expand Down Expand Up @@ -53,6 +55,10 @@ class ApplicationProvider(Provider):
source=SqlaMainTransactionManager,
provides=TransactionManager,
)
flusher = provide(
source=SqlaMainFlusher,
provides=Flusher,
)
user_command_gateway = provide(
source=SqlaUserDataMapper,
provides=UserCommandGateway,
Expand Down