Created
March 11, 2025 12:03
-
-
Save HardMax71/0841e74abe4b6cb3051726401a54a996 to your computer and use it in GitHub Desktop.
Pydantic model <-> neomodel OGM (neo4j) <-> python dict converter
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # ./tests/conftest.py | |
| import pytest | |
| from neomodel import config, db | |
| from converter import Converter | |
| @pytest.fixture(scope="session") | |
| def db_connection(): | |
| """Setup Neo4j database connection for all tests""" | |
| config.DATABASE_URL = 'bolt://neo4j:password@localhost:7687' | |
| yield | |
| # Clear database after all tests | |
| db.cypher_query("MATCH (n) DETACH DELETE n") | |
| @pytest.fixture(autouse=True) | |
| def clean_registry(): | |
| """Reset the converter registry between tests to ensure test isolation""" | |
| yield | |
| # Original cleanup | |
| Converter._pydantic_to_ogm = {} | |
| Converter._ogm_to_pydantic = {} | |
| Converter._type_converters = {} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| Pydantic to Neo4j OGM Converter. | |
| This module provides a utility for converting between Pydantic models and Neo4j OGM models | |
| with support for relationships, nested models, and custom type conversions. | |
| """ | |
| import inspect | |
| import logging | |
| from datetime import datetime, date | |
| from functools import lru_cache | |
| from typing import Type, TypeVar, Dict, List, Any, Optional, Union, Tuple, get_type_hints, Callable | |
| from neomodel import ( | |
| StructuredNode, RelationshipTo, RelationshipFrom, | |
| Relationship, db | |
| ) | |
| from neomodel.properties import ( | |
| StringProperty, IntegerProperty, FloatProperty, BooleanProperty, | |
| DateTimeProperty, ArrayProperty, JSONProperty | |
| ) | |
| from pydantic import BaseModel | |
| # Type variables for generic typing | |
| PydanticModel = TypeVar('PydanticModel', bound=BaseModel) | |
| OGM_Model = TypeVar('OGM_Model', bound=StructuredNode) | |
| # Configure logger | |
| logger = logging.getLogger(__name__) | |
| class ConversionError(Exception): | |
| """Exception raised for errors during model conversion.""" | |
| pass | |
| class Converter: | |
| """ | |
| A utility class for converting between Pydantic models and neomodel OGM models. | |
| This converter handles: | |
| - Basic property conversion between models. | |
| - Conversion of nested Pydantic models into related OGM nodes. | |
| - Processing of relationships at any depth (including cyclic references). | |
| - Conversion of lists and dictionaries of models. | |
| - Custom type conversions via user-registered converters. | |
| - Conversion from Python dictionaries to OGM models (via dict_to_ogm) | |
| and from OGM models to Python dictionaries (via ogm_to_dict). | |
| - Batch conversion of multiple models (via batch_to_ogm and batch_to_pydantic). | |
| Usage Example: | |
| -------------- | |
| from pydantic import BaseModel | |
| from neomodel import StructuredNode, StringProperty, IntegerProperty, config, db | |
| from converter import Converter | |
| # Setup the Neo4j connection (must be done before using neomodel models) | |
| config.DATABASE_URL = 'bolt://neo4j:password@localhost:7687' | |
| # Define a Pydantic model | |
| class UserPydantic(BaseModel): | |
| name: str | |
| email: str | |
| age: int | |
| # Define a corresponding neomodel OGM model | |
| class UserOGM(StructuredNode): | |
| name = StringProperty(required=True) | |
| email = StringProperty(unique_index=True, required=True) | |
| age = IntegerProperty(default=0) | |
| # Register the mapping between the Pydantic model and the OGM model. | |
| # This ensures that the Converter knows how to translate between the two. | |
| Converter.register_models(UserPydantic, UserOGM) | |
| # Create an instance of the Pydantic model. | |
| user_py = UserPydantic(name="Alice", email="alice@example.com", age=30) | |
| # Convert the Pydantic model to an OGM model. | |
| user_ogm = Converter.to_ogm(user_py) | |
| print("Converted to OGM:", user_ogm.name, user_ogm.email, user_ogm.age) | |
| # Convert the OGM model back to a Pydantic model. | |
| # By default, this conversion uses the registered mapping. | |
| user_py_converted = Converter.to_pydantic(user_ogm) | |
| print("Converted back to Pydantic:", user_py_converted.name, user_py_converted.email, user_py_converted.age) | |
| # The converter also supports converting from dictionaries: | |
| data_dict = {"name": "Bob", "email": "bob@example.com", "age": 25} | |
| user_ogm_from_dict = Converter.dict_to_ogm(data_dict, UserOGM) | |
| print("Converted from dict to OGM:", user_ogm_from_dict.name, user_ogm_from_dict.email, user_ogm_from_dict.age) | |
| # And converting OGM models to dictionaries: | |
| user_dict = Converter.ogm_to_dict(user_ogm_from_dict) | |
| print("OGM converted to dict:", user_dict) | |
| """ | |
| # Registry to store mappings between Pydantic and OGM models | |
| _pydantic_to_ogm: Dict[Type[BaseModel], Type[StructuredNode]] = {} | |
| _ogm_to_pydantic: Dict[Type[StructuredNode], Type[BaseModel]] = {} | |
| # Custom type converters | |
| _type_converters: Dict[Tuple[Type, Type], Callable[[Any], Any]] = {} | |
| @classmethod | |
| def register_type_converter( | |
| cls, | |
| source_type: Type, | |
| target_type: Type, | |
| converter_func: Callable[[Any], Any] | |
| ) -> None: | |
| """ | |
| Register a custom type converter function. | |
| Args: | |
| source_type (Type): The source type to convert from. | |
| target_type (Type): The target type to convert to. | |
| converter_func (Callable[[Any], Any]): A function that converts a value from source_type to target_type. | |
| Returns: | |
| None. | |
| """ | |
| cls._type_converters[(source_type, target_type)] = converter_func | |
| logger.debug(f"Registered type converter: {source_type.__name__} -> {target_type.__name__}") | |
| @classmethod | |
| @lru_cache(maxsize=128) | |
| def _is_property(cls, attr: Any) -> bool: | |
| """ | |
| Determine if the given attribute is a neomodel property. | |
| This function checks whether the attribute is an instance of any known neomodel property type. | |
| Args: | |
| attr (Any): The attribute to check. | |
| Returns: | |
| bool: True if the attribute is a neomodel property, False otherwise. | |
| """ | |
| try: | |
| property_types = ( | |
| StringProperty, IntegerProperty, FloatProperty, BooleanProperty, | |
| DateTimeProperty, ArrayProperty, JSONProperty | |
| ) | |
| return isinstance(attr, property_types) | |
| except TypeError: | |
| return False | |
| @classmethod | |
| @lru_cache(maxsize=128) | |
| def _is_relationship(cls, attr: Any) -> bool: | |
| """ | |
| Determine if the given attribute is a neomodel relationship. | |
| Args: | |
| attr (Any): The attribute to check. | |
| Returns: | |
| bool: True if the attribute is a neomodel relationship, False otherwise. | |
| """ | |
| return isinstance(attr, (RelationshipTo, RelationshipFrom, Relationship)) | |
| @classmethod | |
| def _get_ogm_properties(cls, ogm_class: Type[StructuredNode]) -> Dict[str, Any]: | |
| """ | |
| Retrieve all property fields of a neomodel StructuredNode class. | |
| Args: | |
| ogm_class (Type[StructuredNode]): A neomodel StructuredNode class. | |
| Returns: | |
| Dict[str, Any]: A dictionary mapping property names to their corresponding property instances. | |
| """ | |
| # Use __all_properties__ attribute which is available in StructuredNode classes | |
| if hasattr(ogm_class, '__all_properties__'): | |
| return dict(ogm_class.__all_properties__) | |
| # As a fallback, use defined_properties method if available | |
| if hasattr(ogm_class, 'defined_properties'): | |
| try: | |
| return ogm_class.defined_properties(aliases=False, rels=False) | |
| except Exception as e: | |
| logger.warning(f"Error using defined_properties: {e}") | |
| # Manual inspection as last resort | |
| properties = {} | |
| for name, attr in inspect.getmembers(ogm_class): | |
| try: | |
| if cls._is_property(attr) and not name.startswith('_'): | |
| properties[name] = attr | |
| except Exception: | |
| continue | |
| return properties | |
| @classmethod | |
| def _get_ogm_relationships( | |
| cls, ogm_class: Type[StructuredNode] | |
| ) -> Dict[str, Union[RelationshipTo, RelationshipFrom, Relationship]]: | |
| """ | |
| Retrieve all relationship fields of a neomodel StructuredNode class. | |
| Args: | |
| ogm_class (Type[StructuredNode]): A neomodel StructuredNode class. | |
| Returns: | |
| Dict[str, Union[RelationshipTo, RelationshipFrom, Relationship]]: | |
| A dictionary mapping relationship names to their corresponding relationship instances. | |
| """ | |
| # Use __all_relationships__ attribute which is available in StructuredNode classes | |
| if hasattr(ogm_class, '__all_relationships__'): | |
| return dict(ogm_class.__all_relationships__) | |
| # As a fallback, use defined_properties method if available | |
| if hasattr(ogm_class, 'defined_properties'): | |
| try: | |
| return ogm_class.defined_properties(aliases=False, rels=True, properties=False) | |
| except Exception as e: | |
| logger.warning(f"Error using defined_properties: {e}") | |
| # Manual inspection as last resort | |
| relationships = {} | |
| for name, attr in inspect.getmembers(ogm_class): | |
| try: | |
| if cls._is_relationship(attr) and not name.startswith('_'): | |
| relationships[name] = attr | |
| except Exception: | |
| continue | |
| return relationships | |
| @classmethod | |
| def _get_property_type(cls, prop: Any) -> Type: | |
| """ | |
| Determine the Python type corresponding to a neomodel property. | |
| Args: | |
| prop (Any): A neomodel property. | |
| Returns: | |
| Type: The Python type associated with the property. | |
| """ | |
| # Map neomodel property types to Python types | |
| if isinstance(prop, StringProperty): | |
| return str | |
| elif isinstance(prop, IntegerProperty): | |
| return int | |
| elif isinstance(prop, FloatProperty): | |
| return float | |
| elif isinstance(prop, BooleanProperty): | |
| return bool | |
| elif isinstance(prop, DateTimeProperty): | |
| return datetime | |
| elif isinstance(prop, ArrayProperty): | |
| return list | |
| elif isinstance(prop, JSONProperty): | |
| return dict | |
| # For any other property types, try to get the type from the property_type attribute | |
| # or use Any as a fallback | |
| try: | |
| return getattr(prop, 'property_type', Any) | |
| except (AttributeError, TypeError): | |
| return Any | |
| @classmethod | |
| def _convert_value(cls, value: Any, target_type: Any) -> Any: | |
| """ | |
| Convert the given value to the specified target type using registered converters if available. | |
| Args: | |
| value (Any): The value to convert. | |
| target_type (Type): The target type to which the value should be converted. | |
| Returns: | |
| Any: The converted value. | |
| """ | |
| if value is None: | |
| return None | |
| source_type = type(value) | |
| # Check for direct registered converter | |
| converter = cls._type_converters.get((source_type, target_type)) | |
| if converter: | |
| return converter(value) | |
| # Handle basic type conversions | |
| if target_type is str and not isinstance(value, str): | |
| return str(value) | |
| if target_type is int and isinstance(value, (str, float)): | |
| return int(value) | |
| if target_type is float and isinstance(value, (str, int)): | |
| return float(value) | |
| if target_type is bool and not isinstance(value, bool): | |
| return bool(value) | |
| if target_type is datetime and isinstance(value, str): | |
| try: | |
| # Try ISO format first | |
| return datetime.fromisoformat(value) | |
| except ValueError: | |
| # Try other common formats | |
| for fmt in ["%Y-%m-%d", "%Y/%m/%d", "%d-%m-%Y", "%d/%m/%Y"]: | |
| try: | |
| return datetime.strptime(value, fmt) | |
| except ValueError: | |
| continue | |
| # If all else fails, raise an error | |
| raise ConversionError(f"Cannot convert string '{value}' to datetime") | |
| if target_type is date and isinstance(value, str): | |
| try: | |
| # Try ISO format first | |
| return date.fromisoformat(value) | |
| except ValueError: | |
| # Try other common formats | |
| for fmt in ["%Y-%m-%d", "%Y/%m/%d", "%d-%m-%Y", "%d/%m/%Y"]: | |
| try: | |
| return datetime.strptime(value, fmt).date() | |
| except ValueError: | |
| continue | |
| # If all else fails, raise an error | |
| raise ConversionError(f"Cannot convert string '{value}' to date") | |
| # If the target_type is a class and the value is a dict, attempt to create an instance | |
| if isinstance(target_type, type) and issubclass(target_type, BaseModel) and isinstance(value, dict): | |
| return target_type(**value) | |
| # If we get here, just return the original value | |
| return value | |
| @classmethod | |
| def register_models(cls, pydantic_class: Type[BaseModel], ogm_class: Type[StructuredNode]) -> None: | |
| """ | |
| Register a mapping between a Pydantic model class and a neomodel OGM model class. | |
| Args: | |
| pydantic_class (Type[BaseModel]): The Pydantic model class | |
| ogm_class (Type[StructuredNode]): The neomodel OGM model class | |
| """ | |
| cls._pydantic_to_ogm[pydantic_class] = ogm_class | |
| cls._ogm_to_pydantic[ogm_class] = pydantic_class | |
| logger.debug(f"Registered mapping: {pydantic_class.__name__} <-> {ogm_class.__name__}") | |
| @classmethod | |
| def _process_pydantic_field(cls, pydantic_instance, field_name, pydantic_data): | |
| """ | |
| Process a single field from a Pydantic model, handling BaseModel instances and lists of BaseModels. | |
| This helper function extracts a field value from a Pydantic model instance, | |
| performs special handling for BaseModel instances and lists of BaseModels | |
| to avoid circular references, and updates the provided data dictionary. | |
| Args: | |
| pydantic_instance: The Pydantic model instance | |
| field_name: The name of the field to process | |
| pydantic_data: The dictionary to update with the processed field value | |
| Returns: | |
| None - updates pydantic_data in-place | |
| """ | |
| try: | |
| value = getattr(pydantic_instance, field_name) | |
| if isinstance(value, BaseModel): | |
| # Skip BaseModel instances to avoid circular references | |
| return | |
| elif isinstance(value, list) and all(isinstance(item, BaseModel) for item in value): | |
| # Deduplicate items in lists of BaseModels | |
| processed_list = [] | |
| seen_ids = set() | |
| for item in value: | |
| if id(item) not in seen_ids: | |
| seen_ids.add(id(item)) | |
| processed_list.append(item) | |
| value = processed_list | |
| pydantic_data[field_name] = value | |
| except Exception: | |
| # Skip fields that can't be accessed | |
| return | |
| @classmethod | |
| def _resolve_node_class(cls, class_name_or_obj: Union[str, Type[StructuredNode]]) -> Optional[Type[StructuredNode]]: | |
| """ | |
| Resolve a class name or object to an actual class without modifying registries or namespaces. | |
| Args: | |
| class_name_or_obj: Either a string class name or a class object | |
| Returns: | |
| The resolved class or None if not found | |
| """ | |
| # If already a class, just return it | |
| if not isinstance(class_name_or_obj, str): | |
| return class_name_or_obj | |
| # Extract simple name from potentially module-prefixed name | |
| simple_name = class_name_or_obj.split('.')[-1] | |
| # Look in our own mapping first | |
| for ogm_cls in cls._ogm_to_pydantic.keys(): | |
| if ogm_cls.__name__ == simple_name: | |
| return ogm_cls | |
| # Look in Neo4j's registry | |
| for label_set, cls_obj in db._NODE_CLASS_REGISTRY.items(): | |
| if cls_obj.__name__ == simple_name: | |
| return cls_obj | |
| # If we didn't find it, return None | |
| return None | |
| @classmethod | |
| def to_ogm( | |
| cls, | |
| pydantic_instance: PydanticModel, | |
| ogm_class: Optional[Type[OGM_Model]] = None, | |
| processed_objects: Optional[Dict[int, OGM_Model]] = None, | |
| max_depth: int = 10 | |
| ) -> Optional[OGM_Model]: | |
| """ | |
| Convert a Pydantic model instance to a neomodel OGM model instance. | |
| Args: | |
| pydantic_instance (P): The Pydantic model instance to convert. | |
| ogm_class (Optional[Type[O]]): The target neomodel OGM model class. If not provided, the registered mapping is used. | |
| processed_objects (Optional[Dict[int, O]]): A dictionary to track already processed objects for handling cyclic references. | |
| max_depth (int): The maximum recursion depth for processing nested relationships. | |
| Returns: | |
| O: The converted neomodel OGM model instance. | |
| """ | |
| if pydantic_instance is None: | |
| return None | |
| if max_depth <= 0: | |
| logger.warning("Maximum recursion depth reached during to_ogm conversion") | |
| return None | |
| if processed_objects is None: | |
| processed_objects = {} | |
| instance_id = id(pydantic_instance) | |
| if instance_id in processed_objects: | |
| return processed_objects[instance_id] | |
| if ogm_class is None: | |
| pydantic_class = type(pydantic_instance) | |
| if pydantic_class not in cls._pydantic_to_ogm: | |
| raise ConversionError(f"No mapping registered for Pydantic class {pydantic_class.__name__}") | |
| ogm_class = cls._pydantic_to_ogm[pydantic_class] | |
| # Create the OGM instance. | |
| ogm_instance = ogm_class() | |
| processed_objects[instance_id] = ogm_instance | |
| # Extract Pydantic data (handling cyclic references). | |
| try: | |
| pydantic_data = {} | |
| try: | |
| pydantic_data = pydantic_instance.model_dump(exclude_unset=True, exclude_none=True) | |
| except ValueError as e: | |
| if "Circular reference detected" in str(e): | |
| field_names = pydantic_instance.model_fields.keys() | |
| for field_name in field_names: | |
| cls._process_pydantic_field(pydantic_instance, field_name, pydantic_data) | |
| else: | |
| raise | |
| except Exception as e: | |
| raise ConversionError(f"Failed to get dictionary from Pydantic model: {str(e)}") | |
| ogm_properties = cls._get_ogm_properties(ogm_class) | |
| ogm_relationships = cls._get_ogm_relationships(ogm_class) | |
| for prop_name, prop in ogm_properties.items(): | |
| if prop_name in pydantic_data: | |
| value = pydantic_data[prop_name] | |
| prop_type = cls._get_property_type(prop) | |
| converted_value = cls._convert_value(value, prop_type) | |
| setattr(ogm_instance, prop_name, converted_value) | |
| try: | |
| ogm_instance.save() | |
| except Exception as e: | |
| logger.warning(f"Failed to save OGM instance with properties: {str(e)}") | |
| for rel_name, rel in ogm_relationships.items(): | |
| try: | |
| rel_data = getattr(pydantic_instance, rel_name) | |
| if rel_data is None: | |
| continue | |
| # Resolve target class without monkey patching | |
| target_ogm_class = cls._resolve_node_class(rel.definition['node_class']) | |
| if not target_ogm_class: | |
| logger.warning(f"Could not resolve target class for relationship {rel_name}") | |
| continue | |
| # Convert to list if needed | |
| if not isinstance(rel_data, list): | |
| rel_data = [rel_data] | |
| for item in rel_data: | |
| cls._process_related_item( | |
| item, | |
| ogm_instance, | |
| rel_name, | |
| target_ogm_class, | |
| processed_objects, | |
| max_depth, | |
| instance_id | |
| ) | |
| except Exception as e: | |
| logger.warning(f"Failed to process relationship {rel_name}: {str(e)}") | |
| try: | |
| ogm_instance.save() | |
| except Exception as e: | |
| raise ConversionError(f"Failed to save OGM instance with relationships: {str(e)}") | |
| return ogm_instance | |
| @classmethod | |
| def _process_related_item(cls, item, ogm_instance, rel_name, target_ogm_class, | |
| processed_objects, max_depth, instance_id): | |
| """ | |
| Process a single related item and connect it to the OGM instance if successful. | |
| Args: | |
| item: The item to process (BaseModel or dict) | |
| ogm_instance: The OGM instance to connect the related item to | |
| rel_name: The name of the relationship | |
| target_ogm_class: The target OGM class | |
| processed_objects: Dictionary of already processed objects | |
| max_depth: Maximum recursion depth | |
| instance_id: ID of the parent instance (to avoid circular references) | |
| Returns: | |
| bool: Whether the item was successfully processed and connected | |
| """ | |
| if isinstance(item, BaseModel): | |
| if id(item) == instance_id: | |
| return False | |
| related_instance = cls.to_ogm( | |
| item, | |
| target_ogm_class, | |
| processed_objects, | |
| max_depth - 1 | |
| ) | |
| if related_instance: | |
| getattr(ogm_instance, rel_name).connect(related_instance) | |
| return True | |
| elif isinstance(item, dict): | |
| target_pydantic_class = cls._ogm_to_pydantic.get(target_ogm_class) | |
| if target_pydantic_class: | |
| try: | |
| related_pydantic = target_pydantic_class(**item) | |
| related_instance = cls.to_ogm( | |
| related_pydantic, | |
| target_ogm_class, | |
| processed_objects, | |
| max_depth - 1 | |
| ) | |
| if related_instance: | |
| getattr(ogm_instance, rel_name).connect(related_instance) | |
| return True | |
| except Exception as e: | |
| logger.warning( | |
| f"Failed to process dict item in relationship {rel_name}: {str(e)}") | |
| else: | |
| logger.warning( | |
| f"No Pydantic model registered for OGM class {target_ogm_class.__name__}") | |
| return False | |
| @classmethod | |
| def to_pydantic( | |
| cls, | |
| ogm_instance: OGM_Model, | |
| pydantic_class: Optional[Type[PydanticModel]] = None, | |
| processed_objects: Optional[Dict[int, PydanticModel]] = None, | |
| max_depth: int = 10, | |
| current_path: Optional[List[int]] = None | |
| ) -> Optional[PydanticModel]: | |
| """ | |
| Convert a neomodel OGM model instance to a Pydantic model instance. | |
| This function recursively converts a neomodel OGM model (including its relationships) | |
| into its corresponding Pydantic model instance. | |
| Args: | |
| ogm_instance (O): The neomodel OGM model instance to convert. | |
| pydantic_class (Optional[Type[P]]): The target Pydantic model class. If not provided, the registered mapping is used. | |
| processed_objects (Optional[Dict[int, P]]): A dictionary to track already processed objects for handling cycles. | |
| max_depth (int): The maximum recursion depth for processing nested relationships. | |
| current_path (Optional[List[int]]): A list of object IDs representing the current conversion path for cycle detection. | |
| Returns: | |
| P: The converted Pydantic model instance. | |
| """ | |
| if ogm_instance is None: | |
| return None | |
| # Initialize tracking for cyclic references and path | |
| if processed_objects is None: | |
| processed_objects = {} | |
| if current_path is None: | |
| current_path = [] | |
| # Get instance ID for cycle detection | |
| instance_id = id(ogm_instance) | |
| # If we've seen this object before in our current conversion path, we have a cycle | |
| if instance_id in current_path: | |
| # We've detected a cycle - return the already processed instance if available | |
| if instance_id in processed_objects: | |
| return processed_objects[instance_id] | |
| # Otherwise we need to create a minimal instance | |
| if pydantic_class is None: | |
| ogm_class = type(ogm_instance) | |
| if ogm_class not in cls._ogm_to_pydantic: | |
| raise ConversionError(f"No mapping registered for OGM class {ogm_class.__name__}") | |
| pydantic_class = cls._ogm_to_pydantic[ogm_class] | |
| # Create minimal instance with just required fields | |
| # Usually an ID or primary key field is enough to establish the reference | |
| ogm_properties = cls._get_ogm_properties(type(ogm_instance)) | |
| pydantic_data = {} | |
| # Try to at least include required fields or primary identifiers | |
| for prop_name, prop in ogm_properties.items(): | |
| # Include only basic identifier fields to break the cycle | |
| if hasattr(prop, 'required') and prop.required or hasattr(prop, 'unique_index') and prop.unique_index: | |
| try: | |
| pydantic_data[prop_name] = getattr(ogm_instance, prop_name) | |
| except Exception: | |
| pass | |
| # Create a minimal instance with just the required fields | |
| stub_instance = pydantic_class(**pydantic_data) | |
| processed_objects[instance_id] = stub_instance | |
| return stub_instance | |
| # Check if we've already fully processed this object | |
| if instance_id in processed_objects: | |
| return processed_objects[instance_id] | |
| # Check max depth | |
| if max_depth <= 0: | |
| logger.info(f"Maximum recursion depth reached for {type(ogm_instance).__name__}") | |
| # Still create a minimal instance with just properties, no relationships | |
| if pydantic_class is None: | |
| ogm_class = type(ogm_instance) | |
| if ogm_class not in cls._ogm_to_pydantic: | |
| raise ConversionError(f"No mapping registered for OGM class {ogm_class.__name__}") | |
| pydantic_class = cls._ogm_to_pydantic[ogm_class] | |
| # Just extract basic properties | |
| ogm_properties = cls._get_ogm_properties(type(ogm_instance)) | |
| pydantic_data = {} | |
| for prop_name, prop in ogm_properties.items(): | |
| try: | |
| pydantic_data[prop_name] = getattr(ogm_instance, prop_name) | |
| except Exception: | |
| pass | |
| # Create minimal instance | |
| stub_instance = pydantic_class(**pydantic_data) | |
| processed_objects[instance_id] = stub_instance | |
| return stub_instance | |
| # Add this object ID to the current path | |
| current_path.append(instance_id) | |
| # Try to infer the Pydantic class if not provided | |
| if pydantic_class is None: | |
| ogm_class = type(ogm_instance) | |
| if ogm_class not in cls._ogm_to_pydantic: | |
| raise ConversionError(f"No mapping registered for OGM class {ogm_class.__name__}") | |
| pydantic_class = cls._ogm_to_pydantic[ogm_class] | |
| # Create a dictionary to hold all data for the Pydantic model | |
| pydantic_data = {} | |
| # Get OGM properties and relationships | |
| ogm_class = type(ogm_instance) | |
| ogm_properties = cls._get_ogm_properties(ogm_class) | |
| ogm_relationships = cls._get_ogm_relationships(ogm_class) | |
| # Create a namespace with all registered Pydantic classes for resolving forward references | |
| localns = {} | |
| for pyd_cls in cls._pydantic_to_ogm.keys(): | |
| localns[pyd_cls.__name__] = pyd_cls | |
| # Ensure the current class is in the namespace | |
| localns[pydantic_class.__name__] = pydantic_class | |
| # Get Pydantic model fields with proper namespace for resolving forward references | |
| try: | |
| pydantic_fields = get_type_hints(pydantic_class, globalns=None, localns=localns) | |
| except Exception as e: | |
| logger.warning(f"Failed to resolve type hints for {pydantic_class.__name__}: {str(e)}") | |
| # Fall back to raw annotations if available | |
| if hasattr(pydantic_class, "__annotations__"): | |
| pydantic_fields = pydantic_class.__annotations__ | |
| else: | |
| # Last resort - create an empty dict | |
| pydantic_fields = {} | |
| logger.warning(f"Could not get field types for {pydantic_class.__name__}") | |
| # Process properties first | |
| for prop_name, prop in ogm_properties.items(): | |
| if prop_name in pydantic_fields: | |
| try: | |
| # Get the property value from OGM instance | |
| value = getattr(ogm_instance, prop_name) | |
| # Convert to expected Pydantic type if needed | |
| target_type = pydantic_fields[prop_name] | |
| converted_value = cls._convert_value(value, target_type) | |
| # Add to Pydantic data dictionary | |
| pydantic_data[prop_name] = converted_value | |
| except Exception as e: | |
| logger.warning(f"Failed to get OGM property {prop_name}: {str(e)}") | |
| # Use default value or None for the field | |
| pydantic_data[prop_name] = None | |
| # Create Pydantic instance with properties only first | |
| try: | |
| pydantic_instance = pydantic_class.parse_obj(pydantic_data) | |
| # Add to processed objects early to handle circular references | |
| processed_objects[instance_id] = pydantic_instance | |
| except Exception as e: | |
| raise ConversionError(f"Failed to create Pydantic instance: {str(e)}") | |
| # Now process relationships (after registering the instance to handle cycles) | |
| for rel_name, rel in ogm_relationships.items(): | |
| if rel_name in pydantic_fields: | |
| try: | |
| # Get the target OGM class for this relationship | |
| target_ogm_class = rel.definition['node_class'] | |
| # Try to find the corresponding Pydantic class | |
| target_pydantic_class = cls._ogm_to_pydantic.get(target_ogm_class) | |
| if not target_pydantic_class: | |
| logger.warning( | |
| f"No Pydantic model registered for OGM class {target_ogm_class.__name__}" | |
| ) | |
| continue | |
| # Check cardinality to determine if single or list | |
| # Look for cardinality in manager attribute, definition.get, or class name | |
| cardinality_name = None | |
| is_single = False | |
| if hasattr(rel, 'manager'): | |
| cardinality_name = rel.manager.__name__ | |
| elif 'cardinality' in rel.definition: | |
| cardinality_name = rel.definition['cardinality'].__name__ | |
| if cardinality_name: | |
| is_single = cardinality_name in ('ZeroOrOne', 'One', 'AsyncZeroOrOne', 'AsyncOne') | |
| # Fetch related nodes | |
| rel_objects = list(getattr(ogm_instance, rel_name).all()) | |
| if is_single: | |
| # Handle single related object | |
| if rel_objects: | |
| related_pydantic = cls.to_pydantic( | |
| rel_objects[0], | |
| target_pydantic_class, | |
| processed_objects, | |
| max_depth - 1, | |
| current_path.copy() # Make a copy of the path for this branch | |
| ) | |
| setattr(pydantic_instance, rel_name, related_pydantic) | |
| else: | |
| setattr(pydantic_instance, rel_name, None) | |
| else: | |
| # Handle collection of related objects | |
| related_pydantics = [] | |
| for obj in rel_objects: | |
| related = cls.to_pydantic( | |
| obj, | |
| target_pydantic_class, | |
| processed_objects, | |
| max_depth - 1, | |
| current_path.copy() # Make a copy of the path for this branch | |
| ) | |
| if related: | |
| related_pydantics.append(related) | |
| setattr(pydantic_instance, rel_name, related_pydantics) | |
| except Exception as e: | |
| logger.warning(f"Failed to process relationship {rel_name}: {str(e)}") | |
| # Use empty list or None for the field | |
| try: | |
| if 'is_single' in locals() and is_single: | |
| setattr(pydantic_instance, rel_name, None) | |
| else: | |
| setattr(pydantic_instance, rel_name, []) | |
| except Exception: | |
| # Last resort if we can't determine cardinality | |
| setattr(pydantic_instance, rel_name, []) | |
| # Remove this object from the current path since we're done processing it | |
| current_path.remove(instance_id) | |
| return pydantic_instance | |
| @classmethod | |
| def batch_to_ogm( | |
| cls, | |
| pydantic_instances: List[PydanticModel], | |
| ogm_class: Optional[Type[OGM_Model]] = None, | |
| max_depth: int = 10 | |
| ) -> List[OGM_Model]: | |
| """ | |
| Convert a list of Pydantic model instances to neomodel OGM model instances within a single transaction. | |
| This method is optimized for batch conversion of multiple instances, utilizing a single database transaction | |
| for improved performance. | |
| Args: | |
| pydantic_instances (List[P]): A list of Pydantic model instances to convert. | |
| ogm_class (Optional[Type[O]]): The target neomodel OGM model class. If not provided, the registered mapping is used. | |
| max_depth (int): The maximum recursion depth for processing nested relationships. | |
| Returns: | |
| List[O]: A list of converted neomodel OGM model instances. | |
| Raises: | |
| ConversionError: If the conversion fails. | |
| """ | |
| if not pydantic_instances: | |
| return [] | |
| # Use a single processed_objects dictionary for the entire batch | |
| processed_objects = {} | |
| # Use a transaction for the entire batch | |
| with db.transaction: | |
| return [ | |
| cls.to_ogm(instance, ogm_class, processed_objects, max_depth) | |
| for instance in pydantic_instances | |
| ] | |
| @classmethod | |
| def batch_to_pydantic( | |
| cls, | |
| ogm_instances: List[OGM_Model], | |
| pydantic_class: Optional[Type[PydanticModel]] = None, | |
| max_depth: int = 10 | |
| ) -> List[PydanticModel]: | |
| """ | |
| Convert a list of neomodel OGM model instances to Pydantic model instances. | |
| Args: | |
| ogm_instances (List[O]): A list of neomodel OGM model instances to convert. | |
| pydantic_class (Optional[Type[P]]): The target Pydantic model class. If not provided, the registered mapping is used. | |
| max_depth (int): The maximum recursion depth for processing nested relationships. | |
| Returns: | |
| List[P]: A list of converted Pydantic model instances. | |
| """ | |
| if not ogm_instances: | |
| return [] | |
| # Use a single processed_objects dictionary for the entire batch | |
| processed_objects = {} | |
| return [ | |
| cls.to_pydantic(instance, pydantic_class, processed_objects, max_depth, []) | |
| for instance in ogm_instances | |
| ] | |
| @classmethod | |
| def dict_to_ogm( | |
| cls, | |
| data_dict: dict, | |
| ogm_class: Type[OGM_Model], | |
| processed_objects: Optional[Dict[int, OGM_Model]] = None, | |
| max_depth: int = 10 | |
| ) -> Optional[OGM_Model]: | |
| """ | |
| Convert a Python dictionary to a neomodel OGM model instance. | |
| This function recursively converts a dictionary (including nested dictionaries) | |
| into a neomodel OGM model instance. | |
| """ | |
| if data_dict is None: | |
| return None | |
| if max_depth <= 0: | |
| logger.warning("Maximum recursion depth reached during dict_to_ogm conversion") | |
| return None | |
| if processed_objects is None: | |
| processed_objects = {} | |
| instance_id = id(data_dict) | |
| if instance_id in processed_objects: | |
| return processed_objects[instance_id] | |
| # Create a new OGM instance and register it for cycle handling | |
| ogm_instance = ogm_class() | |
| processed_objects[instance_id] = ogm_instance | |
| # Process properties | |
| ogm_properties = cls._get_ogm_properties(ogm_class) | |
| for prop_name, prop in ogm_properties.items(): | |
| if prop_name in data_dict: | |
| try: | |
| value = data_dict[prop_name] | |
| prop_type = cls._get_property_type(prop) | |
| converted_value = cls._convert_value(value, prop_type) | |
| setattr(ogm_instance, prop_name, converted_value) | |
| except Exception as e: | |
| logger.warning(f"Failed to set OGM property {prop_name}: {str(e)}") | |
| try: | |
| ogm_instance.save() | |
| except Exception as e: | |
| logger.warning(f"Failed to save OGM instance with properties: {str(e)}") | |
| # Process relationships | |
| ogm_relationships = cls._get_ogm_relationships(ogm_class) | |
| for rel_name, rel in ogm_relationships.items(): | |
| if rel_name in data_dict: | |
| try: | |
| rel_data = data_dict[rel_name] | |
| if rel_data is None: | |
| continue | |
| # Resolve target class without caller namespace modification | |
| target_ogm_class = cls._resolve_node_class(rel.definition['node_class']) | |
| if not target_ogm_class: | |
| logger.warning(f"Could not resolve target class for relationship {rel_name}") | |
| continue | |
| if isinstance(rel_data, list): | |
| for item in rel_data: | |
| if isinstance(item, dict): | |
| related_instance = cls.dict_to_ogm( | |
| item, target_ogm_class, processed_objects, max_depth - 1) | |
| if related_instance: | |
| getattr(ogm_instance, rel_name).connect(related_instance) | |
| else: | |
| if isinstance(rel_data, dict): | |
| related_instance = cls.dict_to_ogm( | |
| rel_data, target_ogm_class, processed_objects, max_depth - 1) | |
| if related_instance: | |
| getattr(ogm_instance, rel_name).connect(related_instance) | |
| except Exception as e: | |
| logger.warning(f"Failed to process relationship {rel_name}: {str(e)}") | |
| try: | |
| ogm_instance.save() | |
| except Exception as e: | |
| raise ConversionError(f"Failed to save OGM instance with relationships: {str(e)}") | |
| return ogm_instance | |
| @classmethod | |
| def ogm_to_dict( | |
| cls, | |
| ogm_instance: OGM_Model, | |
| processed_objects: Optional[Dict[int, dict]] = None, | |
| max_depth: int = 10, | |
| current_path: Optional[List[int]] = None, | |
| include_properties: bool = True, | |
| include_relationships: bool = True | |
| ) -> Optional[dict]: | |
| """ | |
| Convert a neomodel OGM model instance to a Python dictionary. | |
| This function recursively converts an OGM model (including its relationships) | |
| into a dictionary. | |
| """ | |
| if ogm_instance is None: | |
| return None | |
| if processed_objects is None: | |
| processed_objects = {} | |
| if current_path is None: | |
| current_path = [] | |
| instance_id = id(ogm_instance) | |
| if instance_id in current_path: | |
| if instance_id in processed_objects: | |
| return processed_objects[instance_id] | |
| minimal = {} | |
| for prop_name, prop in cls._get_ogm_properties(type(ogm_instance)).items(): | |
| try: | |
| minimal[prop_name] = getattr(ogm_instance, prop_name) | |
| except Exception: | |
| pass | |
| processed_objects[instance_id] = minimal | |
| return minimal | |
| if instance_id in processed_objects: | |
| return processed_objects[instance_id] | |
| if max_depth <= 0: | |
| minimal = {} | |
| for prop_name, prop in cls._get_ogm_properties(type(ogm_instance)).items(): | |
| try: | |
| minimal[prop_name] = getattr(ogm_instance, prop_name) | |
| except Exception: | |
| pass | |
| processed_objects[instance_id] = minimal | |
| return minimal | |
| current_path.append(instance_id) | |
| result = {} | |
| ogm_cls = type(ogm_instance) | |
| ogm_properties = cls._get_ogm_properties(ogm_cls) | |
| if include_properties: | |
| for prop_name, prop in ogm_properties.items(): | |
| try: | |
| result[prop_name] = getattr(ogm_instance, prop_name) | |
| except Exception as e: | |
| logger.warning(f"Failed to get OGM property {prop_name}: {str(e)}") | |
| result[prop_name] = None | |
| processed_objects[instance_id] = result | |
| if include_relationships and max_depth > 0: | |
| ogm_relationships = cls._get_ogm_relationships(ogm_cls) | |
| for rel_name, rel in ogm_relationships.items(): | |
| try: | |
| # Resolve target class without caller namespace modification | |
| target_ogm_class = cls._resolve_node_class(rel.definition['node_class']) | |
| if not target_ogm_class: | |
| logger.warning(f"Could not resolve target class for relationship {rel_name}") | |
| continue | |
| is_single = False | |
| if hasattr(rel, 'manager'): | |
| is_single = rel.manager.__name__ in ('ZeroOrOne', 'One', 'AsyncZeroOrOne', 'AsyncOne') | |
| rel_objs = list(getattr(ogm_instance, rel_name).all()) | |
| if is_single: | |
| if rel_objs: | |
| result[rel_name] = cls.ogm_to_dict( | |
| rel_objs[0], | |
| processed_objects, | |
| max_depth - 1, | |
| current_path.copy() | |
| ) | |
| else: | |
| result[rel_name] = None | |
| else: | |
| result[rel_name] = [] | |
| for obj in rel_objs: | |
| conv = cls.ogm_to_dict( | |
| obj, | |
| processed_objects, | |
| max_depth - 1, | |
| current_path.copy() | |
| ) | |
| if conv: | |
| result[rel_name].append(conv) | |
| except Exception as e: | |
| logger.warning(f"Failed to process relationship {rel_name}: {str(e)}") | |
| result[rel_name] = [] if 'is_single' not in locals() or not is_single else None | |
| current_path.remove(instance_id) | |
| return result | |
| @classmethod | |
| def batch_dict_to_ogm( | |
| cls, | |
| data_dicts: List[dict], | |
| ogm_class: Type[OGM_Model], | |
| max_depth: int = 10 | |
| ) -> List[OGM_Model]: | |
| """ | |
| Batch convert a list of dictionaries to OGM model instances. | |
| """ | |
| processed_objects = {} | |
| from neomodel.sync_.core import db | |
| with db.transaction: | |
| return [ | |
| cls.dict_to_ogm(d, ogm_class, processed_objects, max_depth) | |
| for d in data_dicts | |
| ] | |
| @classmethod | |
| def batch_ogm_to_dict( | |
| cls, | |
| ogm_instances: List[OGM_Model], | |
| max_depth: int = 10, | |
| include_properties: bool = True, | |
| include_relationships: bool = True | |
| ) -> List[dict]: | |
| """ | |
| Batch convert a list of OGM model instances to dictionaries. | |
| """ | |
| processed_objects = {} | |
| return [ | |
| cls.ogm_to_dict( | |
| instance, | |
| processed_objects, | |
| max_depth, | |
| [], | |
| include_properties, | |
| include_relationships | |
| ) | |
| for instance in ogm_instances | |
| ] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| services: | |
| neo4j: | |
| image: neo4j:5.16.0 | |
| container_name: neo4j-pydantic-converter | |
| ports: | |
| # HTTP | |
| - "7474:7474" | |
| # Bolt | |
| - "7687:7687" | |
| # HTTPS | |
| - "7473:7473" | |
| environment: | |
| - NEO4J_AUTH=neo4j/password | |
| - NEO4J_ACCEPT_LICENSE_AGREEMENT=yes | |
| - NEO4J_apoc_export_file_enabled=true | |
| - NEO4J_apoc_import_file_enabled=true | |
| - NEO4J_apoc_import_file_use__neo4j__config=true | |
| - NEO4J_dbms_security_procedures_unrestricted=apoc.*,algo.*,gds.* | |
| # Enable APOC for the inspect_database functionality | |
| - NEO4J_dbms_security_procedures_allowlist=apoc.*,algo.*,gds.* | |
| volumes: | |
| - neo4j_data:/data | |
| - neo4j_logs:/logs | |
| - neo4j_import:/import | |
| - neo4j_conf:/var/lib/neo4j/conf | |
| networks: | |
| - neo4j-network | |
| healthcheck: | |
| test: ["CMD", "curl", "-f", "http://localhost:7474"] | |
| interval: 10s | |
| timeout: 5s | |
| retries: 5 | |
| volumes: | |
| neo4j_data: | |
| neo4j_logs: | |
| neo4j_import: | |
| neo4j_conf: | |
| networks: | |
| neo4j-network: | |
| driver: bridge |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from datetime import date | |
| from typing import List, Optional | |
| from neomodel import ( | |
| StructuredNode, StringProperty, IntegerProperty, FloatProperty, BooleanProperty, | |
| RelationshipTo, config | |
| ) | |
| from pydantic import BaseModel, Field | |
| from converter import Converter | |
| import json | |
| # Configure Neo4j connection | |
| config.DATABASE_URL = 'bolt://neo4j:password@localhost:7687' | |
| # ===== Models for complex nested structure ===== | |
| class ItemPydantic(BaseModel): | |
| name: str | |
| price: float | |
| class OrderPydantic(BaseModel): | |
| uid: str | |
| items: List[ItemPydantic] = Field(default_factory=list) | |
| class CustomerPydantic(BaseModel): | |
| name: str | |
| email: str | |
| orders: List[OrderPydantic] = Field(default_factory=list) | |
| class ItemOGM(StructuredNode): | |
| name = StringProperty() | |
| price = FloatProperty() | |
| class OrderOGM(StructuredNode): | |
| uid = StringProperty() | |
| items = RelationshipTo(ItemOGM, 'CONTAINS') | |
| class CustomerOGM(StructuredNode): | |
| name = StringProperty() | |
| email = StringProperty() | |
| orders = RelationshipTo(OrderOGM, 'PLACED') | |
| # ===== Models for cyclic references ===== | |
| class CyclicPydantic(BaseModel): | |
| name: str | |
| links: List['CyclicPydantic'] = Field(default_factory=list) | |
| CyclicPydantic.model_rebuild() # Resolve forward references | |
| class CyclicOGM(StructuredNode): | |
| name = StringProperty() | |
| links = RelationshipTo('CyclicOGM', 'LINKS_TO') | |
| # Register models | |
| def register_models(): | |
| # Register nested structure models | |
| Converter.register_models(ItemPydantic, ItemOGM) | |
| Converter.register_models(OrderPydantic, OrderOGM) | |
| Converter.register_models(CustomerPydantic, CustomerOGM) | |
| # Register cyclic models | |
| Converter.register_models(CyclicPydantic, CyclicOGM) | |
| # Register custom type converters | |
| Converter.register_type_converter( | |
| date, str, lambda d: d.isoformat() | |
| ) | |
| Converter.register_type_converter( | |
| str, date, lambda s: date.fromisoformat(s) | |
| ) | |
| # ===== Example 1: Complex Nested Structure ===== | |
| def example_complex_structure(): | |
| print("\n=== EXAMPLE 1: COMPLEX NESTED STRUCTURE ===") | |
| # Create sample data | |
| item1 = ItemPydantic(name="Laptop", price=999.99) | |
| item2 = ItemPydantic(name="Headphones", price=89.99) | |
| item3 = ItemPydantic(name="Mouse", price=24.99) | |
| order1 = OrderPydantic(uid="ORD-001", items=[item1, item2]) | |
| order2 = OrderPydantic(uid="ORD-002", items=[item3]) | |
| customer = CustomerPydantic( | |
| name="John Doe", | |
| email="john@example.com", | |
| orders=[order1, order2] | |
| ) | |
| # Print original structure | |
| print("Original Pydantic Structure:") | |
| print(json.dumps({ | |
| "name": customer.name, | |
| "email": customer.email, | |
| "orders": [ | |
| { | |
| "uid": order.uid, | |
| "items": [ | |
| {"name": item.name, "price": item.price} | |
| for item in order.items | |
| ] | |
| } | |
| for order in customer.orders | |
| ] | |
| }, indent=2)) | |
| # Convert to OGM | |
| customer_ogm = Converter.to_ogm(customer) | |
| # Extract data from OGM | |
| ogm_orders = list(customer_ogm.orders.all()) | |
| ogm_items = [] | |
| for order in ogm_orders: | |
| ogm_items.extend(list(order.items.all())) | |
| print("\nConverted to OGM:") | |
| print(json.dumps({ | |
| "name": customer_ogm.name, | |
| "email": customer_ogm.email, | |
| "orders_count": len(ogm_orders), | |
| "order_uids": [order.uid for order in ogm_orders], | |
| "items_count": len(ogm_items), | |
| "items": [{"name": item.name, "price": item.price} for item in ogm_items] | |
| }, indent=2)) | |
| # Convert to dict | |
| customer_dict = Converter.ogm_to_dict(customer_ogm) | |
| print("\nConverted to Dict:") | |
| print(json.dumps(customer_dict, default=str, indent=2)) | |
| # Convert back to Pydantic | |
| customer_py = Converter.to_pydantic(customer_ogm) | |
| print("\nRound-trip to Pydantic:") | |
| print(json.dumps({ | |
| "name": customer_py.name, | |
| "email": customer_py.email, | |
| "orders_count": len(customer_py.orders), | |
| "order_uids": [order.uid for order in customer_py.orders], | |
| "items": [ | |
| {"name": item.name, "price": item.price} | |
| for order in customer_py.orders | |
| for item in order.items | |
| ] | |
| }, indent=2)) | |
| # ===== Example 2: Cyclic References ===== | |
| def example_cyclic_references(): | |
| print("\n=== EXAMPLE 2: CYCLIC REFERENCES ===") | |
| # Create cycle: A -> B -> C -> A | |
| node_a = CyclicPydantic(name="NodeA") | |
| node_b = CyclicPydantic(name="NodeB") | |
| node_c = CyclicPydantic(name="NodeC") | |
| node_a.links = [node_b] | |
| node_b.links = [node_c] | |
| node_c.links = [node_a] # Creates cycle | |
| print("Original Cyclic Structure:") | |
| print(json.dumps({ | |
| "node": node_a.name, | |
| "links_to": node_a.links[0].name, | |
| "links_to_links_to": node_a.links[0].links[0].name, | |
| "links_to_links_to_links_to": node_a.links[0].links[0].links[0].name, | |
| "cycle_detected": node_a.links[0].links[0].links[0].name == node_a.name | |
| }, indent=2)) | |
| # Convert to OGM | |
| node_a_ogm = Converter.to_ogm(node_a) | |
| # Extract data from OGM | |
| ogm_bs = list(node_a_ogm.links.all()) | |
| ogm_cs = list(ogm_bs[0].links.all()) | |
| ogm_as = list(ogm_cs[0].links.all()) | |
| print("\nConverted to OGM:") | |
| print(json.dumps({ | |
| "node": node_a_ogm.name, | |
| "links_to": ogm_bs[0].name, | |
| "links_to_links_to": ogm_cs[0].name, | |
| "links_to_links_to_links_to": ogm_as[0].name, | |
| "cycle_detected": ogm_as[0].name == node_a_ogm.name | |
| }, indent=2)) | |
| # Convert to dict | |
| node_dict = Converter.ogm_to_dict(node_a_ogm, max_depth=4) | |
| print("\nConverted to Dict (trimmed for readability):") | |
| print(json.dumps({ | |
| "name": node_dict["name"], | |
| "links": [ | |
| { | |
| "name": node_dict["links"][0]["name"], | |
| "links": [ | |
| { | |
| "name": node_dict["links"][0]["links"][0]["name"], | |
| "has_links_back": "links" in node_dict["links"][0]["links"][0] | |
| } | |
| ] | |
| } | |
| ] | |
| }, indent=2)) | |
| # Convert back to Pydantic | |
| node_py = Converter.to_pydantic(node_a_ogm) | |
| print("\nRound-trip to Pydantic:") | |
| print(json.dumps({ | |
| "node": node_py.name, | |
| "links_to": node_py.links[0].name, | |
| "links_to_links_to": node_py.links[0].links[0].name, | |
| "links_to_links_to_links_to": node_py.links[0].links[0].links[0].name, | |
| "cycle_detected": node_py.links[0].links[0].links[0].name == node_py.name | |
| }, indent=2)) | |
| if __name__ == "__main__": | |
| # Register all models | |
| register_models() | |
| # Run examples | |
| example_complex_structure() | |
| example_cyclic_references() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # ./tests/test_complex_relationships.py | |
| from typing import List | |
| import pytest | |
| from neomodel import ( | |
| StructuredNode, StringProperty, DateTimeProperty, | |
| RelationshipTo, StructuredRel | |
| ) | |
| from pydantic import BaseModel, Field | |
| from converter import Converter | |
| # ===== Module-level model definitions ===== | |
| # Relationship model | |
| class AuthoredRel(StructuredRel): | |
| created_at = DateTimeProperty() | |
| # Pydantic models | |
| class CommentPydantic(BaseModel): | |
| content: str | |
| author: str | |
| class PostPydantic(BaseModel): | |
| title: str | |
| content: str | |
| comments: List[CommentPydantic] = Field(default_factory=list) | |
| class BloggerPydantic(BaseModel): | |
| name: str | |
| bio: str | |
| posts: List[PostPydantic] = Field(default_factory=list) | |
| following: List['BloggerPydantic'] = Field(default_factory=list) | |
| # Resolve forward references | |
| BloggerPydantic.model_rebuild() | |
| # Neo4j OGM models | |
| class CommentOGM(StructuredNode): | |
| content = StringProperty(required=True) | |
| author = StringProperty(required=True) | |
| class PostOGM(StructuredNode): | |
| title = StringProperty(required=True) | |
| content = StringProperty(required=True) | |
| comments = RelationshipTo(CommentOGM, 'HAS_COMMENT') | |
| class BloggerOGM(StructuredNode): | |
| name = StringProperty(required=True) | |
| bio = StringProperty() | |
| posts = RelationshipTo(PostOGM, 'AUTHORED', model=AuthoredRel) | |
| following = RelationshipTo('BloggerOGM', 'FOLLOWS') | |
| # ===== Fixtures ===== | |
| @pytest.fixture | |
| def registered_models(): | |
| """Register models""" | |
| Converter.register_models(CommentPydantic, CommentOGM) | |
| Converter.register_models(PostPydantic, PostOGM) | |
| Converter.register_models(BloggerPydantic, BloggerOGM) | |
| yield | |
| @pytest.fixture | |
| def blogger_fixture(): | |
| """Create test blogger with posts and comments""" | |
| # Create test data | |
| comment1 = CommentPydantic(content="Great post!", author="Reader1") | |
| comment2 = CommentPydantic(content="I learned a lot", author="Reader2") | |
| post1 = PostPydantic( | |
| title="Introduction to Neo4j", | |
| content="Neo4j is a graph database...", | |
| comments=[comment1, comment2] | |
| ) | |
| post2 = PostPydantic( | |
| title="Pydantic and Data Validation", | |
| content="Pydantic provides data validation...", | |
| comments=[CommentPydantic(content="Very helpful", author="Reader3")] | |
| ) | |
| blogger = BloggerPydantic( | |
| name="Tech Writer", | |
| bio="I write about tech", | |
| posts=[post1, post2] | |
| ) | |
| # Add self-reference | |
| another_blogger = BloggerPydantic( | |
| name="Code Expert", | |
| bio="Professional developer", | |
| posts=[] | |
| ) | |
| blogger.following = [another_blogger] | |
| return blogger | |
| # ===== Test Class ===== | |
| class TestComplexRelationships: | |
| """Tests for models with complex nested relationships""" | |
| def test_nested_relationships(self, db_connection, registered_models, blogger_fixture): | |
| """ | |
| Test converting models with complex nested relationships (lists and depth). | |
| Verifies that multi-level relationships (blogger->posts->comments) and self-references | |
| are properly handled during conversion. | |
| """ | |
| # Get blogger from fixture | |
| blogger = blogger_fixture | |
| # Convert to OGM | |
| blogger_ogm = Converter.to_ogm(blogger) | |
| # Verify blogger properties | |
| assert blogger_ogm.name == "Tech Writer", "Blogger name not preserved" | |
| assert blogger_ogm.bio == "I write about tech", "Blogger bio not preserved" | |
| # Verify posts relationship | |
| posts = list(blogger_ogm.posts.all()) | |
| assert len(posts) == 2, "Incorrect number of posts" | |
| # Find the post with specific title | |
| pyd_post = next((p for p in posts if p.title == "Pydantic and Data Validation"), None) | |
| assert pyd_post is not None, "Post 'Pydantic and Data Validation' not found" | |
| # Verify comments | |
| comments = list(pyd_post.comments.all()) | |
| assert len(comments) == 1, "Incorrect number of comments" | |
| assert comments[0].content == "Very helpful", "Comment content not preserved" | |
| assert comments[0].author == "Reader3", "Comment author not preserved" | |
| # Convert back to Pydantic | |
| converted_back = Converter.to_pydantic(blogger_ogm) | |
| # Verify structure after round trip | |
| assert converted_back.name == blogger.name, "Blogger name not preserved in round trip" | |
| assert len(converted_back.posts) == 2, "Incorrect number of posts in round trip" | |
| # Find post in converted data | |
| converted_post = next((p for p in converted_back.posts if p.title == "Pydantic and Data Validation"), None) | |
| assert converted_post is not None, "Post not found in round trip" | |
| assert len(converted_post.comments) == 1, "Incorrect number of comments in round trip" | |
| assert converted_post.comments[0].content == "Very helpful", "Comment content not preserved in round trip" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment