Last active
September 17, 2025 08:46
-
-
Save yipo/78a94c60afe207ea9401fab3918aa2b5 to your computer and use it in GitHub Desktop.
event message
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| *.pyc | |
| /.venv/ |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import argparse | |
| import logging | |
| from datetime import datetime | |
| from sqlalchemy import create_engine, engine, select | |
| from sqlalchemy.orm import DeclarativeBase, Mapped, Session, mapped_column | |
| logger = logging.getLogger(__name__) | |
| class Base(DeclarativeBase): | |
| pass | |
| class SolarEvent(Base): | |
| __tablename__ = 'SolarEvent' | |
| id: Mapped[int] = mapped_column(primary_key=True) | |
| startTimestamp: Mapped[datetime] | |
| endTimestamp: Mapped[datetime] | |
| deviceVendorName: Mapped[str] | |
| deviceModelName: Mapped[str] | |
| eventType: Mapped[str] | |
| value: Mapped[str] | |
| eventMessageId: Mapped[int] | |
| class DeviceModel(Base): | |
| __tablename__ = 'device_models' | |
| id: Mapped[int] = mapped_column(primary_key=True) | |
| vendor_name: Mapped[str] | |
| model_name: Mapped[str] | |
| device_type: Mapped[str] | |
| series_name: Mapped[str] | |
| class EventMessage(Base): | |
| __tablename__ = 'SolarEventMessage' | |
| id: Mapped[int] = mapped_column(primary_key=True) | |
| vendorName: Mapped[str] | |
| seriesName: Mapped[str] | |
| type: Mapped[str] | |
| no: Mapped[str] | |
| def main(): | |
| args = get_args() | |
| series_names = load_series_names(args.icms_db) | |
| message_ids = load_message_ids(args.photon_db) | |
| for owner_key in ('americas', 'chailease', 'demo', 'plus', 'thailand', 'thingnario', 'tsc'): | |
| update_event_message_id(args.photon_db, owner_key, datetime(2025, 9, 1), series_names, message_ids) | |
| def get_args(): | |
| args = argparse.ArgumentParser() | |
| args.add_argument( | |
| '--photon-db', | |
| required=True, | |
| type=engine.make_url, | |
| ) | |
| args.add_argument( | |
| '--icms-db', | |
| required=True, | |
| type=engine.make_url, | |
| ) | |
| return args.parse_args() | |
| def load_series_names(url: engine.URL): | |
| engine = create_engine(url) | |
| with Session(engine) as session: | |
| return { | |
| (row.vendor_name, row.model_name, row.device_type): row.series_name | |
| for row in session.query(DeviceModel).all() if row.series_name | |
| } | |
| def load_message_ids(url: engine.URL): | |
| engine = create_engine(url.set(database='solar_PV_demo')) | |
| with Session(engine) as session: | |
| return {(row.vendorName, row.seriesName, row.type, row.no): row.id for row in session.query(EventMessage).all()} | |
| def update_event_message_id( | |
| url: engine.URL, owner_key: str, begin: datetime, series_names: dict[tuple, str], | |
| message_ids: dict[tuple, int]): | |
| engine = create_engine(url.set(database=f'solar_PV_{owner_key}')) | |
| statement = select(SolarEvent) \ | |
| .where(SolarEvent.startTimestamp > begin) \ | |
| .where(SolarEvent.eventMessageId.is_(None)) | |
| with Session(engine) as session: | |
| for row in session.scalars(statement): | |
| model_key = (row.deviceVendorName, row.deviceModelName, row.eventType) | |
| try: | |
| event_key = (row.deviceVendorName, series_names[model_key], row.eventType, row.value) | |
| id = message_ids[event_key] | |
| logger.info('%s %s %s %d', owner_key, model_key, event_key, id) | |
| row.eventMessageId = id | |
| except KeyError as e: | |
| logging.error('%s %s: [%s] %s', owner_key, model_key, type(e).__name__, e) | |
| session.commit() | |
| if __name__ == '__main__': | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)-8s| %(message)s') | |
| main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| [tool.yapf] | |
| column_limit = 120 | |
| continuation_align_style = 'fixed' | |
| [tool.isort] | |
| line_length = 120 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| PyMySQL==1.1.2 | |
| SQLAlchemy==2.0.43 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment