Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save akhundMurad/21f093ee88ed0654777a2485240d758c to your computer and use it in GitHub Desktop.

Select an option

Save akhundMurad/21f093ee88ed0654777a2485240d758c to your computer and use it in GitHub Desktop.
Read Model
from src.application.ports.uow import IUnitOfWork
from src.application.ports.analytics_gateway import IAnalyticsGateway
from src.domain.analytics.queries.analytics_query import AnalyticsQuery, AnalyticsQueryResult
class AnalyticsQueryHandler:
def __init__(self, uow: IUnitOfWork, analytics_gateway: IAnalyticsGateway):
self._uow = uow
self._analytics_gateway = analytics_gateway
def handle(self, query: AnalyticsQuery) -> AnalyticsQueryResult:
with self._uow:
result = self._analytics_gateway.find(id=query.campaign_id, start_date=query.start_date, end_date=query.end_date)
return AnalyticsQueryResult(total_hours=result.hours, total_budget=result.budget, expenses=result.expenses)
from datetime import date
from uuid import UUID
from pydantic import Field
from src.domain.base.query import Query, QueryResult
class AnalyticsQuery(Query):
campaign_id: UUID = Field(...)
start_date: date = Field(...)
end_date: date = Field(...)
class AnalyticsQueryResult(QueryResult):
total_hours: int = Field(0)
total_budget: float = Field(0)
expenses: float = Field(0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment