Skip to content

Instantly share code, notes, and snippets.

@Hammer2900
Created October 19, 2025 21:03
Show Gist options
  • Select an option

  • Save Hammer2900/90367c6871679953f4fb3b67b3282cdd to your computer and use it in GitHub Desktop.

Select an option

Save Hammer2900/90367c6871679953f4fb3b67b3282cdd to your computer and use it in GitHub Desktop.
esper numpy
"""
Esper-Numpy - высокопроизводительная Entity Component System (ECS) для Python на базе NumPy.
ИСПРАВЛЕНА проблема с копированием данных при индексации.
"""
import time as _time
import numpy as np
import inspect as _inspect
from dataclasses import is_dataclass, fields as _dataclass_fields
from types import MethodType as _MethodType
from typing import (
Any as _Any,
Callable as _Callable,
Dict as _Dict,
List as _List,
Set as _Set,
Type as _Type,
Tuple as _Tuple,
TypeVar as _TypeVar,
Iterable as _Iterable,
Optional as _Optional,
overload as _overload,
)
from weakref import ref as _ref
from weakref import WeakMethod as _WeakMethod
from itertools import count as _count
__version__ = version = '3.5-numpy-fixed'
def dispatch_event(name: str, *args: _Any) -> None:
for func in event_registry.get(name, []):
func()(*args)
def _make_callback(name: str) -> _Callable[[_Any], None]:
def callback(weak_method: _Any) -> None:
event_registry[name].remove(weak_method)
if not event_registry[name]:
del event_registry[name]
return callback
def set_handler(name: str, func: _Callable[..., None]) -> None:
if name not in event_registry:
event_registry[name] = set()
if isinstance(func, _MethodType):
event_registry[name].add(_WeakMethod(func, _make_callback(name)))
else:
event_registry[name].add(_ref(func, _make_callback(name)))
def remove_handler(name: str, func: _Callable[..., None]) -> None:
func_ref = _ref(func)
if func_ref not in event_registry.get(name, []):
return
event_registry[name].remove(func_ref)
if not event_registry[name]:
del event_registry[name]
_C = _TypeVar('_C')
_C2 = _TypeVar('_C2')
_C3 = _TypeVar('_C3')
_C4 = _TypeVar('_C4')
class Processor:
priority = 0
def process(self, *args: _Any, **kwargs: _Any) -> None:
raise NotImplementedError
_entity_count: '_count[int]' = _count(start=1)
_components: _Dict[_Type[_Any], np.ndarray] = {}
_component_dtypes: _Dict[_Type[_Any], np.dtype] = {}
_entity_component_map: _Dict[_Type[_Any], _Dict[int, int]] = {}
_component_entity_map: _Dict[_Type[_Any], _Dict[int, int]] = {}
_living_entities: _Set[int] = set()
_dead_entities: _Set[int] = set()
_get_component_cache: _Dict[_Type[_Any], _List[_Any]] = {}
_get_components_cache: _Dict[_Tuple[_Type[_Any], ...], _List[_Any]] = {}
_cache_dirty: bool = False
_processors: _List[Processor] = []
_processors_dict: _Dict[_Type[Processor], Processor] = {}
event_registry: _Dict[str, _Any] = {}
process_times: _Dict[str, int] = {}
current_world: str = 'default'
_context_map: _Dict[str, _Tuple] = {}
def _get_dtype_and_values(component_instance: _Any) -> _Tuple[np.dtype, tuple]:
if hasattr(component_instance, '__slots__'):
field_names = component_instance.__slots__
values = tuple(getattr(component_instance, name) for name in field_names)
elif is_dataclass(component_instance):
field_names = [f.name for f in _dataclass_fields(component_instance)]
values = tuple(getattr(component_instance, name) for name in field_names)
else:
field_names = sorted(component_instance.__dict__.keys())
values = tuple(component_instance.__dict__[name] for name in field_names)
dtype_pairs = []
for name, value in zip(field_names, values):
numpy_type = np.result_type(type(value))
dtype_pairs.append((name, numpy_type))
return np.dtype(dtype_pairs), values
def _recreate_instance(component_type: _Type[_C], numpy_row: np.void) -> _C:
return component_type(*numpy_row.tolist())
def clear_cache() -> None:
global _cache_dirty
_cache_dirty = True
def _clear_cache_now() -> None:
global _cache_dirty
_get_component_cache.clear()
_get_components_cache.clear()
_cache_dirty = False
def clear_database() -> None:
global _entity_count
_entity_count = _count(start=1)
_components.clear()
_component_dtypes.clear()
_entity_component_map.clear()
_component_entity_map.clear()
_living_entities.clear()
_dead_entities.clear()
_clear_cache_now()
def add_processor(processor_instance: Processor, priority: int = 0) -> None:
processor_instance.priority = priority
_processors.append(processor_instance)
_processors.sort(key=lambda proc: proc.priority, reverse=True)
_processors_dict[type(processor_instance)] = processor_instance
def remove_processor(processor_type: _Type[Processor]) -> None:
processor = _processors_dict.pop(processor_type, None)
if processor:
_processors.remove(processor)
def get_processor(processor_type: _Type[Processor]) -> _Optional[Processor]:
return _processors_dict.get(processor_type)
def create_entity(*components: _C) -> int:
entity = next(_entity_count)
_living_entities.add(entity)
for component_instance in components:
add_component(entity, component_instance)
return entity
def delete_entity(entity: int, immediate: bool = False) -> None:
if not entity_exists(entity):
return
if immediate:
types_to_remove = [ctype for ctype, entity_map in _entity_component_map.items() if entity in entity_map]
for component_type in types_to_remove:
remove_component(entity, component_type)
_living_entities.discard(entity)
else:
_dead_entities.add(entity)
def entity_exists(entity: int) -> bool:
return entity in _living_entities and entity not in _dead_entities
def component_for_entity(entity: int, component_type: _Type[_C]) -> _C:
try:
idx = _entity_component_map[component_type][entity]
numpy_row = _components[component_type][idx]
return _recreate_instance(component_type, numpy_row)
except KeyError:
raise KeyError(f'Сущность {entity} не имеет компонента типа {component_type.__name__}')
def components_for_entity(entity: int) -> _Tuple[_C, ...]:
components_tuple = []
for ctype, entity_map in _entity_component_map.items():
if entity in entity_map:
components_tuple.append(component_for_entity(entity, ctype))
return tuple(components_tuple)
def has_component(entity: int, component_type: _Type[_C]) -> bool:
return entity in _entity_component_map.get(component_type, {})
def has_components(entity: int, *component_types: _Type[_C]) -> bool:
return all(has_component(entity, ctype) for ctype in component_types)
def add_component(entity: int, component_instance: _C, type_alias: _Optional[_Type[_C]] = None) -> None:
component_type = type_alias or type(component_instance)
if component_type not in _components:
dtype, values = _get_dtype_and_values(component_instance)
_component_dtypes[component_type] = dtype
_components[component_type] = np.array([values], dtype=dtype)
_entity_component_map[component_type] = {entity: 0}
_component_entity_map[component_type] = {0: entity}
else:
dtype = _component_dtypes[component_type]
_, values = _get_dtype_and_values(component_instance)
arr = _components[component_type]
new_index = len(arr)
_components[component_type] = np.append(arr, np.array([values], dtype=dtype))
_entity_component_map[component_type][entity] = new_index
_component_entity_map[component_type][new_index] = entity
clear_cache()
def remove_component(entity: int, component_type: _Type[_C]) -> _C:
idx_to_remove = _entity_component_map[component_type][entity]
instance_to_return = component_for_entity(entity, component_type)
arr = _components[component_type]
last_idx = len(arr) - 1
if idx_to_remove != last_idx:
last_element_entity = _component_entity_map[component_type][last_idx]
arr[idx_to_remove] = arr[last_idx]
_entity_component_map[component_type][last_element_entity] = idx_to_remove
_component_entity_map[component_type][idx_to_remove] = last_element_entity
del _entity_component_map[component_type][entity]
del _component_entity_map[component_type][last_idx]
_components[component_type] = np.delete(arr, last_idx, axis=0)
if not _entity_component_map[component_type]:
del _components[component_type]
del _component_dtypes[component_type]
del _entity_component_map[component_type]
del _component_entity_map[component_type]
clear_cache()
return instance_to_return
def _get_component(component_type: _Type[_C]) -> _Iterable[_Tuple[int, _C]]:
if component_type not in _components:
return
arr = _components[component_type]
idx_entity_map = _component_entity_map[component_type]
for i, row in enumerate(arr):
entity_id = idx_entity_map[i]
if entity_id not in _dead_entities:
yield entity_id, _recreate_instance(component_type, row)
def _get_components(*component_types: _Type[_C]) -> _Iterable[_Tuple[int, _Tuple[_C, ...]]]:
if not component_types:
return
for ctype in component_types:
if ctype not in _entity_component_map:
return
entity_sets = [
set(entity_map.keys()) for ctype, entity_map in _entity_component_map.items() if ctype in component_types
]
valid_entities = set.intersection(*entity_sets)
for entity in valid_entities:
if entity not in _dead_entities:
component_instances = tuple(component_for_entity(entity, ctype) for ctype in component_types)
yield entity, component_instances
def get_component(component_type: _Type[_C]) -> _List[_Tuple[int, _C]]:
if _cache_dirty:
_clear_cache_now()
cached = _get_component_cache.get(component_type)
if cached is not None:
return cached
result = list(_get_component(component_type))
_get_component_cache[component_type] = result
return result
@_overload
def get_components(__c1: _Type[_C], __c2: _Type[_C2]) -> _List[_Tuple[int, _Tuple[_C, _C2]]]: ...
@_overload
def get_components(__c1: _Type[_C], __c2: _Type[_C2], __c3: _Type[_C3]) -> _List[_Tuple[int, _Tuple[_C, _C2, _C3]]]: ...
@_overload
def get_components(
__c1: _Type[_C], __c2: _Type[_C2], __c3: _Type[_C3], __c4: _Type[_C4]
) -> _List[_Tuple[int, _Tuple[_C, _C2, _C3, _C4]]]: ...
def get_components(*component_types: _Type[_Any]) -> _List[_Tuple[int, _Tuple[_Any, ...]]]:
if _cache_dirty:
_clear_cache_now()
cached = _get_components_cache.get(component_types)
if cached is not None:
return cached
result = list(_get_components(*component_types))
_get_components_cache[component_types] = result
return result
def try_component(entity: int, component_type: _Type[_C]) -> _Optional[_C]:
if has_component(entity, component_type):
return component_for_entity(entity, component_type)
return None
@_overload
def try_components(entity: int, __c1: _Type[_C], __c2: _Type[_C2]) -> _Optional[_Tuple[_C, _C2]]: ...
@_overload
def try_components(
entity: int, __c1: _Type[_C], __c2: _Type[_C2], __c3: _Type[_C3]
) -> _Optional[_Tuple[_C, _C2, _C3]]: ...
@_overload
def try_components(
entity: int, __c1: _Type[_C], __c2: _Type[_C2], __c3: _Type[_C3], __c4: _Type[_C4]
) -> _Optional[_Tuple[_C, _C2, _C3, _C4]]: ...
def try_components(entity: int, *component_types: _Type[_C]) -> _Optional[_Tuple[_C, ...]]:
if has_components(entity, *component_types):
return tuple(component_for_entity(entity, ctype) for ctype in component_types)
return None
def clear_dead_entities() -> None:
if not _dead_entities:
return
for entity in list(_dead_entities):
delete_entity(entity, immediate=True)
_dead_entities.clear()
def process(*args: _Any, **kwargs: _Any) -> None:
clear_dead_entities()
for processor in _processors:
processor.process(*args, **kwargs)
def timed_process(*args: _Any, **kwargs: _Any) -> None:
clear_dead_entities()
for processor in _processors:
start_time = _time.process_time()
processor.process(*args, **kwargs)
process_times[processor.__class__.__name__] = int((_time.process_time() - start_time) * 1000)
def list_worlds() -> _List[str]:
return list(_context_map)
def delete_world(name: str) -> None:
if current_world == name:
raise PermissionError('Активный мир (контекст) не может быть удален.')
del _context_map[name]
def switch_world(name: str) -> None:
if name not in _context_map:
_context_map[name] = (_count(start=1), {}, {}, {}, {}, {}, set(), set(), [], {}, False, {}, {})
global _entity_count, _components, _component_dtypes, _entity_component_map
global _component_entity_map, _living_entities, _dead_entities, _get_component_cache
global _get_components_cache, _processors, _processors_dict, _cache_dirty
global process_times, event_registry, current_world
(
_entity_count,
_components,
_component_dtypes,
_entity_component_map,
_component_entity_map,
_living_entities,
_dead_entities,
_get_component_cache,
_get_components_cache,
_processors,
_processors_dict,
_cache_dirty,
process_times,
event_registry,
) = _context_map[name]
current_world = name
def get_component_arrays_direct(*component_types: _Type[_C]) -> _Tuple[_Tuple[np.ndarray, np.ndarray], ...]:
"""
Возвращает кортежи (full_array, indices) для каждого типа компонента.
Использование:
(pos_arr, pos_idx), (vel_arr, vel_idx) = get_component_arrays_direct(Position, Velocity)
# Теперь изменения напрямую влияют на данные:
pos_arr['x'][pos_idx] += vel_arr['x'][vel_idx] * dt
Returns:
Tuple of (array, indices) pairs, where:
- array: полный NumPy массив компонентов этого типа
- indices: индексы элементов, у которых есть ВСЕ запрошенные компоненты
"""
if not component_types:
return tuple()
for ctype in component_types:
if ctype not in _entity_component_map:
empty = np.array([], dtype=_component_dtypes.get(ctype, object))
empty_idx = np.array([], dtype=int)
return tuple((empty, empty_idx) for _ in component_types)
entity_sets = [set(_entity_component_map[ctype].keys()) for ctype in component_types]
valid_entities = sorted(list(set.intersection(*entity_sets)))
if not valid_entities:
empty = np.array([], dtype=int)
return tuple((_components.get(ctype, np.array([])), empty) for ctype in component_types)
results = []
for ctype in component_types:
entity_map = _entity_component_map[ctype]
indices = np.array([entity_map[ent] for ent in valid_entities], dtype=int)
results.append((_components[ctype], indices))
return tuple(results)
def get_entities_with_components(*component_types: _Type[_C]) -> _List[int]:
"""Возвращает список ID сущностей, у которых есть все указанные компоненты."""
if not component_types:
return []
for ctype in component_types:
if ctype not in _entity_component_map:
return []
entity_sets = [set(_entity_component_map[ctype].keys()) for ctype in component_types]
valid_entities = sorted(list(set.intersection(*entity_sets)))
return valid_entities
_context_map['default'] = (
_entity_count,
_components,
_component_dtypes,
_entity_component_map,
_component_entity_map,
_living_entities,
_dead_entities,
_get_component_cache,
_get_components_cache,
_processors,
_processors_dict,
_cache_dirty,
process_times,
event_registry,
)
import esper as es
from pyray import *
import random
import time
import math
from dataclasses import dataclass
from typing import List, Dict
import numpy as np
# Компоненты
class Position:
__slots__ = ('x', 'y')
def __init__(self, x: float = 0.0, y: float = 0.0):
self.x = x
self.y = y
class Velocity:
__slots__ = ('x', 'y')
def __init__(self, x: float = 0.0, y: float = 0.0):
self.x = x
self.y = y
class Renderable:
__slots__ = ('color', 'radius')
def __init__(self, color: Color, radius: float):
self.color = color
self.radius = radius
class Lifetime:
__slots__ = ('time_left', 'max_time')
def __init__(self, max_time: float):
self.time_left = max_time
self.max_time = max_time
class Acceleration:
__slots__ = ('x', 'y')
def __init__(self, x: float = 0.0, y: float = 0.0):
self.x = x
self.y = y
class Bouncing:
__slots__ = ('width', 'height')
def __init__(self, width: int, height: int):
self.width = width
self.height = height
# ИСПРАВЛЕННЫЕ СИСТЕМЫ
class MovementSystem(es.Processor):
priority = 100
def __init__(self):
self.iterations = 0
self.total_time = 0.0
def process(self, dt: float):
start = time.perf_counter()
# Получаем прямой доступ к массивам и индексам
arrays = es.get_component_arrays_direct(Position, Velocity)
if not arrays:
return
(pos_array, pos_idx), (vel_array, vel_idx) = arrays
# Теперь изменения напрямую влияют на исходные данные!
if len(pos_idx) > 0:
pos_array['x'][pos_idx] += vel_array['x'][vel_idx] * dt
pos_array['y'][pos_idx] += vel_array['y'][vel_idx] * dt
self.total_time += time.perf_counter() - start
self.iterations += 1
class AccelerationSystem(es.Processor):
priority = 90
def __init__(self):
self.iterations = 0
self.total_time = 0.0
def process(self, dt: float):
start = time.perf_counter()
arrays = es.get_component_arrays_direct(Velocity, Acceleration)
if not arrays:
return
(vel_array, vel_idx), (acc_array, acc_idx) = arrays
if len(vel_idx) > 0:
vel_array['x'][vel_idx] += acc_array['x'][acc_idx] * dt
vel_array['y'][vel_idx] += acc_array['y'][acc_idx] * dt
self.total_time += time.perf_counter() - start
self.iterations += 1
class BouncingSystem(es.Processor):
priority = 80
def __init__(self):
self.iterations = 0
self.total_time = 0.0
def process(self, dt: float):
start = time.perf_counter()
arrays = es.get_component_arrays_direct(Position, Velocity, Bouncing, Renderable)
if not arrays:
return
(pos_array, pos_idx), (vel_array, vel_idx), (bounce_array, bounce_idx), (rend_array, rend_idx) = arrays
if len(pos_idx) > 0:
# Отскок от левой/правой границы
left_hit = pos_array['x'][pos_idx] - rend_array['radius'][rend_idx] < 0
right_hit = pos_array['x'][pos_idx] + rend_array['radius'][rend_idx] > bounce_array['width'][bounce_idx]
# Применяем изменения через индексы
hit_mask = left_hit | right_hit
vel_array['x'][vel_idx[hit_mask]] *= -0.95
# Корректируем позицию
pos_array['x'][pos_idx] = np.clip(
pos_array['x'][pos_idx],
rend_array['radius'][rend_idx],
bounce_array['width'][bounce_idx] - rend_array['radius'][rend_idx],
)
# Отскок от верхней/нижней границы
top_hit = pos_array['y'][pos_idx] - rend_array['radius'][rend_idx] < 0
bottom_hit = pos_array['y'][pos_idx] + rend_array['radius'][rend_idx] > bounce_array['height'][bounce_idx]
hit_mask = top_hit | bottom_hit
vel_array['y'][vel_idx[hit_mask]] *= -0.95
pos_array['y'][pos_idx] = np.clip(
pos_array['y'][pos_idx],
rend_array['radius'][rend_idx],
bounce_array['height'][bounce_idx] - rend_array['radius'][rend_idx],
)
self.total_time += time.perf_counter() - start
self.iterations += 1
class LifetimeSystem(es.Processor):
priority = 70
def __init__(self):
self.iterations = 0
self.total_time = 0.0
self.deleted_count = 0
def process(self, dt: float):
start = time.perf_counter()
entities = es.get_entities_with_components(Lifetime)
arrays = es.get_component_arrays_direct(Lifetime)
if not arrays or not entities:
return
(lifetime_array, lifetime_idx) = arrays[0]
if len(lifetime_idx) > 0:
# Уменьшаем время жизни
lifetime_array['time_left'][lifetime_idx] -= dt
# Находим умерших
dead_mask = lifetime_array['time_left'][lifetime_idx] <= 0
# Удаляем всех умерших
for i, entity_id in enumerate(entities):
if dead_mask[i]:
es.delete_entity(entity_id)
self.deleted_count += 1
self.total_time += time.perf_counter() - start
self.iterations += 1
class RenderSystem(es.Processor):
priority = 0
def __init__(self):
self.iterations = 0
self.total_time = 0.0
self.rendered_count = 0
def process(self, dt: float):
start = time.perf_counter()
count = 0
# Для рендеринга удобнее использовать get_components
for ent, (pos, rend) in es.get_components(Position, Renderable):
draw_circle(int(pos.x), int(pos.y), rend.radius, rend.color)
count += 1
self.rendered_count = count
self.total_time += time.perf_counter() - start
self.iterations += 1
# Бенчмарк (без изменений)
class BenchmarkStats:
def __init__(self):
self.frame_times: List[float] = []
self.entity_counts: List[int] = []
self.fps_history: List[float] = []
self.start_time = time.time()
def add_frame(self, frame_time: float, entity_count: int, fps: float):
self.frame_times.append(frame_time)
self.entity_counts.append(entity_count)
self.fps_history.append(fps)
if len(self.frame_times) > 300:
self.frame_times.pop(0)
self.entity_counts.pop(0)
self.fps_history.pop(0)
def get_avg_fps(self) -> float:
if not self.fps_history:
return 0.0
return sum(self.fps_history) / len(self.fps_history)
def get_avg_frame_time(self) -> float:
if not self.frame_times:
return 0.0
return sum(self.frame_times) / len(self.frame_times) * 1000
def get_min_fps(self) -> float:
return min(self.fps_history) if self.fps_history else 0.0
def get_max_fps(self) -> float:
return max(self.fps_history) if self.fps_history else 0.0
class Benchmark:
def __init__(self, width: int = 1200, height: int = 800):
self.screen_width = width
self.screen_height = height
self.stats = BenchmarkStats()
self.spawn_rate = 50
self.auto_spawn = False
self.auto_spawn_rate = 10
self.movement_sys = None
self.acceleration_sys = None
self.bouncing_sys = None
self.lifetime_sys = None
self.render_sys = None
def setup(self):
init_window(self.screen_width, self.screen_height, 'ECS Performance Benchmark (NumPy Fixed)')
set_target_fps(120)
self.movement_sys = MovementSystem()
self.acceleration_sys = AccelerationSystem()
self.bouncing_sys = BouncingSystem()
self.lifetime_sys = LifetimeSystem()
self.render_sys = RenderSystem()
es.add_processor(self.acceleration_sys, priority=90)
es.add_processor(self.movement_sys, priority=100)
es.add_processor(self.bouncing_sys, priority=80)
es.add_processor(self.lifetime_sys, priority=70)
es.add_processor(self.render_sys, priority=0)
def spawn_entity(self, with_lifetime: bool = True):
pos = Position(random.uniform(50, self.screen_width - 50), random.uniform(50, self.screen_height - 50))
vel = Velocity(random.uniform(-200, 200), random.uniform(-200, 200))
acc = Acceleration(0, 50)
rend = Renderable(
Color(random.randint(50, 255), random.randint(50, 255), random.randint(50, 255), 255), random.uniform(3, 10)
)
bouncing = Bouncing(self.screen_width, self.screen_height)
components = [pos, vel, acc, rend, bouncing]
# ИСПРАВЛЕНИЕ: Всегда добавляем Lifetime для консистентности
if with_lifetime:
components.append(Lifetime(random.uniform(5, 15)))
else:
# Даже "постоянным" сущностям даем очень большой Lifetime
components.append(Lifetime(999999.0))
es.create_entity(*components)
def spawn_batch(self, count: int, with_lifetime: bool = True):
for _ in range(count):
self.spawn_entity(with_lifetime)
def get_entity_count(self) -> int:
return len(es._living_entities)
def draw_ui(self, dt: float):
entity_count = self.get_entity_count()
fps = get_fps()
y_offset = 10
line_height = 25
draw_text(f'FPS: {fps}', 10, y_offset, 20, GREEN)
y_offset += line_height
draw_text(f'Entities: {entity_count}', 10, y_offset, 20, YELLOW)
y_offset += line_height
draw_text(f'Frame time: {dt * 1000:.2f} ms', 10, y_offset, 20, ORANGE)
y_offset += line_height
avg_fps = self.stats.get_avg_fps()
avg_frame = self.stats.get_avg_frame_time()
draw_text(f'Avg FPS: {avg_fps:.1f}', 10, y_offset, 20, LIGHTGRAY)
y_offset += line_height
draw_text(f'Avg Frame: {avg_frame:.2f} ms', 10, y_offset, 20, LIGHTGRAY)
y_offset += line_height
min_fps = self.stats.get_min_fps()
max_fps = self.stats.get_max_fps()
draw_text(f'Min/Max FPS: {min_fps:.0f}/{max_fps:.0f}', 10, y_offset, 20, LIGHTGRAY)
y_offset += line_height
y_offset += 10
if self.movement_sys and self.movement_sys.iterations > 0:
avg_time = (self.movement_sys.total_time / self.movement_sys.iterations) * 1000
draw_text(f'MovementSystem: {avg_time:.3f} ms', 10, y_offset, 18, SKYBLUE)
y_offset += line_height
if self.acceleration_sys and self.acceleration_sys.iterations > 0:
avg_time = (self.acceleration_sys.total_time / self.acceleration_sys.iterations) * 1000
draw_text(f'AccelerationSystem: {avg_time:.3f} ms', 10, y_offset, 18, SKYBLUE)
y_offset += line_height
if self.bouncing_sys and self.bouncing_sys.iterations > 0:
avg_time = (self.bouncing_sys.total_time / self.bouncing_sys.iterations) * 1000
draw_text(f'BouncingSystem: {avg_time:.3f} ms', 10, y_offset, 18, SKYBLUE)
y_offset += line_height
if self.lifetime_sys and self.lifetime_sys.iterations > 0:
avg_time = (self.lifetime_sys.total_time / self.lifetime_sys.iterations) * 1000
draw_text(f'LifetimeSystem: {avg_time:.3f} ms', 10, y_offset, 18, SKYBLUE)
draw_text(f'(deleted: {self.lifetime_sys.deleted_count})', 250, y_offset, 18, GRAY)
y_offset += line_height
if self.render_sys and self.render_sys.iterations > 0:
avg_time = (self.render_sys.total_time / self.render_sys.iterations) * 1000
draw_text(f'RenderSystem: {avg_time:.3f} ms', 10, y_offset, 18, SKYBLUE)
y_offset += line_height
y_offset += 20
draw_text('Controls:', 10, y_offset, 20, WHITE)
y_offset += line_height
draw_text(f'SPACE: Spawn {self.spawn_rate} entities', 10, y_offset, 18, LIGHTGRAY)
y_offset += line_height
draw_text(
f"A: Auto-spawn {'ON' if self.auto_spawn else 'OFF'}", 10, y_offset, 18, GREEN if self.auto_spawn else RED
)
y_offset += line_height
draw_text('UP/DOWN: Change spawn rate', 10, y_offset, 18, LIGHTGRAY)
y_offset += line_height
draw_text('C: Clear all entities', 10, y_offset, 18, LIGHTGRAY)
y_offset += line_height
draw_text('R: Reset stats', 10, y_offset, 18, LIGHTGRAY)
def handle_input(self):
if is_key_pressed(KEY_SPACE):
self.spawn_batch(self.spawn_rate, with_lifetime=True)
if is_key_pressed(KEY_A):
self.auto_spawn = not self.auto_spawn
if is_key_pressed(KEY_UP):
self.spawn_rate = min(1000, self.spawn_rate + 10)
if is_key_pressed(KEY_DOWN):
self.spawn_rate = max(10, self.spawn_rate - 10)
if is_key_pressed(KEY_C):
es.clear_database()
print('Database cleared!')
if is_key_pressed(KEY_R):
self.stats = BenchmarkStats()
for sys in [
self.movement_sys,
self.acceleration_sys,
self.bouncing_sys,
self.lifetime_sys,
self.render_sys,
]:
if sys:
sys.iterations = 0
sys.total_time = 0.0
print('Stats reset!')
def run(self):
self.setup()
self.spawn_batch(100, with_lifetime=False)
last_time = time.time()
while not window_should_close():
current_time = time.time()
dt = current_time - last_time
last_time = current_time
if self.auto_spawn:
self.spawn_batch(self.auto_spawn_rate, with_lifetime=True)
self.handle_input()
frame_start = time.perf_counter()
es.process(dt)
frame_time = time.perf_counter() - frame_start
self.stats.add_frame(frame_time, self.get_entity_count(), get_fps())
begin_drawing()
clear_background(Color(20, 20, 30, 255))
self.draw_ui(dt)
end_drawing()
close_window()
self.print_final_stats()
def print_final_stats(self):
print('\n' + '=' * 50)
print('FINAL BENCHMARK STATISTICS')
print('=' * 50)
print(f'Total runtime: {time.time() - self.stats.start_time:.2f} seconds')
print(f'Average FPS: {self.stats.get_avg_fps():.2f}')
print(f'Min FPS: {self.stats.get_min_fps():.2f}')
print(f'Max FPS: {self.stats.get_max_fps():.2f}')
print(f'Average frame time: {self.stats.get_avg_frame_time():.3f} ms')
print(f'\nTotal entities deleted: {self.lifetime_sys.deleted_count if self.lifetime_sys else 0}')
print('=' * 50)
if __name__ == '__main__':
benchmark = Benchmark()
benchmark.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment