Created
March 5, 2026 21:43
-
-
Save Bahus/65a1602f60be95a47ddf7ad2691859d3 to your computer and use it in GitHub Desktop.
Бенчмарки SQLAlchemy
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| Бенчмарк SQLAlchemy (psycopg/asyncpg) через PgBouncer и прямой Postgres. | |
| Переменные окружения (обязательные): | |
| PGBOUNCER_DSN_NO_PREPARED — DSN к PgBouncer без prepared statements | |
| PGBOUNCER_DSN_PREPARED — DSN к PgBouncer с включёнными prepared statements | |
| POSTGRES_DSN_DIRECT — DSN напрямую к Postgres | |
| Формат DSN: postgresql+psycopg://user:password@host:port/dbname | |
| или postgresql+asyncpg://user:password@host:port/dbname | |
| Ключевые параметры конфигурации PGBouncer: | |
| pool_mode = transaction | |
| max_client_conn = 1000 | |
| default_pool_size = 1000 | |
| max_prepared_statements = 500 - с включением prepared statements | |
| max_prepared_statements = 0 - без включения prepared statements | |
| Запуск бенчмарка через uv без pyproject.toml (one-liner): | |
| uv run --with "sqlalchemy[asyncio]" --with "psycopg[binary,pool]" \\ | |
| --with faker --with matplotlib --with pandas --with tabulate \\ | |
| main.py [--total 100] [--repeat 25] [--interactive] [--skip-plot] | |
| """ | |
| import argparse | |
| import asyncio | |
| import logging | |
| import os | |
| import time | |
| import uuid | |
| from datetime import datetime, timezone | |
| from dataclasses import dataclass | |
| from statistics import fmean, median, pstdev, stdev | |
| from typing import Any | |
| import matplotlib.pyplot as plt | |
| import pandas as pd | |
| import tabulate | |
| from faker import Faker | |
| from sqlalchemy.engine import make_url | |
| from sqlalchemy.ext.asyncio import ( | |
| AsyncEngine, | |
| AsyncSession, | |
| async_sessionmaker, | |
| create_async_engine, | |
| ) | |
| from sqlalchemy.sql import text | |
| from sqlalchemy.sql.elements import TextClause | |
| ENV_PGBOUNCER_DSN_PREPARED = 'PGBOUNCER_DSN_PREPARED' | |
| ENV_PGBOUNCER_DSN_NO_PREPARED = 'PGBOUNCER_DSN_NO_PREPARED' | |
| ENV_POSTGRES_DSN_DIRECT = 'POSTGRES_DSN_DIRECT' | |
| DEFAULT_QUERY = 'select * from messages_benchmark where sequence_id = :sequence_id;' | |
| BENCHMARK_TABLE_NAME = 'messages_benchmark' | |
| BENCHMARK_ROWS_LIMIT = 1000 | |
| FAKER_LOCALE = 'ru_RU' | |
| @dataclass(frozen=True) | |
| class BenchmarkTarget: | |
| name: str | |
| dsn: str | |
| prepared_statements_enabled: bool | |
| @dataclass(frozen=True) | |
| class BenchmarkResult: | |
| name: str | |
| runs_seconds: list[float] | |
| def parse_args() -> argparse.Namespace: | |
| parser = argparse.ArgumentParser( | |
| description='Benchmark SQLAlchemy + psycopg через pgbouncer и прямой Postgres.', | |
| ) | |
| parser.add_argument('--total', type=int, default=100, help='Запросов в одном запуске.') | |
| parser.add_argument('--repeat', type=int, default=25, help='Количество запусков бенчмарка.') | |
| parser.add_argument('--query', type=str, default=DEFAULT_QUERY, help='SQL запрос для бенчмарка.') | |
| parser.add_argument( | |
| '--plot-output', | |
| type=str, | |
| default='direct_vs_prepared_not_prepared.png', | |
| help='Путь для сохранения графика.', | |
| ) | |
| parser.add_argument( | |
| '--skip-plot', | |
| action='store_true', | |
| help='Не строить и не сохранять график.', | |
| ) | |
| parser.add_argument( | |
| '--interactive', | |
| action='store_true', | |
| help='Подробные логи и запрос нажатия Enter перед каждым запуском бенчмарка.', | |
| ) | |
| return parser.parse_args() | |
| def get_required_env(var_name: str) -> str: | |
| value = os.getenv(var_name) | |
| if not value: | |
| raise RuntimeError(f'Не задана обязательная переменная окружения: {var_name}') | |
| return value | |
| def build_targets() -> list[BenchmarkTarget]: | |
| return [ | |
| BenchmarkTarget( | |
| name='psycopg_bouncer_not_prepared', | |
| dsn=get_required_env(ENV_PGBOUNCER_DSN_NO_PREPARED), | |
| prepared_statements_enabled=False, | |
| ), | |
| BenchmarkTarget( | |
| name='psycopg_bouncer_prepared', | |
| dsn=get_required_env(ENV_PGBOUNCER_DSN_PREPARED), | |
| prepared_statements_enabled=True, | |
| ), | |
| BenchmarkTarget( | |
| name='psycopg_direct_prepared', | |
| dsn=get_required_env(ENV_POSTGRES_DSN_DIRECT), | |
| prepared_statements_enabled=True, | |
| ), | |
| ] | |
| def parse_driver_from_dsn(dsn: str) -> str: | |
| scheme = dsn.split('://', maxsplit=1)[0] | |
| if '+' not in scheme: | |
| raise RuntimeError( | |
| 'В DSN нужно явно указывать драйвер: postgresql+psycopg:// или ' | |
| 'postgresql+asyncpg://', | |
| ) | |
| return scheme.split('+', maxsplit=1)[1] | |
| def build_connect_args(driver: str, prepared_statements_enabled: bool) -> dict[str, Any]: | |
| if driver == 'psycopg': | |
| return { | |
| # включаем prepared statements для всех запросов | |
| 'prepare_threshold': 1 if prepared_statements_enabled else None, | |
| } | |
| if driver == 'asyncpg': | |
| if prepared_statements_enabled: | |
| return { | |
| 'prepared_statement_cache_size': 100, | |
| 'statement_cache_size': 100, | |
| } | |
| return { | |
| 'prepared_statement_name_func': lambda: '', | |
| 'prepared_statement_cache_size': 0, | |
| 'statement_cache_size': 0, | |
| } | |
| raise RuntimeError(f'Неподдерживаемый драйвер в DSN: {driver}') | |
| def build_engine(dsn: str, prepared_statements_enabled: bool) -> AsyncEngine: | |
| driver = parse_driver_from_dsn(dsn) | |
| connect_args = build_connect_args(driver, prepared_statements_enabled) | |
| return create_async_engine( | |
| dsn, | |
| isolation_level='AUTOCOMMIT', | |
| pool_size=-1, | |
| pool_reset_on_return=None, | |
| connect_args=connect_args, | |
| echo=False, | |
| ) | |
| def parse_username_from_dsn(dsn: str) -> str: | |
| username = make_url(dsn).username | |
| if not username: | |
| raise RuntimeError(f'Не удалось извлечь username из DSN: {dsn}') | |
| return username | |
| def quote_sql_identifier(value: str) -> str: | |
| escaped_value = value.replace('"', '""') | |
| return f'"{escaped_value}"' | |
| def generate_benchmark_rows(rows_count: int, faker: Faker) -> list[dict[str, str]]: | |
| rows: list[dict[str, str]] = [] | |
| for _ in range(rows_count): | |
| rows.append( | |
| { | |
| 'id': str(uuid.uuid4()), | |
| 'content': faker.text(max_nb_chars=240), | |
| } | |
| ) | |
| return rows | |
| async def ensure_benchmark_table_data(engine: AsyncEngine, owner: str) -> None: | |
| create_table_query = text( | |
| f''' | |
| create table if not exists public.{BENCHMARK_TABLE_NAME} | |
| ( | |
| id uuid not null primary key, | |
| content text not null, | |
| created_at timestamp with time zone default now() not null, | |
| updated_at timestamp with time zone default now() not null, | |
| sequence_id bigserial not null | |
| ); | |
| ''' | |
| ) | |
| table_owner_query = text( | |
| f''' | |
| alter table public.{BENCHMARK_TABLE_NAME} | |
| owner to {quote_sql_identifier(owner)}; | |
| ''' | |
| ) | |
| count_query = text(f'select count(*) from public.{BENCHMARK_TABLE_NAME};') | |
| insert_query = text( | |
| f''' | |
| insert into public.{BENCHMARK_TABLE_NAME} (id, content) | |
| values (:id, :content); | |
| ''' | |
| ) | |
| faker = Faker(FAKER_LOCALE) | |
| async with engine.begin() as connection: | |
| await connection.execute(create_table_query) | |
| await connection.execute(table_owner_query) | |
| rows_in_table_raw = await connection.scalar(count_query) | |
| rows_in_table = int(rows_in_table_raw) if rows_in_table_raw is not None else 0 | |
| rows_to_insert = max(0, BENCHMARK_ROWS_LIMIT - rows_in_table) | |
| if rows_to_insert == 0: | |
| return | |
| rows_payload = generate_benchmark_rows(rows_to_insert, faker) | |
| await connection.execute(insert_query, rows_payload) | |
| def build_session_factory(engine: AsyncEngine) -> async_sessionmaker[AsyncSession]: | |
| return async_sessionmaker( | |
| engine, | |
| expire_on_commit=False, | |
| autoflush=False, | |
| autobegin=True, | |
| ) | |
| async def make_query( | |
| factory: async_sessionmaker[AsyncSession], | |
| query: TextClause, | |
| ) -> Exception | None: | |
| try: | |
| async with factory() as session: | |
| await session.execute(query, {'sequence_id': 42}) | |
| except Exception as exc: | |
| logging.exception(exc) | |
| return exc | |
| return None | |
| async def bench_factory( | |
| factory: async_sessionmaker[AsyncSession], | |
| query: TextClause, | |
| total: int, | |
| repeat: int, | |
| ) -> list[float]: | |
| async def run_once() -> list[Exception | None]: | |
| return await asyncio.gather( | |
| *[make_query(factory, query) for _ in range(total)], | |
| return_exceptions=False, | |
| ) | |
| timings: list[float] = [] | |
| for _ in range(repeat): | |
| started_at = time.perf_counter() | |
| results = await run_once() | |
| elapsed = time.perf_counter() - started_at | |
| errors = [item for item in results if isinstance(item, Exception)] | |
| if errors: | |
| raise RuntimeError(f'Получено {len(errors)} ошибок во время бенчмарка.') | |
| timings.append(elapsed) | |
| return timings | |
| def result_to_row(result: BenchmarkResult) -> dict[str, str | float]: | |
| min_value = min(result.runs_seconds) | |
| max_value = max(result.runs_seconds) | |
| mean_value = fmean(result.runs_seconds) | |
| median_value = median(result.runs_seconds) | |
| stdev_value = stdev(result.runs_seconds) if len(result.runs_seconds) > 1 else 0.0 | |
| pstdev_value = pstdev(result.runs_seconds) | |
| row: dict[str, str | float] = { | |
| 'Factory': result.name, | |
| 'Min': min_value * 1000, | |
| 'Max': max_value * 1000, | |
| 'Mean': mean_value * 1000, | |
| 'Median': median_value * 1000, | |
| 'Stdev': stdev_value * 1000, | |
| 'PStdev': pstdev_value * 1000, | |
| } | |
| for index, value in enumerate(result.runs_seconds, start=1): | |
| row[f'Run{index}'] = value * 1000 | |
| return row | |
| def build_dataframe(results: list[BenchmarkResult]) -> pd.DataFrame: | |
| rows = [result_to_row(item) for item in results] | |
| return pd.DataFrame(rows) | |
| def print_stats(result: BenchmarkResult) -> None: | |
| print(f'Factory: {result.name}') | |
| print(f'Timings (s): {tuple(round(value, 5) for value in result.runs_seconds)}') | |
| print( | |
| f'Min: {min(result.runs_seconds):.5f}s, ' | |
| f'Max: {max(result.runs_seconds):.5f}s, ' | |
| f'Mean: {fmean(result.runs_seconds):.5f}s, ' | |
| f'Median: {median(result.runs_seconds):.5f}s, ' | |
| f'Stdev: {stdev(result.runs_seconds):.5f}s, ' | |
| f'PStdev: {pstdev(result.runs_seconds):.5f}s' | |
| ) | |
| print('=' * 80) | |
| def plot_results(dataframe: pd.DataFrame, output_path: str) -> None: | |
| run_columns = [column for column in dataframe.columns if column.startswith('Run')] | |
| df_runs = dataframe[['Factory'] + run_columns].set_index('Factory').T | |
| fig, axes = plt.subplots(2, 2, figsize=(15, 12)) | |
| ax1 = axes[0, 0] | |
| for factory in dataframe['Factory']: | |
| ax1.plot( | |
| range(1, len(run_columns) + 1), | |
| df_runs[factory], | |
| marker='o', | |
| label=factory, | |
| linewidth=2, | |
| ) | |
| ax1.set_xlabel('Номер запуска') | |
| ax1.set_ylabel('Время выполнения (ms)') | |
| ax1.set_title('Времена выполнения по запускам') | |
| ax1.legend() | |
| ax1.grid(True, alpha=0.3) | |
| ax2 = axes[0, 1] | |
| metrics = ['Min', 'Max', 'Mean', 'Median'] | |
| x_positions = range(len(metrics)) | |
| width = 0.25 | |
| for index, factory in enumerate(dataframe['Factory']): | |
| factory_row = dataframe[dataframe['Factory'] == factory].iloc[0] | |
| values = [float(factory_row[metric]) for metric in metrics] | |
| ax2.bar( | |
| [position + index * width for position in x_positions], | |
| values, | |
| width, | |
| label=factory, | |
| ) | |
| ax2.set_xlabel('Метрика') | |
| ax2.set_ylabel('Время выполнения (ms)') | |
| ax2.set_title('Сравнение статистических метрик') | |
| ax2.set_xticks([position + width for position in x_positions]) | |
| ax2.set_xticklabels(metrics) | |
| ax2.legend() | |
| ax2.grid(True, alpha=0.3, axis='y') | |
| ax3 = axes[1, 0] | |
| data_for_box = [df_runs[factory].to_numpy() for factory in dataframe['Factory']] | |
| box_plot = ax3.boxplot( | |
| data_for_box, | |
| tick_labels=list(dataframe['Factory']), | |
| patch_artist=True, | |
| ) | |
| colors = ['lightblue', 'lightgreen', 'lightcoral'] | |
| for patch, color in zip(box_plot['boxes'], colors): | |
| patch.set_facecolor(color) | |
| ax3.set_ylabel('Время выполнения (ms)') | |
| ax3.set_title('Распределение времен выполнения') | |
| ax3.grid(True, alpha=0.3, axis='y') | |
| ax4 = axes[1, 1] | |
| ax4.bar( | |
| dataframe['Factory'], | |
| dataframe['Stdev'], | |
| color=['lightblue', 'lightgreen', 'lightcoral'], | |
| ) | |
| ax4.set_xlabel('Factory') | |
| ax4.set_ylabel('Стандартное отклонение (ms)') | |
| ax4.set_title('Сравнение стандартных отклонений') | |
| ax4.grid(True, alpha=0.3, axis='y') | |
| plt.setp(ax3.get_xticklabels(), rotation=45, ha='right') | |
| plt.setp(ax4.get_xticklabels(), rotation=45, ha='right') | |
| plt.tight_layout() | |
| fig.savefig(output_path, dpi=150, bbox_inches='tight') | |
| plt.close(fig) | |
| def mask_dsn(dsn: str) -> str: | |
| """Скрывает пароль в DSN для вывода в лог.""" | |
| try: | |
| url = make_url(dsn) | |
| if url.password: | |
| url = url.set(password='****') | |
| return str(url) | |
| except Exception: | |
| return dsn.split('@')[-1] if '@' in dsn else '***' | |
| async def run(args: argparse.Namespace) -> pd.DataFrame: | |
| query = text(args.query) | |
| results: list[BenchmarkResult] = [] | |
| targets = build_targets() | |
| if args.interactive: | |
| print('Будет запущено бенчмарков:', len(targets)) | |
| print('Параметры: total=%s, repeat=%s' % (args.total, args.repeat)) | |
| print('Запрос:', args.query) | |
| print() | |
| for target in targets: | |
| if args.interactive: | |
| print('-' * 60) | |
| print('Следующий запуск: %s' % target.name) | |
| print(' prepared_statements: %s' % target.prepared_statements_enabled) | |
| print(' DSN: %s' % mask_dsn(target.dsn)) | |
| print(' Запросов за прогон: %s, прогонов: %s' % (args.total, args.repeat)) | |
| print('-' * 60) | |
| input('Нажмите Enter для запуска этого бенчмарка...') | |
| engine = build_engine( | |
| dsn=target.dsn, | |
| prepared_statements_enabled=target.prepared_statements_enabled, | |
| ) | |
| factory = build_session_factory(engine) | |
| try: | |
| if args.interactive: | |
| print('Проверка/создание таблицы %s...' % BENCHMARK_TABLE_NAME) | |
| owner = parse_username_from_dsn(target.dsn) | |
| await ensure_benchmark_table_data(engine, owner) | |
| if args.interactive: | |
| print('Запуск бенчмарка...') | |
| timings = await bench_factory(factory, query=query, total=args.total, repeat=args.repeat) | |
| result = BenchmarkResult(name=target.name, runs_seconds=timings) | |
| print_stats(result) | |
| results.append(result) | |
| finally: | |
| await engine.dispose() | |
| dataframe = build_dataframe(results) | |
| rounded_dataframe = dataframe.copy() | |
| numeric_columns = [col for col in rounded_dataframe.columns if col != 'Factory'] | |
| rounded_dataframe[numeric_columns] = rounded_dataframe[numeric_columns].round(3) | |
| print( | |
| tabulate.tabulate( | |
| rounded_dataframe, | |
| headers='keys', | |
| showindex=False, | |
| tablefmt='github', | |
| ) | |
| ) | |
| return dataframe | |
| def add_timestamp_to_path(path: str) -> str: | |
| """Добавляет к имени файла timestamp в формате YYYY-MM-DD_HH-MM-SS перед расширением.""" | |
| base, ext = os.path.splitext(path) | |
| ts = datetime.now(tz=timezone.utc).strftime('%Y-%m-%d_%H-%M-%S') | |
| return f'{base}_{ts}{ext}' | |
| def main() -> None: | |
| args = parse_args() | |
| dataframe = asyncio.run(run(args)) | |
| if not args.skip_plot: | |
| output_path = add_timestamp_to_path(args.plot_output) | |
| plot_results(dataframe, output_path) | |
| print(f'График сохранен в: {output_path}') | |
| if __name__ == '__main__': | |
| main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment