Last active
October 14, 2025 11:29
-
-
Save hasansezertasan/c2d682078cf678b2fdb399b4d1c1be98 to your computer and use it in GitHub Desktop.
An MRE for Litestar Discord Server. The script down below gives `sqlalchemy.exc.InvalidRequestError: merge() with load=False option does not support objects transient (i.e. unpersisted) objects. flush() all changes on mapped instances before merging with load=False.` when updating a nested record with Litestar, Advanced Alchemy and DTOs.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # /// script | |
| # requires-python = ">=3.12" | |
| # dependencies = ["litestar[standard]>=2.9.1", "advanced-alchemy==1.7.0", "aiosqlite>=0.20.0"] | |
| # /// | |
| import datetime | |
| from typing import Annotated, Optional | |
| from uuid import UUID | |
| import uvicorn | |
| from advanced_alchemy.extensions.litestar import ( | |
| AsyncSessionConfig, | |
| SQLAlchemyAsyncConfig, | |
| SQLAlchemyPlugin, | |
| base, | |
| filters, | |
| providers, | |
| repository, | |
| service, | |
| ) | |
| from advanced_alchemy.extensions.litestar.dto import SQLAlchemyDTO, SQLAlchemyDTOConfig | |
| from litestar import Controller, Litestar, delete, get, patch, post | |
| from litestar.params import Dependency, Parameter | |
| from sqlalchemy import ForeignKey | |
| from sqlalchemy.ext.associationproxy import AssociationProxy, association_proxy | |
| from sqlalchemy.orm import Mapped, load_only, mapped_column, relationship, selectinload | |
| class BookModel(base.UUIDAuditBase): | |
| __tablename__ = "book" | |
| title: Mapped[str] | |
| author_id: Mapped[UUID] = mapped_column(ForeignKey("author.id")) | |
| author: Mapped["AuthorModel"] = relationship(innerjoin=True, viewonly=True) | |
| genres: Mapped[list["BookGenreModel"]] = relationship( | |
| "BookGenreModel", | |
| cascade="all, delete-orphan", | |
| back_populates="book", | |
| ) | |
| class AuthorModel(base.UUIDBase): | |
| __tablename__ = "author" | |
| name: Mapped[str] | |
| dob: Mapped[Optional[datetime.date]] | |
| books: Mapped[list[BookModel]] = relationship(back_populates="author", cascade="all, delete-orphan") | |
| class AuthorDTO(SQLAlchemyDTO[AuthorModel]): | |
| config = SQLAlchemyDTOConfig( | |
| include={ | |
| "id", | |
| "name", | |
| "dob", | |
| "books", | |
| } | |
| ) | |
| class AuthorCreateDTO(SQLAlchemyDTO[AuthorModel]): | |
| config = SQLAlchemyDTOConfig( | |
| include={ | |
| "name", | |
| "dob", | |
| "books.0.title", | |
| } | |
| ) | |
| class AuthorUpdateDTO(SQLAlchemyDTO[AuthorModel]): | |
| config = SQLAlchemyDTOConfig( | |
| include={ | |
| "name", | |
| "dob", | |
| "books.0.id", | |
| "books.0.title", | |
| } | |
| ) | |
| class AuthorService(service.SQLAlchemyAsyncRepositoryService[AuthorModel]): | |
| """Author repository.""" | |
| class Repo(repository.SQLAlchemyAsyncRepository[AuthorModel]): | |
| """Author repository.""" | |
| model_type = AuthorModel | |
| repository_type = Repo | |
| class AuthorController(Controller): | |
| """Author CRUD""" | |
| dependencies = providers.create_service_dependencies( | |
| AuthorService, | |
| "authors_service", | |
| load=[AuthorModel.books], | |
| filters={"pagination_type": "limit_offset", "id_filter": UUID, "search": "name", "search_ignore_case": True}, | |
| ) | |
| return_dto = AuthorDTO | |
| @get(path="/authors") | |
| async def list_authors( | |
| self, | |
| authors_service: AuthorService, | |
| filters: Annotated[list[filters.FilterTypes], Dependency(skip_validation=True)], | |
| ) -> service.OffsetPagination[AuthorModel]: | |
| """List authors.""" | |
| results, total = await authors_service.list_and_count(*filters) | |
| return authors_service.to_schema(results, total, filters=filters) | |
| @post(path="/authors", dto=AuthorCreateDTO) | |
| async def create_author(self, authors_service: AuthorService, data: AuthorModel) -> AuthorModel: | |
| """Create a new author.""" | |
| obj = await authors_service.create(data) | |
| return authors_service.to_schema(obj) | |
| @get(path="/authors/{author_id:uuid}") | |
| async def get_author( | |
| self, | |
| authors_service: AuthorService, | |
| author_id: UUID = Parameter( | |
| title="Author ID", | |
| description="The author to retrieve.", | |
| ), | |
| ) -> AuthorModel: | |
| """Get an existing author.""" | |
| obj = await authors_service.get(author_id) | |
| return authors_service.to_schema(obj) | |
| @patch(path="/authors/{author_id:uuid}", dto=AuthorUpdateDTO) | |
| async def update_author( | |
| self, | |
| authors_service: AuthorService, | |
| data: AuthorModel, | |
| author_id: UUID = Parameter( | |
| title="Author ID", | |
| description="The author to update.", | |
| ), | |
| ) -> AuthorModel: | |
| """Update an author.""" | |
| obj = await authors_service.update(data, item_id=author_id, auto_commit=True) | |
| return authors_service.to_schema(obj) | |
| @delete(path="/authors/{author_id:uuid}", return_dto=None) | |
| async def delete_author( | |
| self, | |
| authors_service: AuthorService, | |
| author_id: UUID = Parameter( | |
| title="Author ID", | |
| description="The author to delete.", | |
| ), | |
| ) -> None: | |
| """Delete a author from the system.""" | |
| _ = await authors_service.delete(author_id) | |
| class GenreModel(base.UUIDBase): | |
| __tablename__ = "genre" | |
| name: Mapped[str] | |
| books: Mapped[list["BookGenreModel"]] = relationship( | |
| "BookGenreModel", | |
| cascade="all, delete-orphan", | |
| back_populates="genre", | |
| ) | |
| class BookGenreModel(base.UUIDBase): | |
| __tablename__ = "book_genre" | |
| book_id: Mapped[UUID] = mapped_column(ForeignKey("book.id")) | |
| book: Mapped[BookModel] = relationship( | |
| "BookModel", | |
| back_populates="genres", | |
| ) | |
| genre_id: Mapped[UUID] = mapped_column(ForeignKey("genre.id")) | |
| genre: Mapped[GenreModel] = relationship( | |
| "GenreModel", | |
| back_populates="books", | |
| ) | |
| book_title: AssociationProxy[str] = association_proxy("book", "title") | |
| class GenreDTO(SQLAlchemyDTO[GenreModel]): | |
| config = SQLAlchemyDTOConfig( | |
| include={ | |
| "id", | |
| "name", | |
| "books.0.id", | |
| "books.0.book_id", | |
| "books.0.book_title", | |
| } | |
| ) | |
| class GenreCreateDTO(SQLAlchemyDTO[GenreModel]): | |
| config = SQLAlchemyDTOConfig( | |
| include={ | |
| "name", | |
| "books.0.book_id", | |
| } | |
| ) | |
| class GenreUpdateDTO(SQLAlchemyDTO[GenreModel]): | |
| config = SQLAlchemyDTOConfig( | |
| include={ | |
| "name", | |
| "books.0.id", | |
| "books.0.book_id", | |
| }, | |
| partial=True, | |
| ) | |
| class GenreService(service.SQLAlchemyAsyncRepositoryService[GenreModel]): | |
| """Genre repository.""" | |
| class Repo(repository.SQLAlchemyAsyncRepository[GenreModel]): | |
| """Genre repository.""" | |
| model_type = GenreModel | |
| repository_type = Repo | |
| class GenreController(Controller): | |
| """Genre CRUD""" | |
| dependencies = providers.create_service_dependencies( | |
| GenreService, | |
| "genres_service", | |
| load=[ | |
| selectinload(GenreModel.books).options( | |
| selectinload(BookGenreModel.book).options( | |
| load_only(BookModel.title), | |
| ), | |
| ) | |
| ], | |
| filters={"pagination_type": "limit_offset", "id_filter": UUID, "search": "name", "search_ignore_case": True}, | |
| ) | |
| return_dto = GenreDTO | |
| @get(path="/genres") | |
| async def list_genres( | |
| self, | |
| genres_service: GenreService, | |
| filters: Annotated[list[filters.FilterTypes], Dependency(skip_validation=True)], | |
| ) -> service.OffsetPagination[GenreModel]: | |
| """List genres.""" | |
| results, total = await genres_service.list_and_count(*filters) | |
| return genres_service.to_schema(results, total, filters=filters) | |
| @post(path="/genres", dto=GenreCreateDTO) | |
| async def create_genre(self, genres_service: GenreService, data: GenreModel) -> GenreModel: | |
| """Create a new genre.""" | |
| obj = await genres_service.create(data) | |
| obj = await genres_service.get(obj.id) | |
| return genres_service.to_schema(obj) | |
| @get(path="/genres/{genre_id:uuid}") | |
| async def get_genre( | |
| self, | |
| genres_service: GenreService, | |
| genre_id: UUID = Parameter( | |
| title="Genre ID", | |
| description="The genre to retrieve.", | |
| ), | |
| ) -> GenreModel: | |
| """Get an existing genre.""" | |
| obj = await genres_service.get(genre_id) | |
| return genres_service.to_schema(obj) | |
| @patch(path="/genres/{genre_id:uuid}", dto=GenreUpdateDTO) | |
| async def update_genre( | |
| self, | |
| genres_service: GenreService, | |
| data: GenreModel, | |
| genre_id: UUID = Parameter( | |
| title="Genre ID", | |
| description="The genre to update.", | |
| ), | |
| ) -> GenreModel: | |
| """Update a genre.""" | |
| obj = await genres_service.update(data, item_id=genre_id, auto_commit=True) | |
| return genres_service.to_schema(obj) | |
| alchemy_config = SQLAlchemyAsyncConfig( | |
| connection_string="sqlite+aiosqlite:///db.sqlite3", | |
| before_send_handler="autocommit", | |
| session_config=AsyncSessionConfig(expire_on_commit=False), | |
| create_all=True, | |
| ) | |
| app = Litestar( | |
| route_handlers=[AuthorController, GenreController], | |
| debug=True, | |
| plugins=[SQLAlchemyPlugin(config=alchemy_config)], | |
| ) | |
| if __name__ == "__main__": | |
| uvicorn.run(app, host="0.0.0.0", port=8000) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """Tests for the MRE application.""" | |
| from __future__ import annotations | |
| from mre import app | |
| from litestar.testing import TestClient | |
| def test_authors_route() -> None: | |
| """Test the authors route returns the expected message. | |
| Args: | |
| client: The test client for making requests. | |
| """ | |
| app.debug = True | |
| with TestClient(app=app) as client: | |
| response = client.get("/authors") | |
| assert response.status_code == 200 | |
| assert response.json() == {"items": [], "total": 0, "offset": 0, "limit": 20}, response.json() | |
| response = client.post("/authors", json={"name": "John Doe", "dob": "1990-01-01", "books": [{"title": "The Great Gatsby"}]}) | |
| assert response.status_code == 201 | |
| response = client.get("/authors") | |
| assert response.status_code == 200 | |
| author_id = response.json()["items"][0]["id"] | |
| book_id = response.json()["items"][0]["books"][0]["id"] | |
| response = client.patch(f"/authors/{author_id}", json={"name": "Jane Doe", "dob": "1990-01-01", "books": [{"title": "The Great Gatsby", "id": book_id}]}) | |
| assert response.status_code == 200 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment