Guide complet et approfondi pour construire une application web scalable, performante et maintenable.
- Introduction et concepts fondamentaux
- Principes architecturaux
- Vue d'ensemble de l'architecture
- Structure des fichiers
- Le Backend Python
- Le Frontend Vue 3 TypeScript
- Communication Frontend-Backend
- Authentification et Sécurité
- Base de données PostgreSQL
- Routes et Middleware
- Gestion de l'état avancée
- Performances et optimisations
- Pièges courants et comment les éviter
- Scalabilité
- Déploiement
L'architecture que nous allons explorer est basée sur des principes éprouvés utilisés par des entreprises comme Netflix, Spotify, et Airbnb. Elle répond aux défis modernes du développement web :
Maintenabilité : Le code doit être compréhensible et modifiable par d'autres développeurs. La séparation des responsabilités (separation of concerns) permet à chaque partie du code d'avoir une raison unique de changer.
Scalabilité : L'application doit pouvoir croître sans refonte majeure. Une bonne architecture permet d'ajouter des features sans complexifier exponentiellement le code.
Testabilité : Un code bien structuré est facile à tester. Les tests garantissent que les changements ne cassent rien.
Résilience : L'application doit gérer gracieusement les erreurs, les pertes de connexion, et les cas limites.
Performance : Chaque milliseconde compte. Une mauvaise architecture peut rendre l'application lente et consommatrice de ressources.
┌─────────────────────────────────────────────────────────────┐
│ PRÉSENTATION (Vue 3 TypeScript) │
│ • Interface utilisateur réactive │
│ • Validation côté client │
│ • État local (composants) │
│ • Mise en cache temporaire │
└─────────────────────────────────────────────────────────────┘
↓↑
HTTP/HTTPS avec JWT
↓↑
┌─────────────────────────────────────────────────────────────┐
│ LOGIQUE MÉTIER (Python Flask/FastAPI) │
│ • Routes API (endpoints) │
│ • Validation des données │
│ • Authentification et autorisation │
│ • Orchestration de la logique métier │
│ • Transactions de base de données │
└─────────────────────────────────────────────────────────────┘
↓↑
SQL avec paramètres liés
↓↑
┌─────────────────────────────────────────────────────────────┐
│ PERSISTANCE (PostgreSQL) │
│ • Stockage des données durables │
│ • Intégrité des données (constraints) │
│ • Transactions ACID │
│ • Indexes pour les performances │
└─────────────────────────────────────────────────────────────┘
Chaque module doit avoir une seule raison de changer. Par exemple :
❌ Mauvais : Un fichier qui mélange requêtes HTTP, logique métier et stockage
async function getUserData(id: string) {
// Appel API
const response = await fetch(`/api/users/${id}`)
const data = await response.json()
// Logique métier
const processedData = data.map(user => ({...user, age: calculateAge(user.birthDate)}))
// Stockage local
localStorage.setItem('users', JSON.stringify(processedData))
return processedData
}✅ Bon : Chaque responsabilité est séparée
// Service API
class UserService {
async getUser(id: string): Promise<User> {
return api.get(`/users/${id}`)
}
}
// Logique métier
class UserProcessor {
processUsers(users: User[]): ProcessedUser[] {
return users.map(user => ({...user, age: this.calculateAge(user.birthDate)}))
}
}
// Stockage local
class UserCache {
set(users: ProcessedUser[]): void {
localStorage.setItem('users', JSON.stringify(users))
}
}Ne dupliquez pas le code. Si vous écrivez la même logique deux fois, créez une fonction.
Ces principes rendront votre code plus flexible et maintenable :
S - Single Responsibility : Une classe = une responsabilité O - Open/Closed : Ouvert à l'extension, fermé à la modification L - Liskov Substitution : Les classes dérivées doivent pouvoir remplacer les classes de base I - Interface Segregation : Les clients ne doivent pas dépendre de fonctionnalités qu'ils n'utilisent pas D - Dependency Inversion : Dépendre des abstractions, pas des implémentations concrètes
Imaginez qu'un utilisateur clique sur un bouton pour créer une commande :
1. [Navigateur] Utilisateur clique sur "Créer commande"
↓
2. [Vue Component] L'événement @click déclenche une action
↓
3. [Pinia Store] Une action du store est dispatched
↓
4. [Service API] Un appel HTTP POST est effectué
↓
5. [Intercepteur Axios] Le JWT token est attaché au header
↓
6. [Réseau] La requête traverse le réseau
↓
7. [Flask Server] La route reçoit la requête
↓
8. [Middleware CORS] Vérifier que l'origine est autorisée
↓
9. [Middleware Auth] Décoder et valider le JWT
↓
10. [Validation] Vérifier que les données sont valides
↓
11. [Service métier] Créer la commande (logique métier)
↓
12. [ORM SQLAlchemy] Générer et exécuter la requête SQL
↓
13. [PostgreSQL] Exécuter la transaction
↓
14. [ORM] Mapper le résultat en objet Python
↓
15. [Route] Sérialiser l'objet en JSON
↓
16. [Réseau] La réponse est renvoyée au navigateur
↓
17. [Intercepteur Axios] Traiter la réponse, gérer les erreurs
↓
18. [Pinia Store] Mettre à jour l'état global
↓
19. [Vue Component] L'interface se met à jour (réactivité)
↓
20. [Navigateur] L'utilisateur voit le résultat
Chaque étape est importante. Une erreur à n'importe quel niveau peut casser le flux.
CLIENT (Vue 3 + TypeScript)
├── Components (Affichage)
├── Composables (Logique réutilisable)
├── Stores (État global)
├── Services (Appels API)
└── Types (Typage TypeScript)
↓
HTTP + JWT Token
↓
SERVEUR (Python + Flask)
├── Routes (Endpoints)
├── Middleware (Authentification, validation)
├── Services (Logique métier)
├── Models (ORM)
└── Utils (Outils)
↓
SQL
↓
DATABASE (PostgreSQL)
└── Tables + Relations
mon-application/
│
├── backend/ # Application Python
│ ├── app/
│ │ ├── __init__.py # Factory pattern pour créer l'app
│ │ ├── config.py # Configuration (dev, prod, test)
│ │ │
│ │ ├── domain/ # Logique métier pure (sans dépendances)
│ │ │ ├── __init__.py
│ │ │ ├── user.py # Entités User
│ │ │ ├── product.py # Entités Product
│ │ │ ├── order.py # Entités Order
│ │ │ └── exceptions.py # Exceptions métier
│ │ │
│ │ ├── models.py # Modèles SQLAlchemy (ORM)
│ │ ├── schemas.py # Schémas validation (Marshmallow/Pydantic)
│ │ │
│ │ ├── middleware/
│ │ │ ├── __init__.py
│ │ │ ├── auth.py # JWT validation
│ │ │ ├── cors.py # CORS headers
│ │ │ ├── error_handler.py # Global error handling
│ │ │ ├── request_logger.py # Logging des requêtes
│ │ │ └── rate_limiter.py # Rate limiting
│ │ │
│ │ ├── routes/
│ │ │ ├── __init__.py
│ │ │ ├── auth.py # Auth endpoints
│ │ │ ├── users.py # User CRUD
│ │ │ ├── products.py # Product CRUD
│ │ │ ├── orders.py # Order CRUD
│ │ │ └── health.py # Health checks
│ │ │
│ │ ├── services/ # Logique métier
│ │ │ ├── __init__.py
│ │ │ ├── user_service.py # Business logic pour users
│ │ │ ├── product_service.py # Business logic pour products
│ │ │ ├── order_service.py # Business logic pour orders
│ │ │ └── auth_service.py # Business logic auth
│ │ │
│ │ ├── repositories/ # Data access layer (DAL)
│ │ │ ├── __init__.py
│ │ │ ├── base_repository.py # Classe de base pour toutes les repos
│ │ │ ├── user_repository.py # Requêtes spécifiques User
│ │ │ ├── product_repository.py # Requêtes spécifiques Product
│ │ │ └── order_repository.py # Requêtes spécifiques Order
│ │ │
│ │ ├── utils/
│ │ │ ├── __init__.py
│ │ │ ├── jwt_utils.py # JWT encoding/decoding
│ │ │ ├── password_utils.py # Password hashing
│ │ │ ├── validators.py # Custom validators
│ │ │ ├── decorators.py # Decorators réutilisables
│ │ │ └── helpers.py # Helper functions
│ │ │
│ │ └── security/
│ │ ├── __init__.py
│ │ ├── permissions.py # Permission checks
│ │ └── constants.py # Constantes de sécurité
│ │
│ ├── migrations/ # Alembic database migrations
│ │ ├── versions/
│ │ ├── env.py
│ │ └── script.py.mako
│ │
│ ├── tests/
│ │ ├── __init__.py
│ │ ├── conftest.py # Pytest fixtures
│ │ ├── test_auth.py
│ │ ├── test_users.py
│ │ ├── test_products.py
│ │ └── test_orders.py
│ │
│ ├── logs/ # Application logs
│ │ └── .gitkeep
│ │
│ ├── .env # Variables d'environnement (local)
│ ├── .env.example # Template
│ ├── .env.test # Variables de test
│ ├── requirements.txt # Dépendances Python
│ ├── requirements-dev.txt # Dépendances de développement
│ ├── wsgi.py # Entry point pour production
│ ├── run.py # Entry point pour développement
│ ├── pytest.ini # Configuration pytest
│ └── .flake8 # Configuration linter
│
├── frontend/ # Application Vue 3 TypeScript
│ ├── public/
│ │ ├── index.html
│ │ └── favicon.ico
│ │
│ ├── src/
│ │ ├── main.ts # Entry point
│ │ ├── App.vue # Root component
│ │ │
│ │ ├── types/ # Types TypeScript globaux
│ │ │ ├── index.ts
│ │ │ ├── user.ts # Types User
│ │ │ ├── product.ts # Types Product
│ │ │ ├── order.ts # Types Order
│ │ │ ├── api.ts # Types API responses
│ │ │ └── errors.ts # Types errors
│ │ │
│ │ ├── assets/
│ │ │ ├── styles/
│ │ │ │ ├── main.css # Styles globaux
│ │ │ │ ├── variables.css # Variables CSS (theming)
│ │ │ │ ├── components.css # Styles composants
│ │ │ │ ├── utilities.css # Classes utilitaires
│ │ │ │ └── animations.css # Animations
│ │ │ ├── images/
│ │ │ └── fonts/
│ │ │
│ │ ├── components/ # Composants réutilisables
│ │ │ ├── common/
│ │ │ │ ├── Navbar.vue
│ │ │ │ ├── Footer.vue
│ │ │ │ ├── Button.vue
│ │ │ │ ├── Modal.vue
│ │ │ │ ├── Loading.vue
│ │ │ │ └── ErrorAlert.vue
│ │ │ ├── forms/
│ │ │ │ ├── TextInput.vue
│ │ │ │ ├── SelectInput.vue
│ │ │ │ └── FormField.vue
│ │ │ └── products/
│ │ │ ├── ProductCard.vue
│ │ │ ├── ProductList.vue
│ │ │ └── ProductFilter.vue
│ │ │
│ │ ├── pages/ # Pages (routes)
│ │ │ ├── Home.vue
│ │ │ ├── Login.vue
│ │ │ ├── Register.vue
│ │ │ ├── Dashboard.vue
│ │ │ ├── Products.vue
│ │ │ ├── ProductDetail.vue
│ │ │ ├── Orders.vue
│ │ │ └── NotFound.vue
│ │ │
│ │ ├── router/
│ │ │ └── index.ts # Configuration Vue Router
│ │ │
│ │ ├── store/ # Pinia stores
│ │ │ ├── index.ts
│ │ │ ├── auth.ts # Auth state management
│ │ │ ├── user.ts # User state
│ │ │ ├── products.ts # Products state
│ │ │ ├── orders.ts # Orders state
│ │ │ └── ui.ts # UI state (modals, notifications)
│ │ │
│ │ ├── composables/ # Composables réutilisables
│ │ │ ├── useAuth.ts # Logique d'authentification
│ │ │ ├── useFetch.ts # Wrapper réactif pour fetch
│ │ │ ├── useForm.ts # Gestion formulaire
│ │ │ ├── useNotification.ts # Notifications toasts
│ │ │ └── useDebounce.ts # Debounce helper
│ │ │
│ │ ├── services/ # Services API
│ │ │ ├── api.ts # Configuration Axios
│ │ │ ├── authService.ts # Auth API calls
│ │ │ ├── userService.ts # User API calls
│ │ │ ├── productService.ts # Product API calls
│ │ │ └── orderService.ts # Order API calls
│ │ │
│ │ ├── utils/
│ │ │ ├── validators.ts # Form validators
│ │ │ ├── formatters.ts # Data formatters
│ │ │ ├── constants.ts # App constants
│ │ │ ├── logger.ts # Logging utility
│ │ │ └── storage.ts # LocalStorage wrapper
│ │ │
│ │ └── layouts/
│ │ ├── MainLayout.vue
│ │ ├── AuthLayout.vue
│ │ └── AdminLayout.vue
│ │
│ ├── .env.local # Variables d'environnement (local)
│ ├── .env.example
│ ├── .env.production
│ ├── package.json
│ ├── tsconfig.json # Configuration TypeScript
│ ├── vite.config.ts # Configuration Vite
│ ├── eslint.config.js # Configuration ESLint
│ ├── prettier.config.js # Configuration Prettier
│ └── vitest.config.ts # Configuration tests
│
├── docker-compose.yml # Services (DB, API, frontend)
├── .gitignore
├── .dockerignore
└── README.md
Séparation par couches : Les routes ne contiennent que le routing, la logique métier est dans les services, l'accès aux données dans les repositories.
Types TypeScript centralisés : Tous les types sont dans le dossier types/, ce qui les rend faciles à maintenir et à réutiliser.
Composables Vue : Les composables encapsulent la logique réutilisable (auth, fetch, forms) qui peut être partagée entre plusieurs composants.
Repositories : Couche d'accès aux données qui encapsule les requêtes SQL. Cela facilite les tests et les changements futurs de DB.
# Créer et activer un environnement virtuel
python -m venv venv
source venv/bin/activate # Linux/Mac
# ou
venv\Scripts\activate # Windows
# Installer les dépendances
pip install -r requirements.txt# Framework
Flask==3.0.0
Flask-CORS==4.0.0
python-dotenv==1.0.0
# Database
Flask-SQLAlchemy==3.1.1
psycopg2-binary==2.9.9
SQLAlchemy==2.0.23
alembic==1.12.1
# Security
PyJWT==2.8.1
bcrypt==4.1.1
cryptography==41.0.7
# Validation
marshmallow==3.20.1
marshmallow-sqlalchemy==0.29.0
# Production server
gunicorn==21.2.0
python-multipart==0.0.6
# Development
pytest==7.4.3
pytest-cov==4.1.0
black==23.12.0
flake8==6.1.0
from flask import Flask
from flask_cors import CORS
from flask_sqlalchemy import SQLAlchemy
import logging
from logging.handlers import RotatingFileHandler
import os
db = SQLAlchemy()
def create_app(config_name: str = 'development') -> Flask:
"""
Application factory pattern.
Avantages :
- Facilite les tests (on peut créer plusieurs instances)
- Permet différentes configurations par environnement
- Sépare la création de l'app de son utilisation
Args:
config_name: 'development', 'production', ou 'testing'
Returns:
Flask application instance
"""
app = Flask(__name__)
# Configuration
from app.config import config
app.config.from_object(config[config_name])
# Extensions
db.init_app(app)
# CORS - Important pour la communication Vue.js -> Flask
CORS(app,
resources={r"/api/*": {
"origins": app.config['CORS_ORIGINS'],
"methods": ["GET", "POST", "PUT", "DELETE", "OPTIONS"],
"allow_headers": ["Content-Type", "Authorization"],
"supports_credentials": True,
"max_age": 3600
}}
)
# Error handlers (middleware global)
from app.middleware.error_handler import register_error_handlers
register_error_handlers(app)
# Request logger
setup_logging(app)
# Blueprints (groupement de routes)
from app.routes import auth_bp, user_bp, product_bp, order_bp, health_bp
app.register_blueprint(health_bp)
app.register_blueprint(auth_bp, url_prefix='/api/auth')
app.register_blueprint(user_bp, url_prefix='/api/users')
app.register_blueprint(product_bp, url_prefix='/api/products')
app.register_blueprint(order_bp, url_prefix='/api/orders')
# Create tables
with app.app_context():
db.create_all()
# CLI commands
register_cli_commands(app)
return app
def setup_logging(app: Flask) -> None:
"""Configurer le logging pour debugging."""
if not app.debug:
if not os.path.exists('logs'):
os.mkdir('logs')
file_handler = RotatingFileHandler(
'logs/app.log',
maxBytes=10240000, # 10MB
backupCount=10
)
file_handler.setFormatter(logging.Formatter(
'%(asctime)s %(levelname)s: %(message)s [in %(pathname)s:%(lineno)d]'
))
file_handler.setLevel(logging.INFO)
app.logger.addHandler(file_handler)
app.logger.setLevel(logging.INFO)
def register_cli_commands(app: Flask) -> None:
"""Enregistrer des commandes CLI personnalisées."""
@app.cli.command()
def create_admin():
"""Créer un utilisateur administrateur."""
from app.models import User
from app.utils.password_utils import hash_password
username = input('Username: ')
email = input('Email: ')
password = input('Password: ')
if User.query.filter_by(email=email).first():
print('Erreur: Cet email existe déjà')
return
user = User(
username=username,
email=email,
password_hash=hash_password(password),
is_admin=True,
is_active=True
)
db.session.add(user)
db.session.commit()
print(f'Admin {username} créé avec succès')import os
from datetime import timedelta
from dotenv import load_dotenv
load_dotenv()
class Config:
"""Configuration commune."""
# Flask
SECRET_KEY = os.getenv('SECRET_KEY', 'change-me-in-production')
DEBUG = False
TESTING = False
# SQLAlchemy
SQLALCHEMY_TRACK_MODIFICATIONS = False
SQLALCHEMY_ECHO = False
# JWT
JWT_SECRET = os.getenv('JWT_SECRET', 'jwt-secret-key')
JWT_ALGORITHM = 'HS256'
JWT_EXPIRATION = timedelta(hours=24)
JWT_REFRESH_EXPIRATION = timedelta(days=30)
# CORS
CORS_ORIGINS = os.getenv('CORS_ORIGINS', 'http://localhost:3000').split(',')
# Pagination
DEFAULT_PAGE_SIZE = 10
MAX_PAGE_SIZE = 100
class DevelopmentConfig(Config):
"""Configuration développement."""
DEBUG = True
SQLALCHEMY_ECHO = True # Log les requêtes SQL
SQLALCHEMY_DATABASE_URI = os.getenv(
'DATABASE_URL',
'postgresql://postgres:password@localhost:5432/app_dev'
)
class ProductionConfig(Config):
"""Configuration production."""
DEBUG = False
SQLALCHEMY_DATABASE_URI = os.getenv('DATABASE_URL')
# Vérifier que les variables critiques sont définies
if not SQLALCHEMY_DATABASE_URI:
raise ValueError('DATABASE_URL non défini en production')
class TestingConfig(Config):
"""Configuration tests."""
TESTING = True
SQLALCHEMY_DATABASE_URI = 'sqlite:///:memory:'
JWT_EXPIRATION = timedelta(minutes=5)
config = {
'development': DevelopmentConfig,
'production': ProductionConfig,
'testing': TestingConfig,
'default': DevelopmentConfig
}from app import db
from datetime import datetime
from uuid import uuid4
import enum
class BaseModel(db.Model):
"""Classe de base pour tous les modèles."""
__abstract__ = True
id = db.Column(db.String(36), primary_key=True, default=lambda: str(uuid4()))
created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False)
def to_dict(self) -> dict:
"""Convertir le modèle en dictionnaire."""
raise NotImplementedError('to_dict() doit être implémenté par la classe enfant')
class User(BaseModel):
"""Modèle utilisateur."""
__tablename__ = 'users'
# Colonnes
email = db.Column(db.String(120), unique=True, nullable=False, index=True)
username = db.Column(db.String(80), unique=True, nullable=False, index=True)
password_hash = db.Column(db.String(255), nullable=False)
full_name = db.Column(db.String(120))
is_active = db.Column(db.Boolean, default=True, nullable=False)
is_admin = db.Column(db.Boolean, default=False, nullable=False)
last_login = db.Column(db.DateTime)
# Relations
orders = db.relationship('Order', backref='user', lazy='select', cascade='all, delete-orphan')
def to_dict(self) -> dict:
"""Convertir en dictionnaire (sans données sensibles)."""
return {
'id': self.id,
'email': self.email,
'username': self.username,
'full_name': self.full_name,
'is_active': self.is_active,
'is_admin': self.is_admin,
'created_at': self.created_at.isoformat() if self.created_at else None,
'updated_at': self.updated_at.isoformat() if self.updated_at else None
}
def set_last_login(self) -> None:
"""Mettre à jour le dernier login."""
self.last_login = datetime.utcnow()
db.session.commit()
class Product(BaseModel):
"""Modèle produit."""
__tablename__ = 'products'
# Colonnes
name = db.Column(db.String(255), nullable=False, index=True)
description = db.Column(db.Text)
price = db.Column(db.Float, nullable=False, index=True)
cost = db.Column(db.Float) # Prix de revient (pour calculer les marges)
stock = db.Column(db.Integer, default=0, nullable=False)
sku = db.Column(db.String(100), unique=True, index=True)
is_active = db.Column(db.Boolean, default=True, nullable=False)
# Relations
order_items = db.relationship('OrderItem', backref='product', lazy='select', cascade='all, delete-orphan')
# Validations
__table_args__ = (
db.CheckConstraint('price >= 0', name='check_positive_price'),
db.CheckConstraint('stock >= 0', name='check_non_negative_stock'),
)
def to_dict(self) -> dict:
return {
'id': self.id,
'name': self.name,
'description': self.description,
'price': float(self.price),
'cost': float(self.cost) if self.cost else None,
'stock': self.stock,
'sku': self.sku,
'is_active': self.is_active,
'created_at': self.created_at.isoformat() if self.created_at else None
}
@property
def margin_percentage(self) -> float:
"""Calculer le pourcentage de marge."""
if not self.cost or self.cost == 0:
return 0
return ((self.price - self.cost) / self.cost) * 100
def has_sufficient_stock(self, quantity: int) -> bool:
"""Vérifier s'il y a assez de stock."""
return self.stock >= quantity
def decrease_stock(self, quantity: int) -> None:
"""Diminuer le stock (pour les commandes)."""
if not self.has_sufficient_stock(quantity):
raise ValueError(f'Stock insuffisant. Disponible: {self.stock}')
self.stock -= quantity
db.session.commit()
class Order(BaseModel):
"""Modèle commande."""
__tablename__ = 'orders'
# Énumération pour les statuts
class Status(enum.Enum):
PENDING = 'pending'
CONFIRMED = 'confirmed'
SHIPPED = 'shipped'
DELIVERED = 'delivered'
CANCELLED = 'cancelled'
# Colonnes
user_id = db.Column(db.String(36), db.ForeignKey('users.id'), nullable=False)
total_amount = db.Column(db.Float, nullable=False)
status = db.Column(db.String(20), default=Status.PENDING.value, nullable=False, index=True)
shipping_address = db.Column(db.String(255))
notes = db.Column(db.Text)
# Relations
items = db.relationship('OrderItem', backref='order', lazy='select', cascade='all, delete-orphan')
# Validations
__table_args__ = (
db.CheckConstraint('total_amount >= 0', name='check_positive_amount'),
)
def to_dict(self, include_items: bool = True) -> dict:
data = {
'id': self.id,
'user_id': self.user_id,
'total_amount': float(self.total_amount),
'status': self.status,
'shipping_address': self.shipping_address,
'created_at': self.created_at.isoformat() if self.created_at else None
}
if include_items:
data['items'] = [item.to_dict() for item in self.items]
return data
def can_be_cancelled(self) -> bool:
"""Vérifier si la commande peut être annulée."""
return self.status in [self.Status.PENDING.value, self.Status.CONFIRMED.value]
def cancel(self) -> None:
"""Annuler la commande et restaurer le stock."""
if not self.can_be_cancelled():
raise ValueError(f'Impossible d\'annuler une commande avec le statut {self.status}')
for item in self.items:
item.product.stock += item.quantity
self.status = self.Status.CANCELLED.value
db.session.commit()
class OrderItem(BaseModel):
"""Modèle article de commande."""
__tablename__ = 'order_items'
# Colonnes
order_id = db.Column(db.String(36), db.ForeignKey('orders.id'), nullable=False)
product_id = db.Column(db.String(36), db.ForeignKey('products.id'), nullable=False)
quantity = db.Column(db.Integer, nullable=False)
unit_price = db.Column(db.Float, nullable=False) # Prix au moment de la commande
# Validations
__table_args__ = (
db.CheckConstraint('quantity > 0', name='check_positive_quantity'),
db.CheckConstraint('unit_price >= 0', name='check_non_negative_unit_price'),
)
def to_dict(self) -> dict:
return {
'id': self.id,
'product_id': self.product_id,
'product_name': self.product.name,
'quantity': self.quantity,
'unit_price': float(self.unit_price),
'subtotal': self.quantity * float(self.unit_price)
}
@property
def subtotal(self) -> float:
"""Calculer le sous-total."""
return self.quantity * self.unit_pricefrom app import db
from typing import List, Tuple, Optional, TypeVar, Generic
from sqlalchemy import desc
T = TypeVar('T')
class BaseRepository(Generic[T]):
"""
Classe de base pour tous les repositories.
Avantages du pattern Repository :
1. Encapsule la logique d'accès aux données
2. Facilite les tests (on peut mocker facilement)
3. Centralise les requêtes (plus facile de changer de DB)
4. Réduit la duplication de code
"""
def __init__(self, model: T):
self.model = model
def get_by_id(self, id: str) -> Optional[T]:
"""Récupérer par ID."""
return self.model.query.get(id)
def get_all(self, page: int = 1, limit: int = 10, sort_by: str = 'created_at',
descending: bool = True) -> Tuple[List[T], int]:
"""
Récupérer tous les éléments avec pagination.
Args:
page: Numéro de page (commence à 1)
limit: Nombre d'éléments par page
sort_by: Champ de tri
descending: Tri décroissant si True
Returns:
Tuple de (liste d'éléments, total)
"""
query = self.model.query
# Vérifier que le champ de tri existe
if not hasattr(self.model, sort_by):
raise ValueError(f'Champ de tri {sort_by} n\'existe pas')
# Appliquer le tri
sort_column = getattr(self.model, sort_by)
if descending:
query = query.order_by(desc(sort_column))
else:
query = query.order_by(sort_column)
# Appliquer la pagination
total = query.count()
items = query.paginate(page=page, per_page=limit).items
return items, total
def create(self, **kwargs) -> T:
"""Créer un nouvel élément."""
item = self.model(**kwargs)
db.session.add(item)
db.session.commit()
return item
def update(self, id: str, **kwargs) -> Optional[T]:
"""Mettre à jour un élément."""
item = self.get_by_id(id)
if not item:
return None
for key, value in kwargs.items():
if hasattr(item, key):
setattr(item, key, value)
db.session.commit()
return item
def delete(self, id: str) -> bool:
"""Supprimer un élément."""
item = self.get_by_id(id)
if not item:
return False
db.session.delete(item)
db.session.commit()
return True
def filter_by(self, **kwargs) -> List[T]:
"""Filtrer les éléments."""
return self.model.query.filter_by(**kwargs).all()
def exists(self, **kwargs) -> bool:
"""Vérifier si un élément existe."""
return self.model.query.filter_by(**kwargs).first() is not Nonefrom app.models import Product
from app.repositories.base_repository import BaseRepository
from sqlalchemy import and_, or_
from typing import List, Tuple
class ProductRepository(BaseRepository[Product]):
"""Repository pour les produits."""
def search(self, keyword: str, min_price: float = 0, max_price: float = float('inf'),
in_stock_only: bool = False, page: int = 1, limit: int = 10) -> Tuple[List[Product], int]:
"""
Rechercher des produits avec filtres.
Args:
keyword: Terme de recherche
min_price: Prix minimum
max_price: Prix maximum
in_stock_only: Seulement les produits en stock
page: Numéro de page
limit: Limite par page
Returns:
Tuple de (produits, total)
"""
query = Product.query.filter(
Product.name.ilike(f'%{keyword}%'),
Product.price >= min_price,
Product.price <= max_price,
Product.is_active == True
)
if in_stock_only:
query = query.filter(Product.stock > 0)
total = query.count()
items = query.paginate(page=page, per_page=limit).items
return items, total
def get_low_stock_products(self, threshold: int = 10) -> List[Product]:
"""Récupérer les produits avec peu de stock."""
return Product.query.filter(
Product.stock <= threshold,
Product.is_active == True
).all()
def get_by_sku(self, sku: str) -> Product:
"""Récupérer un produit par SKU."""
return Product.query.filter_by(sku=sku).first()
def get_best_sellers(self, limit: int = 10) -> List[Product]:
"""Récupérer les produits les plus vendus."""
from app.models import OrderItem
from sqlalchemy import func
return db.session.query(Product).join(OrderItem).group_by(Product.id).order_by(
desc(func.sum(OrderItem.quantity))
).limit(limit).all()from app.models import Product, Order, OrderItem
from app.repositories.product_repository import ProductRepository
from app import db
from typing import List, Tuple, Dict
from flask import current_app
class ProductService:
"""
Service métier pour les produits.
La séparation entre Repository et Service est importante :
- Repository : Comment accéder aux données (WHERE, JOIN, etc.)
- Service : Quoi faire avec les données (logique métier)
Exemple :
- Repository.search() retourne les produits avec un keyword
- Service.get_recommended_products() utilise plusieurs repositories
et applique une logique métier complexe
"""
def __init__(self):
self.repository = ProductRepository(Product)
def get_all_products(self, page: int = 1, limit: int = 10,
sort_by: str = 'created_at') -> Tuple[List[Dict], int]:
"""Récupérer tous les produits."""
products, total = self.repository.get_all(page=page, limit=limit, sort_by=sort_by)
return [p.to_dict() for p in products], total
def get_product_by_id(self, product_id: str) -> Dict:
"""Récupérer un produit par ID."""
product = self.repository.get_by_id(product_id)
if not product:
raise ValueError(f'Produit {product_id} non trouvé')
return product.to_dict()
def create_product(self, data: Dict) -> Dict:
"""Créer un nouveau produit."""
# Validation métier
if data.get('price', 0) < 0:
raise ValueError('Le prix ne peut pas être négatif')
if data.get('stock', 0) < 0:
raise ValueError('Le stock ne peut pas être négatif')
# Vérifier l'unicité du SKU
if 'sku' in data and self.repository.get_by_sku(data['sku']):
raise ValueError(f'SKU {data["sku"]} déjà utilisé')
product = self.repository.create(**data)
current_app.logger.info(f'Produit créé: {product.id}')
return product.to_dict()
def update_product(self, product_id: str, data: Dict) -> Dict:
"""Mettre à jour un produit."""
product = self.repository.update(product_id, **data)
if not product:
raise ValueError(f'Produit {product_id} non trouvé')
current_app.logger.info(f'Produit mis à jour: {product_id}')
return product.to_dict()
def delete_product(self, product_id: str) -> None:
"""Supprimer un produit."""
# Vérifier qu'il n'y a pas de commandes associées
if OrderItem.query.filter_by(product_id=product_id).first():
raise ValueError('Impossible de supprimer un produit ayant des commandes')
if not self.repository.delete(product_id):
raise ValueError(f'Produit {product_id} non trouvé')
current_app.logger.info(f'Produit supprimé: {product_id}')
def get_low_stock_products(self) -> List[Dict]:
"""Récupérer les produits avec peu de stock."""
threshold = current_app.config.get('LOW_STOCK_THRESHOLD', 10)
products = self.repository.get_low_stock_products(threshold)
return [p.to_dict() for p in products]
def search_products(self, keyword: str, min_price: float = 0,
max_price: float = float('inf'),
in_stock_only: bool = False,
page: int = 1, limit: int = 10) -> Tuple[List[Dict], int]:
"""Rechercher des produits."""
products, total = self.repository.search(
keyword=keyword,
min_price=min_price,
max_price=max_price,
in_stock_only=in_stock_only,
page=page,
limit=limit
)
return [p.to_dict() for p in products], total
def update_stock(self, product_id: str, new_quantity: int) -> Dict:
"""
Mettre à jour le stock.
NOTE IMPORTANTE : Cette opération est sensible !
En production, vous voudrez peut-être :
- Utiliser des transactions de base de données
- Implémenter une queue d'événements
- Ajouter de l'audit logging
"""
product = self.repository.get_by_id(product_id)
if not product:
raise ValueError(f'Produit {product_id} non trouvé')
old_quantity = product.stock
product.stock = new_quantity
db.session.commit()
current_app.logger.info(
f'Stock mis à jour pour {product_id}: {old_quantity} -> {new_quantity}'
)
return product.to_dict()from flask import Blueprint, request, jsonify
from app.services.product_service import ProductService
from app.middleware.auth import token_required, admin_required
from app.utils.validators import validate_pagination
from marshmallow import ValidationError
product_bp = Blueprint('products', __name__)
product_service = ProductService()
@product_bp.route('', methods=['GET'])
def get_products():
"""
GET /api/products
Récupérer tous les produits avec pagination.
Query params:
- page: int (default: 1)
- limit: int (default: 10, max: 100)
- sort_by: str (default: created_at)
Response:
{
"data": [...],
"total": 100,
"page": 1,
"limit": 10
}
"""
try:
page = request.args.get('page', 1, type=int)
limit = request.args.get('limit', 10, type=int)
sort_by = request.args.get('sort_by', 'created_at', type=str)
# Validation
page, limit = validate_pagination(page, limit)
products, total = product_service.get_all_products(
page=page,
limit=limit,
sort_by=sort_by
)
return jsonify({
'data': products,
'total': total,
'page': page,
'limit': limit,
'has_next': page * limit < total
}), 200
except ValueError as e:
return jsonify({'error': str(e)}), 400
except Exception as e:
return jsonify({'error': 'Erreur serveur'}), 500
@product_bp.route('/<product_id>', methods=['GET'])
def get_product(product_id: str):
"""
GET /api/products/<id>
Récupérer un produit spécifique.
"""
try:
product = product_service.get_product_by_id(product_id)
return jsonify(product), 200
except ValueError as e:
return jsonify({'error': str(e)}), 404
except Exception as e:
return jsonify({'error': 'Erreur serveur'}), 500
@product_bp.route('', methods=['POST'])
@admin_required
def create_product():
"""
POST /api/products
Créer un nouveau produit (admin seulement).
Body:
{
"name": "Product Name",
"description": "Description",
"price": 99.99,
"cost": 50.00,
"stock": 100,
"sku": "PROD-001"
}
"""
try:
data = request.get_json()
# Validation basique
if not data.get('name'):
return jsonify({'error': 'Le nom est requis'}), 400
if not data.get('price'):
return jsonify({'error': 'Le prix est requis'}), 400
product = product_service.create_product(data)
return jsonify(product), 201
except ValueError as e:
return jsonify({'error': str(e)}), 400
except Exception as e:
return jsonify({'error': 'Erreur serveur'}), 500
@product_bp.route('/<product_id>', methods=['PUT'])
@admin_required
def update_product(product_id: str):
"""
PUT /api/products/<id>
Mettre à jour un produit (admin seulement).
"""
try:
data = request.get_json()
product = product_service.update_product(product_id, data)
return jsonify(product), 200
except ValueError as e:
return jsonify({'error': str(e)}), 404
except Exception as e:
return jsonify({'error': 'Erreur serveur'}), 500
@product_bp.route('/<product_id>', methods=['DELETE'])
@admin_required
def delete_product(product_id: str):
"""
DELETE /api/products/<id>
Supprimer un produit (admin seulement).
"""
try:
product_service.delete_product(product_id)
return jsonify({'message': 'Produit supprimé'}), 200
except ValueError as e:
return jsonify({'error': str(e)}), 400
except Exception as e:
return jsonify({'error': 'Erreur serveur'}), 500
@product_bp.route('/search', methods=['GET'])
def search_products():
"""
GET /api/products/search
Rechercher des produits.
Query params:
- keyword: str (requis)
- min_price: float (default: 0)
- max_price: float (default: infinity)
- in_stock_only: bool (default: false)
- page: int (default: 1)
- limit: int (default: 10)
"""
try:
keyword = request.args.get('keyword', '', type=str)
if not keyword:
return jsonify({'error': 'Keyword requis'}), 400
min_price = request.args.get('min_price', 0, type=float)
max_price = request.args.get('max_price', float('inf'), type=float)
in_stock_only = request.args.get('in_stock_only', False, type=bool)
page = request.args.get('page', 1, type=int)
limit = request.args.get('limit', 10, type=int)
products, total = product_service.search_products(
keyword=keyword,
min_price=min_price,
max_price=max_price,
in_stock_only=in_stock_only,
page=page,
limit=limit
)
return jsonify({
'data': products,
'total': total,
'page': page
}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500# Créer le projet
npm create vite@latest mon-app -- --template vue
cd mon-app
# Installer les dépendances
npm install
npm install -D typescript vue-tsc
npm install axios pinia vue-router
npm install -D tailwindcss postcss autoprefixer/**
* Types centralisés
*
* Avantages:
* - Un seul endroit pour modifier les types
* - Partage facile entre composants
* - Autocomplétion meilleure
* - Erreurs détectées à la compilation
*/
// User types
export interface User {
id: string
email: string
username: string
full_name: string | null
is_active: boolean
is_admin: boolean
created_at: string
last_login: string | null
}
export interface LoginRequest {
email: string
password: string
}
export interface LoginResponse {
token: string
refreshToken: string
user: User
}
export interface RegisterRequest {
email: string
username: string
password: string
full_name?: string
}
// Product types
export interface Product {
id: string
name: string
description: string | null
price: number
cost: number | null
stock: number
sku: string
is_active: boolean
created_at: string
}
export interface CreateProductRequest {
name: string
description?: string
price: number
cost?: number
stock: number
sku: string
}
// Order types
export interface OrderItem {
id: string
product_id: string
product_name: string
quantity: number
unit_price: number
subtotal: number
}
export enum OrderStatus {
PENDING = 'pending',
CONFIRMED = 'confirmed',
SHIPPED = 'shipped',
DELIVERED = 'delivered',
CANCELLED = 'cancelled'
}
export interface Order {
id: string
user_id: string
total_amount: number
status: OrderStatus
shipping_address?: string
items: OrderItem[]
created_at: string
}
export interface CreateOrderRequest {
items: {
product_id: string
quantity: number
}[]
shipping_address: string
}
// API Response types
export interface ApiResponse<T> {
data: T
message?: string
}
export interface PaginatedResponse<T> {
data: T[]
total: number
page: number
limit: number
has_next: boolean
}
// Error types
export interface ApiError {
error: string
details?: Record<string, any>
status: number
}
export interface ValidationError {
field: string
message: string
}
// Auth state
export interface AuthState {
user: User | null
token: string | null
refreshToken: string | null
loading: boolean
error: string | null
}
// UI state
export interface NotificationState {
id: string
type: 'success' | 'error' | 'warning' | 'info'
message: string
duration?: number
}/**
* Configuration Axios centralisée avec interceptors
*
* Les interceptors sont essentiels pour :
* - Ajouter le JWT token à chaque requête
* - Gérer le refresh token
* - Gérer les erreurs globales
* - Logger les requêtes
*/
import axios, { AxiosInstance, AxiosError, AxiosResponse } from 'axios'
import { useAuthStore } from '../store/auth'
import type { ApiError } from '../types'
const API_BASE_URL = import.meta.env.VITE_API_URL || 'http://localhost:5000/api'
// Créer l'instance Axios
export const api: AxiosInstance = axios.create({
baseURL: API_BASE_URL,
headers: {
'Content-Type': 'application/json'
}
})
// Intercepteur de requête - Ajouter le token JWT
api.interceptors.request.use(
(config) => {
const authStore = useAuthStore()
if (authStore.token) {
config.headers.Authorization = `Bearer ${authStore.token}`
}
// Logger les requêtes en développement
if (import.meta.env.DEV) {
console.log(`[API] ${config.method?.toUpperCase()} ${config.url}`)
}
return config
},
(error) => {
return Promise.reject(error)
}
)
// Intercepteur de réponse - Gérer les erreurs et le refresh token
api.interceptors.response.use(
(response) => {
return response
},
async (error: AxiosError) => {
const authStore = useAuthStore()
const originalRequest = error.config as any
// Si 401 et qu'on a un refresh token, essayer de rafraîchir
if (error.response?.status === 401 && authStore.refreshToken) {
try {
const response = await axios.post(`${API_BASE_URL}/auth/refresh`, {
refreshToken: authStore.refreshToken
})
const { token, refreshToken } = response.data
authStore.setTokens(token, refreshToken)
// Retry la requête originale avec le nouveau token
originalRequest.headers.Authorization = `Bearer ${token}`
return api(originalRequest)
} catch (refreshError) {
// Le refresh a échoué, se déconnecter
authStore.logout()
window.location.href = '/login'
return Promise.reject(refreshError)
}
}
// Si 401 et pas de refresh token, se déconnecter
if (error.response?.status === 401) {
authStore.logout()
window.location.href = '/login'
}
// Formatter l'erreur
const apiError: ApiError = {
error: error.response?.data?.error || error.message || 'Erreur inconnue',
status: error.response?.status || 500,
details: error.response?.data?.details
}
return Promise.reject(apiError)
}
)
export default apiimport api from './api'
import type { LoginRequest, LoginResponse, RegisterRequest, User, ApiResponse } from '../types'
export const authService = {
/**
* S'inscrire
*/
async register(data: RegisterRequest) {
const response = await api.post<ApiResponse<LoginResponse>>('/auth/register', data)
return response.data.data
},
/**
* Se connecter
*/
async login(data: LoginRequest) {
const response = await api.post<ApiResponse<LoginResponse>>('/auth/login', data)
return response.data.data
},
/**
* Rafraîchir le token
*/
async refreshToken(refreshToken: string) {
const response = await api.post<ApiResponse<LoginResponse>>('/auth/refresh', {
refreshToken
})
return response.data.data
},
/**
* Se déconnecter
*/
async logout() {
try {
await api.post('/auth/logout')
} catch (error) {
// On se déconnecte quand même, même si l'appel échoue
}
},
/**
* Récupérer le profil utilisateur
*/
async getProfile(): Promise<User> {
const response = await api.get<ApiResponse<User>>('/auth/profile')
return response.data.data
}
}import api from './api'
import type { Product, CreateProductRequest, PaginatedResponse, ApiResponse } from '../types'
export const productService = {
/**
* Récupérer tous les produits
*/
async getAll(page: number = 1, limit: number = 10, sort_by: string = 'created_at') {
const response = await api.get<PaginatedResponse<Product>>('/products', {
params: { page, limit, sort_by }
})
return response.data
},
/**
* Récupérer un produit par ID
*/
async getById(id: string): Promise<Product> {
const response = await api.get<ApiResponse<Product>>(`/products/${id}`)
return response.data.data
},
/**
* Créer un produit
*/
async create(data: CreateProductRequest): Promise<Product> {
const response = await api.post<ApiResponse<Product>>('/products', data)
return response.data.data
},
/**
* Mettre à jour un produit
*/
async update(id: string, data: Partial<CreateProductRequest>): Promise<Product> {
const response = await api.put<ApiResponse<Product>>(`/products/${id}`, data)
return response.data.data
},
/**
* Supprimer un produit
*/
async delete(id: string): Promise<void> {
await api.delete(`/products/${id}`)
},
/**
* Rechercher des produits
*/
async search(
keyword: string,
filters?: {
min_price?: number
max_price?: number
in_stock_only?: boolean
page?: number
limit?: number
}
) {
const response = await api.get<PaginatedResponse<Product>>('/products/search', {
params: { keyword, ...filters }
})
return response.data
}
}/**
* Store d'authentification
*
* Pinia vs Vuex:
* - Plus simple et plus léger
* - Meilleur support TypeScript
* - API plus moderne
* - Pas de mutations
*/
import { defineStore } from 'pinia'
import { ref, computed } from 'vue'
import { authService } from '../services/authService'
import type { User, AuthState, LoginRequest, RegisterRequest } from '../types'
const STORAGE_KEYS = {
TOKEN: 'auth_token',
REFRESH_TOKEN: 'auth_refresh_token'
}
export const useAuthStore = defineStore('auth', () => {
// State
const user = ref<User | null>(null)
const token = ref<string | null>(localStorage.getItem(STORAGE_KEYS.TOKEN))
const refreshToken = ref<string | null>(localStorage.getItem(STORAGE_KEYS.REFRESH_TOKEN))
const loading = ref(false)
const error = ref<string | null>(null)
// Computed
const isAuthenticated = computed(() => !!token.value)
const currentUser = computed(() => user.value)
// Actions
async function register(data: RegisterRequest) {
loading.value = true
error.value = null
try {
const response = await authService.register(data)
setTokens(response.token, response.refreshToken)
user.value = response.user
return response
} catch (err) {
error.value = err instanceof Error ? err.message : 'Erreur lors de l\'inscription'
throw err
} finally {
loading.value = false
}
}
async function login(credentials: LoginRequest) {
loading.value = true
error.value = null
try {
const response = await authService.login(credentials)
setTokens(response.token, response.refreshToken)
user.value = response.user
return response
} catch (err) {
error.value = err instanceof Error ? err.message : 'Erreur lors de la connexion'
throw err
} finally {
loading.value = false
}
}
function setTokens(newToken: string, newRefreshToken: string) {
token.value = newToken
refreshToken.value = newRefreshToken
localStorage.setItem(STORAGE_KEYS.TOKEN, newToken)
localStorage.setItem(STORAGE_KEYS.REFRESH_TOKEN, newRefreshToken)
}
async function logout() {
try {
await authService.logout()
} finally {
clearAuth()
}
}
function clearAuth() {
token.value = null
refreshToken.value = null
user.value = null
localStorage.removeItem(STORAGE_KEYS.TOKEN)
localStorage.removeItem(STORAGE_KEYS.REFRESH_TOKEN)
}
async function fetchProfile() {
if (!token.value) return
try {
user.value = await authService.getProfile()
} catch (err) {
if (err instanceof Error && err.message.includes('401')) {
clearAuth()
}
}
}
// Hydrater le store au démarrage
async function initializeAuth() {
if (token.value) {
try {
await fetchProfile()
} catch (err) {
clearAuth()
}
}
}
return {
// State
user,
token,
refreshToken,
loading,
error,
// Computed
isAuthenticated,
currentUser,
// Actions
register,
login,
logout,
setTokens,
fetchProfile,
initializeAuth,
clearAuth
}
})import { defineStore } from 'pinia'
import { ref, computed } from 'vue'
import { productService } from '../services/productService'
import type { Product } from '../types'
export const useProductStore = defineStore('products', () => {
// State
const products = ref<Product[]>([])
const selectedProduct = ref<Product | null>(null)
const loading = ref(false)
const error = ref<string | null>(null)
const pagination = ref({ page: 1, limit: 10, total: 0, has_next: false })
// Computed
const getProductById = computed(() => (id: string) => {
return products.value.find(p => p.id === id)
})
// Actions
async function fetchProducts(page: number = 1, limit: number = 10) {
loading.value = true
error.value = null
try {
const response = await productService.getAll(page, limit)
products.value = response.data
pagination.value = {
page: response.page,
limit: response.limit,
total: response.total,
has_next: response.has_next
}
} catch (err) {
error.value = 'Erreur lors du chargement des produits'
} finally {
loading.value = false
}
}
async function fetchProductById(id: string) {
loading.value = true
error.value = null
try {
selectedProduct.value = await productService.getById(id)
return selectedProduct.value
} catch (err) {
error.value = 'Erreur lors du chargement du produit'
throw err
} finally {
loading.value = false
}
}
async function createProduct(data: any) {
loading.value = true
error.value = null
try {
const product = await productService.create(data)
products.value.unshift(product)
return product
} catch (err) {
error.value = 'Erreur lors de la création'
throw err
} finally {
loading.value = false
}
}
async function updateProduct(id: string, data: any) {
loading.value = true
error.value = null
try {
const updatedProduct = await productService.update(id, data)
const index = products.value.findIndex(p => p.id === id)
if (index !== -1) {
products.value[index] = updatedProduct
}
if (selectedProduct.value?.id === id) {
selectedProduct.value = updatedProduct
}
return updatedProduct
} catch (err) {
error.value = 'Erreur lors de la mise à jour'
throw err
} finally {
loading.value = false
}
}
async function deleteProduct(id: string) {
loading.value = true
error.value = null
try {
await productService.delete(id)
products.value = products.value.filter(p => p.id !== id)
} catch (err) {
error.value = 'Erreur lors de la suppression'
throw err
} finally {
loading.value = false
}
}
async function searchProducts(keyword: string, filters?: any) {
loading.value = true
error.value = null
try {
const response = await productService.search(keyword, filters)
products.value = response.data
pagination.value = {
page: response.page,
limit: response.limit,
total: response.total,
has_next: false
}
} catch (err) {
error.value = 'Erreur lors de la recherche'
} finally {
loading.value = false
}
}
return {
// State
products,
selectedProduct,
loading,
error,
pagination,
// Computed
getProductById,
// Actions
fetchProducts,
fetchProductById,
createProduct,
updateProduct,
deleteProduct,
searchProducts
}
})import { createRouter, createWebHistory, type RouteRecordRaw } from 'vue-router'
import { useAuthStore } from '../store/auth'
// Importation des pages
import Home from '../pages/Home.vue'
import Login from '../pages/Login.vue'
import Register from '../pages/Register.vue'
import Dashboard from '../pages/Dashboard.vue'
import Products from '../pages/Products.vue'
import ProductDetail from '../pages/ProductDetail.vue'
import NotFound from '../pages/NotFound.vue'
const routes: RouteRecordRaw[] = [
{
path: '/',
name: 'Home',
component: Home
},
{
path: '/login',
name: 'Login',
component: Login,
meta: { requiresGuest: true, title: 'Connexion' }
},
{
path: '/register',
name: 'Register',
component: Register,
meta: { requiresGuest: true, title: 'Inscription' }
},
{
path: '/dashboard',
name: 'Dashboard',
component: Dashboard,
meta: { requiresAuth: true, title: 'Tableau de bord' }
},
{
path: '/products',
name: 'Products',
component: Products,
meta: { title: 'Produits' }
},
{
path: '/products/:id',
name: 'ProductDetail',
component: ProductDetail,
meta: { title: 'Détails du produit' }
},
{
path: '/:pathMatch(.*)*',
name: 'NotFound',
component: NotFound,
meta: { title: 'Page non trouvée' }
}
]
const router = createRouter({
history: createWebHistory(),
routes
})
/**
* Navigation guards (middleware)
*
* Exécutés avant chaque navigation.
* Utiles pour :
* - Vérifier l'authentification
* - Charger les données requises
* - Mettre à jour le titre de la page
*/
router.beforeEach(async (to, from, next) => {
const authStore = useAuthStore()
// Initialiser l'auth au premier chargement
if (!authStore.user && authStore.token) {
await authStore.fetchProfile()
}
// Mettre à jour le titre de la page
document.title = `${to.meta.title || 'App'} - Mon Application`
// Vérifier si la route nécessite une authentification
if (to.meta.requiresAuth && !authStore.isAuthenticated) {
next('/login')
return
}
// Rediriger les utilisateurs authentifiés loin des pages publiques
if (to.meta.requiresGuest && authStore.isAuthenticated) {
next('/dashboard')
return
}
next()
})
export default router/**
* Composable pour gérer les formulaires
*
* Élimine la duplication de logique de formulaire
* entre les composants.
*/
import { reactive, ref } from 'vue'
import type { ValidationError } from '../types'
export function useForm<T extends Record<string, any>>(initialValues: T) {
const formData = reactive<T>(initialValues)
const errors = reactive<Record<string, string>>({})
const isSubmitting = ref(false)
const isDirty = ref(false)
/**
* Mettre à jour une valeur de formulaire
*/
function setFieldValue(field: keyof T, value: any) {
formData[field] = value
isDirty.value = true
// Effacer l'erreur du champ
if (errors[field as string]) {
delete errors[field as string]
}
}
/**
* Mettre à jour l'erreur d'un champ
*/
function setFieldError(field: string, message: string) {
errors[field] = message
}
/**
* Valider le formulaire
*/
function validate(validationRules: Record<string, (value: any) => string | null>) {
const newErrors: Record<string, string> = {}
for (const [field, rule] of Object.entries(validationRules)) {
const value = formData[field as keyof T]
const error = rule(value)
if (error) {
newErrors[field] = error
}
}
Object.assign(errors, newErrors)
return Object.keys(newErrors).length === 0
}
/**
* Réinitialiser le formulaire
*/
function reset() {
Object.assign(formData, initialValues)
Object.keys(errors).forEach(key => delete errors[key])
isDirty.value = false
}
/**
* Soumettre le formulaire
*/
async function submitForm(
onSubmit: (data: T) => Promise<void>,
validationRules?: Record<string, (value: any) => string | null>
) {
// Valider
if (validationRules && !validate(validationRules)) {
return
}
isSubmitting.value = true
try {
await onSubmit(formData)
} finally {
isSubmitting.value = false
}
}
return {
formData,
errors,
isSubmitting,
isDirty,
setFieldValue,
setFieldError,
validate,
reset,
submitForm
}
}import { ref } from 'vue'
import type { NotificationState } from '../types'
export function useNotification() {
const notifications = ref<NotificationState[]>([])
function addNotification(
message: string,
type: 'success' | 'error' | 'warning' | 'info' = 'info',
duration: number = 3000
) {
const id = Date.now().toString()
const notification: NotificationState = {
id,
message,
type,
duration
}
notifications.value.push(notification)
if (duration > 0) {
setTimeout(() => {
removeNotification(id)
}, duration)
}
return id
}
function removeNotification(id: string) {
const index = notifications.value.findIndex(n => n.id === id)
if (index !== -1) {
notifications.value.splice(index, 1)
}
}
return {
notifications,
addNotification,
removeNotification,
success: (msg: string) => addNotification(msg, 'success'),
error: (msg: string) => addNotification(msg, 'error'),
warning: (msg: string) => addNotification(msg, 'warning'),
info: (msg: string) => addNotification(msg, 'info')
}
}<template>
<div class="login-container">
<div class="login-box">
<h1>Connexion</h1>
<form @submit.prevent="handleLogin">
<!-- Email -->
<FormField
label="Email"
error={errors.email}
>
<input
v-model="formData.email"
type="email"
placeholder="votre@email.com"
class="form-input"
@blur="validateField('email')"
/>
</FormField>
<!-- Password -->
<FormField
label="Mot de passe"
:error="errors.password"
>
<input
v-model="formData.password"
type="password"
placeholder="••••••••"
class="form-input"
@blur="validateField('password')"
/>
</FormField>
<!-- Error global -->
<div v-if="authStore.error" class="error-alert">
{{ authStore.error }}
</div>
<!-- Submit button -->
<button
type="submit"
:disabled="authStore.loading || !isFormValid"
class="btn-submit"
>
{{ authStore.loading ? 'Connexion en cours...' : 'Se connecter' }}
</button>
</form>
<!-- Register link -->
<p class="register-link">
Pas encore de compte?
<router-link to="/register">S'inscrire</router-link>
</p>
</div>
</div>
</template>
<script setup lang="ts">
import { computed } from 'vue'
import { useRouter } from 'vue-router'
import { useAuthStore } from '../store/auth'
import { useForm } from '../composables/useForm'
import { useNotification } from '../composables/useNotification'
import FormField from '../components/forms/FormField.vue'
import type { LoginRequest } from '../types'
const router = useRouter()
const authStore = useAuthStore()
const notification = useNotification()
// Initialiser le formulaire
const { formData, errors, submitForm, validate } = useForm<LoginRequest>({
email: '',
password: ''
})
// Règles de validation
const validationRules = {
email: (value: string) => {
if (!value) return 'Email requis'
if (!/^[^\s@]+@[^\s@]+\.[^\s@]+$/.test(value)) return 'Email invalide'
return null
},
password: (value: string) => {
if (!value) return 'Mot de passe requis'
if (value.length < 6) return 'Minimum 6 caractères'
return null
}
}
// Computed
const isFormValid = computed(() => {
return formData.email && formData.password && Object.keys(errors).length === 0
})
// Valider un champ
function validateField(field: string) {
const rule = validationRules[field as keyof LoginRequest]
if (rule) {
const error = rule(formData[field as keyof LoginRequest])
if (error) {
errors[field] = error
}
}
}
// Soumettre le formulaire
async function handleLogin() {
try {
await submitForm(
async () => {
await authStore.login(formData)
notification.success('Connexion réussie')
router.push('/dashboard')
},
validationRules
)
} catch (error) {
notification.error('Erreur lors de la connexion')
}
}
</script>
<style scoped>
.login-container {
display: flex;
align-items: center;
justify-content: center;
min-height: 100vh;
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
}
.login-box {
background: white;
padding: 40px;
border-radius: 8px;
box-shadow: 0 10px 25px rgba(0, 0, 0, 0.2);
width: 100%;
max-width: 400px;
}
.login-box h1 {
text-align: center;
margin-bottom: 30px;
color: #333;
}
.form-input {
width: 100%;
padding: 12px;
border: 1px solid #ddd;
border-radius: 4px;
font-size: 14px;
transition: border-color 0.3s;
}
.form-input:focus {
outline: none;
border-color: #667eea;
box-shadow: 0 0 0 3px rgba(102, 126, 234, 0.1);
}
.error-alert {
background: #fee;
color: #c33;
padding: 12px;
border-radius: 4px;
margin-bottom: 20px;
font-size: 14px;
}
.btn-submit {
width: 100%;
padding: 12px;
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
border: none;
border-radius: 4px;
font-size: 16px;
font-weight: 600;
cursor: pointer;
transition: opacity 0.3s;
}
.btn-submit:hover:not(:disabled) {
opacity: 0.9;
}
.btn-submit:disabled {
opacity: 0.6;
cursor: not-allowed;
}
.register-link {
text-align: center;
margin-top: 20px;
color: #666;
}
.register-link a {
color: #667eea;
text-decoration: none;
font-weight: 600;
}
.register-link a:hover {
text-decoration: underline;
}
</style>Toutes les requêtes doivent suivre ce format standardisé pour la cohérence.
// Récupérer des données avec pagination
const response = await api.get('/products', {
params: {
page: 1,
limit: 10,
sort_by: 'created_at'
}
})
// Réponse attendue
{
"data": [
{ "id": "...", "name": "..." },
...
],
"total": 100,
"page": 1,
"limit": 10,
"has_next": true
}// Créer une nouvelle ressource
const response = await api.post('/products', {
name: 'Laptop',
price: 999.99,
stock: 50
})
// Réponse (201 Created)
{
"data": {
"id": "550e8400-e29b-41d4-a716-446655440000",
"name": "Laptop",
"price": 999.99,
"stock": 50,
"created_at": "2024-01-15T10:30:00Z"
}
}// Mettre à jour une ressource
const response = await api.put(`/products/${id}`, {
name: 'Updated Laptop',
price: 1099.99
})
// Réponse (200 OK)
{
"data": { /* ressource mise à jour */ }
}// Supprimer une ressource
await api.delete(`/products/${id}`)
// Réponse (200 OK)
{
"message": "Ressource supprimée"
}// Error 400 - Bad Request
{
"error": "Validation échouée",
"details": {
"name": ["Le nom est requis"],
"price": ["Le prix doit être positif"]
}
}
// Error 401 - Unauthorized
{
"error": "Token expiré"
}
// Error 404 - Not Found
{
"error": "Ressource non trouvée"
}
// Error 500 - Server Error
{
"error": "Erreur serveur interne"
}1. Utilisateur entre email/password
↓
2. Client hash le password (optionnel, pour sécurité additionnelle)
↓
3. POST /api/auth/login avec email + password
↓
4. Serveur vérifie email + password
↓
5. Si OK, génère JWT token + refresh token
↓
6. Client reçoit tokens, les stocke en localStorage
↓
7. Pour chaque requête ultérieure:
- Client ajoute JWT au header Authorization
- Serveur valide le JWT
↓
8. Quand JWT expire (24h):
- Client utilise refresh token pour obtenir nouveau JWT
↓
9. Logout:
- Client supprime tokens du localStorage
- Serveur peut blacklister le token (optionnel)
# app/utils/jwt_utils.py
import jwt
from datetime import datetime, timedelta
from flask import current_app
from typing import Dict, Tuple
def create_tokens(user_id: str, email: str) -> Tuple[str, str]:
"""
Créer access token et refresh token.
Pourquoi deux tokens?
- Access token: Court terme (24h), utilisé pour les requêtes
- Refresh token: Long terme (30j), utilisé pour obtenir un nouveau access token
Si access token leak, l'attaquant n'a accès que pour 24h.
Avec un token unique long terme, un leak = accès permanent.
"""
now = datetime.utcnow()
# Access token
access_payload = {
'user_id': user_id,
'email': email,
'iat': now,
'exp': now + current_app.config['JWT_EXPIRATION'],
'type': 'access' # Important pour différencier
}
# Refresh token
refresh_payload = {
'user_id': user_id,
'iat': now,
'exp': now + current_app.config['JWT_REFRESH_EXPIRATION'],
'type': 'refresh'
}
access_token = jwt.encode(
access_payload,
current_app.config['JWT_SECRET'],
algorithm=current_app.config['JWT_ALGORITHM']
)
refresh_token = jwt.encode(
refresh_payload,
current_app.config['JWT_SECRET'],
algorithm=current_app.config['JWT_ALGORITHM']
)
return access_token, refresh_token
def decode_token(token: str) -> Dict:
"""
Décoder et valider un token JWT.
Levée d'exception si:
- Token expiré
- Token invalide (tamperé)
- Signature invalide
"""
try:
payload = jwt.decode(
token,
current_app.config['JWT_SECRET'],
algorithms=[current_app.config['JWT_ALGORITHM']]
)
return payload
except jwt.ExpiredSignatureError:
raise Exception('Token expiré')
except jwt.InvalidTokenError:
raise Exception('Token invalide')# app/middleware/auth.py
from functools import wraps
from flask import request, jsonify
from app.utils.jwt_utils import decode_token
from app.models import User
def token_required(f):
"""
Décorateur pour les routes protégées.
Vérifie la présence et la validité du JWT token.
Ajoute user_id et email dans request pour utilisation dans la route.
"""
@wraps(f)
def decorated(*args, **kwargs):
token = None
# Récupérer le token du header Authorization
if 'Authorization' in request.headers:
try:
auth_header = request.headers['Authorization']
token = auth_header.split(' ')[1] # Format: "Bearer TOKEN"
except IndexError:
return jsonify({'error': 'Format de token invalide (Bearer TOKEN)'}), 401
if not token:
return jsonify({'error': 'Token manquant'}), 401
try:
payload = decode_token(token)
# Vérifier que c'est un access token (pas un refresh token)
if payload.get('type') != 'access':
return jsonify({'error': 'Type de token invalide'}), 401
# Attacher les données du token à la requête
request.user_id = payload['user_id']
request.email = payload['email']
except Exception as e:
return jsonify({'error': str(e)}), 401
return f(*args, **kwargs)
return decorated
def admin_required(f):
"""
Décorateur pour les routes d'admin.
Vérifie que l'utilisateur est authentifié ET admin.
"""
@wraps(f)
@token_required
def decorated(*args, **kwargs):
user = User.query.get(request.user_id)
if not user:
return jsonify({'error': 'Utilisateur non trouvé'}), 404
if not user.is_admin:
return jsonify({'error': 'Accès réservé aux administrateurs'}), 403
return f(*args, **kwargs)
return decorated/**
* IMPORTANT: Sécurité des tokens en Vue.js
*
* localStorage vs sessionStorage vs memory:
*
* localStorage:
* ✓ Persiste après fermeture du navigateur
* ✗ Accessible à tout script (XSS vulnerability)
* ✗ Visible dans DevTools
*
* sessionStorage:
* ✓ Accessible au script
* ✓ Supprimé à la fermeture du navigateur
* ✗ Accès XSS
*
* Memory (ref Vue):
* ✓ Pas accessible au script externe
* ✓ Sûr contre XSS
* ✗ Perdu au refresh
*
* RECOMMANDATION POUR PRODUCTION:
* - Stocker le token en localStorage (pour user experience)
* - MAIS en HTTP-only cookie (côté serveur) si possible
* - Implémenter une Protection CSRF
* - Valider les origins CORS strictement
*/
// Approche recommandée: HTTP-only cookies
// Le serveur définit:
// Set-Cookie: token=...; HttpOnly; Secure; SameSite=Strict
// Vue reçoit automatiquement le cookie avec chaque requête
// Pas besoin de le gérer manuellement dans les headers
// Pour les tokens en localStorage:
const STORAGE_KEY = 'auth_token'
export function saveToken(token: string) {
// ⚠️ Attention au XSS
// Si votre app est vulnerable à XSS, ce token peut être volé
localStorage.setItem(STORAGE_KEY, token)
}
export function getToken() {
return localStorage.getItem(STORAGE_KEY)
}
export function removeToken() {
localStorage.removeItem(STORAGE_KEY)
}Users
├── id (UUID)
├── email (unique, indexed)
├── username (unique)
├── password_hash
├── is_admin
├── is_active
├── created_at
└── updated_at
↓
├─── Orders (1-N relation)
│ ├── id (UUID)
│ ├── user_id (FK)
│ ├── total_amount
│ ├── status (enum)
│ ├── created_at
│ └── updated_at
│ ↓
│ └─── OrderItems (1-N relation)
│ ├── id (UUID)
│ ├── order_id (FK)
│ ├── product_id (FK)
│ ├── quantity
│ └── unit_price
Products
├── id (UUID)
├── name (indexed)
├── description
├── price (indexed)
├── cost
├── stock
├── sku (unique, indexed)
├── is_active
├── created_at
└── updated_at
↓
└─── OrderItems (1-N relation)
# Initialiser Alembic
cd backend
alembic init migrations
# Créer une migration automatique
alembic revision --autogenerate -m "Create initial schema"
# Voir le fichier généré dans migrations/versions/
# Appliquer les migrations
alembic upgrade head
# Revenir à la migration précédente
alembic downgrade -1
# Voir l'historique
alembic history# ❌ N+1 Query Problem - MAUVAIS
users = User.query.all()
for user in users:
print(user.orders) # Une requête par utilisateur !
# ✅ Eager Loading - BON
users = User.query.options(joinedload(User.orders)).all()
# ✅ Ou avec selectinload (plus efficace pour les grandes collections)
from sqlalchemy.orm import selectinload
users = User.query.options(selectinload(User.orders)).all()
# Indexing - Améliorer les recherches
class User(db.Model):
email = db.Column(db.String(120), unique=True, nullable=False, index=True)
username = db.Column(db.String(80), unique=True, nullable=False, index=True)
created_at = db.Column(db.DateTime, default=datetime.utcnow, index=True) # Si on trie souvent par date
# Query optimization avec limit/offset au lieu de charger tout
products = Product.query.filter_by(is_active=True).limit(10).offset(0).all()# ❌ MAUVAIS - Tout dans la route
@app.route('/api/orders', methods=['POST'])
def create_order():
data = request.get_json()
user_id = request.user_id
# Validation
if not data.get('items'):
return jsonify({'error': 'Items required'}), 400
# Logique métier
user = User.query.get(user_id)
items = []
total = 0
for item_data in data['items']:
product = Product.query.get(item_data['product_id'])
if product.stock < item_data['quantity']:
return jsonify({'error': 'Insufficient stock'}), 400
product.stock -= item_data['quantity']
items.append(OrderItem(...))
total += product.price * item_data['quantity']
order = Order(user_id=user_id, items=items, total_amount=total)
db.session.add(order)
db.session.commit()
return jsonify(order.to_dict()), 201
# ✅ BON - Séparation des responsabilités
from app.services.order_service import OrderService
from app.middleware.auth import token_required
@order_bp.route('', methods=['POST'])
@token_required
def create_order():
"""Handler - Juste valider et appeler le service"""
data = request.get_json()
# Validation basique
if not data.get('items'):
return jsonify({'error': 'Items required'}), 400
try:
# Le service gère toute la logique métier
order = order_service.create_order(
user_id=request.user_id,
items=data['items']
)
return jsonify(order), 201
except ValueError as e:
return jsonify({'error': str(e)}), 400
except Exception as e:
current_app.logger.error(f'Error creating order: {e}')
return jsonify({'error': 'Server error'}), 500# app/middleware/error_handler.py
from flask import jsonify
from app.domain.exceptions import BusinessLogicError
def register_error_handlers(app):
"""Enregistrer les handlers d'erreur globaux."""
@app.errorhandler(400)
def bad_request(error):
return jsonify({'error': 'Requête invalide'}), 400
@app.errorhandler(401)
def unauthorized(error):
return jsonify({'error': 'Non authentifié'}), 401
@app.errorhandler(403)
def forbidden(error):
return jsonify({'error': 'Accès refusé'}), 403
@app.errorhandler(404)
def not_found(error):
return jsonify({'error': 'Ressource non trouvée'}), 404
@app.errorhandler(500)
def internal_error(error):
app.logger.error(f'Internal server error: {error}')
return jsonify({'error': 'Erreur serveur interne'}), 500
# Exception métier personnalisée
@app.errorhandler(BusinessLogicError)
def business_error(error):
return jsonify({'error': error.message}), error.status_code// ❌ MAUVAIS - State partagé non structuré
const state = {
user: null,
products: [],
loading: false,
error: null,
selectedProduct: null,
showModal: false,
// ... 20 autres propriétés mélangées
}
// ✅ BON - State structuré par domaine
export const useAppStore = defineStore('app', () => {
// Auth state
const auth = reactive({
user: null,
token: null,
loading: false,
error: null
})
// Products state
const products = reactive({
list: [],
selected: null,
loading: false,
error: null,
filters: { search: '', minPrice: 0, maxPrice: Infinity }
})
// UI state
const ui = reactive({
showModal: false,
notification: null,
sidebarOpen: false
})
return { auth, products, ui }
})// ❌ MAUVAIS - Pas de gestion d'erreur
const action = async () => {
const result = await api.get('/data')
state.data = result.data
}
// ✅ BON - Gestion complète
const action = async () => {
state.loading = true
state.error = null
try {
const result = await api.get('/data')
state.data = result.data
return result
} catch (error) {
// Différencier les types d'erreurs
if (error instanceof AxiosError) {
state.error = error.response?.data?.error || 'Erreur API'
} else if (error instanceof Error) {
state.error = error.message
} else {
state.error = 'Erreur inconnue'
}
throw error
} finally {
state.loading = false
}
}// 1. Lazy Loading des routes
const router = createRouter({
routes: [
{
path: '/products',
component: () => import('./pages/Products.vue') // Chargé à la demande
}
]
})
// 2. Code splitting automatique avec Vite
// Vite génère automatiquement des chunks séparés
// 3. Virtual Scrolling pour grandes listes
<template>
<VirtualScroller
:items="products"
:item-height="80"
>
<template #default="{ item }">
<ProductCard :product="item" />
</template>
</VirtualScroller>
</template>
// 4. Memoization avec computed
const expensiveComputation = computed(() => {
// Cela ne se recalcule que si ses dépendances changent
return complexCalculation(someValue)
})
// 5. Debouncing pour les recherches
const searchQuery = ref('')
const debouncedSearch = useDebounceFn(() => {
// Appel API seulement après 500ms d'inactivité
productService.search(searchQuery.value)
}, 500)
// 6. Pré-chargement des données
onMounted(async () => {
// Charger les données critiques au chargement
await productStore.fetchProducts()
// Pré-charger d'autres données en arrière-plan
setTimeout(() => {
productStore.fetchRecommendedProducts()
}, 1000)
})# 1. Caching des requêtes coûteuses
from flask_caching import Cache
cache = Cache(app, config={'CACHE_TYPE': 'simple'})
@product_bp.route('', methods=['GET'])
@cache.cached(timeout=300) # Cache 5 minutes
def get_products():
return product_service.get_all_products()
# 2. Pagination obligatoire
@product_bp.route('', methods=['GET'])
def get_products():
page = request.args.get('page', 1, type=int)
limit = min(request.args.get('limit', 10, type=int), 100) # Max 100
# Retourner seulement 10 items par défaut, pas 10000 !
products, total = product_service.get_all_products(page, limit)
# 3. Eager loading pour éviter N+1
users = User.query.options(
joinedload(User.orders).joinedload(Order.items)
).all()
# 4. Indexing des colonnes fréquemment recherchées
class Product(db.Model):
name = db.Column(db.String(255), index=True)
price = db.Column(db.Float, index=True)
created_at = db.Column(db.DateTime, index=True)
# 5. Connection pooling pour la BD
from flask_sqlalchemy import SQLAlchemy
app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {
'pool_size': 10,
'pool_recycle': 3600,
'pool_pre_ping': True
}// Redimensionner côté frontend avant upload
async function uploadProductImage(file: File) {
// Redimensionner l'image
const resized = await resizeImage(file, {
maxWidth: 1200,
maxHeight: 1200,
quality: 0.8
})
// Uploader la version compressée
await api.post('/products/image', resized)
}
// Servir des images optimisées
// À partir du serveur, utiliser une CDN ou un service d'optimisation d'images
// Exemple avec CloudFlare:
// <img src="https://example.com/image.jpg?width=300&quality=80" /># ❌ PIÈGE - 1 requête pour users + N requêtes pour chaque user's orders
users = User.query.all()
for user in users:
print(len(user.orders)) # Déclenche une requête !
# ✅ SOLUTION - Eager load avec joinedload
from sqlalchemy.orm import joinedload
users = User.query.options(joinedload(User.orders)).all()
for user in users:
print(len(user.orders)) # Pas de requête additionnelle !
# Ou avec selectinload pour les collections
from sqlalchemy.orm import selectinload
users = User.query.options(selectinload(User.orders)).all()# ❌ PIÈGE - Deux utilisateurs peuvent acheter le même produit
product = Product.query.get(product_id)
if product.stock >= quantity:
# Entre la vérification et la mise à jour, un autre utilisateur peut acheter !
product.stock -= quantity
db.session.commit()
# ✅ SOLUTION - Utiliser SELECT FOR UPDATE
from sqlalchemy import text
product = db.session.query(Product).with_for_update().filter_by(id=product_id).first()
if product.stock >= quantity:
product.stock -= quantity
db.session.commit()
# Ou avec une transaction
from sqlalchemy import event
@event.listens_for(db.session, 'begin')
def receive_begin(dbapi_conn):
if not dbapi_conn.isolation_level:
dbapi_conn.isolation_level = 'SERIALIZABLE'# ❌ PIÈGE - Concaténation de chaînes
search = request.args.get('search')
products = db.session.execute(f"SELECT * FROM products WHERE name = '{search}'")
# Un attaquant peut faire: search = "'; DROP TABLE users; --"
# ✅ SOLUTION - Utiliser des paramètres liés
products = db.session.query(Product).filter(Product.name == search).all()
# Ou avec text()
products = db.session.execute(text("SELECT * FROM products WHERE name = :name"), {"name": search})# ❌ PIÈGE - Si le token leak, il reste valide jusqu'à expiration
# Attaquant peut l'utiliser pendant 24 heures
# ✅ SOLUTION - Implémenter un token blacklist
class TokenBlacklist(db.Model):
id = db.Column(db.String, primary_key=True)
user_id = db.Column(db.String, db.ForeignKey('users.id'))
blacklisted_at = db.Column(db.DateTime, default=datetime.utcnow)
@auth_bp.route('/logout', methods=['POST'])
@token_required
def logout():
token = request.headers['Authorization'].split(' ')[1]
blacklist = TokenBlacklist(id=token, user_id=request.user_id)
db.session.add(blacklist)
db.session.commit()
return jsonify({'message': 'Logged out'})
def token_required(f):
@wraps(f)
def decorated(*args, **kwargs):
# ... validation habituelle ...
# Vérifier la blacklist
token = request.headers['Authorization'].split(' ')[1]
if TokenBlacklist.query.filter_by(id=token).first():
return jsonify({'error': 'Token invalide'}), 401
return f(*args, **kwargs)
return decorated# ❌ PIÈGE - Faire confiance à la validation du client
@product_bp.route('', methods=['POST'])
def create_product():
data = request.get_json()
# Le client a validé, alors c'est OK ?
product = Product(name=data['name'], price=data['price'])
# ✅ SOLUTION - Toujours valider côté serveur
from marshmallow import Schema, fields, ValidationError
class ProductSchema(Schema):
name = fields.Str(required=True, validate=lambda x: len(x) > 0)
price = fields.Float(required=True, validate=lambda x: x > 0)
stock = fields.Int(validate=lambda x: x >= 0)
@product_bp.route('', methods=['POST'])
def create_product():
data = request.get_json()
schema = ProductSchema()
try:
validated_data = schema.load(data)
except ValidationError as err:
return jsonify({'errors': err.messages}), 400
product = Product(**validated_data)
db.session.add(product)
db.session.commit()
return jsonify(product.to_dict()), 201# ❌ PIÈGE - Accepter toutes les origines
CORS(app, resources={r"/api/*": {"origins": "*"}})
# N'importe quel site peut faire des requêtes !
# ✅ SOLUTION - Whitelist stricte
CORS(app, resources={r"/api/*": {
"origins": [
"https://example.com",
"https://www.example.com"
],
"methods": ["GET", "POST", "PUT", "DELETE"],
"allow_headers": ["Content-Type", "Authorization"],
"supports_credentials": True, # Pour les cookies
"max_age": 3600
}})// ❌ PIÈGE - Afficher du contenu utilisateur sans échapper
<template>
<div>{{ userComment }}</div> <!-- Si userComment contient du HTML, ça s'exécute ! -->
</template>
// ✅ SOLUTION - Utiliser v-text ou des filtres
<template>
<div v-text="userComment"></div> <!-- Échappé automatiquement -->
<!-- Ou -->
{{ userComment }} <!-- v-interpolate échappe aussi -->
</template>
// ✅ Pour du HTML safe
<template>
<div v-html="sanitizedHtml"></div>
</template>
<script>
import DOMPurify from 'dompurify'
const sanitizedHtml = computed(() => {
return DOMPurify.sanitize(userContent)
})
</script># ❌ PIÈGE - Variable globale qui retient l'état entre requêtes
current_user = None # MAUVAIS !
@app.route('/api/profile')
def get_profile():
global current_user
current_user = User.query.get(request.user_id) # Affecte l'état global
return jsonify(current_user.to_dict())
# ✅ SOLUTION - Utiliser request context
from flask import g
@app.route('/api/profile')
def get_profile():
g.current_user = User.query.get(request.user_id) # Spécifique à cette requête
return jsonify(g.current_user.to_dict())
# Accès depuis ailleurs:
def some_helper():
user = g.current_user # Sûr, limité à la requête actuelle// ❌ PIÈGE - Pas de gestion d'erreur
async function loadProducts() {
const response = await api.get('/products')
this.products = response.data // Et si l'API échoue ?
}
// ✅ SOLUTION - Try/catch et afficher l'erreur
async function loadProducts() {
try {
this.loading = true
const response = await api.get('/products')
this.products = response.data
} catch (error) {
this.error = error instanceof Error
? error.message
: 'Erreur lors du chargement'
notification.error(this.error)
} finally {
this.loading = false
}
}// ❌ PIÈGE - Modification directe (difficile à tracker)
store.user.name = 'New Name' // Où est le changement venu ?
// ✅ SOLUTION - Actions pour modifier le state
// store/user.ts
export const useUserStore = defineStore('user', () => {
const user = ref<User | null>(null)
async function updateUserName(newName: string) {
try {
const updated = await userService.updateName(newName)
user.value!.name = updated.name
return updated
} catch (error) {
// Gestion d'erreur centralisée
throw error
}
}
return { user, updateUserName }
})
// Dans le composant
await store.updateUserName('New Name') // Traçable et sûrQuand vous avez 100+ routes, organisez-les par domaine :
app/
├── routes/
│ ├── auth/
│ │ ├── __init__.py
│ │ ├── models.py
│ │ ├── routes.py
│ │ ├── services.py
│ │ └── schemas.py
│ ├── products/
│ ├── orders/
│ └── users/
Quand l'application devient trop grande :
- Service utilisateurs (authentification, profils)
- Service produits (catalogue, recherche)
- Service commandes (création, tracking)
- Service paiement (Stripe integration)
- Service notifications (emails, SMS)
Chacun peut être déployé indépendamment.
Pour les données qui ne changent pas souvent :
import redis
redis_client = redis.Redis(host='localhost', port=6379)
@product_bp.route('', methods=['GET'])
def get_products():
cache_key = 'products:all:page:1'
# Vérifier le cache
cached = redis_client.get(cache_key)
if cached:
return jsonify(json.loads(cached))
# Sinon, requête DB
products = product_service.get_all_products(page=1)
# Mettre en cache pour 1 heure
redis_client.setex(cache_key, 3600, json.dumps(products))
return jsonify(products)Pour les tâches longues :
from celery import Celery
celery = Celery(app.name, broker='redis://localhost:6379')
@celery.task
def send_order_confirmation_email(order_id):
# Peut prendre du temps
order = Order.query.get(order_id)
send_email(order.user.email, ...)
# Dans la route
@order_bp.route('', methods=['POST'])
def create_order():
order = order_service.create_order(...)
# Envoyer l'email en arrière-plan
send_order_confirmation_email.delay(order.id)
return jsonify(order.to_dict()), 201Déployer plusieurs instances de l'app derrière un load balancer :
[Client]
↓
[Load Balancer (Nginx)]
├── [App Instance 1]
├── [App Instance 2]
└── [App Instance 3]
↓
[PostgreSQL Database] (avec réplication)
version: '3.8'
services:
postgres:
image: postgres:15-alpine
environment:
POSTGRES_USER: ${POSTGRES_USER}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
POSTGRES_DB: ${POSTGRES_DB}
volumes:
- postgres_data:/var/lib/postgresql/data
ports:
- "5432:5432"
healthcheck:
test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER}"]
interval: 10s
timeout: 5s
retries: 5
api:
build:
context: ./backend
dockerfile: Dockerfile
environment:
DATABASE_URL: postgresql://${POSTGRES_USER}:${POSTGRES_PASSWORD}@postgres:5432/${POSTGRES_DB}
FLASK_ENV: ${FLASK_ENV:-development}
JWT_SECRET: ${JWT_SECRET}
CORS_ORIGINS: ${CORS_ORIGINS:-http://localhost:3000}
ports:
- "5000:5000"
depends_on:
postgres:
condition: service_healthy
volumes:
- ./backend:/app
command: python run.py
frontend:
build:
context: ./frontend
dockerfile: Dockerfile.dev
environment:
VITE_API_URL: ${VITE_API_URL:-http://localhost:5000/api}
ports:
- "3000:3000"
volumes:
- ./frontend:/app
- /app/node_modules
command: npm run dev
volumes:
postgres_data:FROM python:3.11-slim
WORKDIR /app
# Dépendances système
RUN apt-get update && apt-get install -y \
gcc \
postgresql-client \
&& rm -rf /var/lib/apt/lists/*
# Dépendances Python
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Application
COPY . .
# Créer les logs
RUN mkdir -p logs
# Gunicorn
CMD ["gunicorn", "--bind", "0.0.0.0:5000", "--workers", "4", "--timeout", "120", "wsgi:app"]# Copier les variables d'environnement
cp backend/.env.example backend/.env
cp frontend/.env.example frontend/.env.local
# Lancer
docker-compose up -d
# Voir les logs
docker-compose logs -f api
docker-compose logs -f frontend
# S'arrêter
docker-compose down
# Nettoyer (attention, supprime les données)
docker-compose down -vCette architecture est conçue pour être :
Maintenable : Code organisé, responsabilités claires Scalable : Peut grandir sans refonte majeure Testable : Chaque partie peut être testée indépendamment Sécurisée : JWT, validation, protection contre les attaques courantes Performante : Caching, indexes BD, lazy loading
Les concepts expliqués ici s'appliquent à la plupart des frameworks et langages. L'important est de comprendre les principes, pas juste la syntaxe.
Bonne chance dans votre développement ! 🚀