Skip to content

Instantly share code, notes, and snippets.

@HardMax71
Created March 11, 2025 12:03
Show Gist options
  • Select an option

  • Save HardMax71/0841e74abe4b6cb3051726401a54a996 to your computer and use it in GitHub Desktop.

Select an option

Save HardMax71/0841e74abe4b6cb3051726401a54a996 to your computer and use it in GitHub Desktop.
Pydantic model <-> neomodel OGM (neo4j) <-> python dict converter
# ./tests/conftest.py
import pytest
from neomodel import config, db
from converter import Converter
@pytest.fixture(scope="session")
def db_connection():
"""Setup Neo4j database connection for all tests"""
config.DATABASE_URL = 'bolt://neo4j:password@localhost:7687'
yield
# Clear database after all tests
db.cypher_query("MATCH (n) DETACH DELETE n")
@pytest.fixture(autouse=True)
def clean_registry():
"""Reset the converter registry between tests to ensure test isolation"""
yield
# Original cleanup
Converter._pydantic_to_ogm = {}
Converter._ogm_to_pydantic = {}
Converter._type_converters = {}
"""
Pydantic to Neo4j OGM Converter.
This module provides a utility for converting between Pydantic models and Neo4j OGM models
with support for relationships, nested models, and custom type conversions.
"""
import inspect
import logging
from datetime import datetime, date
from functools import lru_cache
from typing import Type, TypeVar, Dict, List, Any, Optional, Union, Tuple, get_type_hints, Callable
from neomodel import (
StructuredNode, RelationshipTo, RelationshipFrom,
Relationship, db
)
from neomodel.properties import (
StringProperty, IntegerProperty, FloatProperty, BooleanProperty,
DateTimeProperty, ArrayProperty, JSONProperty
)
from pydantic import BaseModel
# Type variables for generic typing
PydanticModel = TypeVar('PydanticModel', bound=BaseModel)
OGM_Model = TypeVar('OGM_Model', bound=StructuredNode)
# Configure logger
logger = logging.getLogger(__name__)
class ConversionError(Exception):
"""Exception raised for errors during model conversion."""
pass
class Converter:
"""
A utility class for converting between Pydantic models and neomodel OGM models.
This converter handles:
- Basic property conversion between models.
- Conversion of nested Pydantic models into related OGM nodes.
- Processing of relationships at any depth (including cyclic references).
- Conversion of lists and dictionaries of models.
- Custom type conversions via user-registered converters.
- Conversion from Python dictionaries to OGM models (via dict_to_ogm)
and from OGM models to Python dictionaries (via ogm_to_dict).
- Batch conversion of multiple models (via batch_to_ogm and batch_to_pydantic).
Usage Example:
--------------
from pydantic import BaseModel
from neomodel import StructuredNode, StringProperty, IntegerProperty, config, db
from converter import Converter
# Setup the Neo4j connection (must be done before using neomodel models)
config.DATABASE_URL = 'bolt://neo4j:password@localhost:7687'
# Define a Pydantic model
class UserPydantic(BaseModel):
name: str
email: str
age: int
# Define a corresponding neomodel OGM model
class UserOGM(StructuredNode):
name = StringProperty(required=True)
email = StringProperty(unique_index=True, required=True)
age = IntegerProperty(default=0)
# Register the mapping between the Pydantic model and the OGM model.
# This ensures that the Converter knows how to translate between the two.
Converter.register_models(UserPydantic, UserOGM)
# Create an instance of the Pydantic model.
user_py = UserPydantic(name="Alice", email="alice@example.com", age=30)
# Convert the Pydantic model to an OGM model.
user_ogm = Converter.to_ogm(user_py)
print("Converted to OGM:", user_ogm.name, user_ogm.email, user_ogm.age)
# Convert the OGM model back to a Pydantic model.
# By default, this conversion uses the registered mapping.
user_py_converted = Converter.to_pydantic(user_ogm)
print("Converted back to Pydantic:", user_py_converted.name, user_py_converted.email, user_py_converted.age)
# The converter also supports converting from dictionaries:
data_dict = {"name": "Bob", "email": "bob@example.com", "age": 25}
user_ogm_from_dict = Converter.dict_to_ogm(data_dict, UserOGM)
print("Converted from dict to OGM:", user_ogm_from_dict.name, user_ogm_from_dict.email, user_ogm_from_dict.age)
# And converting OGM models to dictionaries:
user_dict = Converter.ogm_to_dict(user_ogm_from_dict)
print("OGM converted to dict:", user_dict)
"""
# Registry to store mappings between Pydantic and OGM models
_pydantic_to_ogm: Dict[Type[BaseModel], Type[StructuredNode]] = {}
_ogm_to_pydantic: Dict[Type[StructuredNode], Type[BaseModel]] = {}
# Custom type converters
_type_converters: Dict[Tuple[Type, Type], Callable[[Any], Any]] = {}
@classmethod
def register_type_converter(
cls,
source_type: Type,
target_type: Type,
converter_func: Callable[[Any], Any]
) -> None:
"""
Register a custom type converter function.
Args:
source_type (Type): The source type to convert from.
target_type (Type): The target type to convert to.
converter_func (Callable[[Any], Any]): A function that converts a value from source_type to target_type.
Returns:
None.
"""
cls._type_converters[(source_type, target_type)] = converter_func
logger.debug(f"Registered type converter: {source_type.__name__} -> {target_type.__name__}")
@classmethod
@lru_cache(maxsize=128)
def _is_property(cls, attr: Any) -> bool:
"""
Determine if the given attribute is a neomodel property.
This function checks whether the attribute is an instance of any known neomodel property type.
Args:
attr (Any): The attribute to check.
Returns:
bool: True if the attribute is a neomodel property, False otherwise.
"""
try:
property_types = (
StringProperty, IntegerProperty, FloatProperty, BooleanProperty,
DateTimeProperty, ArrayProperty, JSONProperty
)
return isinstance(attr, property_types)
except TypeError:
return False
@classmethod
@lru_cache(maxsize=128)
def _is_relationship(cls, attr: Any) -> bool:
"""
Determine if the given attribute is a neomodel relationship.
Args:
attr (Any): The attribute to check.
Returns:
bool: True if the attribute is a neomodel relationship, False otherwise.
"""
return isinstance(attr, (RelationshipTo, RelationshipFrom, Relationship))
@classmethod
def _get_ogm_properties(cls, ogm_class: Type[StructuredNode]) -> Dict[str, Any]:
"""
Retrieve all property fields of a neomodel StructuredNode class.
Args:
ogm_class (Type[StructuredNode]): A neomodel StructuredNode class.
Returns:
Dict[str, Any]: A dictionary mapping property names to their corresponding property instances.
"""
# Use __all_properties__ attribute which is available in StructuredNode classes
if hasattr(ogm_class, '__all_properties__'):
return dict(ogm_class.__all_properties__)
# As a fallback, use defined_properties method if available
if hasattr(ogm_class, 'defined_properties'):
try:
return ogm_class.defined_properties(aliases=False, rels=False)
except Exception as e:
logger.warning(f"Error using defined_properties: {e}")
# Manual inspection as last resort
properties = {}
for name, attr in inspect.getmembers(ogm_class):
try:
if cls._is_property(attr) and not name.startswith('_'):
properties[name] = attr
except Exception:
continue
return properties
@classmethod
def _get_ogm_relationships(
cls, ogm_class: Type[StructuredNode]
) -> Dict[str, Union[RelationshipTo, RelationshipFrom, Relationship]]:
"""
Retrieve all relationship fields of a neomodel StructuredNode class.
Args:
ogm_class (Type[StructuredNode]): A neomodel StructuredNode class.
Returns:
Dict[str, Union[RelationshipTo, RelationshipFrom, Relationship]]:
A dictionary mapping relationship names to their corresponding relationship instances.
"""
# Use __all_relationships__ attribute which is available in StructuredNode classes
if hasattr(ogm_class, '__all_relationships__'):
return dict(ogm_class.__all_relationships__)
# As a fallback, use defined_properties method if available
if hasattr(ogm_class, 'defined_properties'):
try:
return ogm_class.defined_properties(aliases=False, rels=True, properties=False)
except Exception as e:
logger.warning(f"Error using defined_properties: {e}")
# Manual inspection as last resort
relationships = {}
for name, attr in inspect.getmembers(ogm_class):
try:
if cls._is_relationship(attr) and not name.startswith('_'):
relationships[name] = attr
except Exception:
continue
return relationships
@classmethod
def _get_property_type(cls, prop: Any) -> Type:
"""
Determine the Python type corresponding to a neomodel property.
Args:
prop (Any): A neomodel property.
Returns:
Type: The Python type associated with the property.
"""
# Map neomodel property types to Python types
if isinstance(prop, StringProperty):
return str
elif isinstance(prop, IntegerProperty):
return int
elif isinstance(prop, FloatProperty):
return float
elif isinstance(prop, BooleanProperty):
return bool
elif isinstance(prop, DateTimeProperty):
return datetime
elif isinstance(prop, ArrayProperty):
return list
elif isinstance(prop, JSONProperty):
return dict
# For any other property types, try to get the type from the property_type attribute
# or use Any as a fallback
try:
return getattr(prop, 'property_type', Any)
except (AttributeError, TypeError):
return Any
@classmethod
def _convert_value(cls, value: Any, target_type: Any) -> Any:
"""
Convert the given value to the specified target type using registered converters if available.
Args:
value (Any): The value to convert.
target_type (Type): The target type to which the value should be converted.
Returns:
Any: The converted value.
"""
if value is None:
return None
source_type = type(value)
# Check for direct registered converter
converter = cls._type_converters.get((source_type, target_type))
if converter:
return converter(value)
# Handle basic type conversions
if target_type is str and not isinstance(value, str):
return str(value)
if target_type is int and isinstance(value, (str, float)):
return int(value)
if target_type is float and isinstance(value, (str, int)):
return float(value)
if target_type is bool and not isinstance(value, bool):
return bool(value)
if target_type is datetime and isinstance(value, str):
try:
# Try ISO format first
return datetime.fromisoformat(value)
except ValueError:
# Try other common formats
for fmt in ["%Y-%m-%d", "%Y/%m/%d", "%d-%m-%Y", "%d/%m/%Y"]:
try:
return datetime.strptime(value, fmt)
except ValueError:
continue
# If all else fails, raise an error
raise ConversionError(f"Cannot convert string '{value}' to datetime")
if target_type is date and isinstance(value, str):
try:
# Try ISO format first
return date.fromisoformat(value)
except ValueError:
# Try other common formats
for fmt in ["%Y-%m-%d", "%Y/%m/%d", "%d-%m-%Y", "%d/%m/%Y"]:
try:
return datetime.strptime(value, fmt).date()
except ValueError:
continue
# If all else fails, raise an error
raise ConversionError(f"Cannot convert string '{value}' to date")
# If the target_type is a class and the value is a dict, attempt to create an instance
if isinstance(target_type, type) and issubclass(target_type, BaseModel) and isinstance(value, dict):
return target_type(**value)
# If we get here, just return the original value
return value
@classmethod
def register_models(cls, pydantic_class: Type[BaseModel], ogm_class: Type[StructuredNode]) -> None:
"""
Register a mapping between a Pydantic model class and a neomodel OGM model class.
Args:
pydantic_class (Type[BaseModel]): The Pydantic model class
ogm_class (Type[StructuredNode]): The neomodel OGM model class
"""
cls._pydantic_to_ogm[pydantic_class] = ogm_class
cls._ogm_to_pydantic[ogm_class] = pydantic_class
logger.debug(f"Registered mapping: {pydantic_class.__name__} <-> {ogm_class.__name__}")
@classmethod
def _process_pydantic_field(cls, pydantic_instance, field_name, pydantic_data):
"""
Process a single field from a Pydantic model, handling BaseModel instances and lists of BaseModels.
This helper function extracts a field value from a Pydantic model instance,
performs special handling for BaseModel instances and lists of BaseModels
to avoid circular references, and updates the provided data dictionary.
Args:
pydantic_instance: The Pydantic model instance
field_name: The name of the field to process
pydantic_data: The dictionary to update with the processed field value
Returns:
None - updates pydantic_data in-place
"""
try:
value = getattr(pydantic_instance, field_name)
if isinstance(value, BaseModel):
# Skip BaseModel instances to avoid circular references
return
elif isinstance(value, list) and all(isinstance(item, BaseModel) for item in value):
# Deduplicate items in lists of BaseModels
processed_list = []
seen_ids = set()
for item in value:
if id(item) not in seen_ids:
seen_ids.add(id(item))
processed_list.append(item)
value = processed_list
pydantic_data[field_name] = value
except Exception:
# Skip fields that can't be accessed
return
@classmethod
def _resolve_node_class(cls, class_name_or_obj: Union[str, Type[StructuredNode]]) -> Optional[Type[StructuredNode]]:
"""
Resolve a class name or object to an actual class without modifying registries or namespaces.
Args:
class_name_or_obj: Either a string class name or a class object
Returns:
The resolved class or None if not found
"""
# If already a class, just return it
if not isinstance(class_name_or_obj, str):
return class_name_or_obj
# Extract simple name from potentially module-prefixed name
simple_name = class_name_or_obj.split('.')[-1]
# Look in our own mapping first
for ogm_cls in cls._ogm_to_pydantic.keys():
if ogm_cls.__name__ == simple_name:
return ogm_cls
# Look in Neo4j's registry
for label_set, cls_obj in db._NODE_CLASS_REGISTRY.items():
if cls_obj.__name__ == simple_name:
return cls_obj
# If we didn't find it, return None
return None
@classmethod
def to_ogm(
cls,
pydantic_instance: PydanticModel,
ogm_class: Optional[Type[OGM_Model]] = None,
processed_objects: Optional[Dict[int, OGM_Model]] = None,
max_depth: int = 10
) -> Optional[OGM_Model]:
"""
Convert a Pydantic model instance to a neomodel OGM model instance.
Args:
pydantic_instance (P): The Pydantic model instance to convert.
ogm_class (Optional[Type[O]]): The target neomodel OGM model class. If not provided, the registered mapping is used.
processed_objects (Optional[Dict[int, O]]): A dictionary to track already processed objects for handling cyclic references.
max_depth (int): The maximum recursion depth for processing nested relationships.
Returns:
O: The converted neomodel OGM model instance.
"""
if pydantic_instance is None:
return None
if max_depth <= 0:
logger.warning("Maximum recursion depth reached during to_ogm conversion")
return None
if processed_objects is None:
processed_objects = {}
instance_id = id(pydantic_instance)
if instance_id in processed_objects:
return processed_objects[instance_id]
if ogm_class is None:
pydantic_class = type(pydantic_instance)
if pydantic_class not in cls._pydantic_to_ogm:
raise ConversionError(f"No mapping registered for Pydantic class {pydantic_class.__name__}")
ogm_class = cls._pydantic_to_ogm[pydantic_class]
# Create the OGM instance.
ogm_instance = ogm_class()
processed_objects[instance_id] = ogm_instance
# Extract Pydantic data (handling cyclic references).
try:
pydantic_data = {}
try:
pydantic_data = pydantic_instance.model_dump(exclude_unset=True, exclude_none=True)
except ValueError as e:
if "Circular reference detected" in str(e):
field_names = pydantic_instance.model_fields.keys()
for field_name in field_names:
cls._process_pydantic_field(pydantic_instance, field_name, pydantic_data)
else:
raise
except Exception as e:
raise ConversionError(f"Failed to get dictionary from Pydantic model: {str(e)}")
ogm_properties = cls._get_ogm_properties(ogm_class)
ogm_relationships = cls._get_ogm_relationships(ogm_class)
for prop_name, prop in ogm_properties.items():
if prop_name in pydantic_data:
value = pydantic_data[prop_name]
prop_type = cls._get_property_type(prop)
converted_value = cls._convert_value(value, prop_type)
setattr(ogm_instance, prop_name, converted_value)
try:
ogm_instance.save()
except Exception as e:
logger.warning(f"Failed to save OGM instance with properties: {str(e)}")
for rel_name, rel in ogm_relationships.items():
try:
rel_data = getattr(pydantic_instance, rel_name)
if rel_data is None:
continue
# Resolve target class without monkey patching
target_ogm_class = cls._resolve_node_class(rel.definition['node_class'])
if not target_ogm_class:
logger.warning(f"Could not resolve target class for relationship {rel_name}")
continue
# Convert to list if needed
if not isinstance(rel_data, list):
rel_data = [rel_data]
for item in rel_data:
cls._process_related_item(
item,
ogm_instance,
rel_name,
target_ogm_class,
processed_objects,
max_depth,
instance_id
)
except Exception as e:
logger.warning(f"Failed to process relationship {rel_name}: {str(e)}")
try:
ogm_instance.save()
except Exception as e:
raise ConversionError(f"Failed to save OGM instance with relationships: {str(e)}")
return ogm_instance
@classmethod
def _process_related_item(cls, item, ogm_instance, rel_name, target_ogm_class,
processed_objects, max_depth, instance_id):
"""
Process a single related item and connect it to the OGM instance if successful.
Args:
item: The item to process (BaseModel or dict)
ogm_instance: The OGM instance to connect the related item to
rel_name: The name of the relationship
target_ogm_class: The target OGM class
processed_objects: Dictionary of already processed objects
max_depth: Maximum recursion depth
instance_id: ID of the parent instance (to avoid circular references)
Returns:
bool: Whether the item was successfully processed and connected
"""
if isinstance(item, BaseModel):
if id(item) == instance_id:
return False
related_instance = cls.to_ogm(
item,
target_ogm_class,
processed_objects,
max_depth - 1
)
if related_instance:
getattr(ogm_instance, rel_name).connect(related_instance)
return True
elif isinstance(item, dict):
target_pydantic_class = cls._ogm_to_pydantic.get(target_ogm_class)
if target_pydantic_class:
try:
related_pydantic = target_pydantic_class(**item)
related_instance = cls.to_ogm(
related_pydantic,
target_ogm_class,
processed_objects,
max_depth - 1
)
if related_instance:
getattr(ogm_instance, rel_name).connect(related_instance)
return True
except Exception as e:
logger.warning(
f"Failed to process dict item in relationship {rel_name}: {str(e)}")
else:
logger.warning(
f"No Pydantic model registered for OGM class {target_ogm_class.__name__}")
return False
@classmethod
def to_pydantic(
cls,
ogm_instance: OGM_Model,
pydantic_class: Optional[Type[PydanticModel]] = None,
processed_objects: Optional[Dict[int, PydanticModel]] = None,
max_depth: int = 10,
current_path: Optional[List[int]] = None
) -> Optional[PydanticModel]:
"""
Convert a neomodel OGM model instance to a Pydantic model instance.
This function recursively converts a neomodel OGM model (including its relationships)
into its corresponding Pydantic model instance.
Args:
ogm_instance (O): The neomodel OGM model instance to convert.
pydantic_class (Optional[Type[P]]): The target Pydantic model class. If not provided, the registered mapping is used.
processed_objects (Optional[Dict[int, P]]): A dictionary to track already processed objects for handling cycles.
max_depth (int): The maximum recursion depth for processing nested relationships.
current_path (Optional[List[int]]): A list of object IDs representing the current conversion path for cycle detection.
Returns:
P: The converted Pydantic model instance.
"""
if ogm_instance is None:
return None
# Initialize tracking for cyclic references and path
if processed_objects is None:
processed_objects = {}
if current_path is None:
current_path = []
# Get instance ID for cycle detection
instance_id = id(ogm_instance)
# If we've seen this object before in our current conversion path, we have a cycle
if instance_id in current_path:
# We've detected a cycle - return the already processed instance if available
if instance_id in processed_objects:
return processed_objects[instance_id]
# Otherwise we need to create a minimal instance
if pydantic_class is None:
ogm_class = type(ogm_instance)
if ogm_class not in cls._ogm_to_pydantic:
raise ConversionError(f"No mapping registered for OGM class {ogm_class.__name__}")
pydantic_class = cls._ogm_to_pydantic[ogm_class]
# Create minimal instance with just required fields
# Usually an ID or primary key field is enough to establish the reference
ogm_properties = cls._get_ogm_properties(type(ogm_instance))
pydantic_data = {}
# Try to at least include required fields or primary identifiers
for prop_name, prop in ogm_properties.items():
# Include only basic identifier fields to break the cycle
if hasattr(prop, 'required') and prop.required or hasattr(prop, 'unique_index') and prop.unique_index:
try:
pydantic_data[prop_name] = getattr(ogm_instance, prop_name)
except Exception:
pass
# Create a minimal instance with just the required fields
stub_instance = pydantic_class(**pydantic_data)
processed_objects[instance_id] = stub_instance
return stub_instance
# Check if we've already fully processed this object
if instance_id in processed_objects:
return processed_objects[instance_id]
# Check max depth
if max_depth <= 0:
logger.info(f"Maximum recursion depth reached for {type(ogm_instance).__name__}")
# Still create a minimal instance with just properties, no relationships
if pydantic_class is None:
ogm_class = type(ogm_instance)
if ogm_class not in cls._ogm_to_pydantic:
raise ConversionError(f"No mapping registered for OGM class {ogm_class.__name__}")
pydantic_class = cls._ogm_to_pydantic[ogm_class]
# Just extract basic properties
ogm_properties = cls._get_ogm_properties(type(ogm_instance))
pydantic_data = {}
for prop_name, prop in ogm_properties.items():
try:
pydantic_data[prop_name] = getattr(ogm_instance, prop_name)
except Exception:
pass
# Create minimal instance
stub_instance = pydantic_class(**pydantic_data)
processed_objects[instance_id] = stub_instance
return stub_instance
# Add this object ID to the current path
current_path.append(instance_id)
# Try to infer the Pydantic class if not provided
if pydantic_class is None:
ogm_class = type(ogm_instance)
if ogm_class not in cls._ogm_to_pydantic:
raise ConversionError(f"No mapping registered for OGM class {ogm_class.__name__}")
pydantic_class = cls._ogm_to_pydantic[ogm_class]
# Create a dictionary to hold all data for the Pydantic model
pydantic_data = {}
# Get OGM properties and relationships
ogm_class = type(ogm_instance)
ogm_properties = cls._get_ogm_properties(ogm_class)
ogm_relationships = cls._get_ogm_relationships(ogm_class)
# Create a namespace with all registered Pydantic classes for resolving forward references
localns = {}
for pyd_cls in cls._pydantic_to_ogm.keys():
localns[pyd_cls.__name__] = pyd_cls
# Ensure the current class is in the namespace
localns[pydantic_class.__name__] = pydantic_class
# Get Pydantic model fields with proper namespace for resolving forward references
try:
pydantic_fields = get_type_hints(pydantic_class, globalns=None, localns=localns)
except Exception as e:
logger.warning(f"Failed to resolve type hints for {pydantic_class.__name__}: {str(e)}")
# Fall back to raw annotations if available
if hasattr(pydantic_class, "__annotations__"):
pydantic_fields = pydantic_class.__annotations__
else:
# Last resort - create an empty dict
pydantic_fields = {}
logger.warning(f"Could not get field types for {pydantic_class.__name__}")
# Process properties first
for prop_name, prop in ogm_properties.items():
if prop_name in pydantic_fields:
try:
# Get the property value from OGM instance
value = getattr(ogm_instance, prop_name)
# Convert to expected Pydantic type if needed
target_type = pydantic_fields[prop_name]
converted_value = cls._convert_value(value, target_type)
# Add to Pydantic data dictionary
pydantic_data[prop_name] = converted_value
except Exception as e:
logger.warning(f"Failed to get OGM property {prop_name}: {str(e)}")
# Use default value or None for the field
pydantic_data[prop_name] = None
# Create Pydantic instance with properties only first
try:
pydantic_instance = pydantic_class.parse_obj(pydantic_data)
# Add to processed objects early to handle circular references
processed_objects[instance_id] = pydantic_instance
except Exception as e:
raise ConversionError(f"Failed to create Pydantic instance: {str(e)}")
# Now process relationships (after registering the instance to handle cycles)
for rel_name, rel in ogm_relationships.items():
if rel_name in pydantic_fields:
try:
# Get the target OGM class for this relationship
target_ogm_class = rel.definition['node_class']
# Try to find the corresponding Pydantic class
target_pydantic_class = cls._ogm_to_pydantic.get(target_ogm_class)
if not target_pydantic_class:
logger.warning(
f"No Pydantic model registered for OGM class {target_ogm_class.__name__}"
)
continue
# Check cardinality to determine if single or list
# Look for cardinality in manager attribute, definition.get, or class name
cardinality_name = None
is_single = False
if hasattr(rel, 'manager'):
cardinality_name = rel.manager.__name__
elif 'cardinality' in rel.definition:
cardinality_name = rel.definition['cardinality'].__name__
if cardinality_name:
is_single = cardinality_name in ('ZeroOrOne', 'One', 'AsyncZeroOrOne', 'AsyncOne')
# Fetch related nodes
rel_objects = list(getattr(ogm_instance, rel_name).all())
if is_single:
# Handle single related object
if rel_objects:
related_pydantic = cls.to_pydantic(
rel_objects[0],
target_pydantic_class,
processed_objects,
max_depth - 1,
current_path.copy() # Make a copy of the path for this branch
)
setattr(pydantic_instance, rel_name, related_pydantic)
else:
setattr(pydantic_instance, rel_name, None)
else:
# Handle collection of related objects
related_pydantics = []
for obj in rel_objects:
related = cls.to_pydantic(
obj,
target_pydantic_class,
processed_objects,
max_depth - 1,
current_path.copy() # Make a copy of the path for this branch
)
if related:
related_pydantics.append(related)
setattr(pydantic_instance, rel_name, related_pydantics)
except Exception as e:
logger.warning(f"Failed to process relationship {rel_name}: {str(e)}")
# Use empty list or None for the field
try:
if 'is_single' in locals() and is_single:
setattr(pydantic_instance, rel_name, None)
else:
setattr(pydantic_instance, rel_name, [])
except Exception:
# Last resort if we can't determine cardinality
setattr(pydantic_instance, rel_name, [])
# Remove this object from the current path since we're done processing it
current_path.remove(instance_id)
return pydantic_instance
@classmethod
def batch_to_ogm(
cls,
pydantic_instances: List[PydanticModel],
ogm_class: Optional[Type[OGM_Model]] = None,
max_depth: int = 10
) -> List[OGM_Model]:
"""
Convert a list of Pydantic model instances to neomodel OGM model instances within a single transaction.
This method is optimized for batch conversion of multiple instances, utilizing a single database transaction
for improved performance.
Args:
pydantic_instances (List[P]): A list of Pydantic model instances to convert.
ogm_class (Optional[Type[O]]): The target neomodel OGM model class. If not provided, the registered mapping is used.
max_depth (int): The maximum recursion depth for processing nested relationships.
Returns:
List[O]: A list of converted neomodel OGM model instances.
Raises:
ConversionError: If the conversion fails.
"""
if not pydantic_instances:
return []
# Use a single processed_objects dictionary for the entire batch
processed_objects = {}
# Use a transaction for the entire batch
with db.transaction:
return [
cls.to_ogm(instance, ogm_class, processed_objects, max_depth)
for instance in pydantic_instances
]
@classmethod
def batch_to_pydantic(
cls,
ogm_instances: List[OGM_Model],
pydantic_class: Optional[Type[PydanticModel]] = None,
max_depth: int = 10
) -> List[PydanticModel]:
"""
Convert a list of neomodel OGM model instances to Pydantic model instances.
Args:
ogm_instances (List[O]): A list of neomodel OGM model instances to convert.
pydantic_class (Optional[Type[P]]): The target Pydantic model class. If not provided, the registered mapping is used.
max_depth (int): The maximum recursion depth for processing nested relationships.
Returns:
List[P]: A list of converted Pydantic model instances.
"""
if not ogm_instances:
return []
# Use a single processed_objects dictionary for the entire batch
processed_objects = {}
return [
cls.to_pydantic(instance, pydantic_class, processed_objects, max_depth, [])
for instance in ogm_instances
]
@classmethod
def dict_to_ogm(
cls,
data_dict: dict,
ogm_class: Type[OGM_Model],
processed_objects: Optional[Dict[int, OGM_Model]] = None,
max_depth: int = 10
) -> Optional[OGM_Model]:
"""
Convert a Python dictionary to a neomodel OGM model instance.
This function recursively converts a dictionary (including nested dictionaries)
into a neomodel OGM model instance.
"""
if data_dict is None:
return None
if max_depth <= 0:
logger.warning("Maximum recursion depth reached during dict_to_ogm conversion")
return None
if processed_objects is None:
processed_objects = {}
instance_id = id(data_dict)
if instance_id in processed_objects:
return processed_objects[instance_id]
# Create a new OGM instance and register it for cycle handling
ogm_instance = ogm_class()
processed_objects[instance_id] = ogm_instance
# Process properties
ogm_properties = cls._get_ogm_properties(ogm_class)
for prop_name, prop in ogm_properties.items():
if prop_name in data_dict:
try:
value = data_dict[prop_name]
prop_type = cls._get_property_type(prop)
converted_value = cls._convert_value(value, prop_type)
setattr(ogm_instance, prop_name, converted_value)
except Exception as e:
logger.warning(f"Failed to set OGM property {prop_name}: {str(e)}")
try:
ogm_instance.save()
except Exception as e:
logger.warning(f"Failed to save OGM instance with properties: {str(e)}")
# Process relationships
ogm_relationships = cls._get_ogm_relationships(ogm_class)
for rel_name, rel in ogm_relationships.items():
if rel_name in data_dict:
try:
rel_data = data_dict[rel_name]
if rel_data is None:
continue
# Resolve target class without caller namespace modification
target_ogm_class = cls._resolve_node_class(rel.definition['node_class'])
if not target_ogm_class:
logger.warning(f"Could not resolve target class for relationship {rel_name}")
continue
if isinstance(rel_data, list):
for item in rel_data:
if isinstance(item, dict):
related_instance = cls.dict_to_ogm(
item, target_ogm_class, processed_objects, max_depth - 1)
if related_instance:
getattr(ogm_instance, rel_name).connect(related_instance)
else:
if isinstance(rel_data, dict):
related_instance = cls.dict_to_ogm(
rel_data, target_ogm_class, processed_objects, max_depth - 1)
if related_instance:
getattr(ogm_instance, rel_name).connect(related_instance)
except Exception as e:
logger.warning(f"Failed to process relationship {rel_name}: {str(e)}")
try:
ogm_instance.save()
except Exception as e:
raise ConversionError(f"Failed to save OGM instance with relationships: {str(e)}")
return ogm_instance
@classmethod
def ogm_to_dict(
cls,
ogm_instance: OGM_Model,
processed_objects: Optional[Dict[int, dict]] = None,
max_depth: int = 10,
current_path: Optional[List[int]] = None,
include_properties: bool = True,
include_relationships: bool = True
) -> Optional[dict]:
"""
Convert a neomodel OGM model instance to a Python dictionary.
This function recursively converts an OGM model (including its relationships)
into a dictionary.
"""
if ogm_instance is None:
return None
if processed_objects is None:
processed_objects = {}
if current_path is None:
current_path = []
instance_id = id(ogm_instance)
if instance_id in current_path:
if instance_id in processed_objects:
return processed_objects[instance_id]
minimal = {}
for prop_name, prop in cls._get_ogm_properties(type(ogm_instance)).items():
try:
minimal[prop_name] = getattr(ogm_instance, prop_name)
except Exception:
pass
processed_objects[instance_id] = minimal
return minimal
if instance_id in processed_objects:
return processed_objects[instance_id]
if max_depth <= 0:
minimal = {}
for prop_name, prop in cls._get_ogm_properties(type(ogm_instance)).items():
try:
minimal[prop_name] = getattr(ogm_instance, prop_name)
except Exception:
pass
processed_objects[instance_id] = minimal
return minimal
current_path.append(instance_id)
result = {}
ogm_cls = type(ogm_instance)
ogm_properties = cls._get_ogm_properties(ogm_cls)
if include_properties:
for prop_name, prop in ogm_properties.items():
try:
result[prop_name] = getattr(ogm_instance, prop_name)
except Exception as e:
logger.warning(f"Failed to get OGM property {prop_name}: {str(e)}")
result[prop_name] = None
processed_objects[instance_id] = result
if include_relationships and max_depth > 0:
ogm_relationships = cls._get_ogm_relationships(ogm_cls)
for rel_name, rel in ogm_relationships.items():
try:
# Resolve target class without caller namespace modification
target_ogm_class = cls._resolve_node_class(rel.definition['node_class'])
if not target_ogm_class:
logger.warning(f"Could not resolve target class for relationship {rel_name}")
continue
is_single = False
if hasattr(rel, 'manager'):
is_single = rel.manager.__name__ in ('ZeroOrOne', 'One', 'AsyncZeroOrOne', 'AsyncOne')
rel_objs = list(getattr(ogm_instance, rel_name).all())
if is_single:
if rel_objs:
result[rel_name] = cls.ogm_to_dict(
rel_objs[0],
processed_objects,
max_depth - 1,
current_path.copy()
)
else:
result[rel_name] = None
else:
result[rel_name] = []
for obj in rel_objs:
conv = cls.ogm_to_dict(
obj,
processed_objects,
max_depth - 1,
current_path.copy()
)
if conv:
result[rel_name].append(conv)
except Exception as e:
logger.warning(f"Failed to process relationship {rel_name}: {str(e)}")
result[rel_name] = [] if 'is_single' not in locals() or not is_single else None
current_path.remove(instance_id)
return result
@classmethod
def batch_dict_to_ogm(
cls,
data_dicts: List[dict],
ogm_class: Type[OGM_Model],
max_depth: int = 10
) -> List[OGM_Model]:
"""
Batch convert a list of dictionaries to OGM model instances.
"""
processed_objects = {}
from neomodel.sync_.core import db
with db.transaction:
return [
cls.dict_to_ogm(d, ogm_class, processed_objects, max_depth)
for d in data_dicts
]
@classmethod
def batch_ogm_to_dict(
cls,
ogm_instances: List[OGM_Model],
max_depth: int = 10,
include_properties: bool = True,
include_relationships: bool = True
) -> List[dict]:
"""
Batch convert a list of OGM model instances to dictionaries.
"""
processed_objects = {}
return [
cls.ogm_to_dict(
instance,
processed_objects,
max_depth,
[],
include_properties,
include_relationships
)
for instance in ogm_instances
]
services:
neo4j:
image: neo4j:5.16.0
container_name: neo4j-pydantic-converter
ports:
# HTTP
- "7474:7474"
# Bolt
- "7687:7687"
# HTTPS
- "7473:7473"
environment:
- NEO4J_AUTH=neo4j/password
- NEO4J_ACCEPT_LICENSE_AGREEMENT=yes
- NEO4J_apoc_export_file_enabled=true
- NEO4J_apoc_import_file_enabled=true
- NEO4J_apoc_import_file_use__neo4j__config=true
- NEO4J_dbms_security_procedures_unrestricted=apoc.*,algo.*,gds.*
# Enable APOC for the inspect_database functionality
- NEO4J_dbms_security_procedures_allowlist=apoc.*,algo.*,gds.*
volumes:
- neo4j_data:/data
- neo4j_logs:/logs
- neo4j_import:/import
- neo4j_conf:/var/lib/neo4j/conf
networks:
- neo4j-network
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:7474"]
interval: 10s
timeout: 5s
retries: 5
volumes:
neo4j_data:
neo4j_logs:
neo4j_import:
neo4j_conf:
networks:
neo4j-network:
driver: bridge
from datetime import date
from typing import List, Optional
from neomodel import (
StructuredNode, StringProperty, IntegerProperty, FloatProperty, BooleanProperty,
RelationshipTo, config
)
from pydantic import BaseModel, Field
from converter import Converter
import json
# Configure Neo4j connection
config.DATABASE_URL = 'bolt://neo4j:password@localhost:7687'
# ===== Models for complex nested structure =====
class ItemPydantic(BaseModel):
name: str
price: float
class OrderPydantic(BaseModel):
uid: str
items: List[ItemPydantic] = Field(default_factory=list)
class CustomerPydantic(BaseModel):
name: str
email: str
orders: List[OrderPydantic] = Field(default_factory=list)
class ItemOGM(StructuredNode):
name = StringProperty()
price = FloatProperty()
class OrderOGM(StructuredNode):
uid = StringProperty()
items = RelationshipTo(ItemOGM, 'CONTAINS')
class CustomerOGM(StructuredNode):
name = StringProperty()
email = StringProperty()
orders = RelationshipTo(OrderOGM, 'PLACED')
# ===== Models for cyclic references =====
class CyclicPydantic(BaseModel):
name: str
links: List['CyclicPydantic'] = Field(default_factory=list)
CyclicPydantic.model_rebuild() # Resolve forward references
class CyclicOGM(StructuredNode):
name = StringProperty()
links = RelationshipTo('CyclicOGM', 'LINKS_TO')
# Register models
def register_models():
# Register nested structure models
Converter.register_models(ItemPydantic, ItemOGM)
Converter.register_models(OrderPydantic, OrderOGM)
Converter.register_models(CustomerPydantic, CustomerOGM)
# Register cyclic models
Converter.register_models(CyclicPydantic, CyclicOGM)
# Register custom type converters
Converter.register_type_converter(
date, str, lambda d: d.isoformat()
)
Converter.register_type_converter(
str, date, lambda s: date.fromisoformat(s)
)
# ===== Example 1: Complex Nested Structure =====
def example_complex_structure():
print("\n=== EXAMPLE 1: COMPLEX NESTED STRUCTURE ===")
# Create sample data
item1 = ItemPydantic(name="Laptop", price=999.99)
item2 = ItemPydantic(name="Headphones", price=89.99)
item3 = ItemPydantic(name="Mouse", price=24.99)
order1 = OrderPydantic(uid="ORD-001", items=[item1, item2])
order2 = OrderPydantic(uid="ORD-002", items=[item3])
customer = CustomerPydantic(
name="John Doe",
email="john@example.com",
orders=[order1, order2]
)
# Print original structure
print("Original Pydantic Structure:")
print(json.dumps({
"name": customer.name,
"email": customer.email,
"orders": [
{
"uid": order.uid,
"items": [
{"name": item.name, "price": item.price}
for item in order.items
]
}
for order in customer.orders
]
}, indent=2))
# Convert to OGM
customer_ogm = Converter.to_ogm(customer)
# Extract data from OGM
ogm_orders = list(customer_ogm.orders.all())
ogm_items = []
for order in ogm_orders:
ogm_items.extend(list(order.items.all()))
print("\nConverted to OGM:")
print(json.dumps({
"name": customer_ogm.name,
"email": customer_ogm.email,
"orders_count": len(ogm_orders),
"order_uids": [order.uid for order in ogm_orders],
"items_count": len(ogm_items),
"items": [{"name": item.name, "price": item.price} for item in ogm_items]
}, indent=2))
# Convert to dict
customer_dict = Converter.ogm_to_dict(customer_ogm)
print("\nConverted to Dict:")
print(json.dumps(customer_dict, default=str, indent=2))
# Convert back to Pydantic
customer_py = Converter.to_pydantic(customer_ogm)
print("\nRound-trip to Pydantic:")
print(json.dumps({
"name": customer_py.name,
"email": customer_py.email,
"orders_count": len(customer_py.orders),
"order_uids": [order.uid for order in customer_py.orders],
"items": [
{"name": item.name, "price": item.price}
for order in customer_py.orders
for item in order.items
]
}, indent=2))
# ===== Example 2: Cyclic References =====
def example_cyclic_references():
print("\n=== EXAMPLE 2: CYCLIC REFERENCES ===")
# Create cycle: A -> B -> C -> A
node_a = CyclicPydantic(name="NodeA")
node_b = CyclicPydantic(name="NodeB")
node_c = CyclicPydantic(name="NodeC")
node_a.links = [node_b]
node_b.links = [node_c]
node_c.links = [node_a] # Creates cycle
print("Original Cyclic Structure:")
print(json.dumps({
"node": node_a.name,
"links_to": node_a.links[0].name,
"links_to_links_to": node_a.links[0].links[0].name,
"links_to_links_to_links_to": node_a.links[0].links[0].links[0].name,
"cycle_detected": node_a.links[0].links[0].links[0].name == node_a.name
}, indent=2))
# Convert to OGM
node_a_ogm = Converter.to_ogm(node_a)
# Extract data from OGM
ogm_bs = list(node_a_ogm.links.all())
ogm_cs = list(ogm_bs[0].links.all())
ogm_as = list(ogm_cs[0].links.all())
print("\nConverted to OGM:")
print(json.dumps({
"node": node_a_ogm.name,
"links_to": ogm_bs[0].name,
"links_to_links_to": ogm_cs[0].name,
"links_to_links_to_links_to": ogm_as[0].name,
"cycle_detected": ogm_as[0].name == node_a_ogm.name
}, indent=2))
# Convert to dict
node_dict = Converter.ogm_to_dict(node_a_ogm, max_depth=4)
print("\nConverted to Dict (trimmed for readability):")
print(json.dumps({
"name": node_dict["name"],
"links": [
{
"name": node_dict["links"][0]["name"],
"links": [
{
"name": node_dict["links"][0]["links"][0]["name"],
"has_links_back": "links" in node_dict["links"][0]["links"][0]
}
]
}
]
}, indent=2))
# Convert back to Pydantic
node_py = Converter.to_pydantic(node_a_ogm)
print("\nRound-trip to Pydantic:")
print(json.dumps({
"node": node_py.name,
"links_to": node_py.links[0].name,
"links_to_links_to": node_py.links[0].links[0].name,
"links_to_links_to_links_to": node_py.links[0].links[0].links[0].name,
"cycle_detected": node_py.links[0].links[0].links[0].name == node_py.name
}, indent=2))
if __name__ == "__main__":
# Register all models
register_models()
# Run examples
example_complex_structure()
example_cyclic_references()
# ./tests/test_complex_relationships.py
from typing import List
import pytest
from neomodel import (
StructuredNode, StringProperty, DateTimeProperty,
RelationshipTo, StructuredRel
)
from pydantic import BaseModel, Field
from converter import Converter
# ===== Module-level model definitions =====
# Relationship model
class AuthoredRel(StructuredRel):
created_at = DateTimeProperty()
# Pydantic models
class CommentPydantic(BaseModel):
content: str
author: str
class PostPydantic(BaseModel):
title: str
content: str
comments: List[CommentPydantic] = Field(default_factory=list)
class BloggerPydantic(BaseModel):
name: str
bio: str
posts: List[PostPydantic] = Field(default_factory=list)
following: List['BloggerPydantic'] = Field(default_factory=list)
# Resolve forward references
BloggerPydantic.model_rebuild()
# Neo4j OGM models
class CommentOGM(StructuredNode):
content = StringProperty(required=True)
author = StringProperty(required=True)
class PostOGM(StructuredNode):
title = StringProperty(required=True)
content = StringProperty(required=True)
comments = RelationshipTo(CommentOGM, 'HAS_COMMENT')
class BloggerOGM(StructuredNode):
name = StringProperty(required=True)
bio = StringProperty()
posts = RelationshipTo(PostOGM, 'AUTHORED', model=AuthoredRel)
following = RelationshipTo('BloggerOGM', 'FOLLOWS')
# ===== Fixtures =====
@pytest.fixture
def registered_models():
"""Register models"""
Converter.register_models(CommentPydantic, CommentOGM)
Converter.register_models(PostPydantic, PostOGM)
Converter.register_models(BloggerPydantic, BloggerOGM)
yield
@pytest.fixture
def blogger_fixture():
"""Create test blogger with posts and comments"""
# Create test data
comment1 = CommentPydantic(content="Great post!", author="Reader1")
comment2 = CommentPydantic(content="I learned a lot", author="Reader2")
post1 = PostPydantic(
title="Introduction to Neo4j",
content="Neo4j is a graph database...",
comments=[comment1, comment2]
)
post2 = PostPydantic(
title="Pydantic and Data Validation",
content="Pydantic provides data validation...",
comments=[CommentPydantic(content="Very helpful", author="Reader3")]
)
blogger = BloggerPydantic(
name="Tech Writer",
bio="I write about tech",
posts=[post1, post2]
)
# Add self-reference
another_blogger = BloggerPydantic(
name="Code Expert",
bio="Professional developer",
posts=[]
)
blogger.following = [another_blogger]
return blogger
# ===== Test Class =====
class TestComplexRelationships:
"""Tests for models with complex nested relationships"""
def test_nested_relationships(self, db_connection, registered_models, blogger_fixture):
"""
Test converting models with complex nested relationships (lists and depth).
Verifies that multi-level relationships (blogger->posts->comments) and self-references
are properly handled during conversion.
"""
# Get blogger from fixture
blogger = blogger_fixture
# Convert to OGM
blogger_ogm = Converter.to_ogm(blogger)
# Verify blogger properties
assert blogger_ogm.name == "Tech Writer", "Blogger name not preserved"
assert blogger_ogm.bio == "I write about tech", "Blogger bio not preserved"
# Verify posts relationship
posts = list(blogger_ogm.posts.all())
assert len(posts) == 2, "Incorrect number of posts"
# Find the post with specific title
pyd_post = next((p for p in posts if p.title == "Pydantic and Data Validation"), None)
assert pyd_post is not None, "Post 'Pydantic and Data Validation' not found"
# Verify comments
comments = list(pyd_post.comments.all())
assert len(comments) == 1, "Incorrect number of comments"
assert comments[0].content == "Very helpful", "Comment content not preserved"
assert comments[0].author == "Reader3", "Comment author not preserved"
# Convert back to Pydantic
converted_back = Converter.to_pydantic(blogger_ogm)
# Verify structure after round trip
assert converted_back.name == blogger.name, "Blogger name not preserved in round trip"
assert len(converted_back.posts) == 2, "Incorrect number of posts in round trip"
# Find post in converted data
converted_post = next((p for p in converted_back.posts if p.title == "Pydantic and Data Validation"), None)
assert converted_post is not None, "Post not found in round trip"
assert len(converted_post.comments) == 1, "Incorrect number of comments in round trip"
assert converted_post.comments[0].content == "Very helpful", "Comment content not preserved in round trip"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment