Created
October 19, 2025 21:03
-
-
Save Hammer2900/90367c6871679953f4fb3b67b3282cdd to your computer and use it in GitHub Desktop.
esper numpy
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| Esper-Numpy - высокопроизводительная Entity Component System (ECS) для Python на базе NumPy. | |
| ИСПРАВЛЕНА проблема с копированием данных при индексации. | |
| """ | |
| import time as _time | |
| import numpy as np | |
| import inspect as _inspect | |
| from dataclasses import is_dataclass, fields as _dataclass_fields | |
| from types import MethodType as _MethodType | |
| from typing import ( | |
| Any as _Any, | |
| Callable as _Callable, | |
| Dict as _Dict, | |
| List as _List, | |
| Set as _Set, | |
| Type as _Type, | |
| Tuple as _Tuple, | |
| TypeVar as _TypeVar, | |
| Iterable as _Iterable, | |
| Optional as _Optional, | |
| overload as _overload, | |
| ) | |
| from weakref import ref as _ref | |
| from weakref import WeakMethod as _WeakMethod | |
| from itertools import count as _count | |
| __version__ = version = '3.5-numpy-fixed' | |
| def dispatch_event(name: str, *args: _Any) -> None: | |
| for func in event_registry.get(name, []): | |
| func()(*args) | |
| def _make_callback(name: str) -> _Callable[[_Any], None]: | |
| def callback(weak_method: _Any) -> None: | |
| event_registry[name].remove(weak_method) | |
| if not event_registry[name]: | |
| del event_registry[name] | |
| return callback | |
| def set_handler(name: str, func: _Callable[..., None]) -> None: | |
| if name not in event_registry: | |
| event_registry[name] = set() | |
| if isinstance(func, _MethodType): | |
| event_registry[name].add(_WeakMethod(func, _make_callback(name))) | |
| else: | |
| event_registry[name].add(_ref(func, _make_callback(name))) | |
| def remove_handler(name: str, func: _Callable[..., None]) -> None: | |
| func_ref = _ref(func) | |
| if func_ref not in event_registry.get(name, []): | |
| return | |
| event_registry[name].remove(func_ref) | |
| if not event_registry[name]: | |
| del event_registry[name] | |
| _C = _TypeVar('_C') | |
| _C2 = _TypeVar('_C2') | |
| _C3 = _TypeVar('_C3') | |
| _C4 = _TypeVar('_C4') | |
| class Processor: | |
| priority = 0 | |
| def process(self, *args: _Any, **kwargs: _Any) -> None: | |
| raise NotImplementedError | |
| _entity_count: '_count[int]' = _count(start=1) | |
| _components: _Dict[_Type[_Any], np.ndarray] = {} | |
| _component_dtypes: _Dict[_Type[_Any], np.dtype] = {} | |
| _entity_component_map: _Dict[_Type[_Any], _Dict[int, int]] = {} | |
| _component_entity_map: _Dict[_Type[_Any], _Dict[int, int]] = {} | |
| _living_entities: _Set[int] = set() | |
| _dead_entities: _Set[int] = set() | |
| _get_component_cache: _Dict[_Type[_Any], _List[_Any]] = {} | |
| _get_components_cache: _Dict[_Tuple[_Type[_Any], ...], _List[_Any]] = {} | |
| _cache_dirty: bool = False | |
| _processors: _List[Processor] = [] | |
| _processors_dict: _Dict[_Type[Processor], Processor] = {} | |
| event_registry: _Dict[str, _Any] = {} | |
| process_times: _Dict[str, int] = {} | |
| current_world: str = 'default' | |
| _context_map: _Dict[str, _Tuple] = {} | |
| def _get_dtype_and_values(component_instance: _Any) -> _Tuple[np.dtype, tuple]: | |
| if hasattr(component_instance, '__slots__'): | |
| field_names = component_instance.__slots__ | |
| values = tuple(getattr(component_instance, name) for name in field_names) | |
| elif is_dataclass(component_instance): | |
| field_names = [f.name for f in _dataclass_fields(component_instance)] | |
| values = tuple(getattr(component_instance, name) for name in field_names) | |
| else: | |
| field_names = sorted(component_instance.__dict__.keys()) | |
| values = tuple(component_instance.__dict__[name] for name in field_names) | |
| dtype_pairs = [] | |
| for name, value in zip(field_names, values): | |
| numpy_type = np.result_type(type(value)) | |
| dtype_pairs.append((name, numpy_type)) | |
| return np.dtype(dtype_pairs), values | |
| def _recreate_instance(component_type: _Type[_C], numpy_row: np.void) -> _C: | |
| return component_type(*numpy_row.tolist()) | |
| def clear_cache() -> None: | |
| global _cache_dirty | |
| _cache_dirty = True | |
| def _clear_cache_now() -> None: | |
| global _cache_dirty | |
| _get_component_cache.clear() | |
| _get_components_cache.clear() | |
| _cache_dirty = False | |
| def clear_database() -> None: | |
| global _entity_count | |
| _entity_count = _count(start=1) | |
| _components.clear() | |
| _component_dtypes.clear() | |
| _entity_component_map.clear() | |
| _component_entity_map.clear() | |
| _living_entities.clear() | |
| _dead_entities.clear() | |
| _clear_cache_now() | |
| def add_processor(processor_instance: Processor, priority: int = 0) -> None: | |
| processor_instance.priority = priority | |
| _processors.append(processor_instance) | |
| _processors.sort(key=lambda proc: proc.priority, reverse=True) | |
| _processors_dict[type(processor_instance)] = processor_instance | |
| def remove_processor(processor_type: _Type[Processor]) -> None: | |
| processor = _processors_dict.pop(processor_type, None) | |
| if processor: | |
| _processors.remove(processor) | |
| def get_processor(processor_type: _Type[Processor]) -> _Optional[Processor]: | |
| return _processors_dict.get(processor_type) | |
| def create_entity(*components: _C) -> int: | |
| entity = next(_entity_count) | |
| _living_entities.add(entity) | |
| for component_instance in components: | |
| add_component(entity, component_instance) | |
| return entity | |
| def delete_entity(entity: int, immediate: bool = False) -> None: | |
| if not entity_exists(entity): | |
| return | |
| if immediate: | |
| types_to_remove = [ctype for ctype, entity_map in _entity_component_map.items() if entity in entity_map] | |
| for component_type in types_to_remove: | |
| remove_component(entity, component_type) | |
| _living_entities.discard(entity) | |
| else: | |
| _dead_entities.add(entity) | |
| def entity_exists(entity: int) -> bool: | |
| return entity in _living_entities and entity not in _dead_entities | |
| def component_for_entity(entity: int, component_type: _Type[_C]) -> _C: | |
| try: | |
| idx = _entity_component_map[component_type][entity] | |
| numpy_row = _components[component_type][idx] | |
| return _recreate_instance(component_type, numpy_row) | |
| except KeyError: | |
| raise KeyError(f'Сущность {entity} не имеет компонента типа {component_type.__name__}') | |
| def components_for_entity(entity: int) -> _Tuple[_C, ...]: | |
| components_tuple = [] | |
| for ctype, entity_map in _entity_component_map.items(): | |
| if entity in entity_map: | |
| components_tuple.append(component_for_entity(entity, ctype)) | |
| return tuple(components_tuple) | |
| def has_component(entity: int, component_type: _Type[_C]) -> bool: | |
| return entity in _entity_component_map.get(component_type, {}) | |
| def has_components(entity: int, *component_types: _Type[_C]) -> bool: | |
| return all(has_component(entity, ctype) for ctype in component_types) | |
| def add_component(entity: int, component_instance: _C, type_alias: _Optional[_Type[_C]] = None) -> None: | |
| component_type = type_alias or type(component_instance) | |
| if component_type not in _components: | |
| dtype, values = _get_dtype_and_values(component_instance) | |
| _component_dtypes[component_type] = dtype | |
| _components[component_type] = np.array([values], dtype=dtype) | |
| _entity_component_map[component_type] = {entity: 0} | |
| _component_entity_map[component_type] = {0: entity} | |
| else: | |
| dtype = _component_dtypes[component_type] | |
| _, values = _get_dtype_and_values(component_instance) | |
| arr = _components[component_type] | |
| new_index = len(arr) | |
| _components[component_type] = np.append(arr, np.array([values], dtype=dtype)) | |
| _entity_component_map[component_type][entity] = new_index | |
| _component_entity_map[component_type][new_index] = entity | |
| clear_cache() | |
| def remove_component(entity: int, component_type: _Type[_C]) -> _C: | |
| idx_to_remove = _entity_component_map[component_type][entity] | |
| instance_to_return = component_for_entity(entity, component_type) | |
| arr = _components[component_type] | |
| last_idx = len(arr) - 1 | |
| if idx_to_remove != last_idx: | |
| last_element_entity = _component_entity_map[component_type][last_idx] | |
| arr[idx_to_remove] = arr[last_idx] | |
| _entity_component_map[component_type][last_element_entity] = idx_to_remove | |
| _component_entity_map[component_type][idx_to_remove] = last_element_entity | |
| del _entity_component_map[component_type][entity] | |
| del _component_entity_map[component_type][last_idx] | |
| _components[component_type] = np.delete(arr, last_idx, axis=0) | |
| if not _entity_component_map[component_type]: | |
| del _components[component_type] | |
| del _component_dtypes[component_type] | |
| del _entity_component_map[component_type] | |
| del _component_entity_map[component_type] | |
| clear_cache() | |
| return instance_to_return | |
| def _get_component(component_type: _Type[_C]) -> _Iterable[_Tuple[int, _C]]: | |
| if component_type not in _components: | |
| return | |
| arr = _components[component_type] | |
| idx_entity_map = _component_entity_map[component_type] | |
| for i, row in enumerate(arr): | |
| entity_id = idx_entity_map[i] | |
| if entity_id not in _dead_entities: | |
| yield entity_id, _recreate_instance(component_type, row) | |
| def _get_components(*component_types: _Type[_C]) -> _Iterable[_Tuple[int, _Tuple[_C, ...]]]: | |
| if not component_types: | |
| return | |
| for ctype in component_types: | |
| if ctype not in _entity_component_map: | |
| return | |
| entity_sets = [ | |
| set(entity_map.keys()) for ctype, entity_map in _entity_component_map.items() if ctype in component_types | |
| ] | |
| valid_entities = set.intersection(*entity_sets) | |
| for entity in valid_entities: | |
| if entity not in _dead_entities: | |
| component_instances = tuple(component_for_entity(entity, ctype) for ctype in component_types) | |
| yield entity, component_instances | |
| def get_component(component_type: _Type[_C]) -> _List[_Tuple[int, _C]]: | |
| if _cache_dirty: | |
| _clear_cache_now() | |
| cached = _get_component_cache.get(component_type) | |
| if cached is not None: | |
| return cached | |
| result = list(_get_component(component_type)) | |
| _get_component_cache[component_type] = result | |
| return result | |
| @_overload | |
| def get_components(__c1: _Type[_C], __c2: _Type[_C2]) -> _List[_Tuple[int, _Tuple[_C, _C2]]]: ... | |
| @_overload | |
| def get_components(__c1: _Type[_C], __c2: _Type[_C2], __c3: _Type[_C3]) -> _List[_Tuple[int, _Tuple[_C, _C2, _C3]]]: ... | |
| @_overload | |
| def get_components( | |
| __c1: _Type[_C], __c2: _Type[_C2], __c3: _Type[_C3], __c4: _Type[_C4] | |
| ) -> _List[_Tuple[int, _Tuple[_C, _C2, _C3, _C4]]]: ... | |
| def get_components(*component_types: _Type[_Any]) -> _List[_Tuple[int, _Tuple[_Any, ...]]]: | |
| if _cache_dirty: | |
| _clear_cache_now() | |
| cached = _get_components_cache.get(component_types) | |
| if cached is not None: | |
| return cached | |
| result = list(_get_components(*component_types)) | |
| _get_components_cache[component_types] = result | |
| return result | |
| def try_component(entity: int, component_type: _Type[_C]) -> _Optional[_C]: | |
| if has_component(entity, component_type): | |
| return component_for_entity(entity, component_type) | |
| return None | |
| @_overload | |
| def try_components(entity: int, __c1: _Type[_C], __c2: _Type[_C2]) -> _Optional[_Tuple[_C, _C2]]: ... | |
| @_overload | |
| def try_components( | |
| entity: int, __c1: _Type[_C], __c2: _Type[_C2], __c3: _Type[_C3] | |
| ) -> _Optional[_Tuple[_C, _C2, _C3]]: ... | |
| @_overload | |
| def try_components( | |
| entity: int, __c1: _Type[_C], __c2: _Type[_C2], __c3: _Type[_C3], __c4: _Type[_C4] | |
| ) -> _Optional[_Tuple[_C, _C2, _C3, _C4]]: ... | |
| def try_components(entity: int, *component_types: _Type[_C]) -> _Optional[_Tuple[_C, ...]]: | |
| if has_components(entity, *component_types): | |
| return tuple(component_for_entity(entity, ctype) for ctype in component_types) | |
| return None | |
| def clear_dead_entities() -> None: | |
| if not _dead_entities: | |
| return | |
| for entity in list(_dead_entities): | |
| delete_entity(entity, immediate=True) | |
| _dead_entities.clear() | |
| def process(*args: _Any, **kwargs: _Any) -> None: | |
| clear_dead_entities() | |
| for processor in _processors: | |
| processor.process(*args, **kwargs) | |
| def timed_process(*args: _Any, **kwargs: _Any) -> None: | |
| clear_dead_entities() | |
| for processor in _processors: | |
| start_time = _time.process_time() | |
| processor.process(*args, **kwargs) | |
| process_times[processor.__class__.__name__] = int((_time.process_time() - start_time) * 1000) | |
| def list_worlds() -> _List[str]: | |
| return list(_context_map) | |
| def delete_world(name: str) -> None: | |
| if current_world == name: | |
| raise PermissionError('Активный мир (контекст) не может быть удален.') | |
| del _context_map[name] | |
| def switch_world(name: str) -> None: | |
| if name not in _context_map: | |
| _context_map[name] = (_count(start=1), {}, {}, {}, {}, {}, set(), set(), [], {}, False, {}, {}) | |
| global _entity_count, _components, _component_dtypes, _entity_component_map | |
| global _component_entity_map, _living_entities, _dead_entities, _get_component_cache | |
| global _get_components_cache, _processors, _processors_dict, _cache_dirty | |
| global process_times, event_registry, current_world | |
| ( | |
| _entity_count, | |
| _components, | |
| _component_dtypes, | |
| _entity_component_map, | |
| _component_entity_map, | |
| _living_entities, | |
| _dead_entities, | |
| _get_component_cache, | |
| _get_components_cache, | |
| _processors, | |
| _processors_dict, | |
| _cache_dirty, | |
| process_times, | |
| event_registry, | |
| ) = _context_map[name] | |
| current_world = name | |
| def get_component_arrays_direct(*component_types: _Type[_C]) -> _Tuple[_Tuple[np.ndarray, np.ndarray], ...]: | |
| """ | |
| Возвращает кортежи (full_array, indices) для каждого типа компонента. | |
| Использование: | |
| (pos_arr, pos_idx), (vel_arr, vel_idx) = get_component_arrays_direct(Position, Velocity) | |
| # Теперь изменения напрямую влияют на данные: | |
| pos_arr['x'][pos_idx] += vel_arr['x'][vel_idx] * dt | |
| Returns: | |
| Tuple of (array, indices) pairs, where: | |
| - array: полный NumPy массив компонентов этого типа | |
| - indices: индексы элементов, у которых есть ВСЕ запрошенные компоненты | |
| """ | |
| if not component_types: | |
| return tuple() | |
| for ctype in component_types: | |
| if ctype not in _entity_component_map: | |
| empty = np.array([], dtype=_component_dtypes.get(ctype, object)) | |
| empty_idx = np.array([], dtype=int) | |
| return tuple((empty, empty_idx) for _ in component_types) | |
| entity_sets = [set(_entity_component_map[ctype].keys()) for ctype in component_types] | |
| valid_entities = sorted(list(set.intersection(*entity_sets))) | |
| if not valid_entities: | |
| empty = np.array([], dtype=int) | |
| return tuple((_components.get(ctype, np.array([])), empty) for ctype in component_types) | |
| results = [] | |
| for ctype in component_types: | |
| entity_map = _entity_component_map[ctype] | |
| indices = np.array([entity_map[ent] for ent in valid_entities], dtype=int) | |
| results.append((_components[ctype], indices)) | |
| return tuple(results) | |
| def get_entities_with_components(*component_types: _Type[_C]) -> _List[int]: | |
| """Возвращает список ID сущностей, у которых есть все указанные компоненты.""" | |
| if not component_types: | |
| return [] | |
| for ctype in component_types: | |
| if ctype not in _entity_component_map: | |
| return [] | |
| entity_sets = [set(_entity_component_map[ctype].keys()) for ctype in component_types] | |
| valid_entities = sorted(list(set.intersection(*entity_sets))) | |
| return valid_entities | |
| _context_map['default'] = ( | |
| _entity_count, | |
| _components, | |
| _component_dtypes, | |
| _entity_component_map, | |
| _component_entity_map, | |
| _living_entities, | |
| _dead_entities, | |
| _get_component_cache, | |
| _get_components_cache, | |
| _processors, | |
| _processors_dict, | |
| _cache_dirty, | |
| process_times, | |
| event_registry, | |
| ) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import esper as es | |
| from pyray import * | |
| import random | |
| import time | |
| import math | |
| from dataclasses import dataclass | |
| from typing import List, Dict | |
| import numpy as np | |
| # Компоненты | |
| class Position: | |
| __slots__ = ('x', 'y') | |
| def __init__(self, x: float = 0.0, y: float = 0.0): | |
| self.x = x | |
| self.y = y | |
| class Velocity: | |
| __slots__ = ('x', 'y') | |
| def __init__(self, x: float = 0.0, y: float = 0.0): | |
| self.x = x | |
| self.y = y | |
| class Renderable: | |
| __slots__ = ('color', 'radius') | |
| def __init__(self, color: Color, radius: float): | |
| self.color = color | |
| self.radius = radius | |
| class Lifetime: | |
| __slots__ = ('time_left', 'max_time') | |
| def __init__(self, max_time: float): | |
| self.time_left = max_time | |
| self.max_time = max_time | |
| class Acceleration: | |
| __slots__ = ('x', 'y') | |
| def __init__(self, x: float = 0.0, y: float = 0.0): | |
| self.x = x | |
| self.y = y | |
| class Bouncing: | |
| __slots__ = ('width', 'height') | |
| def __init__(self, width: int, height: int): | |
| self.width = width | |
| self.height = height | |
| # ИСПРАВЛЕННЫЕ СИСТЕМЫ | |
| class MovementSystem(es.Processor): | |
| priority = 100 | |
| def __init__(self): | |
| self.iterations = 0 | |
| self.total_time = 0.0 | |
| def process(self, dt: float): | |
| start = time.perf_counter() | |
| # Получаем прямой доступ к массивам и индексам | |
| arrays = es.get_component_arrays_direct(Position, Velocity) | |
| if not arrays: | |
| return | |
| (pos_array, pos_idx), (vel_array, vel_idx) = arrays | |
| # Теперь изменения напрямую влияют на исходные данные! | |
| if len(pos_idx) > 0: | |
| pos_array['x'][pos_idx] += vel_array['x'][vel_idx] * dt | |
| pos_array['y'][pos_idx] += vel_array['y'][vel_idx] * dt | |
| self.total_time += time.perf_counter() - start | |
| self.iterations += 1 | |
| class AccelerationSystem(es.Processor): | |
| priority = 90 | |
| def __init__(self): | |
| self.iterations = 0 | |
| self.total_time = 0.0 | |
| def process(self, dt: float): | |
| start = time.perf_counter() | |
| arrays = es.get_component_arrays_direct(Velocity, Acceleration) | |
| if not arrays: | |
| return | |
| (vel_array, vel_idx), (acc_array, acc_idx) = arrays | |
| if len(vel_idx) > 0: | |
| vel_array['x'][vel_idx] += acc_array['x'][acc_idx] * dt | |
| vel_array['y'][vel_idx] += acc_array['y'][acc_idx] * dt | |
| self.total_time += time.perf_counter() - start | |
| self.iterations += 1 | |
| class BouncingSystem(es.Processor): | |
| priority = 80 | |
| def __init__(self): | |
| self.iterations = 0 | |
| self.total_time = 0.0 | |
| def process(self, dt: float): | |
| start = time.perf_counter() | |
| arrays = es.get_component_arrays_direct(Position, Velocity, Bouncing, Renderable) | |
| if not arrays: | |
| return | |
| (pos_array, pos_idx), (vel_array, vel_idx), (bounce_array, bounce_idx), (rend_array, rend_idx) = arrays | |
| if len(pos_idx) > 0: | |
| # Отскок от левой/правой границы | |
| left_hit = pos_array['x'][pos_idx] - rend_array['radius'][rend_idx] < 0 | |
| right_hit = pos_array['x'][pos_idx] + rend_array['radius'][rend_idx] > bounce_array['width'][bounce_idx] | |
| # Применяем изменения через индексы | |
| hit_mask = left_hit | right_hit | |
| vel_array['x'][vel_idx[hit_mask]] *= -0.95 | |
| # Корректируем позицию | |
| pos_array['x'][pos_idx] = np.clip( | |
| pos_array['x'][pos_idx], | |
| rend_array['radius'][rend_idx], | |
| bounce_array['width'][bounce_idx] - rend_array['radius'][rend_idx], | |
| ) | |
| # Отскок от верхней/нижней границы | |
| top_hit = pos_array['y'][pos_idx] - rend_array['radius'][rend_idx] < 0 | |
| bottom_hit = pos_array['y'][pos_idx] + rend_array['radius'][rend_idx] > bounce_array['height'][bounce_idx] | |
| hit_mask = top_hit | bottom_hit | |
| vel_array['y'][vel_idx[hit_mask]] *= -0.95 | |
| pos_array['y'][pos_idx] = np.clip( | |
| pos_array['y'][pos_idx], | |
| rend_array['radius'][rend_idx], | |
| bounce_array['height'][bounce_idx] - rend_array['radius'][rend_idx], | |
| ) | |
| self.total_time += time.perf_counter() - start | |
| self.iterations += 1 | |
| class LifetimeSystem(es.Processor): | |
| priority = 70 | |
| def __init__(self): | |
| self.iterations = 0 | |
| self.total_time = 0.0 | |
| self.deleted_count = 0 | |
| def process(self, dt: float): | |
| start = time.perf_counter() | |
| entities = es.get_entities_with_components(Lifetime) | |
| arrays = es.get_component_arrays_direct(Lifetime) | |
| if not arrays or not entities: | |
| return | |
| (lifetime_array, lifetime_idx) = arrays[0] | |
| if len(lifetime_idx) > 0: | |
| # Уменьшаем время жизни | |
| lifetime_array['time_left'][lifetime_idx] -= dt | |
| # Находим умерших | |
| dead_mask = lifetime_array['time_left'][lifetime_idx] <= 0 | |
| # Удаляем всех умерших | |
| for i, entity_id in enumerate(entities): | |
| if dead_mask[i]: | |
| es.delete_entity(entity_id) | |
| self.deleted_count += 1 | |
| self.total_time += time.perf_counter() - start | |
| self.iterations += 1 | |
| class RenderSystem(es.Processor): | |
| priority = 0 | |
| def __init__(self): | |
| self.iterations = 0 | |
| self.total_time = 0.0 | |
| self.rendered_count = 0 | |
| def process(self, dt: float): | |
| start = time.perf_counter() | |
| count = 0 | |
| # Для рендеринга удобнее использовать get_components | |
| for ent, (pos, rend) in es.get_components(Position, Renderable): | |
| draw_circle(int(pos.x), int(pos.y), rend.radius, rend.color) | |
| count += 1 | |
| self.rendered_count = count | |
| self.total_time += time.perf_counter() - start | |
| self.iterations += 1 | |
| # Бенчмарк (без изменений) | |
| class BenchmarkStats: | |
| def __init__(self): | |
| self.frame_times: List[float] = [] | |
| self.entity_counts: List[int] = [] | |
| self.fps_history: List[float] = [] | |
| self.start_time = time.time() | |
| def add_frame(self, frame_time: float, entity_count: int, fps: float): | |
| self.frame_times.append(frame_time) | |
| self.entity_counts.append(entity_count) | |
| self.fps_history.append(fps) | |
| if len(self.frame_times) > 300: | |
| self.frame_times.pop(0) | |
| self.entity_counts.pop(0) | |
| self.fps_history.pop(0) | |
| def get_avg_fps(self) -> float: | |
| if not self.fps_history: | |
| return 0.0 | |
| return sum(self.fps_history) / len(self.fps_history) | |
| def get_avg_frame_time(self) -> float: | |
| if not self.frame_times: | |
| return 0.0 | |
| return sum(self.frame_times) / len(self.frame_times) * 1000 | |
| def get_min_fps(self) -> float: | |
| return min(self.fps_history) if self.fps_history else 0.0 | |
| def get_max_fps(self) -> float: | |
| return max(self.fps_history) if self.fps_history else 0.0 | |
| class Benchmark: | |
| def __init__(self, width: int = 1200, height: int = 800): | |
| self.screen_width = width | |
| self.screen_height = height | |
| self.stats = BenchmarkStats() | |
| self.spawn_rate = 50 | |
| self.auto_spawn = False | |
| self.auto_spawn_rate = 10 | |
| self.movement_sys = None | |
| self.acceleration_sys = None | |
| self.bouncing_sys = None | |
| self.lifetime_sys = None | |
| self.render_sys = None | |
| def setup(self): | |
| init_window(self.screen_width, self.screen_height, 'ECS Performance Benchmark (NumPy Fixed)') | |
| set_target_fps(120) | |
| self.movement_sys = MovementSystem() | |
| self.acceleration_sys = AccelerationSystem() | |
| self.bouncing_sys = BouncingSystem() | |
| self.lifetime_sys = LifetimeSystem() | |
| self.render_sys = RenderSystem() | |
| es.add_processor(self.acceleration_sys, priority=90) | |
| es.add_processor(self.movement_sys, priority=100) | |
| es.add_processor(self.bouncing_sys, priority=80) | |
| es.add_processor(self.lifetime_sys, priority=70) | |
| es.add_processor(self.render_sys, priority=0) | |
| def spawn_entity(self, with_lifetime: bool = True): | |
| pos = Position(random.uniform(50, self.screen_width - 50), random.uniform(50, self.screen_height - 50)) | |
| vel = Velocity(random.uniform(-200, 200), random.uniform(-200, 200)) | |
| acc = Acceleration(0, 50) | |
| rend = Renderable( | |
| Color(random.randint(50, 255), random.randint(50, 255), random.randint(50, 255), 255), random.uniform(3, 10) | |
| ) | |
| bouncing = Bouncing(self.screen_width, self.screen_height) | |
| components = [pos, vel, acc, rend, bouncing] | |
| # ИСПРАВЛЕНИЕ: Всегда добавляем Lifetime для консистентности | |
| if with_lifetime: | |
| components.append(Lifetime(random.uniform(5, 15))) | |
| else: | |
| # Даже "постоянным" сущностям даем очень большой Lifetime | |
| components.append(Lifetime(999999.0)) | |
| es.create_entity(*components) | |
| def spawn_batch(self, count: int, with_lifetime: bool = True): | |
| for _ in range(count): | |
| self.spawn_entity(with_lifetime) | |
| def get_entity_count(self) -> int: | |
| return len(es._living_entities) | |
| def draw_ui(self, dt: float): | |
| entity_count = self.get_entity_count() | |
| fps = get_fps() | |
| y_offset = 10 | |
| line_height = 25 | |
| draw_text(f'FPS: {fps}', 10, y_offset, 20, GREEN) | |
| y_offset += line_height | |
| draw_text(f'Entities: {entity_count}', 10, y_offset, 20, YELLOW) | |
| y_offset += line_height | |
| draw_text(f'Frame time: {dt * 1000:.2f} ms', 10, y_offset, 20, ORANGE) | |
| y_offset += line_height | |
| avg_fps = self.stats.get_avg_fps() | |
| avg_frame = self.stats.get_avg_frame_time() | |
| draw_text(f'Avg FPS: {avg_fps:.1f}', 10, y_offset, 20, LIGHTGRAY) | |
| y_offset += line_height | |
| draw_text(f'Avg Frame: {avg_frame:.2f} ms', 10, y_offset, 20, LIGHTGRAY) | |
| y_offset += line_height | |
| min_fps = self.stats.get_min_fps() | |
| max_fps = self.stats.get_max_fps() | |
| draw_text(f'Min/Max FPS: {min_fps:.0f}/{max_fps:.0f}', 10, y_offset, 20, LIGHTGRAY) | |
| y_offset += line_height | |
| y_offset += 10 | |
| if self.movement_sys and self.movement_sys.iterations > 0: | |
| avg_time = (self.movement_sys.total_time / self.movement_sys.iterations) * 1000 | |
| draw_text(f'MovementSystem: {avg_time:.3f} ms', 10, y_offset, 18, SKYBLUE) | |
| y_offset += line_height | |
| if self.acceleration_sys and self.acceleration_sys.iterations > 0: | |
| avg_time = (self.acceleration_sys.total_time / self.acceleration_sys.iterations) * 1000 | |
| draw_text(f'AccelerationSystem: {avg_time:.3f} ms', 10, y_offset, 18, SKYBLUE) | |
| y_offset += line_height | |
| if self.bouncing_sys and self.bouncing_sys.iterations > 0: | |
| avg_time = (self.bouncing_sys.total_time / self.bouncing_sys.iterations) * 1000 | |
| draw_text(f'BouncingSystem: {avg_time:.3f} ms', 10, y_offset, 18, SKYBLUE) | |
| y_offset += line_height | |
| if self.lifetime_sys and self.lifetime_sys.iterations > 0: | |
| avg_time = (self.lifetime_sys.total_time / self.lifetime_sys.iterations) * 1000 | |
| draw_text(f'LifetimeSystem: {avg_time:.3f} ms', 10, y_offset, 18, SKYBLUE) | |
| draw_text(f'(deleted: {self.lifetime_sys.deleted_count})', 250, y_offset, 18, GRAY) | |
| y_offset += line_height | |
| if self.render_sys and self.render_sys.iterations > 0: | |
| avg_time = (self.render_sys.total_time / self.render_sys.iterations) * 1000 | |
| draw_text(f'RenderSystem: {avg_time:.3f} ms', 10, y_offset, 18, SKYBLUE) | |
| y_offset += line_height | |
| y_offset += 20 | |
| draw_text('Controls:', 10, y_offset, 20, WHITE) | |
| y_offset += line_height | |
| draw_text(f'SPACE: Spawn {self.spawn_rate} entities', 10, y_offset, 18, LIGHTGRAY) | |
| y_offset += line_height | |
| draw_text( | |
| f"A: Auto-spawn {'ON' if self.auto_spawn else 'OFF'}", 10, y_offset, 18, GREEN if self.auto_spawn else RED | |
| ) | |
| y_offset += line_height | |
| draw_text('UP/DOWN: Change spawn rate', 10, y_offset, 18, LIGHTGRAY) | |
| y_offset += line_height | |
| draw_text('C: Clear all entities', 10, y_offset, 18, LIGHTGRAY) | |
| y_offset += line_height | |
| draw_text('R: Reset stats', 10, y_offset, 18, LIGHTGRAY) | |
| def handle_input(self): | |
| if is_key_pressed(KEY_SPACE): | |
| self.spawn_batch(self.spawn_rate, with_lifetime=True) | |
| if is_key_pressed(KEY_A): | |
| self.auto_spawn = not self.auto_spawn | |
| if is_key_pressed(KEY_UP): | |
| self.spawn_rate = min(1000, self.spawn_rate + 10) | |
| if is_key_pressed(KEY_DOWN): | |
| self.spawn_rate = max(10, self.spawn_rate - 10) | |
| if is_key_pressed(KEY_C): | |
| es.clear_database() | |
| print('Database cleared!') | |
| if is_key_pressed(KEY_R): | |
| self.stats = BenchmarkStats() | |
| for sys in [ | |
| self.movement_sys, | |
| self.acceleration_sys, | |
| self.bouncing_sys, | |
| self.lifetime_sys, | |
| self.render_sys, | |
| ]: | |
| if sys: | |
| sys.iterations = 0 | |
| sys.total_time = 0.0 | |
| print('Stats reset!') | |
| def run(self): | |
| self.setup() | |
| self.spawn_batch(100, with_lifetime=False) | |
| last_time = time.time() | |
| while not window_should_close(): | |
| current_time = time.time() | |
| dt = current_time - last_time | |
| last_time = current_time | |
| if self.auto_spawn: | |
| self.spawn_batch(self.auto_spawn_rate, with_lifetime=True) | |
| self.handle_input() | |
| frame_start = time.perf_counter() | |
| es.process(dt) | |
| frame_time = time.perf_counter() - frame_start | |
| self.stats.add_frame(frame_time, self.get_entity_count(), get_fps()) | |
| begin_drawing() | |
| clear_background(Color(20, 20, 30, 255)) | |
| self.draw_ui(dt) | |
| end_drawing() | |
| close_window() | |
| self.print_final_stats() | |
| def print_final_stats(self): | |
| print('\n' + '=' * 50) | |
| print('FINAL BENCHMARK STATISTICS') | |
| print('=' * 50) | |
| print(f'Total runtime: {time.time() - self.stats.start_time:.2f} seconds') | |
| print(f'Average FPS: {self.stats.get_avg_fps():.2f}') | |
| print(f'Min FPS: {self.stats.get_min_fps():.2f}') | |
| print(f'Max FPS: {self.stats.get_max_fps():.2f}') | |
| print(f'Average frame time: {self.stats.get_avg_frame_time():.3f} ms') | |
| print(f'\nTotal entities deleted: {self.lifetime_sys.deleted_count if self.lifetime_sys else 0}') | |
| print('=' * 50) | |
| if __name__ == '__main__': | |
| benchmark = Benchmark() | |
| benchmark.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment