Created
August 17, 2025 09:57
-
-
Save giograno/f4921af94b6a7181feeeebf698aae7ee to your computer and use it in GitHub Desktop.
Scratch file that demonstrate how to extract an Avro schema from a store definition
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import json | |
| import typing | |
| from typing import Type | |
| import py_avro_schema | |
| from py_avro_schema._schemas import RecordSchema, _type_from_annotated, RecordField, Option | |
| from localstack.pro.core.services.codebuild.models import CodeBuildStore | |
| from localstack.services.stores import BaseStore | |
| class BaseStoreSchema(RecordSchema): | |
| @classmethod | |
| def handles_type(cls, py_type: Type) -> bool: | |
| if not isinstance(py_type, type): | |
| return False | |
| return issubclass(py_type, BaseStore) | |
| def __init__(self, py_type: Type, namespace: str | None = None, options: Option = Option(0)): | |
| super().__init__(py_type, namespace=namespace, options=options) | |
| py_type = _type_from_annotated(py_type) | |
| self.py_fields: dict[str, type] = typing.get_type_hints(py_type) | |
| # special store variables we might want to exclude for the serialization | |
| skip_attributes = ["_account_id", "_global", "_region_name", "_service_name", "_universal"] | |
| self.py_fields = {k:v for k,v in self.py_fields.items() if k not in skip_attributes} | |
| self.record_fields = [self._record_field(field) for field in self.py_fields.items()] | |
| def _record_field(self, py_field: tuple[str, str]) -> RecordField: | |
| _name, _type = py_field | |
| # NB: record field does the recursion for the fields | |
| field_obj = RecordField( | |
| py_type=_type_from_annotated(_type), | |
| name=_name, | |
| namespace=self.namespace_override, | |
| options=self.options, | |
| ) | |
| return field_obj | |
| # Our own extension on py-avro-schema for typed dicts | |
| class TypedDictSchema(RecordSchema): | |
| """An Avro record schema for Python TypedDicts. Uses `get_type_hints` for extract the fields.""" | |
| @classmethod | |
| def handles_type(cls, py_type: Type) -> bool: | |
| """Whether this schema can represent a TypedDict""" | |
| return typing.is_typeddict(py_type) | |
| def __init__(self, py_type: Type, namespace: str | None = None, options: Option = Option(0)): | |
| """ | |
| An Avro record schema for a given Python TypedDicts | |
| :param py_type: The Python class to generate a schema for. | |
| :param namespace: The Avro namespace to add to schemas. | |
| :param options: Schema generation options. | |
| """ | |
| super().__init__(py_type, namespace=namespace, options=options) | |
| py_type = _type_from_annotated(py_type) | |
| self.py_fields: dict[str, type] = typing.get_type_hints(py_type) | |
| self.record_fields = [self._record_field(field) for field in self.py_fields.items()] | |
| def _record_field(self, py_field: tuple[str, str]) -> RecordField: | |
| """Return an Avro record field object for a given dataclass field""" | |
| _name, _type = py_field | |
| field_obj = RecordField( | |
| py_type=_type_from_annotated(_type), | |
| name=_name, | |
| namespace=self.namespace_override, | |
| options=self.options, | |
| ) | |
| return field_obj | |
| setattr(py_avro_schema._schemas, 'TypedDictSchema', TypedDictSchema) | |
| setattr(py_avro_schema._schemas, 'BaseStoreSchema', BaseStoreSchema) | |
| actual_schema = py_avro_schema._schemas.schema(CodeBuildStore, options=Option.AUTO_NAMESPACE_MODULE) | |
| print(json.dumps(actual_schema, indent=2)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment