Skip to content

Instantly share code, notes, and snippets.

@yipo
Last active September 17, 2025 08:46
Show Gist options
  • Select an option

  • Save yipo/78a94c60afe207ea9401fab3918aa2b5 to your computer and use it in GitHub Desktop.

Select an option

Save yipo/78a94c60afe207ea9401fab3918aa2b5 to your computer and use it in GitHub Desktop.
event message
*.pyc
/.venv/
import argparse
import logging
from datetime import datetime
from sqlalchemy import create_engine, engine, select
from sqlalchemy.orm import DeclarativeBase, Mapped, Session, mapped_column
logger = logging.getLogger(__name__)
class Base(DeclarativeBase):
pass
class SolarEvent(Base):
__tablename__ = 'SolarEvent'
id: Mapped[int] = mapped_column(primary_key=True)
startTimestamp: Mapped[datetime]
endTimestamp: Mapped[datetime]
deviceVendorName: Mapped[str]
deviceModelName: Mapped[str]
eventType: Mapped[str]
value: Mapped[str]
eventMessageId: Mapped[int]
class DeviceModel(Base):
__tablename__ = 'device_models'
id: Mapped[int] = mapped_column(primary_key=True)
vendor_name: Mapped[str]
model_name: Mapped[str]
device_type: Mapped[str]
series_name: Mapped[str]
class EventMessage(Base):
__tablename__ = 'SolarEventMessage'
id: Mapped[int] = mapped_column(primary_key=True)
vendorName: Mapped[str]
seriesName: Mapped[str]
type: Mapped[str]
no: Mapped[str]
def main():
args = get_args()
series_names = load_series_names(args.icms_db)
message_ids = load_message_ids(args.photon_db)
for owner_key in ('americas', 'chailease', 'demo', 'plus', 'thailand', 'thingnario', 'tsc'):
update_event_message_id(args.photon_db, owner_key, datetime(2025, 9, 1), series_names, message_ids)
def get_args():
args = argparse.ArgumentParser()
args.add_argument(
'--photon-db',
required=True,
type=engine.make_url,
)
args.add_argument(
'--icms-db',
required=True,
type=engine.make_url,
)
return args.parse_args()
def load_series_names(url: engine.URL):
engine = create_engine(url)
with Session(engine) as session:
return {
(row.vendor_name, row.model_name, row.device_type): row.series_name
for row in session.query(DeviceModel).all() if row.series_name
}
def load_message_ids(url: engine.URL):
engine = create_engine(url.set(database='solar_PV_demo'))
with Session(engine) as session:
return {(row.vendorName, row.seriesName, row.type, row.no): row.id for row in session.query(EventMessage).all()}
def update_event_message_id(
url: engine.URL, owner_key: str, begin: datetime, series_names: dict[tuple, str],
message_ids: dict[tuple, int]):
engine = create_engine(url.set(database=f'solar_PV_{owner_key}'))
statement = select(SolarEvent) \
.where(SolarEvent.startTimestamp > begin) \
.where(SolarEvent.eventMessageId.is_(None))
with Session(engine) as session:
for row in session.scalars(statement):
model_key = (row.deviceVendorName, row.deviceModelName, row.eventType)
try:
event_key = (row.deviceVendorName, series_names[model_key], row.eventType, row.value)
id = message_ids[event_key]
logger.info('%s %s %s %d', owner_key, model_key, event_key, id)
row.eventMessageId = id
except KeyError as e:
logging.error('%s %s: [%s] %s', owner_key, model_key, type(e).__name__, e)
session.commit()
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)-8s| %(message)s')
main()
[tool.yapf]
column_limit = 120
continuation_align_style = 'fixed'
[tool.isort]
line_length = 120
PyMySQL==1.1.2
SQLAlchemy==2.0.43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment