|
#!/usr/bin/env python3 |
|
"""Calculate returns for all Tesouro Direto investors with checkpoint/resume support. |
|
|
|
This script processes investor operations to calculate: |
|
- Net position (profit/loss) |
|
- Total accumulated returns |
|
- Returns by bond type |
|
- Monthly portfolio time series |
|
|
|
Features: |
|
- Resumable: skips already-processed investors |
|
- Multiprocessing: parallel processing with --parallel flag |
|
- Batch writing: periodic CSV writes for memory efficiency |
|
- Progress tracking: real-time ETA and statistics |
|
|
|
Usage: |
|
# Sequential processing |
|
python scripts/calculate_investor_returns.py --data-dir ~/data/tddata --out-dir results |
|
|
|
# Parallel processing (recommended for large datasets) |
|
python scripts/calculate_investor_returns.py --data-dir ~/data/tddata --out-dir results --parallel --workers 8 |
|
""" |
|
|
|
import argparse |
|
import csv |
|
import logging |
|
import multiprocessing as mp |
|
import os |
|
import shutil |
|
import sys |
|
import tempfile |
|
import time |
|
from concurrent.futures import ProcessPoolExecutor, as_completed |
|
from datetime import UTC, datetime |
|
from pathlib import Path |
|
from typing import Dict, List, Optional, Set, Tuple, Union |
|
|
|
import polars as pl |
|
|
|
try: |
|
from tqdm import tqdm |
|
except ImportError: |
|
tqdm = None |
|
|
|
from tddata import analytics, reader, storage |
|
from tddata.constants import Column as C |
|
from tddata.constants import OperationType |
|
|
|
# Configure logging |
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S") |
|
logger = logging.getLogger(__name__) |
|
|
|
# Cache directory for loaded data |
|
CACHE_DIR = Path(".") / ".cache" / "tddata_returns" |
|
|
|
|
|
# ============================================================================== |
|
# Cache management functions |
|
# ============================================================================== |
|
|
|
|
|
def _get_file_mtime(file_path: Optional[Path]) -> float: |
|
"""Get file modification time, return 0 if file doesn't exist.""" |
|
if file_path is None or not file_path.exists(): |
|
return 0 |
|
return file_path.stat().st_mtime |
|
|
|
|
|
def _is_cache_valid(cache_file: Path, source_files: List[Path]) -> bool: |
|
"""Check if cache is newer than all source files.""" |
|
if not cache_file.exists(): |
|
return False |
|
cache_mtime = cache_file.stat().st_mtime |
|
return all(_get_file_mtime(f) < cache_mtime for f in source_files if f is not None) |
|
|
|
|
|
def load_operations_cached(data_dir: Path, force_reload: bool = False) -> Tuple[pl.DataFrame, List[Path]]: |
|
"""Load operations data, using cache if available and valid.""" |
|
logger.info("Loading operations data...") |
|
|
|
ops_files = list(storage.get_latest_files(data_dir)) |
|
ops_files = [f for f in ops_files if "operacoes-do-tesouro-direto" in f.name.lower()] |
|
|
|
if not ops_files: |
|
raise FileNotFoundError(f"No operations files found in {data_dir}") |
|
|
|
logger.info(f"Found {len(ops_files)} operations files") |
|
|
|
# Check cache |
|
CACHE_DIR.mkdir(parents=True, exist_ok=True) |
|
cache_file = CACHE_DIR / "operations.ipc" |
|
|
|
if not force_reload and _is_cache_valid(cache_file, ops_files): |
|
logger.info(f"Loading operations from cache: {cache_file}") |
|
operations = pl.read_ipc(cache_file) |
|
logger.info(f"Loaded {operations.height:,} operations from cache") |
|
return operations, ops_files |
|
|
|
# Load from CSV files |
|
logger.info("Reading operations files (not in cache)...") |
|
operations_dfs = [] |
|
for ops_file in ops_files: |
|
logger.info(f" Reading {ops_file.name}...") |
|
operations_dfs.append(reader.read_operations(ops_file)) |
|
|
|
operations = pl.concat(operations_dfs) |
|
logger.info(f"Loaded {operations.height:,} operations") |
|
|
|
# Save to cache |
|
logger.info(f"Caching operations to: {cache_file}") |
|
operations.write_ipc(cache_file) |
|
|
|
return operations, ops_files |
|
|
|
|
|
def load_prices_cached(data_dir: Path, force_reload: bool = False) -> Tuple[pl.DataFrame, Optional[Path]]: |
|
"""Load prices data, using cache if available and valid.""" |
|
logger.info("Loading prices data...") |
|
|
|
prices_file = storage.get_latest_file(data_dir, "taxas-dos-titulos-ofertados*.csv") |
|
if not prices_file: |
|
raise FileNotFoundError(f"No prices file found in {data_dir}") |
|
|
|
# Check cache |
|
CACHE_DIR.mkdir(parents=True, exist_ok=True) |
|
cache_file = CACHE_DIR / "prices.ipc" |
|
|
|
if not force_reload and _is_cache_valid(cache_file, [prices_file]): |
|
logger.info(f"Loading prices from cache: {cache_file}") |
|
prices = pl.read_ipc(cache_file) |
|
logger.info(f"Loaded {prices.height:,} price records from cache") |
|
return prices, prices_file |
|
|
|
# Load from CSV |
|
logger.info(f"Reading {prices_file.name}...") |
|
prices = reader.read_prices(prices_file) |
|
if not isinstance(prices, pl.DataFrame): |
|
prices = pl.concat(list(prices)) |
|
logger.info(f"Loaded {prices.height:,} price records") |
|
|
|
# Save to cache |
|
logger.info(f"Caching prices to: {cache_file}") |
|
prices.write_ipc(cache_file) |
|
|
|
return prices, prices_file |
|
|
|
|
|
def load_coupons_cached(data_dir: Path, force_reload: bool = False) -> Tuple[Optional[pl.DataFrame], Optional[Path]]: |
|
"""Load coupons data, using cache if available and valid.""" |
|
coupons_file = storage.get_latest_file(data_dir, "pagamento-de-cupom-de-juros*.csv") |
|
if not coupons_file: |
|
logger.info("No coupon data found (optional)") |
|
return None, None |
|
|
|
logger.info("Loading coupons data...") |
|
|
|
# Check cache |
|
CACHE_DIR.mkdir(parents=True, exist_ok=True) |
|
cache_file = CACHE_DIR / "coupons.ipc" |
|
|
|
if not force_reload and _is_cache_valid(cache_file, [coupons_file]): |
|
logger.info(f"Loading coupons from cache: {cache_file}") |
|
coupons = pl.read_ipc(cache_file) |
|
logger.info(f"Loaded {coupons.height:,} coupon records from cache") |
|
return coupons, coupons_file |
|
|
|
# Load from CSV |
|
logger.info(f"Reading {coupons_file.name}...") |
|
coupons = reader.read_interest_coupons(coupons_file) |
|
if not isinstance(coupons, pl.DataFrame): |
|
coupons = pl.concat(list(coupons)) |
|
logger.info(f"Loaded {coupons.height:,} coupon records") |
|
|
|
# Save to cache |
|
logger.info(f"Caching coupons to: {cache_file}") |
|
coupons.write_ipc(cache_file) |
|
|
|
return coupons, coupons_file |
|
|
|
|
|
# ============================================================================== |
|
# Global state for worker processes (initialized via _init_worker) |
|
# ============================================================================== |
|
|
|
_OPERATIONS: Optional[pl.DataFrame] = None |
|
|
|
_PRICES: Optional[pl.DataFrame] = None |
|
|
|
_COUPONS: Optional[pl.DataFrame] = None |
|
|
|
|
|
def _init_worker( |
|
ops_path: str, |
|
prices_path: str, |
|
coupons_path: Optional[str], |
|
): |
|
"""Initialize worker process with global dataframes.""" |
|
|
|
global _OPERATIONS, _PRICES, _COUPONS |
|
|
|
_OPERATIONS = pl.read_ipc(ops_path) |
|
|
|
_PRICES = pl.read_ipc(prices_path) |
|
|
|
if coupons_path and os.path.exists(coupons_path): |
|
_COUPONS = pl.read_ipc(coupons_path) |
|
|
|
logger.info(f"Worker {os.getpid()} initialized.") |
|
|
|
|
|
# ============================================================================== |
|
|
|
# Core computation functions |
|
|
|
# ============================================================================== |
|
|
|
|
|
def _format_month_str(month: Optional[object]) -> str: |
|
"""Format month value to ISO string.""" |
|
|
|
if month is not None and hasattr(month, "isoformat"): |
|
return month.isoformat() # type: ignore |
|
|
|
return str(month) if month is not None else "" |
|
|
|
|
|
def _compute_monthly_returns( |
|
investor_id_str: str, |
|
operations: pl.DataFrame, |
|
prices: pl.DataFrame, |
|
coupons: Optional[pl.DataFrame], |
|
timestamp: str, |
|
) -> List[Dict]: |
|
"""Compute monthly portfolio returns.""" |
|
|
|
monthly_records = [] |
|
|
|
monthly_df = analytics.calculate_portfolio_monthly_returns(operations, prices, coupons=coupons) |
|
|
|
if monthly_df is not None and monthly_df.height > 0: |
|
for row in monthly_df.iter_rows(named=True): |
|
month_str = _format_month_str(row.get("month")) |
|
|
|
monthly_records.append( |
|
{ |
|
"investor_id": investor_id_str, |
|
"month": month_str, |
|
"monthly_return": float(row.get("monthly_return") or 0.0), |
|
"cumulative_return": float(row.get("cumulative_return") or 0.0), |
|
"portfolio_value": float(row.get("portfolio_value") or 0.0), |
|
"net_cash_flow": float(row.get("net_cash_flow") or 0.0), |
|
"timestamp": timestamp, |
|
} |
|
) |
|
|
|
return monthly_records |
|
|
|
|
|
def _compute_bond_monthly_returns( |
|
investor_id_str: str, |
|
operations: pl.DataFrame, |
|
prices: pl.DataFrame, |
|
coupons: Optional[pl.DataFrame], |
|
unique_bond_types: List[str], |
|
timestamp: str, |
|
) -> List[Dict]: |
|
"""Compute per-bond-type monthly returns.""" |
|
|
|
bond_monthly_records = [] |
|
|
|
for bond_type in unique_bond_types: |
|
bt_ops = operations.filter(pl.col(C.BOND_TYPE.value) == bond_type) |
|
|
|
bt_prices = prices.filter(pl.col(C.BOND_TYPE.value) == bond_type) |
|
|
|
bt_coupons = coupons.filter(pl.col(C.BOND_TYPE.value) == bond_type) if coupons is not None else None |
|
|
|
bt_monthly = analytics.calculate_portfolio_monthly_returns(bt_ops, bt_prices, coupons=bt_coupons) |
|
|
|
if bt_monthly is not None and bt_monthly.height > 0: |
|
for row in bt_monthly.iter_rows(named=True): |
|
month_str = _format_month_str(row.get("month")) |
|
|
|
bond_monthly_records.append( |
|
{ |
|
"investor_id": investor_id_str, |
|
"bond_type": bond_type, |
|
"month": month_str, |
|
"monthly_return": float(row.get("monthly_return") or 0.0), |
|
"cumulative_return": float(row.get("cumulative_return") or 0.0), |
|
"portfolio_value": float(row.get("portfolio_value") or 0.0), |
|
"net_cash_flow": float(row.get("net_cash_flow") or 0.0), |
|
"timestamp": timestamp, |
|
} |
|
) |
|
|
|
return bond_monthly_records |
|
|
|
|
|
def compute_investor_returns( |
|
investor_id: Union[str, int], |
|
inv_ops: pl.DataFrame, |
|
prices: pl.DataFrame, |
|
coupons: Optional[pl.DataFrame] = None, |
|
) -> Optional[Dict]: |
|
"""Calculate all return metrics for a single investor using vectorized operations. |
|
|
|
|
|
|
|
Returns a dict with: |
|
|
|
- summary: Total portfolio summary |
|
|
|
- by_bond_type: Returns aggregated by bond type |
|
|
|
- monthly: Monthly time series |
|
|
|
- bond_monthly: Monthly time series per bond type |
|
|
|
""" |
|
|
|
if inv_ops.height == 0: |
|
return None |
|
|
|
# Calculate lot-level returns (FIFO matching) |
|
|
|
lots = analytics.calculate_operations_returns(inv_ops, prices, coupons=coupons) |
|
|
|
if lots.height == 0: |
|
return None |
|
|
|
investor_id_str = str(investor_id) |
|
|
|
timestamp = datetime.now(UTC).isoformat() |
|
|
|
# Vectorized calculation of summary statistics using Polars groupby |
|
|
|
total_invested = float(lots[C.OPERATION_VALUE.value].sum() or 0.0) |
|
|
|
total_end_value = float(lots["end_value"].sum() or 0.0) |
|
|
|
total_return_pct = ((total_end_value / total_invested) - 1) * 100 if total_invested > 0.0 else 0.0 |
|
|
|
net_position = total_end_value - total_invested |
|
|
|
# Build summary record |
|
|
|
summary = { |
|
"investor_id": investor_id_str, |
|
"total_invested": total_invested, |
|
"total_end_value": total_end_value, |
|
"total_return_pct": total_return_pct, |
|
"net_position": net_position, |
|
"num_operations": inv_ops.height, |
|
"timestamp": timestamp, |
|
} |
|
|
|
# Vectorized bond type aggregation using Polars |
|
|
|
bond_agg = ( |
|
lots.group_by(C.BOND_TYPE.value) |
|
.agg( |
|
[ |
|
pl.col(C.OPERATION_VALUE.value).sum().alias("invested"), |
|
pl.col("end_value").sum().alias("end_value"), |
|
] |
|
) |
|
.with_columns( |
|
[ |
|
(((pl.col("end_value") / pl.col("invested")) - 1) * 100).fill_nan(0.0).alias("return_pct"), |
|
(pl.col("end_value") - pl.col("invested")).alias("net_position"), |
|
] |
|
) |
|
) |
|
|
|
bond_type_records = [ |
|
{ |
|
"investor_id": investor_id_str, |
|
"bond_type": row[C.BOND_TYPE.value], |
|
"invested": float(row["invested"]), |
|
"end_value": float(row["end_value"]), |
|
"return_pct": float(row["return_pct"]), |
|
"net_position": float(row["net_position"]), |
|
"timestamp": timestamp, |
|
} |
|
for row in bond_agg.iter_rows(named=True) |
|
] |
|
|
|
# Calculate monthly portfolio returns (Modified Dietz method) |
|
|
|
monthly_records = _compute_monthly_returns(investor_id_str, inv_ops, prices, coupons, timestamp) |
|
|
|
# Cache unique bond types and compute per-bond-type monthly returns |
|
|
|
unique_bond_types = inv_ops[C.BOND_TYPE.value].unique().to_list() |
|
|
|
bond_monthly_records = _compute_bond_monthly_returns( |
|
investor_id_str, inv_ops, prices, coupons, unique_bond_types, timestamp |
|
) |
|
|
|
return { |
|
"summary": summary, |
|
"by_bond_type": bond_type_records, |
|
"monthly": monthly_records, |
|
"bond_monthly": bond_monthly_records, |
|
} |
|
|
|
|
|
def compute_investor_returns_worker(investor_id: Union[str, int]) -> Optional[Dict]: |
|
"""Worker function that uses global state initialized by _init_worker.""" |
|
|
|
try: |
|
if _OPERATIONS is None or _PRICES is None: |
|
logger.error(f"Worker not initialized properly for investor {investor_id}") |
|
|
|
return None |
|
|
|
# Filter the global dataframe for the specific investor |
|
|
|
inv_ops = _OPERATIONS.filter(pl.col(C.INVESTOR_ID.value) == investor_id) |
|
|
|
if inv_ops.height == 0: |
|
return None |
|
|
|
return compute_investor_returns(investor_id, inv_ops, _PRICES, _COUPONS) |
|
|
|
except Exception as e: |
|
logger.error(f"Error processing investor {investor_id}: {e}") |
|
|
|
return None |
|
|
|
|
|
def compute_investor_batch_worker(investor_ids_batch: List[Union[str, int]]) -> List[Tuple[Union[str, int], Dict]]: |
|
"""Worker function that processes a batch of investors efficiently. |
|
|
|
Instead of filtering the entire operations DataFrame per investor, |
|
filters once for the whole batch and uses group_by. |
|
""" |
|
if _OPERATIONS is None or _PRICES is None: |
|
logger.error(f"Worker {os.getpid()} not initialized properly") |
|
return [] |
|
|
|
results = [] |
|
|
|
# Filter operations for entire batch at once — O(n) once instead of O(n) per investor |
|
batch_ops = _OPERATIONS.filter(pl.col(C.INVESTOR_ID.value).is_in(investor_ids_batch)) |
|
|
|
if batch_ops.height == 0: |
|
return [] |
|
|
|
# Process each investor by grouping — avoids redundant scans |
|
for investor_id_group, inv_ops in batch_ops.group_by(C.INVESTOR_ID.value): |
|
investor_id = investor_id_group if not isinstance(investor_id_group, tuple) else investor_id_group[0] |
|
|
|
try: |
|
result = compute_investor_returns(investor_id, inv_ops, _PRICES, _COUPONS) |
|
if result: |
|
results.append((investor_id, result)) |
|
except Exception as e: |
|
logger.error(f"Error processing investor {investor_id}: {e}") |
|
|
|
logger.info(f"Worker {os.getpid()} finished batch of {len(investor_ids_batch)} investors ({len(results)} with results)") |
|
return results |
|
|
|
|
|
# ============================================================================== |
|
# CSV I/O functions |
|
# ============================================================================== |
|
|
|
|
|
def read_processed_ids(csv_path: Path, id_column: str = "investor_id") -> Set[str]: |
|
"""Read investor IDs that have already been processed.""" |
|
if not csv_path.exists(): |
|
return set() |
|
|
|
try: |
|
df = pl.read_csv(csv_path) |
|
if id_column in df.columns: |
|
return set(df[id_column].cast(pl.Utf8).to_list()) |
|
except Exception as e: |
|
logger.warning(f"Could not read {csv_path}: {e}") |
|
|
|
return set() |
|
|
|
|
|
def append_to_csv(csv_path: Path, records: List[Dict], fieldnames: List[str]): |
|
"""Append records to CSV file, creating it if necessary. Optimized with buffering.""" |
|
if not records: |
|
return |
|
|
|
write_header = not csv_path.exists() |
|
|
|
# Open file in append mode with large buffer |
|
with csv_path.open("a", newline="", encoding="utf-8", buffering=262144) as f: |
|
writer = csv.DictWriter(f, fieldnames=fieldnames) |
|
if write_header: |
|
writer.writeheader() |
|
writer.writerows(records) |
|
|
|
|
|
# ============================================================================== |
|
# Main processing logic |
|
# ============================================================================== |
|
|
|
|
|
def process_investors_sequential( |
|
investor_ids: List[Union[str, int]], |
|
operations: pl.DataFrame, |
|
prices: pl.DataFrame, |
|
coupons: Optional[pl.DataFrame], |
|
output_dir: Path, |
|
batch_size: int, |
|
processed_summary: Set[str], |
|
processed_monthly: Set[str], |
|
) -> None: |
|
"""Process investors sequentially with batch-level vectorized filtering.""" |
|
csv_summary = output_dir / "investor_summary.csv" |
|
csv_by_bond = output_dir / "investor_by_bond_type.csv" |
|
csv_monthly = output_dir / "investor_monthly.csv" |
|
csv_bond_monthly = output_dir / "investor_bond_monthly.csv" |
|
|
|
fieldnames_summary = [ |
|
"investor_id", |
|
"total_invested", |
|
"total_end_value", |
|
"total_return_pct", |
|
"net_position", |
|
"num_operations", |
|
"timestamp", |
|
] |
|
fieldnames_bond = ["investor_id", "bond_type", "invested", "end_value", "return_pct", "net_position", "timestamp"] |
|
fieldnames_monthly = [ |
|
"investor_id", |
|
"month", |
|
"monthly_return", |
|
"cumulative_return", |
|
"portfolio_value", |
|
"net_cash_flow", |
|
"timestamp", |
|
] |
|
fieldnames_bond_monthly = [ |
|
"investor_id", |
|
"bond_type", |
|
"month", |
|
"monthly_return", |
|
"cumulative_return", |
|
"portfolio_value", |
|
"net_cash_flow", |
|
"timestamp", |
|
] |
|
|
|
start_time = time.time() |
|
|
|
# Process in batches with vectorized filtering |
|
for batch_start in range(0, len(investor_ids), batch_size): |
|
batch_end = min(batch_start + batch_size, len(investor_ids)) |
|
batch_investor_ids = investor_ids[batch_start:batch_end] |
|
|
|
# Vectorized: filter operations for entire batch at once |
|
batch_ops = operations.filter(pl.col(C.INVESTOR_ID.value).is_in(batch_investor_ids)) |
|
|
|
batch_summary = [] |
|
batch_bond = [] |
|
batch_monthly = [] |
|
batch_bond_monthly = [] |
|
|
|
# Process each investor in batch by grouping |
|
for investor_id_group, inv_ops in batch_ops.group_by(C.INVESTOR_ID.value): |
|
# Extract investor_id from the group key |
|
investor_id = investor_id_group if not isinstance(investor_id_group, tuple) else investor_id_group[0] |
|
|
|
result = compute_investor_returns(investor_id, inv_ops, prices, coupons) |
|
|
|
if result: |
|
if str(investor_id) not in processed_summary: |
|
batch_summary.append(result["summary"]) |
|
|
|
batch_bond.extend(result["by_bond_type"]) |
|
|
|
if str(investor_id) not in processed_monthly: |
|
batch_monthly.extend(result["monthly"]) |
|
|
|
batch_bond_monthly.extend(result["bond_monthly"]) |
|
|
|
# Write batch to disk |
|
if batch_summary: |
|
append_to_csv(csv_summary, batch_summary, fieldnames_summary) |
|
processed_summary.update(r["investor_id"] for r in batch_summary) |
|
|
|
if batch_bond: |
|
append_to_csv(csv_by_bond, batch_bond, fieldnames_bond) |
|
|
|
if batch_monthly: |
|
append_to_csv(csv_monthly, batch_monthly, fieldnames_monthly) |
|
processed_monthly.update(r["investor_id"] for r in batch_monthly) |
|
|
|
if batch_bond_monthly: |
|
append_to_csv(csv_bond_monthly, batch_bond_monthly, fieldnames_bond_monthly) |
|
|
|
# Progress tracking |
|
processed = min(batch_end, len(investor_ids)) |
|
elapsed = time.time() - start_time |
|
rate = processed / elapsed if elapsed > 0 else 0 |
|
eta_seconds = (len(investor_ids) - processed) / rate if rate > 0 else 0 |
|
|
|
logger.info( |
|
f"Progress: {processed}/{len(investor_ids)} ({100*processed/len(investor_ids):.1f}%) | " |
|
f"Rate: {rate:.1f} inv/s | ETA: {eta_seconds/60:.1f} min" |
|
) |
|
|
|
|
|
def process_investors_parallel( |
|
investor_ids: List[Union[str, int]], |
|
operations: pl.DataFrame, |
|
prices: pl.DataFrame, |
|
coupons: Optional[pl.DataFrame], |
|
output_dir: Path, |
|
batch_size: int, |
|
workers: int, |
|
processed_summary: Set[str], |
|
processed_monthly: Set[str], |
|
) -> None: |
|
"""Process investors in parallel using batch-based multiprocessing. |
|
|
|
Each worker receives a batch of investor IDs and filters the operations |
|
DataFrame once per batch (using is_in + group_by), instead of scanning |
|
the entire DataFrame per investor. |
|
""" |
|
|
|
csv_summary = output_dir / "investor_summary.csv" |
|
csv_by_bond = output_dir / "investor_by_bond_type.csv" |
|
csv_monthly = output_dir / "investor_monthly.csv" |
|
csv_bond_monthly = output_dir / "investor_bond_monthly.csv" |
|
|
|
fieldnames_summary = [ |
|
"investor_id", |
|
"total_invested", |
|
"total_end_value", |
|
"total_return_pct", |
|
"net_position", |
|
"num_operations", |
|
"timestamp", |
|
] |
|
fieldnames_bond = ["investor_id", "bond_type", "invested", "end_value", "return_pct", "net_position", "timestamp"] |
|
fieldnames_monthly = [ |
|
"investor_id", |
|
"month", |
|
"monthly_return", |
|
"cumulative_return", |
|
"portfolio_value", |
|
"net_cash_flow", |
|
"timestamp", |
|
] |
|
fieldnames_bond_monthly = [ |
|
"investor_id", |
|
"bond_type", |
|
"month", |
|
"monthly_return", |
|
"cumulative_return", |
|
"portfolio_value", |
|
"net_cash_flow", |
|
"timestamp", |
|
] |
|
|
|
# Split investor IDs into batches for workers |
|
batches = [investor_ids[i : i + batch_size] for i in range(0, len(investor_ids), batch_size)] |
|
logger.info(f"Split {len(investor_ids):,} investors into {len(batches)} batches of up to {batch_size}") |
|
|
|
# Create temporary IPC files for worker initialization |
|
with tempfile.TemporaryDirectory() as tmpdir: |
|
logger.info("Preparing data for workers...") |
|
|
|
# Serialize main dataframes to IPC files for workers to load |
|
ops_path = Path(tmpdir) / "operations.ipc" |
|
prices_path = Path(tmpdir) / "prices.ipc" |
|
coupons_path = Path(tmpdir) / "coupons.ipc" if coupons is not None else None |
|
|
|
operations.write_ipc(ops_path) |
|
prices.write_ipc(prices_path) |
|
if coupons_path and coupons is not None: |
|
coupons.write_ipc(coupons_path) |
|
|
|
logger.info(f"Starting {workers} worker processes...") |
|
|
|
start_time = time.time() |
|
completed = 0 |
|
|
|
with ProcessPoolExecutor( |
|
max_workers=workers, |
|
initializer=_init_worker, |
|
initargs=( |
|
str(ops_path), |
|
str(prices_path), |
|
str(coupons_path) if coupons_path else None, |
|
), |
|
) as executor: |
|
# Submit batch tasks (not one per investor) |
|
future_to_batch_idx = {} |
|
for batch_idx, batch in enumerate(batches): |
|
future = executor.submit(compute_investor_batch_worker, batch) |
|
future_to_batch_idx[future] = batch_idx |
|
|
|
logger.info(f"Submitted {len(batches)} batch tasks to {workers} workers") |
|
|
|
# Process results as batches complete |
|
for future in as_completed(future_to_batch_idx): |
|
batch_idx = future_to_batch_idx[future] |
|
batch_investor_count = len(batches[batch_idx]) |
|
|
|
try: |
|
batch_results = future.result() |
|
except Exception as e: |
|
logger.error(f"Batch {batch_idx} failed: {e}") |
|
completed += batch_investor_count |
|
continue |
|
|
|
# Collect records from batch results |
|
batch_summary = [] |
|
batch_bond = [] |
|
batch_monthly = [] |
|
batch_bond_monthly = [] |
|
|
|
for investor_id, result in batch_results: |
|
if str(investor_id) not in processed_summary: |
|
batch_summary.append(result["summary"]) |
|
batch_bond.extend(result["by_bond_type"]) |
|
if str(investor_id) not in processed_monthly: |
|
batch_monthly.extend(result["monthly"]) |
|
batch_bond_monthly.extend(result["bond_monthly"]) |
|
|
|
# Write batch to disk |
|
if batch_summary: |
|
append_to_csv(csv_summary, batch_summary, fieldnames_summary) |
|
processed_summary.update(r["investor_id"] for r in batch_summary) |
|
if batch_bond: |
|
append_to_csv(csv_by_bond, batch_bond, fieldnames_bond) |
|
if batch_monthly: |
|
append_to_csv(csv_monthly, batch_monthly, fieldnames_monthly) |
|
processed_monthly.update(r["investor_id"] for r in batch_monthly) |
|
if batch_bond_monthly: |
|
append_to_csv(csv_bond_monthly, batch_bond_monthly, fieldnames_bond_monthly) |
|
|
|
completed += batch_investor_count |
|
elapsed = time.time() - start_time |
|
rate = completed / elapsed if elapsed > 0 else 0 |
|
eta_seconds = (len(investor_ids) - completed) / rate if rate > 0 else 0 |
|
|
|
logger.info( |
|
f"Progress: {completed:,}/{len(investor_ids):,} ({100*completed/len(investor_ids):.1f}%) | " |
|
f"Batch {batch_idx+1}/{len(batches)} done ({len(batch_results)} results) | " |
|
f"Rate: {rate:.1f} inv/s | ETA: {eta_seconds/60:.1f} min" |
|
) |
|
|
|
|
|
# ============================================================================== |
|
# CLI and main entry point |
|
# ============================================================================== |
|
|
|
|
|
def main(): |
|
parser = argparse.ArgumentParser( |
|
description="Calculate returns for all Tesouro Direto investors", |
|
formatter_class=argparse.RawDescriptionHelpFormatter, |
|
epilog=f""" |
|
Examples: |
|
# Sequential processing |
|
python scripts/calculate_investor_returns.py --data-dir ~/data/tddata --out-dir results |
|
|
|
# Parallel processing with 8 workers |
|
python scripts/calculate_investor_returns.py --data-dir ~/data/tddata --out-dir results --parallel --workers 8 |
|
|
|
# Force reload data (ignore cache) |
|
python scripts/calculate_investor_returns.py --data-dir ~/data/tddata --out-dir results --no-cache |
|
|
|
Output files: |
|
- investor_summary.csv: Total portfolio summary per investor |
|
- investor_by_bond_type.csv: Returns by bond type per investor |
|
- investor_monthly.csv: Monthly time series per investor |
|
- investor_bond_monthly.csv: Monthly time series per bond type and investor |
|
|
|
Cache location: |
|
- {CACHE_DIR} |
|
""", |
|
) |
|
|
|
parser.add_argument("--data-dir", type=Path, required=True, help="Directory containing TD data files") |
|
|
|
parser.add_argument( |
|
"--out-dir", type=Path, default=Path("results"), help="Output directory for CSV files (default: results)" |
|
) |
|
|
|
parser.add_argument("--batch-size", type=int, default=200, help="Write to CSV every N investors (default: 200)") |
|
|
|
parser.add_argument("--parallel", action="store_true", help="Enable parallel processing") |
|
|
|
parser.add_argument( |
|
"--workers", type=int, default=mp.cpu_count(), help=f"Number of worker processes (default: {mp.cpu_count()})" |
|
) |
|
|
|
parser.add_argument("--no-cache", action="store_true", help="Force reload data from CSV (ignore cache)") |
|
|
|
parser.add_argument("--clear-cache", action="store_true", help="Clear cache directory and exit") |
|
|
|
args = parser.parse_args() |
|
|
|
# Handle cache clearing |
|
if args.clear_cache: |
|
if CACHE_DIR.exists(): |
|
import shutil |
|
|
|
shutil.rmtree(CACHE_DIR) |
|
logger.info(f"Cache cleared: {CACHE_DIR}") |
|
else: |
|
logger.info(f"Cache directory does not exist: {CACHE_DIR}") |
|
return |
|
|
|
# Create output directory |
|
args.out_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
# Load data with caching |
|
try: |
|
operations, ops_files = load_operations_cached(args.data_dir, force_reload=args.no_cache) |
|
prices, prices_file = load_prices_cached(args.data_dir, force_reload=args.no_cache) |
|
coupons, coupons_file = load_coupons_cached(args.data_dir, force_reload=args.no_cache) |
|
except FileNotFoundError as e: |
|
logger.error(f"Error loading data: {e}") |
|
sys.exit(1) |
|
|
|
# Get list of investors |
|
logger.info("Identifying investors...") |
|
# Keep investor IDs in their original type, but convert to strings for set operations |
|
all_investors = operations[C.INVESTOR_ID.value].unique().to_list() |
|
all_investors_str = [str(inv) for inv in all_investors] |
|
logger.info(f"Found {len(all_investors_str):,} unique investors") |
|
|
|
# Check which investors are already processed |
|
csv_summary = args.out_dir / "investor_summary.csv" |
|
csv_monthly = args.out_dir / "investor_monthly.csv" |
|
|
|
processed_summary = read_processed_ids(csv_summary) |
|
processed_monthly = read_processed_ids(csv_monthly) |
|
|
|
# Determine which investors need processing |
|
to_process = [ |
|
inv |
|
for inv, inv_str in zip(all_investors, all_investors_str) |
|
if inv_str not in processed_summary or inv_str not in processed_monthly |
|
] |
|
|
|
logger.info(f"Already processed: {len(all_investors_str) - len(to_process):,} investors") |
|
logger.info(f"To process: {len(to_process):,} investors") |
|
|
|
if not to_process: |
|
logger.info("All investors already processed. Nothing to do.") |
|
return |
|
|
|
# Process investors |
|
logger.info("=" * 70) |
|
if args.parallel: |
|
logger.info(f"Starting PARALLEL processing with {args.workers} workers") |
|
process_investors_parallel( |
|
to_process, |
|
operations, |
|
prices, |
|
coupons, |
|
args.out_dir, |
|
args.batch_size, |
|
args.workers, |
|
processed_summary, |
|
processed_monthly, |
|
) |
|
else: |
|
logger.info("Starting SEQUENTIAL processing") |
|
process_investors_sequential( |
|
to_process, operations, prices, coupons, args.out_dir, args.batch_size, processed_summary, processed_monthly |
|
) |
|
|
|
logger.info("=" * 70) |
|
logger.info("✓ Processing complete!") |
|
logger.info(f"Results saved to: {args.out_dir}") |
|
logger.info(f" - investor_summary.csv") |
|
logger.info(f" - investor_by_bond_type.csv") |
|
logger.info(f" - investor_monthly.csv") |
|
logger.info(f" - investor_bond_monthly.csv") |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |