Skip to content

Instantly share code, notes, and snippets.

@Bahus
Created March 5, 2026 21:43
Show Gist options
  • Select an option

  • Save Bahus/65a1602f60be95a47ddf7ad2691859d3 to your computer and use it in GitHub Desktop.

Select an option

Save Bahus/65a1602f60be95a47ddf7ad2691859d3 to your computer and use it in GitHub Desktop.
Бенчмарки SQLAlchemy
"""
Бенчмарк SQLAlchemy (psycopg/asyncpg) через PgBouncer и прямой Postgres.
Переменные окружения (обязательные):
PGBOUNCER_DSN_NO_PREPARED — DSN к PgBouncer без prepared statements
PGBOUNCER_DSN_PREPARED — DSN к PgBouncer с включёнными prepared statements
POSTGRES_DSN_DIRECT — DSN напрямую к Postgres
Формат DSN: postgresql+psycopg://user:password@host:port/dbname
или postgresql+asyncpg://user:password@host:port/dbname
Ключевые параметры конфигурации PGBouncer:
pool_mode = transaction
max_client_conn = 1000
default_pool_size = 1000
max_prepared_statements = 500 - с включением prepared statements
max_prepared_statements = 0 - без включения prepared statements
Запуск бенчмарка через uv без pyproject.toml (one-liner):
uv run --with "sqlalchemy[asyncio]" --with "psycopg[binary,pool]" \\
--with faker --with matplotlib --with pandas --with tabulate \\
main.py [--total 100] [--repeat 25] [--interactive] [--skip-plot]
"""
import argparse
import asyncio
import logging
import os
import time
import uuid
from datetime import datetime, timezone
from dataclasses import dataclass
from statistics import fmean, median, pstdev, stdev
from typing import Any
import matplotlib.pyplot as plt
import pandas as pd
import tabulate
from faker import Faker
from sqlalchemy.engine import make_url
from sqlalchemy.ext.asyncio import (
AsyncEngine,
AsyncSession,
async_sessionmaker,
create_async_engine,
)
from sqlalchemy.sql import text
from sqlalchemy.sql.elements import TextClause
ENV_PGBOUNCER_DSN_PREPARED = 'PGBOUNCER_DSN_PREPARED'
ENV_PGBOUNCER_DSN_NO_PREPARED = 'PGBOUNCER_DSN_NO_PREPARED'
ENV_POSTGRES_DSN_DIRECT = 'POSTGRES_DSN_DIRECT'
DEFAULT_QUERY = 'select * from messages_benchmark where sequence_id = :sequence_id;'
BENCHMARK_TABLE_NAME = 'messages_benchmark'
BENCHMARK_ROWS_LIMIT = 1000
FAKER_LOCALE = 'ru_RU'
@dataclass(frozen=True)
class BenchmarkTarget:
name: str
dsn: str
prepared_statements_enabled: bool
@dataclass(frozen=True)
class BenchmarkResult:
name: str
runs_seconds: list[float]
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description='Benchmark SQLAlchemy + psycopg через pgbouncer и прямой Postgres.',
)
parser.add_argument('--total', type=int, default=100, help='Запросов в одном запуске.')
parser.add_argument('--repeat', type=int, default=25, help='Количество запусков бенчмарка.')
parser.add_argument('--query', type=str, default=DEFAULT_QUERY, help='SQL запрос для бенчмарка.')
parser.add_argument(
'--plot-output',
type=str,
default='direct_vs_prepared_not_prepared.png',
help='Путь для сохранения графика.',
)
parser.add_argument(
'--skip-plot',
action='store_true',
help='Не строить и не сохранять график.',
)
parser.add_argument(
'--interactive',
action='store_true',
help='Подробные логи и запрос нажатия Enter перед каждым запуском бенчмарка.',
)
return parser.parse_args()
def get_required_env(var_name: str) -> str:
value = os.getenv(var_name)
if not value:
raise RuntimeError(f'Не задана обязательная переменная окружения: {var_name}')
return value
def build_targets() -> list[BenchmarkTarget]:
return [
BenchmarkTarget(
name='psycopg_bouncer_not_prepared',
dsn=get_required_env(ENV_PGBOUNCER_DSN_NO_PREPARED),
prepared_statements_enabled=False,
),
BenchmarkTarget(
name='psycopg_bouncer_prepared',
dsn=get_required_env(ENV_PGBOUNCER_DSN_PREPARED),
prepared_statements_enabled=True,
),
BenchmarkTarget(
name='psycopg_direct_prepared',
dsn=get_required_env(ENV_POSTGRES_DSN_DIRECT),
prepared_statements_enabled=True,
),
]
def parse_driver_from_dsn(dsn: str) -> str:
scheme = dsn.split('://', maxsplit=1)[0]
if '+' not in scheme:
raise RuntimeError(
'В DSN нужно явно указывать драйвер: postgresql+psycopg:// или '
'postgresql+asyncpg://',
)
return scheme.split('+', maxsplit=1)[1]
def build_connect_args(driver: str, prepared_statements_enabled: bool) -> dict[str, Any]:
if driver == 'psycopg':
return {
# включаем prepared statements для всех запросов
'prepare_threshold': 1 if prepared_statements_enabled else None,
}
if driver == 'asyncpg':
if prepared_statements_enabled:
return {
'prepared_statement_cache_size': 100,
'statement_cache_size': 100,
}
return {
'prepared_statement_name_func': lambda: '',
'prepared_statement_cache_size': 0,
'statement_cache_size': 0,
}
raise RuntimeError(f'Неподдерживаемый драйвер в DSN: {driver}')
def build_engine(dsn: str, prepared_statements_enabled: bool) -> AsyncEngine:
driver = parse_driver_from_dsn(dsn)
connect_args = build_connect_args(driver, prepared_statements_enabled)
return create_async_engine(
dsn,
isolation_level='AUTOCOMMIT',
pool_size=-1,
pool_reset_on_return=None,
connect_args=connect_args,
echo=False,
)
def parse_username_from_dsn(dsn: str) -> str:
username = make_url(dsn).username
if not username:
raise RuntimeError(f'Не удалось извлечь username из DSN: {dsn}')
return username
def quote_sql_identifier(value: str) -> str:
escaped_value = value.replace('"', '""')
return f'"{escaped_value}"'
def generate_benchmark_rows(rows_count: int, faker: Faker) -> list[dict[str, str]]:
rows: list[dict[str, str]] = []
for _ in range(rows_count):
rows.append(
{
'id': str(uuid.uuid4()),
'content': faker.text(max_nb_chars=240),
}
)
return rows
async def ensure_benchmark_table_data(engine: AsyncEngine, owner: str) -> None:
create_table_query = text(
f'''
create table if not exists public.{BENCHMARK_TABLE_NAME}
(
id uuid not null primary key,
content text not null,
created_at timestamp with time zone default now() not null,
updated_at timestamp with time zone default now() not null,
sequence_id bigserial not null
);
'''
)
table_owner_query = text(
f'''
alter table public.{BENCHMARK_TABLE_NAME}
owner to {quote_sql_identifier(owner)};
'''
)
count_query = text(f'select count(*) from public.{BENCHMARK_TABLE_NAME};')
insert_query = text(
f'''
insert into public.{BENCHMARK_TABLE_NAME} (id, content)
values (:id, :content);
'''
)
faker = Faker(FAKER_LOCALE)
async with engine.begin() as connection:
await connection.execute(create_table_query)
await connection.execute(table_owner_query)
rows_in_table_raw = await connection.scalar(count_query)
rows_in_table = int(rows_in_table_raw) if rows_in_table_raw is not None else 0
rows_to_insert = max(0, BENCHMARK_ROWS_LIMIT - rows_in_table)
if rows_to_insert == 0:
return
rows_payload = generate_benchmark_rows(rows_to_insert, faker)
await connection.execute(insert_query, rows_payload)
def build_session_factory(engine: AsyncEngine) -> async_sessionmaker[AsyncSession]:
return async_sessionmaker(
engine,
expire_on_commit=False,
autoflush=False,
autobegin=True,
)
async def make_query(
factory: async_sessionmaker[AsyncSession],
query: TextClause,
) -> Exception | None:
try:
async with factory() as session:
await session.execute(query, {'sequence_id': 42})
except Exception as exc:
logging.exception(exc)
return exc
return None
async def bench_factory(
factory: async_sessionmaker[AsyncSession],
query: TextClause,
total: int,
repeat: int,
) -> list[float]:
async def run_once() -> list[Exception | None]:
return await asyncio.gather(
*[make_query(factory, query) for _ in range(total)],
return_exceptions=False,
)
timings: list[float] = []
for _ in range(repeat):
started_at = time.perf_counter()
results = await run_once()
elapsed = time.perf_counter() - started_at
errors = [item for item in results if isinstance(item, Exception)]
if errors:
raise RuntimeError(f'Получено {len(errors)} ошибок во время бенчмарка.')
timings.append(elapsed)
return timings
def result_to_row(result: BenchmarkResult) -> dict[str, str | float]:
min_value = min(result.runs_seconds)
max_value = max(result.runs_seconds)
mean_value = fmean(result.runs_seconds)
median_value = median(result.runs_seconds)
stdev_value = stdev(result.runs_seconds) if len(result.runs_seconds) > 1 else 0.0
pstdev_value = pstdev(result.runs_seconds)
row: dict[str, str | float] = {
'Factory': result.name,
'Min': min_value * 1000,
'Max': max_value * 1000,
'Mean': mean_value * 1000,
'Median': median_value * 1000,
'Stdev': stdev_value * 1000,
'PStdev': pstdev_value * 1000,
}
for index, value in enumerate(result.runs_seconds, start=1):
row[f'Run{index}'] = value * 1000
return row
def build_dataframe(results: list[BenchmarkResult]) -> pd.DataFrame:
rows = [result_to_row(item) for item in results]
return pd.DataFrame(rows)
def print_stats(result: BenchmarkResult) -> None:
print(f'Factory: {result.name}')
print(f'Timings (s): {tuple(round(value, 5) for value in result.runs_seconds)}')
print(
f'Min: {min(result.runs_seconds):.5f}s, '
f'Max: {max(result.runs_seconds):.5f}s, '
f'Mean: {fmean(result.runs_seconds):.5f}s, '
f'Median: {median(result.runs_seconds):.5f}s, '
f'Stdev: {stdev(result.runs_seconds):.5f}s, '
f'PStdev: {pstdev(result.runs_seconds):.5f}s'
)
print('=' * 80)
def plot_results(dataframe: pd.DataFrame, output_path: str) -> None:
run_columns = [column for column in dataframe.columns if column.startswith('Run')]
df_runs = dataframe[['Factory'] + run_columns].set_index('Factory').T
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
ax1 = axes[0, 0]
for factory in dataframe['Factory']:
ax1.plot(
range(1, len(run_columns) + 1),
df_runs[factory],
marker='o',
label=factory,
linewidth=2,
)
ax1.set_xlabel('Номер запуска')
ax1.set_ylabel('Время выполнения (ms)')
ax1.set_title('Времена выполнения по запускам')
ax1.legend()
ax1.grid(True, alpha=0.3)
ax2 = axes[0, 1]
metrics = ['Min', 'Max', 'Mean', 'Median']
x_positions = range(len(metrics))
width = 0.25
for index, factory in enumerate(dataframe['Factory']):
factory_row = dataframe[dataframe['Factory'] == factory].iloc[0]
values = [float(factory_row[metric]) for metric in metrics]
ax2.bar(
[position + index * width for position in x_positions],
values,
width,
label=factory,
)
ax2.set_xlabel('Метрика')
ax2.set_ylabel('Время выполнения (ms)')
ax2.set_title('Сравнение статистических метрик')
ax2.set_xticks([position + width for position in x_positions])
ax2.set_xticklabels(metrics)
ax2.legend()
ax2.grid(True, alpha=0.3, axis='y')
ax3 = axes[1, 0]
data_for_box = [df_runs[factory].to_numpy() for factory in dataframe['Factory']]
box_plot = ax3.boxplot(
data_for_box,
tick_labels=list(dataframe['Factory']),
patch_artist=True,
)
colors = ['lightblue', 'lightgreen', 'lightcoral']
for patch, color in zip(box_plot['boxes'], colors):
patch.set_facecolor(color)
ax3.set_ylabel('Время выполнения (ms)')
ax3.set_title('Распределение времен выполнения')
ax3.grid(True, alpha=0.3, axis='y')
ax4 = axes[1, 1]
ax4.bar(
dataframe['Factory'],
dataframe['Stdev'],
color=['lightblue', 'lightgreen', 'lightcoral'],
)
ax4.set_xlabel('Factory')
ax4.set_ylabel('Стандартное отклонение (ms)')
ax4.set_title('Сравнение стандартных отклонений')
ax4.grid(True, alpha=0.3, axis='y')
plt.setp(ax3.get_xticklabels(), rotation=45, ha='right')
plt.setp(ax4.get_xticklabels(), rotation=45, ha='right')
plt.tight_layout()
fig.savefig(output_path, dpi=150, bbox_inches='tight')
plt.close(fig)
def mask_dsn(dsn: str) -> str:
"""Скрывает пароль в DSN для вывода в лог."""
try:
url = make_url(dsn)
if url.password:
url = url.set(password='****')
return str(url)
except Exception:
return dsn.split('@')[-1] if '@' in dsn else '***'
async def run(args: argparse.Namespace) -> pd.DataFrame:
query = text(args.query)
results: list[BenchmarkResult] = []
targets = build_targets()
if args.interactive:
print('Будет запущено бенчмарков:', len(targets))
print('Параметры: total=%s, repeat=%s' % (args.total, args.repeat))
print('Запрос:', args.query)
print()
for target in targets:
if args.interactive:
print('-' * 60)
print('Следующий запуск: %s' % target.name)
print(' prepared_statements: %s' % target.prepared_statements_enabled)
print(' DSN: %s' % mask_dsn(target.dsn))
print(' Запросов за прогон: %s, прогонов: %s' % (args.total, args.repeat))
print('-' * 60)
input('Нажмите Enter для запуска этого бенчмарка...')
engine = build_engine(
dsn=target.dsn,
prepared_statements_enabled=target.prepared_statements_enabled,
)
factory = build_session_factory(engine)
try:
if args.interactive:
print('Проверка/создание таблицы %s...' % BENCHMARK_TABLE_NAME)
owner = parse_username_from_dsn(target.dsn)
await ensure_benchmark_table_data(engine, owner)
if args.interactive:
print('Запуск бенчмарка...')
timings = await bench_factory(factory, query=query, total=args.total, repeat=args.repeat)
result = BenchmarkResult(name=target.name, runs_seconds=timings)
print_stats(result)
results.append(result)
finally:
await engine.dispose()
dataframe = build_dataframe(results)
rounded_dataframe = dataframe.copy()
numeric_columns = [col for col in rounded_dataframe.columns if col != 'Factory']
rounded_dataframe[numeric_columns] = rounded_dataframe[numeric_columns].round(3)
print(
tabulate.tabulate(
rounded_dataframe,
headers='keys',
showindex=False,
tablefmt='github',
)
)
return dataframe
def add_timestamp_to_path(path: str) -> str:
"""Добавляет к имени файла timestamp в формате YYYY-MM-DD_HH-MM-SS перед расширением."""
base, ext = os.path.splitext(path)
ts = datetime.now(tz=timezone.utc).strftime('%Y-%m-%d_%H-%M-%S')
return f'{base}_{ts}{ext}'
def main() -> None:
args = parse_args()
dataframe = asyncio.run(run(args))
if not args.skip_plot:
output_path = add_timestamp_to_path(args.plot_output)
plot_results(dataframe, output_path)
print(f'График сохранен в: {output_path}')
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment