Last active
October 19, 2025 12:19
-
-
Save Hammer2900/2874dc9a772dc95f869181fbfc49695f to your computer and use it in GitHub Desktop.
python esper speed test
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """Optimized esper - faster Entity System (ECS) for Python | |
| Key optimizations: | |
| 1. Lazy cache invalidation | |
| 2. Faster component lookups with cached min sets | |
| 3. Direct entity component dict access | |
| 4. Optimized dead entity cleanup | |
| 5. Fast processor lookup dict | |
| """ | |
| import time as _time | |
| from types import MethodType as _MethodType | |
| from typing import Any as _Any | |
| from typing import Callable as _Callable | |
| from typing import Dict as _Dict | |
| from typing import List as _List | |
| from typing import Set as _Set | |
| from typing import Type as _Type | |
| from typing import Tuple as _Tuple | |
| from typing import TypeVar as _TypeVar | |
| from typing import Iterable as _Iterable | |
| from typing import Optional as _Optional | |
| from typing import overload as _overload | |
| from weakref import ref as _ref | |
| from weakref import WeakMethod as _WeakMethod | |
| from itertools import count as _count | |
| __version__ = version = '3.4-optimized' | |
| ################### | |
| # Event system | |
| ################### | |
| def dispatch_event(name: str, *args: _Any) -> None: | |
| """Dispatch an event by name, with optional arguments.""" | |
| for func in event_registry.get(name, []): | |
| func()(*args) | |
| def _make_callback(name: str) -> _Callable[[_Any], None]: | |
| """Create an internal callback to remove dead handlers.""" | |
| def callback(weak_method: _Any) -> None: | |
| event_registry[name].remove(weak_method) | |
| if not event_registry[name]: | |
| del event_registry[name] | |
| return callback | |
| def set_handler(name: str, func: _Callable[..., None]) -> None: | |
| """Register a function to handle the named event type.""" | |
| if name not in event_registry: | |
| event_registry[name] = set() | |
| if isinstance(func, _MethodType): | |
| event_registry[name].add(_WeakMethod(func, _make_callback(name))) | |
| else: | |
| event_registry[name].add(_ref(func, _make_callback(name))) | |
| def remove_handler(name: str, func: _Callable[..., None]) -> None: | |
| """Unregister a handler from receiving events of this name.""" | |
| func_ref = _ref(func) | |
| if func_ref not in event_registry.get(name, []): | |
| return | |
| event_registry[name].remove(func_ref) | |
| if not event_registry[name]: | |
| del event_registry[name] | |
| ################### | |
| # ECS Classes | |
| ################### | |
| _C = _TypeVar('_C') | |
| _C2 = _TypeVar('_C2') | |
| _C3 = _TypeVar('_C3') | |
| _C4 = _TypeVar('_C4') | |
| class Processor: | |
| """Base class for all Processors to inherit from.""" | |
| priority = 0 | |
| def process(self, *args: _Any, **kwargs: _Any) -> None: | |
| raise NotImplementedError | |
| ################### | |
| # ECS functions | |
| ################### | |
| _current_world: str = "default" | |
| _entity_count: "_count[int]" = _count(start=1) | |
| _components: _Dict[_Type[_Any], _Set[_Any]] = {} | |
| _entities: _Dict[int, _Dict[_Type[_Any], _Any]] = {} | |
| _dead_entities: _Set[int] = set() | |
| _get_component_cache: _Dict[_Type[_Any], _List[_Any]] = {} | |
| _get_components_cache: _Dict[_Tuple[_Type[_Any], ...], _List[_Any]] = {} | |
| _processors: _List[Processor] = [] | |
| _processors_dict: _Dict[_Type[Processor], Processor] = {} # NEW: Fast processor lookup | |
| _cache_dirty: bool = False # NEW: Lazy cache invalidation flag | |
| event_registry: _Dict[str, _Any] = {} | |
| process_times: _Dict[str, int] = {} | |
| current_world: str = "default" | |
| _context_map: _Dict[str, _Tuple[ | |
| "_count[int]", | |
| _Dict[_Type[_Any], _Set[_Any]], | |
| _Dict[int, _Dict[_Type[_Any], _Any]], | |
| _Set[int], | |
| _Dict[_Type[_Any], _List[_Any]], | |
| _Dict[_Tuple[_Type[_Any], ...], _List[_Any]], | |
| _List[Processor], | |
| _Dict[_Type[Processor], Processor], | |
| bool, | |
| _Dict[str, int], | |
| _Dict[str, _Any] | |
| ]] = {"default": (_entity_count, {}, {}, set(), {}, {}, [], {}, False, {}, {})} | |
| def clear_cache() -> None: | |
| """Mark cache as dirty for lazy invalidation.""" | |
| global _cache_dirty | |
| _cache_dirty = True | |
| def _clear_cache_now() -> None: | |
| """Actually clear the cache (internal use).""" | |
| global _cache_dirty | |
| _get_component_cache.clear() | |
| _get_components_cache.clear() | |
| _cache_dirty = False | |
| def clear_database() -> None: | |
| """Clear the Entity Component database.""" | |
| global _entity_count | |
| _entity_count = _count(start=1) | |
| _components.clear() | |
| _entities.clear() | |
| _dead_entities.clear() | |
| _clear_cache_now() | |
| def add_processor(processor_instance: Processor, priority: int = 0) -> None: | |
| """Add a Processor instance to the current World.""" | |
| processor_instance.priority = priority | |
| _processors.append(processor_instance) | |
| _processors.sort(key=lambda proc: proc.priority, reverse=True) | |
| _processors_dict[type(processor_instance)] = processor_instance # NEW: Cache in dict | |
| def remove_processor(processor_type: _Type[Processor]) -> None: | |
| """Remove a Processor from the World, by type.""" | |
| processor = _processors_dict.pop(processor_type, None) # NEW: O(1) lookup | |
| if processor: | |
| _processors.remove(processor) | |
| def get_processor(processor_type: _Type[Processor]) -> _Optional[Processor]: | |
| """Get a Processor instance, by type.""" | |
| return _processors_dict.get(processor_type) # NEW: O(1) instead of O(n) | |
| def create_entity(*components: _C) -> int: | |
| """Create a new Entity, with optional initial Components.""" | |
| entity = next(_entity_count) | |
| entity_dict = {} # NEW: Build dict first, then assign | |
| for component_instance in components: | |
| component_type = type(component_instance) | |
| if component_type not in _components: | |
| _components[component_type] = set() | |
| _components[component_type].add(entity) | |
| entity_dict[component_type] = component_instance | |
| _entities[entity] = entity_dict | |
| clear_cache() | |
| return entity | |
| def delete_entity(entity: int, immediate: bool = False) -> None: | |
| """Delete an Entity from the current World.""" | |
| if immediate: | |
| entity_comps = _entities[entity] # NEW: Cache dict lookup | |
| for component_type in entity_comps: | |
| comp_set = _components[component_type] | |
| comp_set.discard(entity) | |
| if not comp_set: | |
| del _components[component_type] | |
| del _entities[entity] | |
| clear_cache() | |
| else: | |
| _dead_entities.add(entity) | |
| def entity_exists(entity: int) -> bool: | |
| """Check if a specific Entity exists.""" | |
| return entity in _entities and entity not in _dead_entities | |
| def component_for_entity(entity: int, component_type: _Type[_C]) -> _C: | |
| """Retrieve a Component instance for a specific Entity.""" | |
| return _entities[entity][component_type] # type: ignore[no-any-return] | |
| def components_for_entity(entity: int) -> _Tuple[_C, ...]: | |
| """Retrieve all Components for a specific Entity, as a Tuple.""" | |
| return tuple(_entities[entity].values()) | |
| def has_component(entity: int, component_type: _Type[_C]) -> bool: | |
| """Check if an Entity has a specific Component type.""" | |
| return component_type in _entities[entity] | |
| def has_components(entity: int, *component_types: _Type[_C]) -> bool: | |
| """Check if an Entity has all the specified Component types.""" | |
| entity_comps = _entities[entity] | |
| return all(comp_type in entity_comps for comp_type in component_types) | |
| def add_component(entity: int, component_instance: _C, type_alias: _Optional[_Type[_C]] = None) -> None: | |
| """Add a new Component instance to an Entity.""" | |
| component_type = type_alias or type(component_instance) | |
| if component_type not in _components: | |
| _components[component_type] = set() | |
| _components[component_type].add(entity) | |
| _entities[entity][component_type] = component_instance | |
| clear_cache() | |
| def remove_component(entity: int, component_type: _Type[_C]) -> _C: | |
| """Remove a Component instance from an Entity, by type.""" | |
| comp_set = _components[component_type] # NEW: Cache lookup | |
| comp_set.discard(entity) | |
| if not comp_set: | |
| del _components[component_type] | |
| clear_cache() | |
| return _entities[entity].pop(component_type) # type: ignore[no-any-return] | |
| def _get_component(component_type: _Type[_C]) -> _Iterable[_Tuple[int, _C]]: | |
| """Internal: iterate over entities with a specific component.""" | |
| entity_db = _entities | |
| comp_set = _components.get(component_type) | |
| if comp_set is None: | |
| return | |
| for entity in comp_set: | |
| yield entity, entity_db[entity][component_type] | |
| def _get_components(*component_types: _Type[_C]) -> _Iterable[_Tuple[int, _Tuple[_C, ...]]]: | |
| """Internal: iterate over entities with multiple components.""" | |
| if not component_types: | |
| return | |
| entity_db = _entities | |
| comp_db = _components | |
| # NEW: Find smallest set to iterate over (major optimization!) | |
| min_set = None | |
| min_size = float('inf') | |
| for ct in component_types: | |
| comp_set = comp_db.get(ct) | |
| if comp_set is None: | |
| return # If any component type has no entities, return empty | |
| if len(comp_set) < min_size: | |
| min_size = len(comp_set) | |
| min_set = comp_set | |
| if min_set is None: | |
| return | |
| # Iterate over smallest set and check others | |
| for entity in min_set: | |
| entity_comps = entity_db[entity] | |
| if all(ct in entity_comps for ct in component_types): | |
| yield entity, tuple(entity_comps[ct] for ct in component_types) | |
| def get_component(component_type: _Type[_C]) -> _List[_Tuple[int, _C]]: | |
| """Get an iterator for Entity, Component pairs.""" | |
| if _cache_dirty: # NEW: Lazy cache invalidation | |
| _clear_cache_now() | |
| cached = _get_component_cache.get(component_type) | |
| if cached is not None: | |
| return cached | |
| result = list(_get_component(component_type)) | |
| _get_component_cache[component_type] = result | |
| return result | |
| @_overload | |
| def get_components(__c1: _Type[_C], __c2: _Type[_C2]) -> _List[_Tuple[int, _Tuple[_C, _C2]]]: | |
| ... | |
| @_overload | |
| def get_components(__c1: _Type[_C], __c2: _Type[_C2], __c3: _Type[_C3]) -> _List[_Tuple[int, _Tuple[_C, _C2, _C3]]]: | |
| ... | |
| @_overload | |
| def get_components(__c1: _Type[_C], __c2: _Type[_C2], __c3: _Type[_C3], __c4: _Type[_C4]) -> _List[ | |
| _Tuple[int, _Tuple[_C, _C2, _C3, _C4]]]: | |
| ... | |
| def get_components(*component_types: _Type[_Any]) -> _List[_Tuple[int, _Tuple[_Any, ...]]]: | |
| """Get an iterator for Entity and multiple Component sets.""" | |
| if _cache_dirty: # NEW: Lazy cache invalidation | |
| _clear_cache_now() | |
| cached = _get_components_cache.get(component_types) | |
| if cached is not None: | |
| return cached | |
| result = list(_get_components(*component_types)) | |
| _get_components_cache[component_types] = result | |
| return result | |
| def try_component(entity: int, component_type: _Type[_C]) -> _Optional[_C]: | |
| """Try to get a single component type for an Entity.""" | |
| entity_comps = _entities.get(entity) | |
| if entity_comps and component_type in entity_comps: | |
| return entity_comps[component_type] # type: ignore[no-any-return] | |
| return None | |
| @_overload | |
| def try_components(entity: int, __c1: _Type[_C], __c2: _Type[_C2]) -> _Tuple[_C, _C2]: | |
| ... | |
| @_overload | |
| def try_components(entity: int, __c1: _Type[_C], __c2: _Type[_C2], __c3: _Type[_C3]) -> _Tuple[_C, _C2, _C3]: | |
| ... | |
| @_overload | |
| def try_components(entity: int, __c1: _Type[_C], __c2: _Type[_C2], __c3: _Type[_C3], __c4: _Type[_C4]) -> _Tuple[_C, _C2, _C3, _C4]: | |
| ... | |
| def try_components(entity: int, *component_types: _Type[_C]) -> _Optional[_Tuple[_C, ...]]: | |
| """Try to get multiple component types for an Entity.""" | |
| entity_comps = _entities.get(entity) | |
| if entity_comps and all(comp_type in entity_comps for comp_type in component_types): | |
| return tuple(entity_comps[comp_type] for comp_type in component_types) # type: ignore[return-value] | |
| return None | |
| def clear_dead_entities() -> None: | |
| """Finalize deletion of any Entities that are marked as dead.""" | |
| if not _dead_entities: # NEW: Early exit if nothing to do | |
| return | |
| for entity in _dead_entities: | |
| entity_comps = _entities[entity] # NEW: Cache lookup | |
| for component_type in entity_comps: | |
| comp_set = _components[component_type] | |
| comp_set.discard(entity) | |
| if not comp_set: | |
| del _components[component_type] | |
| del _entities[entity] | |
| _dead_entities.clear() | |
| clear_cache() | |
| def process(*args: _Any, **kwargs: _Any) -> None: | |
| """Call the process method on all Processors, in order of their priority.""" | |
| clear_dead_entities() | |
| for processor in _processors: | |
| processor.process(*args, **kwargs) | |
| def timed_process(*args: _Any, **kwargs: _Any) -> None: | |
| """Track Processor execution time for benchmarking.""" | |
| clear_dead_entities() | |
| for processor in _processors: | |
| start_time = _time.process_time() | |
| processor.process(*args, **kwargs) | |
| process_times[processor.__class__.__name__] = int((_time.process_time() - start_time) * 1000) | |
| def list_worlds() -> _List[str]: | |
| """A list all World context names.""" | |
| return list(_context_map) | |
| def delete_world(name: str) -> None: | |
| """Delete a World context.""" | |
| if _current_world == name: | |
| raise PermissionError("The active World context cannot be deleted.") | |
| del _context_map[name] | |
| def switch_world(name: str) -> None: | |
| """Switch to a new World context by name.""" | |
| if name not in _context_map: | |
| _context_map[name] = (_count(start=1), {}, {}, set(), {}, {}, [], {}, False, {}, {}) | |
| global _current_world | |
| global _entity_count | |
| global _components | |
| global _entities | |
| global _dead_entities | |
| global _get_component_cache | |
| global _get_components_cache | |
| global _processors | |
| global _processors_dict | |
| global _cache_dirty | |
| global process_times | |
| global event_registry | |
| global current_world | |
| (_entity_count, _components, _entities, _dead_entities, _get_component_cache, | |
| _get_components_cache, _processors, _processors_dict, _cache_dirty, | |
| process_times, event_registry) = _context_map[name] | |
| _current_world = current_world = name |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """Optimized esper - faster Entity System (ECS) for Python | |
| Key optimizations: | |
| 1. Lazy cache invalidation | |
| 2. Faster component lookups with cached min sets | |
| 3. Direct entity component dict access | |
| 4. Optimized dead entity cleanup | |
| 5. Fast processor lookup dict | |
| """ | |
| import time as _time | |
| from types import MethodType as _MethodType | |
| from typing import Any as _Any | |
| from typing import Callable as _Callable | |
| from typing import Dict as _Dict | |
| from typing import List as _List | |
| from typing import Set as _Set | |
| from typing import Type as _Type | |
| from typing import Tuple as _Tuple | |
| from typing import TypeVar as _TypeVar | |
| from typing import Iterable as _Iterable | |
| from typing import Optional as _Optional | |
| from typing import overload as _overload | |
| from weakref import ref as _ref | |
| from weakref import WeakMethod as _WeakMethod | |
| from itertools import count as _count | |
| __version__ = version = '3.4-optimized' | |
| ################### | |
| # Event system | |
| ################### | |
| def dispatch_event(name: str, *args: _Any) -> None: | |
| """Dispatch an event by name, with optional arguments.""" | |
| for func in event_registry.get(name, []): | |
| func()(*args) | |
| def _make_callback(name: str) -> _Callable[[_Any], None]: | |
| """Create an internal callback to remove dead handlers.""" | |
| def callback(weak_method: _Any) -> None: | |
| event_registry[name].remove(weak_method) | |
| if not event_registry[name]: | |
| del event_registry[name] | |
| return callback | |
| def set_handler(name: str, func: _Callable[..., None]) -> None: | |
| """Register a function to handle the named event type.""" | |
| if name not in event_registry: | |
| event_registry[name] = set() | |
| if isinstance(func, _MethodType): | |
| event_registry[name].add(_WeakMethod(func, _make_callback(name))) | |
| else: | |
| event_registry[name].add(_ref(func, _make_callback(name))) | |
| def remove_handler(name: str, func: _Callable[..., None]) -> None: | |
| """Unregister a handler from receiving events of this name.""" | |
| func_ref = _ref(func) | |
| if func_ref not in event_registry.get(name, []): | |
| return | |
| event_registry[name].remove(func_ref) | |
| if not event_registry[name]: | |
| del event_registry[name] | |
| ################### | |
| # ECS Classes | |
| ################### | |
| _C = _TypeVar('_C') | |
| _C2 = _TypeVar('_C2') | |
| _C3 = _TypeVar('_C3') | |
| _C4 = _TypeVar('_C4') | |
| class Processor: | |
| """Base class for all Processors to inherit from.""" | |
| priority = 0 | |
| def process(self, *args: _Any, **kwargs: _Any) -> None: | |
| raise NotImplementedError | |
| ################### | |
| # ECS functions | |
| ################### | |
| _current_world: str = "default" | |
| _entity_count: "_count[int]" = _count(start=1) | |
| _components: _Dict[_Type[_Any], _Set[_Any]] = {} | |
| _entities: _Dict[int, _Dict[_Type[_Any], _Any]] = {} | |
| _dead_entities: _Set[int] = set() | |
| _get_component_cache: _Dict[_Type[_Any], _List[_Any]] = {} | |
| _get_components_cache: _Dict[_Tuple[_Type[_Any], ...], _List[_Any]] = {} | |
| _processors: _List[Processor] = [] | |
| _processors_dict: _Dict[_Type[Processor], Processor] = {} # NEW: Fast processor lookup | |
| _cache_dirty: bool = False # NEW: Lazy cache invalidation flag | |
| event_registry: _Dict[str, _Any] = {} | |
| process_times: _Dict[str, int] = {} | |
| current_world: str = "default" | |
| _context_map: _Dict[str, _Tuple[ | |
| "_count[int]", | |
| _Dict[_Type[_Any], _Set[_Any]], | |
| _Dict[int, _Dict[_Type[_Any], _Any]], | |
| _Set[int], | |
| _Dict[_Type[_Any], _List[_Any]], | |
| _Dict[_Tuple[_Type[_Any], ...], _List[_Any]], | |
| _List[Processor], | |
| _Dict[_Type[Processor], Processor], | |
| bool, | |
| _Dict[str, int], | |
| _Dict[str, _Any] | |
| ]] = {"default": (_entity_count, {}, {}, set(), {}, {}, [], {}, False, {}, {})} | |
| def clear_cache() -> None: | |
| """Mark cache as dirty for lazy invalidation.""" | |
| global _cache_dirty | |
| _cache_dirty = True | |
| def _clear_cache_now() -> None: | |
| """Actually clear the cache (internal use).""" | |
| global _cache_dirty | |
| _get_component_cache.clear() | |
| _get_components_cache.clear() | |
| _cache_dirty = False | |
| def clear_database() -> None: | |
| """Clear the Entity Component database.""" | |
| global _entity_count | |
| _entity_count = _count(start=1) | |
| _components.clear() | |
| _entities.clear() | |
| _dead_entities.clear() | |
| _clear_cache_now() | |
| def add_processor(processor_instance: Processor, priority: int = 0) -> None: | |
| """Add a Processor instance to the current World.""" | |
| processor_instance.priority = priority | |
| _processors.append(processor_instance) | |
| _processors.sort(key=lambda proc: proc.priority, reverse=True) | |
| _processors_dict[type(processor_instance)] = processor_instance # NEW: Cache in dict | |
| def remove_processor(processor_type: _Type[Processor]) -> None: | |
| """Remove a Processor from the World, by type.""" | |
| processor = _processors_dict.pop(processor_type, None) # NEW: O(1) lookup | |
| if processor: | |
| _processors.remove(processor) | |
| def get_processor(processor_type: _Type[Processor]) -> _Optional[Processor]: | |
| """Get a Processor instance, by type.""" | |
| return _processors_dict.get(processor_type) # NEW: O(1) instead of O(n) | |
| def create_entity(*components: _C) -> int: | |
| """Create a new Entity, with optional initial Components.""" | |
| entity = next(_entity_count) | |
| entity_dict = {} # NEW: Build dict first, then assign | |
| for component_instance in components: | |
| component_type = type(component_instance) | |
| if component_type not in _components: | |
| _components[component_type] = set() | |
| _components[component_type].add(entity) | |
| entity_dict[component_type] = component_instance | |
| _entities[entity] = entity_dict | |
| clear_cache() | |
| return entity | |
| def delete_entity(entity: int, immediate: bool = False) -> None: | |
| """Delete an Entity from the current World.""" | |
| if immediate: | |
| entity_comps = _entities[entity] # NEW: Cache dict lookup | |
| for component_type in entity_comps: | |
| comp_set = _components[component_type] | |
| comp_set.discard(entity) | |
| if not comp_set: | |
| del _components[component_type] | |
| del _entities[entity] | |
| clear_cache() | |
| else: | |
| _dead_entities.add(entity) | |
| def entity_exists(entity: int) -> bool: | |
| """Check if a specific Entity exists.""" | |
| return entity in _entities and entity not in _dead_entities | |
| def component_for_entity(entity: int, component_type: _Type[_C]) -> _C: | |
| """Retrieve a Component instance for a specific Entity.""" | |
| return _entities[entity][component_type] # type: ignore[no-any-return] | |
| def components_for_entity(entity: int) -> _Tuple[_C, ...]: | |
| """Retrieve all Components for a specific Entity, as a Tuple.""" | |
| return tuple(_entities[entity].values()) | |
| def has_component(entity: int, component_type: _Type[_C]) -> bool: | |
| """Check if an Entity has a specific Component type.""" | |
| return component_type in _entities[entity] | |
| def has_components(entity: int, *component_types: _Type[_C]) -> bool: | |
| """Check if an Entity has all the specified Component types.""" | |
| entity_comps = _entities[entity] | |
| return all(comp_type in entity_comps for comp_type in component_types) | |
| def add_component(entity: int, component_instance: _C, type_alias: _Optional[_Type[_C]] = None) -> None: | |
| """Add a new Component instance to an Entity.""" | |
| component_type = type_alias or type(component_instance) | |
| if component_type not in _components: | |
| _components[component_type] = set() | |
| _components[component_type].add(entity) | |
| _entities[entity][component_type] = component_instance | |
| clear_cache() | |
| def remove_component(entity: int, component_type: _Type[_C]) -> _C: | |
| """Remove a Component instance from an Entity, by type.""" | |
| comp_set = _components[component_type] # NEW: Cache lookup | |
| comp_set.discard(entity) | |
| if not comp_set: | |
| del _components[component_type] | |
| clear_cache() | |
| return _entities[entity].pop(component_type) # type: ignore[no-any-return] | |
| def _get_component(component_type: _Type[_C]) -> _Iterable[_Tuple[int, _C]]: | |
| """Internal: iterate over entities with a specific component.""" | |
| entity_db = _entities | |
| comp_set = _components.get(component_type) | |
| if comp_set is None: | |
| return | |
| for entity in comp_set: | |
| yield entity, entity_db[entity][component_type] | |
| def _get_components(*component_types: _Type[_C]) -> _Iterable[_Tuple[int, _Tuple[_C, ...]]]: | |
| """Internal: iterate over entities with multiple components.""" | |
| if not component_types: | |
| return | |
| entity_db = _entities | |
| comp_db = _components | |
| # NEW: Find smallest set to iterate over (major optimization!) | |
| min_set = None | |
| min_size = float('inf') | |
| other_types = [] | |
| for ct in component_types: | |
| comp_set = comp_db.get(ct) | |
| if comp_set is None: | |
| return # If any component type has no entities, return empty | |
| set_size = len(comp_set) | |
| if set_size < min_size: | |
| if min_set is not None: | |
| other_types.append(component_types[len(other_types)]) | |
| min_size = set_size | |
| min_set = comp_set | |
| else: | |
| other_types.append(ct) | |
| if min_set is None: | |
| return | |
| # OPTIMIZED: Pre-check if we need to validate other types | |
| if not other_types: | |
| # Only one component type - fast path | |
| for entity in min_set: | |
| entity_comps = entity_db[entity] | |
| yield entity, tuple(entity_comps[ct] for ct in component_types) | |
| else: | |
| # Multiple types - check all | |
| for entity in min_set: | |
| entity_comps = entity_db[entity] | |
| # Faster than all() with generator | |
| has_all = True | |
| for ct in other_types: | |
| if ct not in entity_comps: | |
| has_all = False | |
| break | |
| if has_all: | |
| yield entity, tuple(entity_comps[ct] for ct in component_types) | |
| def get_component(component_type: _Type[_C]) -> _List[_Tuple[int, _C]]: | |
| """Get an iterator for Entity, Component pairs.""" | |
| if _cache_dirty: # NEW: Lazy cache invalidation | |
| _clear_cache_now() | |
| cached = _get_component_cache.get(component_type) | |
| if cached is not None: | |
| return cached | |
| result = list(_get_component(component_type)) | |
| _get_component_cache[component_type] = result | |
| return result | |
| @_overload | |
| def get_components(__c1: _Type[_C], __c2: _Type[_C2]) -> _List[_Tuple[int, _Tuple[_C, _C2]]]: | |
| ... | |
| @_overload | |
| def get_components(__c1: _Type[_C], __c2: _Type[_C2], __c3: _Type[_C3]) -> _List[_Tuple[int, _Tuple[_C, _C2, _C3]]]: | |
| ... | |
| @_overload | |
| def get_components(__c1: _Type[_C], __c2: _Type[_C2], __c3: _Type[_C3], __c4: _Type[_C4]) -> _List[ | |
| _Tuple[int, _Tuple[_C, _C2, _C3, _C4]]]: | |
| ... | |
| def get_components(*component_types: _Type[_Any]) -> _List[_Tuple[int, _Tuple[_Any, ...]]]: | |
| """Get an iterator for Entity and multiple Component sets.""" | |
| if _cache_dirty: # NEW: Lazy cache invalidation | |
| _clear_cache_now() | |
| cached = _get_components_cache.get(component_types) | |
| if cached is not None: | |
| return cached | |
| result = list(_get_components(*component_types)) | |
| _get_components_cache[component_types] = result | |
| return result | |
| def try_component(entity: int, component_type: _Type[_C]) -> _Optional[_C]: | |
| """Try to get a single component type for an Entity.""" | |
| entity_comps = _entities.get(entity) | |
| if entity_comps and component_type in entity_comps: | |
| return entity_comps[component_type] # type: ignore[no-any-return] | |
| return None | |
| @_overload | |
| def try_components(entity: int, __c1: _Type[_C], __c2: _Type[_C2]) -> _Tuple[_C, _C2]: | |
| ... | |
| @_overload | |
| def try_components(entity: int, __c1: _Type[_C], __c2: _Type[_C2], __c3: _Type[_C3]) -> _Tuple[_C, _C2, _C3]: | |
| ... | |
| @_overload | |
| def try_components(entity: int, __c1: _Type[_C], __c2: _Type[_C2], __c3: _Type[_C3], __c4: _Type[_C4]) -> _Tuple[_C, _C2, _C3, _C4]: | |
| ... | |
| def try_components(entity: int, *component_types: _Type[_C]) -> _Optional[_Tuple[_C, ...]]: | |
| """Try to get multiple component types for an Entity.""" | |
| entity_comps = _entities.get(entity) | |
| if entity_comps and all(comp_type in entity_comps for comp_type in component_types): | |
| return tuple(entity_comps[comp_type] for comp_type in component_types) # type: ignore[return-value] | |
| return None | |
| def clear_dead_entities() -> None: | |
| """Finalize deletion of any Entities that are marked as dead.""" | |
| if not _dead_entities: # NEW: Early exit if nothing to do | |
| return | |
| for entity in _dead_entities: | |
| entity_comps = _entities[entity] # NEW: Cache lookup | |
| for component_type in entity_comps: | |
| comp_set = _components[component_type] | |
| comp_set.discard(entity) | |
| if not comp_set: | |
| del _components[component_type] | |
| del _entities[entity] | |
| _dead_entities.clear() | |
| clear_cache() | |
| def process(*args: _Any, **kwargs: _Any) -> None: | |
| """Call the process method on all Processors, in order of their priority.""" | |
| clear_dead_entities() | |
| for processor in _processors: | |
| processor.process(*args, **kwargs) | |
| def timed_process(*args: _Any, **kwargs: _Any) -> None: | |
| """Track Processor execution time for benchmarking.""" | |
| clear_dead_entities() | |
| for processor in _processors: | |
| start_time = _time.process_time() | |
| processor.process(*args, **kwargs) | |
| process_times[processor.__class__.__name__] = int((_time.process_time() - start_time) * 1000) | |
| def list_worlds() -> _List[str]: | |
| """A list all World context names.""" | |
| return list(_context_map) | |
| def delete_world(name: str) -> None: | |
| """Delete a World context.""" | |
| if _current_world == name: | |
| raise PermissionError("The active World context cannot be deleted.") | |
| del _context_map[name] | |
| def switch_world(name: str) -> None: | |
| """Switch to a new World context by name.""" | |
| if name not in _context_map: | |
| _context_map[name] = (_count(start=1), {}, {}, set(), {}, {}, [], {}, False, {}, {}) | |
| global _current_world | |
| global _entity_count | |
| global _components | |
| global _entities | |
| global _dead_entities | |
| global _get_component_cache | |
| global _get_components_cache | |
| global _processors | |
| global _processors_dict | |
| global _cache_dirty | |
| global process_times | |
| global event_registry | |
| global current_world | |
| (_entity_count, _components, _entities, _dead_entities, _get_component_cache, | |
| _get_components_cache, _processors, _processors_dict, _cache_dirty, | |
| process_times, event_registry) = _context_map[name] | |
| _current_world = current_world = name |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import esper as es | |
| from pyray import * | |
| import random | |
| import time | |
| import math | |
| from dataclasses import dataclass | |
| from typing import List, Dict | |
| class Position: | |
| __slots__ = ('x', 'y') | |
| def __init__(self, x: float = 0.0, y: float = 0.0): | |
| self.x = x | |
| self.y = y | |
| class Velocity: | |
| __slots__ = ('x', 'y') | |
| def __init__(self, x: float = 0.0, y: float = 0.0): | |
| self.x = x | |
| self.y = y | |
| class Renderable: | |
| __slots__ = ('color', 'radius') | |
| def __init__(self, color: Color, radius: float): | |
| self.color = color | |
| self.radius = radius | |
| class Lifetime: | |
| __slots__ = ('time_left', 'max_time') | |
| def __init__(self, max_time: float): | |
| self.time_left = max_time | |
| self.max_time = max_time | |
| class Acceleration: | |
| __slots__ = ('x', 'y') | |
| def __init__(self, x: float = 0.0, y: float = 0.0): | |
| self.x = x | |
| self.y = y | |
| class Bouncing: | |
| __slots__ = ('width', 'height') | |
| def __init__(self, width: int, height: int): | |
| self.width = width | |
| self.height = height | |
| class MovementSystem(es.Processor): | |
| """Обновление позиций на основе скорости.""" | |
| priority = 100 | |
| def __init__(self): | |
| self.iterations = 0 | |
| self.total_time = 0.0 | |
| def process(self, dt: float): | |
| start = time.perf_counter() | |
| for ent, (pos, vel) in es.get_components(Position, Velocity): | |
| pos.x += vel.x * dt | |
| pos.y += vel.y * dt | |
| self.total_time += time.perf_counter() - start | |
| self.iterations += 1 | |
| class AccelerationSystem(es.Processor): | |
| """Применение ускорения к скорости.""" | |
| priority = 90 | |
| def __init__(self): | |
| self.iterations = 0 | |
| self.total_time = 0.0 | |
| def process(self, dt: float): | |
| start = time.perf_counter() | |
| for ent, (vel, acc) in es.get_components(Velocity, Acceleration): | |
| vel.x += acc.x * dt | |
| vel.y += acc.y * dt | |
| self.total_time += time.perf_counter() - start | |
| self.iterations += 1 | |
| class BouncingSystem(es.Processor): | |
| """Отскок от границ экрана.""" | |
| priority = 80 | |
| def __init__(self): | |
| self.iterations = 0 | |
| self.total_time = 0.0 | |
| def process(self, dt: float): | |
| start = time.perf_counter() | |
| for ent, (pos, vel, bouncing, rend) in es.get_components(Position, Velocity, Bouncing, Renderable): | |
| if pos.x - rend.radius < 0 or pos.x + rend.radius > bouncing.width: | |
| vel.x *= -0.95 | |
| pos.x = max(rend.radius, min(pos.x, bouncing.width - rend.radius)) | |
| if pos.y - rend.radius < 0 or pos.y + rend.radius > bouncing.height: | |
| vel.y *= -0.95 | |
| pos.y = max(rend.radius, min(pos.y, bouncing.height - rend.radius)) | |
| self.total_time += time.perf_counter() - start | |
| self.iterations += 1 | |
| class LifetimeSystem(es.Processor): | |
| """Удаление сущностей по истечению времени жизни.""" | |
| priority = 70 | |
| def __init__(self): | |
| self.iterations = 0 | |
| self.total_time = 0.0 | |
| self.deleted_count = 0 | |
| def process(self, dt: float): | |
| start = time.perf_counter() | |
| for ent, (lifetime,) in es.get_components(Lifetime): | |
| lifetime.time_left -= dt | |
| if lifetime.time_left <= 0: | |
| es.delete_entity(ent) | |
| self.deleted_count += 1 | |
| self.total_time += time.perf_counter() - start | |
| self.iterations += 1 | |
| class RenderSystem(es.Processor): | |
| """Отрисовка сущностей.""" | |
| priority = 0 | |
| def __init__(self): | |
| self.iterations = 0 | |
| self.total_time = 0.0 | |
| self.rendered_count = 0 | |
| def process(self, dt: float): | |
| start = time.perf_counter() | |
| count = 0 | |
| for ent, (pos, rend) in es.get_components(Position, Renderable): | |
| draw_circle(int(pos.x), int(pos.y), rend.radius, rend.color) | |
| count += 1 | |
| self.rendered_count = count | |
| self.total_time += time.perf_counter() - start | |
| self.iterations += 1 | |
| class BenchmarkStats: | |
| """Статистика производительности.""" | |
| def __init__(self): | |
| self.frame_times: List[float] = [] | |
| self.entity_counts: List[int] = [] | |
| self.fps_history: List[float] = [] | |
| self.start_time = time.time() | |
| def add_frame(self, frame_time: float, entity_count: int, fps: float): | |
| self.frame_times.append(frame_time) | |
| self.entity_counts.append(entity_count) | |
| self.fps_history.append(fps) | |
| if len(self.frame_times) > 300: | |
| self.frame_times.pop(0) | |
| self.entity_counts.pop(0) | |
| self.fps_history.pop(0) | |
| def get_avg_fps(self) -> float: | |
| if not self.fps_history: | |
| return 0.0 | |
| return sum(self.fps_history) / len(self.fps_history) | |
| def get_avg_frame_time(self) -> float: | |
| if not self.frame_times: | |
| return 0.0 | |
| return sum(self.frame_times) / len(self.frame_times) * 1000 | |
| def get_min_fps(self) -> float: | |
| return min(self.fps_history) if self.fps_history else 0.0 | |
| def get_max_fps(self) -> float: | |
| return max(self.fps_history) if self.fps_history else 0.0 | |
| class Benchmark: | |
| """Бенчмарк для тестирования производительности ECS.""" | |
| def __init__(self, width: int = 1200, height: int = 800): | |
| self.screen_width = width | |
| self.screen_height = height | |
| self.stats = BenchmarkStats() | |
| self.spawn_rate = 50 | |
| self.auto_spawn = False | |
| self.auto_spawn_rate = 10 | |
| self.movement_sys = None | |
| self.acceleration_sys = None | |
| self.bouncing_sys = None | |
| self.lifetime_sys = None | |
| self.render_sys = None | |
| def setup(self): | |
| """Инициализация.""" | |
| init_window(self.screen_width, self.screen_height, 'ECS Performance Benchmark') | |
| set_target_fps(120) | |
| self.movement_sys = MovementSystem() | |
| self.acceleration_sys = AccelerationSystem() | |
| self.bouncing_sys = BouncingSystem() | |
| self.lifetime_sys = LifetimeSystem() | |
| self.render_sys = RenderSystem() | |
| es.add_processor(self.acceleration_sys, priority=90) | |
| es.add_processor(self.movement_sys, priority=100) | |
| es.add_processor(self.bouncing_sys, priority=80) | |
| es.add_processor(self.lifetime_sys, priority=70) | |
| es.add_processor(self.render_sys, priority=0) | |
| def spawn_entity(self, with_lifetime: bool = True): | |
| """Создание одной сущности с рандомными параметрами.""" | |
| pos = Position(random.uniform(50, self.screen_width - 50), random.uniform(50, self.screen_height - 50)) | |
| vel = Velocity(random.uniform(-200, 200), random.uniform(-200, 200)) | |
| acc = Acceleration(0, 50) | |
| rend = Renderable( | |
| Color(random.randint(50, 255), random.randint(50, 255), random.randint(50, 255), 255), random.uniform(3, 10) | |
| ) | |
| bouncing = Bouncing(self.screen_width, self.screen_height) | |
| components = [pos, vel, acc, rend, bouncing] | |
| if with_lifetime: | |
| components.append(Lifetime(random.uniform(5, 15))) | |
| es.create_entity(*components) | |
| def spawn_batch(self, count: int, with_lifetime: bool = True): | |
| """Создание пачки сущностей.""" | |
| for _ in range(count): | |
| self.spawn_entity(with_lifetime) | |
| def get_entity_count(self) -> int: | |
| """Подсчет живых сущностей.""" | |
| return len(es._entities) | |
| def draw_ui(self, dt: float): | |
| """Отрисовка UI с метриками.""" | |
| entity_count = self.get_entity_count() | |
| fps = get_fps() | |
| y_offset = 10 | |
| line_height = 25 | |
| draw_text(f'FPS: {fps}', 10, y_offset, 20, GREEN) | |
| y_offset += line_height | |
| draw_text(f'Entities: {entity_count}', 10, y_offset, 20, YELLOW) | |
| y_offset += line_height | |
| draw_text(f'Frame time: {dt * 1000:.2f} ms', 10, y_offset, 20, ORANGE) | |
| y_offset += line_height | |
| avg_fps = self.stats.get_avg_fps() | |
| avg_frame = self.stats.get_avg_frame_time() | |
| draw_text(f'Avg FPS: {avg_fps:.1f}', 10, y_offset, 20, LIGHTGRAY) | |
| y_offset += line_height | |
| draw_text(f'Avg Frame: {avg_frame:.2f} ms', 10, y_offset, 20, LIGHTGRAY) | |
| y_offset += line_height | |
| min_fps = self.stats.get_min_fps() | |
| max_fps = self.stats.get_max_fps() | |
| draw_text(f'Min/Max FPS: {min_fps:.0f}/{max_fps:.0f}', 10, y_offset, 20, LIGHTGRAY) | |
| y_offset += line_height | |
| y_offset += 10 | |
| if self.movement_sys and self.movement_sys.iterations > 0: | |
| avg_time = (self.movement_sys.total_time / self.movement_sys.iterations) * 1000 | |
| draw_text(f'MovementSystem: {avg_time:.3f} ms', 10, y_offset, 18, SKYBLUE) | |
| y_offset += line_height | |
| if self.acceleration_sys and self.acceleration_sys.iterations > 0: | |
| avg_time = (self.acceleration_sys.total_time / self.acceleration_sys.iterations) * 1000 | |
| draw_text(f'AccelerationSystem: {avg_time:.3f} ms', 10, y_offset, 18, SKYBLUE) | |
| y_offset += line_height | |
| if self.bouncing_sys and self.bouncing_sys.iterations > 0: | |
| avg_time = (self.bouncing_sys.total_time / self.bouncing_sys.iterations) * 1000 | |
| draw_text(f'BouncingSystem: {avg_time:.3f} ms', 10, y_offset, 18, SKYBLUE) | |
| y_offset += line_height | |
| if self.lifetime_sys and self.lifetime_sys.iterations > 0: | |
| avg_time = (self.lifetime_sys.total_time / self.lifetime_sys.iterations) * 1000 | |
| draw_text(f'LifetimeSystem: {avg_time:.3f} ms', 10, y_offset, 18, SKYBLUE) | |
| draw_text(f'(deleted: {self.lifetime_sys.deleted_count})', 250, y_offset, 18, GRAY) | |
| y_offset += line_height | |
| if self.render_sys and self.render_sys.iterations > 0: | |
| avg_time = (self.render_sys.total_time / self.render_sys.iterations) * 1000 | |
| draw_text(f'RenderSystem: {avg_time:.3f} ms', 10, y_offset, 18, SKYBLUE) | |
| y_offset += line_height | |
| y_offset += 20 | |
| draw_text('Controls:', 10, y_offset, 20, WHITE) | |
| y_offset += line_height | |
| draw_text(f'SPACE: Spawn {self.spawn_rate} entities', 10, y_offset, 18, LIGHTGRAY) | |
| y_offset += line_height | |
| draw_text( | |
| f"A: Auto-spawn {'ON' if self.auto_spawn else 'OFF'}", 10, y_offset, 18, GREEN if self.auto_spawn else RED | |
| ) | |
| y_offset += line_height | |
| draw_text('UP/DOWN: Change spawn rate', 10, y_offset, 18, LIGHTGRAY) | |
| y_offset += line_height | |
| draw_text('C: Clear all entities', 10, y_offset, 18, LIGHTGRAY) | |
| y_offset += line_height | |
| draw_text('R: Reset stats', 10, y_offset, 18, LIGHTGRAY) | |
| def handle_input(self): | |
| """Обработка ввода.""" | |
| if is_key_pressed(KEY_SPACE): | |
| self.spawn_batch(self.spawn_rate, with_lifetime=True) | |
| if is_key_pressed(KEY_A): | |
| self.auto_spawn = not self.auto_spawn | |
| if is_key_pressed(KEY_UP): | |
| self.spawn_rate = min(1000, self.spawn_rate + 10) | |
| if is_key_pressed(KEY_DOWN): | |
| self.spawn_rate = max(10, self.spawn_rate - 10) | |
| if is_key_pressed(KEY_C): | |
| es.clear_database() | |
| print('Database cleared!') | |
| if is_key_pressed(KEY_R): | |
| self.stats = BenchmarkStats() | |
| for sys in [ | |
| self.movement_sys, | |
| self.acceleration_sys, | |
| self.bouncing_sys, | |
| self.lifetime_sys, | |
| self.render_sys, | |
| ]: | |
| if sys: | |
| sys.iterations = 0 | |
| sys.total_time = 0.0 | |
| print('Stats reset!') | |
| def run(self): | |
| """Основной цикл.""" | |
| self.setup() | |
| self.spawn_batch(100, with_lifetime=False) | |
| last_time = time.time() | |
| while not window_should_close(): | |
| current_time = time.time() | |
| dt = current_time - last_time | |
| last_time = current_time | |
| if self.auto_spawn: | |
| self.spawn_batch(self.auto_spawn_rate, with_lifetime=True) | |
| self.handle_input() | |
| frame_start = time.perf_counter() | |
| es.process(dt) | |
| frame_time = time.perf_counter() - frame_start | |
| self.stats.add_frame(frame_time, self.get_entity_count(), get_fps()) | |
| begin_drawing() | |
| clear_background(Color(20, 20, 30, 255)) | |
| self.draw_ui(dt) | |
| end_drawing() | |
| close_window() | |
| self.print_final_stats() | |
| def print_final_stats(self): | |
| """Вывод финальной статистики в консоль.""" | |
| print('\n' + '=' * 50) | |
| print('FINAL BENCHMARK STATISTICS') | |
| print('=' * 50) | |
| print(f'Total runtime: {time.time() - self.stats.start_time:.2f} seconds') | |
| print(f'Average FPS: {self.stats.get_avg_fps():.2f}') | |
| print(f'Min FPS: {self.stats.get_min_fps():.2f}') | |
| print(f'Max FPS: {self.stats.get_max_fps():.2f}') | |
| print(f'Average frame time: {self.stats.get_avg_frame_time():.3f} ms') | |
| print(f'\nTotal entities deleted: {self.lifetime_sys.deleted_count if self.lifetime_sys else 0}') | |
| print('=' * 50) | |
| if __name__ == '__main__': | |
| benchmark = Benchmark() | |
| benchmark.run() |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Here’s a simple English summary list of what’s happening:
Key Optimizations
Lazy Cache Invalidation
_cache_dirty = True.get_component(s)is called next time.dict.clear()calls become just one.Iterating Over the Smallest Set (Main Optimization!)
set.intersectionfor all component sets → created new sets every time.Position, 100 withLifetime→ now only 100 checked instead of 10,000.Caching Dictionary Lookups
entity_comps = _entities[entity]once._entities[entity][typeX]lookups — faster access.Fast Processor Lookup (O(1) Instead of O(n))
_processors_dictto get processors instantly by type.Early Exit Checks
Skip unnecessary work:
if not _dead_entities: returninclear_dead_entities()if comp_set is None: returnin_get_component()How to Test
Save the file as
esper_optimized.py.In your benchmark, replace:
with:
Run benchmarks and compare performance metrics.
Where You’ll See the Biggest Gains