Created
October 27, 2025 22:42
-
-
Save ubaumann/5c31bd74bfb19b8c2178c456e05ffe99 to your computer and use it in GitHub Desktop.
infrahub run script to dump schema
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # Script to pull schema information from Infrahub and serialize it to YAML | |
| # infrahubctl run <script> | |
| # Dependencies: infrahub-sdk, pyyaml | |
| # alpha version, need testing | |
| import logging | |
| import yaml | |
| from infrahub_sdk import InfrahubClient, Config | |
| from infrahub_sdk.schema import ( | |
| SchemaRoot, | |
| NodeSchemaAPI, | |
| NodeSchema, | |
| GenericSchemaAPI, | |
| GenericSchema, | |
| ) | |
| from infrahub_sdk.schema.main import BaseSchema | |
| class IndentSafeDumper(yaml.SafeDumper): | |
| def increase_indent(self, flow=False, indentless=False): | |
| return super(IndentSafeDumper, self).increase_indent(flow, False) | |
| async def pull_infrahub_info(client: InfrahubClient, log: logging.Logger, branch: str): | |
| schema = await client.schema.all(branch=branch) | |
| def filter_nodes(node: BaseSchema) -> bool: | |
| if node.namespace in ["Core"]: | |
| log.debug(f"Skipping Core node: {node.namespace}.{node.name}") | |
| return False | |
| return True | |
| sr = SchemaRoot( | |
| version="1.0", | |
| generics=[ | |
| GenericSchema(**x.model_dump()) | |
| for x in schema.values() | |
| if isinstance(x, GenericSchemaAPI) and filter_nodes(x) | |
| ], | |
| nodes=[ | |
| NodeSchema(**x.model_dump()) | |
| for x in schema.values() | |
| if isinstance(x, NodeSchemaAPI) and filter_nodes(x) | |
| ], | |
| ) | |
| data = sr.model_dump( | |
| mode="json", | |
| exclude_defaults=True, | |
| exclude_none=True, | |
| exclude={ | |
| "__all__": { | |
| "__all__": { | |
| "id": True, | |
| "__all__": { | |
| "__all__": { | |
| "id": True, | |
| # "order_weight": True, | |
| # "branch": True, | |
| "choices": { | |
| "__all__": { | |
| "id": True, | |
| # "color": True, | |
| }, | |
| }, | |
| }, | |
| }, | |
| "used_by": True, | |
| "default_filter": True, | |
| # "human_friendly_id": True, | |
| # "branch": True, | |
| }, | |
| }, | |
| }, | |
| ) | |
| return yaml.dump( | |
| data, sort_keys=False, default_flow_style=False, Dumper=IndentSafeDumper | |
| ) | |
| async def run(client: InfrahubClient, log: logging.Logger, branch: str, **kwargs) -> None: | |
| try: | |
| schema = await pull_infrahub_info(client=client, log=log, branch=branch) | |
| print(schema) | |
| except Exception as exc: | |
| log.exception(f"Unknown exception: {exc}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment