Skip to content

Instantly share code, notes, and snippets.

@dankkom
Last active March 5, 2026 14:09
Show Gist options
  • Select an option

  • Save dankkom/ffd3b969ec26510d5ecef0b7d9791f18 to your computer and use it in GitHub Desktop.

Select an option

Save dankkom/ffd3b969ec26510d5ecef0b7d9791f18 to your computer and use it in GitHub Desktop.

Calculate Investor Returns

Overview

The calculate_investor_returns.py script calculates comprehensive returns for all Tesouro Direto investors with full checkpoint/resume support and optional multiprocessing.

✨ Key Features

  • Resumable - Stop anytime and resume without reprocessing
  • Multiprocessing - Parallel processing for fast computation
  • Memory Efficient - Batch writing to prevent memory bloat
  • Complete Metrics - Net position, total returns, returns by bond type, monthly time series
  • Progress Tracking - Real-time ETA and performance statistics

🚀 Quick Start

Sequential Processing (Simple)

uv run python scripts/calculate_investor_returns.py \
    --data-dir ~/data/tddata \
    --out-dir results

Parallel Processing (Recommended)

uv run python scripts/calculate_investor_returns.py \
    --data-dir ~/data/tddata \
    --out-dir results \
    --parallel \
    --workers 8

📋 Command-Line Options

Option Description Default
--data-dir Directory with TD data files Required
--out-dir Output directory for CSV files results
--batch-size Write every N investors 100
--parallel Enable parallel processing False
--workers Number of worker processes All CPUs

📊 Output Files

1. investor_summary.csv

Total portfolio summary per investor.

investor_id,total_invested,total_end_value,total_return_pct,net_position,num_operations,timestamp
INV001,50000.00,55234.50,10.469,5234.50,125,2026-02-17T10:30:00Z

Columns:

  • investor_id - Unique investor identifier
  • total_invested - Total amount invested (BRL)
  • total_end_value - Current value of all positions (BRL)
  • total_return_pct - Overall return percentage
  • net_position - Net profit/loss (BRL)
  • num_operations - Total number of operations
  • timestamp - Calculation timestamp

2. investor_by_bond_type.csv

Returns aggregated by bond type for each investor.

investor_id,bond_type,invested,end_value,return_pct,net_position,timestamp
INV001,Tesouro Selic,30000.00,32145.60,7.152,2145.60,2026-02-17T10:30:00Z
INV001,Tesouro IPCA+,20000.00,23088.90,15.445,3088.90,2026-02-17T10:30:00Z

Columns:

  • investor_id - Unique investor identifier
  • bond_type - Bond type name
  • invested - Amount invested in this type (BRL)
  • end_value - Current value of this type (BRL)
  • return_pct - Return percentage for this type
  • net_position - Net profit/loss for this type (BRL)
  • timestamp - Calculation timestamp

3. investor_monthly.csv

Monthly portfolio time series (Modified Dietz method).

investor_id,month,monthly_return,cumulative_return,portfolio_value,net_cash_flow,timestamp
INV001,2024-01-01,0.523,0.523,10234.50,-10000.00,2026-02-17T10:30:00Z
INV001,2024-02-01,0.612,1.138,10296.34,0.00,2026-02-17T10:30:00Z

Columns:

  • investor_id - Unique investor identifier
  • month - Month (first day, ISO format)
  • monthly_return - Monthly return percentage
  • cumulative_return - Cumulative return from start (%)
  • portfolio_value - Portfolio value at month end (BRL)
  • net_cash_flow - Net cash flow during month (BRL)
  • timestamp - Calculation timestamp

4. investor_bond_monthly.csv

Monthly time series per investor per bond type.

investor_id,bond_type,month,monthly_return,cumulative_return,portfolio_value,net_cash_flow,timestamp
INV001,Tesouro Selic,2024-01-01,0.45,0.45,5200.00,-5000.00,2026-02-17T10:30:00Z

Same columns as investor_monthly.csv plus bond_type.

🔄 How Resumability Works

  1. Initial Run: Processes all investors, writes results to CSV
  2. Interruption: User stops the script (Ctrl+C)
  3. Resume: Run the same command again
  4. Smart Skip: Script reads existing CSVs and only processes missing investors
  5. Append: New results are appended to existing files

No data is lost or recalculated!

⚡ Performance Tips

Dataset Size Recommendations

Operations Mode Workers Batch Size
< 1M Sequential 1 50
1M - 10M Parallel 4 100
> 10M Parallel 8-16 100

Memory Optimization

  • Use --batch-size 50 for low-memory systems
  • Use --batch-size 200 for high-memory systems with fast SSD

Speed Optimization

  • Enable --parallel for datasets with > 1,000 investors
  • Use --workers = number of physical CPU cores (not logical)
  • Ensure data files are on SSD for best I/O performance

📐 Calculation Methods

Lot-Level Returns (FIFO)

Uses First-In-First-Out matching:

  1. Each buy creates a "lot" with quantity and cost
  2. Sells consume oldest lots first (FIFO)
  3. Partial sells split lots into closed/open positions
  4. Coupons allocated to lots by holding period

Formulas:

  • Simple return: ((end_value / initial_value) - 1) × 100
  • Annualized return: ((end_value / initial_value)^(365/days) - 1) × 100

Portfolio Returns (Modified Dietz)

Monthly time series uses Modified Dietz method:

Monthly Return = (EMV - BMV - NCF) / (BMV + NCF/2)

Where:

  • EMV = Ending Market Value
  • BMV = Beginning Market Value
  • NCF = Net Cash Flow (buys - sells + coupons)

This approximates time-weighted returns without daily valuations.

📁 Required Data Files

The script auto-discovers files in --data-dir:

Required

  • Operations: operacoes-do-tesouro-direto-*@*.csv

    • Multiple files automatically merged
    • Must have: investor_id, bond_type, operation_date, operation_type, quantity, operation_value, maturity_date
  • Prices: taxas-dos-titulos-ofertados*.csv

    • Latest file used
    • Must have: reference_date, bond_type, maturity_date, sell_price

Optional

  • Coupons: pagamento-de-cupom-de-juros*.csv
    • If present, coupon income included in returns
    • Must have: bond_type, maturity_date, buyback_date, unit_price

🛠️ Troubleshooting

"No operations files found"

  • Check --data-dir path
  • Ensure files match pattern: operacoes-do-tesouro-direto-*.csv

"No prices file found"

  • Verify prices file exists: taxas-dos-titulos-ofertados*.csv

Out of Memory

  • Reduce --batch-size to 50 or 25
  • If using --parallel, reduce --workers
  • Process subset by filtering operations file

Slow Performance

  • Enable --parallel mode
  • Increase --workers if you have many CPU cores
  • Ensure data is on SSD, not HDD
  • Check if disk I/O is bottleneck (not CPU)

💡 Advanced Usage

Process Specific Investors

Edit operations CSV to only include target investors, or filter programmatically:

import polars as pl
ops = pl.read_csv("operations.csv", separator=";", decimal_comma=True)
target_ops = ops.filter(pl.col("investor_id").is_in(["INV001", "INV002"]))
target_ops.write_csv("operations_subset.csv", separator=";")

Then run on the filtered file.

Incremental Updates

To process new investors without touching existing results:

  1. Download latest data to --data-dir
  2. Run script (automatically skips processed investors)
  3. Only new investor IDs will be computed and appended

Custom Batch Tuning

Adjust --batch-size for your use case:

  • Fast SSD + high memory: --batch-size 200
  • HDD or low memory: --batch-size 50
  • Very low memory: --batch-size 20

🔍 Comparing to Original Script

This is a cleaner reimplementation of compute_investor_returns.py with:

Improvements:

  • Simpler, more readable code structure
  • Better separation of concerns (I/O, computation, orchestration)
  • Clearer function names and documentation
  • Simplified CSV field handling
  • More robust error handling

🔄 Same functionality:

  • All return calculations (lot-level, portfolio-level)
  • Resumability with checkpoint files
  • Multiprocessing support
  • Batch writing for memory efficiency
  • Progress tracking with ETA

Both scripts produce equivalent results and can be used interchangeably.

📚 See Also

#!/usr/bin/env python3
"""Calculate returns for all Tesouro Direto investors with checkpoint/resume support.
This script processes investor operations to calculate:
- Net position (profit/loss)
- Total accumulated returns
- Returns by bond type
- Monthly portfolio time series
Features:
- Resumable: skips already-processed investors
- Multiprocessing: parallel processing with --parallel flag
- Batch writing: periodic CSV writes for memory efficiency
- Progress tracking: real-time ETA and statistics
Usage:
# Sequential processing
python scripts/calculate_investor_returns.py --data-dir ~/data/tddata --out-dir results
# Parallel processing (recommended for large datasets)
python scripts/calculate_investor_returns.py --data-dir ~/data/tddata --out-dir results --parallel --workers 8
"""
import argparse
import csv
import logging
import multiprocessing as mp
import os
import shutil
import sys
import tempfile
import time
from concurrent.futures import ProcessPoolExecutor, as_completed
from datetime import UTC, datetime
from pathlib import Path
from typing import Dict, List, Optional, Set, Tuple, Union
import polars as pl
try:
from tqdm import tqdm
except ImportError:
tqdm = None
from tddata import analytics, reader, storage
from tddata.constants import Column as C
from tddata.constants import OperationType
# Configure logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S")
logger = logging.getLogger(__name__)
# Cache directory for loaded data
CACHE_DIR = Path(".") / ".cache" / "tddata_returns"
# ==============================================================================
# Cache management functions
# ==============================================================================
def _get_file_mtime(file_path: Optional[Path]) -> float:
"""Get file modification time, return 0 if file doesn't exist."""
if file_path is None or not file_path.exists():
return 0
return file_path.stat().st_mtime
def _is_cache_valid(cache_file: Path, source_files: List[Path]) -> bool:
"""Check if cache is newer than all source files."""
if not cache_file.exists():
return False
cache_mtime = cache_file.stat().st_mtime
return all(_get_file_mtime(f) < cache_mtime for f in source_files if f is not None)
def load_operations_cached(data_dir: Path, force_reload: bool = False) -> Tuple[pl.DataFrame, List[Path]]:
"""Load operations data, using cache if available and valid."""
logger.info("Loading operations data...")
ops_files = list(storage.get_latest_files(data_dir))
ops_files = [f for f in ops_files if "operacoes-do-tesouro-direto" in f.name.lower()]
if not ops_files:
raise FileNotFoundError(f"No operations files found in {data_dir}")
logger.info(f"Found {len(ops_files)} operations files")
# Check cache
CACHE_DIR.mkdir(parents=True, exist_ok=True)
cache_file = CACHE_DIR / "operations.ipc"
if not force_reload and _is_cache_valid(cache_file, ops_files):
logger.info(f"Loading operations from cache: {cache_file}")
operations = pl.read_ipc(cache_file)
logger.info(f"Loaded {operations.height:,} operations from cache")
return operations, ops_files
# Load from CSV files
logger.info("Reading operations files (not in cache)...")
operations_dfs = []
for ops_file in ops_files:
logger.info(f" Reading {ops_file.name}...")
operations_dfs.append(reader.read_operations(ops_file))
operations = pl.concat(operations_dfs)
logger.info(f"Loaded {operations.height:,} operations")
# Save to cache
logger.info(f"Caching operations to: {cache_file}")
operations.write_ipc(cache_file)
return operations, ops_files
def load_prices_cached(data_dir: Path, force_reload: bool = False) -> Tuple[pl.DataFrame, Optional[Path]]:
"""Load prices data, using cache if available and valid."""
logger.info("Loading prices data...")
prices_file = storage.get_latest_file(data_dir, "taxas-dos-titulos-ofertados*.csv")
if not prices_file:
raise FileNotFoundError(f"No prices file found in {data_dir}")
# Check cache
CACHE_DIR.mkdir(parents=True, exist_ok=True)
cache_file = CACHE_DIR / "prices.ipc"
if not force_reload and _is_cache_valid(cache_file, [prices_file]):
logger.info(f"Loading prices from cache: {cache_file}")
prices = pl.read_ipc(cache_file)
logger.info(f"Loaded {prices.height:,} price records from cache")
return prices, prices_file
# Load from CSV
logger.info(f"Reading {prices_file.name}...")
prices = reader.read_prices(prices_file)
if not isinstance(prices, pl.DataFrame):
prices = pl.concat(list(prices))
logger.info(f"Loaded {prices.height:,} price records")
# Save to cache
logger.info(f"Caching prices to: {cache_file}")
prices.write_ipc(cache_file)
return prices, prices_file
def load_coupons_cached(data_dir: Path, force_reload: bool = False) -> Tuple[Optional[pl.DataFrame], Optional[Path]]:
"""Load coupons data, using cache if available and valid."""
coupons_file = storage.get_latest_file(data_dir, "pagamento-de-cupom-de-juros*.csv")
if not coupons_file:
logger.info("No coupon data found (optional)")
return None, None
logger.info("Loading coupons data...")
# Check cache
CACHE_DIR.mkdir(parents=True, exist_ok=True)
cache_file = CACHE_DIR / "coupons.ipc"
if not force_reload and _is_cache_valid(cache_file, [coupons_file]):
logger.info(f"Loading coupons from cache: {cache_file}")
coupons = pl.read_ipc(cache_file)
logger.info(f"Loaded {coupons.height:,} coupon records from cache")
return coupons, coupons_file
# Load from CSV
logger.info(f"Reading {coupons_file.name}...")
coupons = reader.read_interest_coupons(coupons_file)
if not isinstance(coupons, pl.DataFrame):
coupons = pl.concat(list(coupons))
logger.info(f"Loaded {coupons.height:,} coupon records")
# Save to cache
logger.info(f"Caching coupons to: {cache_file}")
coupons.write_ipc(cache_file)
return coupons, coupons_file
# ==============================================================================
# Global state for worker processes (initialized via _init_worker)
# ==============================================================================
_OPERATIONS: Optional[pl.DataFrame] = None
_PRICES: Optional[pl.DataFrame] = None
_COUPONS: Optional[pl.DataFrame] = None
def _init_worker(
ops_path: str,
prices_path: str,
coupons_path: Optional[str],
):
"""Initialize worker process with global dataframes."""
global _OPERATIONS, _PRICES, _COUPONS
_OPERATIONS = pl.read_ipc(ops_path)
_PRICES = pl.read_ipc(prices_path)
if coupons_path and os.path.exists(coupons_path):
_COUPONS = pl.read_ipc(coupons_path)
logger.info(f"Worker {os.getpid()} initialized.")
# ==============================================================================
# Core computation functions
# ==============================================================================
def _format_month_str(month: Optional[object]) -> str:
"""Format month value to ISO string."""
if month is not None and hasattr(month, "isoformat"):
return month.isoformat() # type: ignore
return str(month) if month is not None else ""
def _compute_monthly_returns(
investor_id_str: str,
operations: pl.DataFrame,
prices: pl.DataFrame,
coupons: Optional[pl.DataFrame],
timestamp: str,
) -> List[Dict]:
"""Compute monthly portfolio returns."""
monthly_records = []
monthly_df = analytics.calculate_portfolio_monthly_returns(operations, prices, coupons=coupons)
if monthly_df is not None and monthly_df.height > 0:
for row in monthly_df.iter_rows(named=True):
month_str = _format_month_str(row.get("month"))
monthly_records.append(
{
"investor_id": investor_id_str,
"month": month_str,
"monthly_return": float(row.get("monthly_return") or 0.0),
"cumulative_return": float(row.get("cumulative_return") or 0.0),
"portfolio_value": float(row.get("portfolio_value") or 0.0),
"net_cash_flow": float(row.get("net_cash_flow") or 0.0),
"timestamp": timestamp,
}
)
return monthly_records
def _compute_bond_monthly_returns(
investor_id_str: str,
operations: pl.DataFrame,
prices: pl.DataFrame,
coupons: Optional[pl.DataFrame],
unique_bond_types: List[str],
timestamp: str,
) -> List[Dict]:
"""Compute per-bond-type monthly returns."""
bond_monthly_records = []
for bond_type in unique_bond_types:
bt_ops = operations.filter(pl.col(C.BOND_TYPE.value) == bond_type)
bt_prices = prices.filter(pl.col(C.BOND_TYPE.value) == bond_type)
bt_coupons = coupons.filter(pl.col(C.BOND_TYPE.value) == bond_type) if coupons is not None else None
bt_monthly = analytics.calculate_portfolio_monthly_returns(bt_ops, bt_prices, coupons=bt_coupons)
if bt_monthly is not None and bt_monthly.height > 0:
for row in bt_monthly.iter_rows(named=True):
month_str = _format_month_str(row.get("month"))
bond_monthly_records.append(
{
"investor_id": investor_id_str,
"bond_type": bond_type,
"month": month_str,
"monthly_return": float(row.get("monthly_return") or 0.0),
"cumulative_return": float(row.get("cumulative_return") or 0.0),
"portfolio_value": float(row.get("portfolio_value") or 0.0),
"net_cash_flow": float(row.get("net_cash_flow") or 0.0),
"timestamp": timestamp,
}
)
return bond_monthly_records
def compute_investor_returns(
investor_id: Union[str, int],
inv_ops: pl.DataFrame,
prices: pl.DataFrame,
coupons: Optional[pl.DataFrame] = None,
) -> Optional[Dict]:
"""Calculate all return metrics for a single investor using vectorized operations.
Returns a dict with:
- summary: Total portfolio summary
- by_bond_type: Returns aggregated by bond type
- monthly: Monthly time series
- bond_monthly: Monthly time series per bond type
"""
if inv_ops.height == 0:
return None
# Calculate lot-level returns (FIFO matching)
lots = analytics.calculate_operations_returns(inv_ops, prices, coupons=coupons)
if lots.height == 0:
return None
investor_id_str = str(investor_id)
timestamp = datetime.now(UTC).isoformat()
# Vectorized calculation of summary statistics using Polars groupby
total_invested = float(lots[C.OPERATION_VALUE.value].sum() or 0.0)
total_end_value = float(lots["end_value"].sum() or 0.0)
total_return_pct = ((total_end_value / total_invested) - 1) * 100 if total_invested > 0.0 else 0.0
net_position = total_end_value - total_invested
# Build summary record
summary = {
"investor_id": investor_id_str,
"total_invested": total_invested,
"total_end_value": total_end_value,
"total_return_pct": total_return_pct,
"net_position": net_position,
"num_operations": inv_ops.height,
"timestamp": timestamp,
}
# Vectorized bond type aggregation using Polars
bond_agg = (
lots.group_by(C.BOND_TYPE.value)
.agg(
[
pl.col(C.OPERATION_VALUE.value).sum().alias("invested"),
pl.col("end_value").sum().alias("end_value"),
]
)
.with_columns(
[
(((pl.col("end_value") / pl.col("invested")) - 1) * 100).fill_nan(0.0).alias("return_pct"),
(pl.col("end_value") - pl.col("invested")).alias("net_position"),
]
)
)
bond_type_records = [
{
"investor_id": investor_id_str,
"bond_type": row[C.BOND_TYPE.value],
"invested": float(row["invested"]),
"end_value": float(row["end_value"]),
"return_pct": float(row["return_pct"]),
"net_position": float(row["net_position"]),
"timestamp": timestamp,
}
for row in bond_agg.iter_rows(named=True)
]
# Calculate monthly portfolio returns (Modified Dietz method)
monthly_records = _compute_monthly_returns(investor_id_str, inv_ops, prices, coupons, timestamp)
# Cache unique bond types and compute per-bond-type monthly returns
unique_bond_types = inv_ops[C.BOND_TYPE.value].unique().to_list()
bond_monthly_records = _compute_bond_monthly_returns(
investor_id_str, inv_ops, prices, coupons, unique_bond_types, timestamp
)
return {
"summary": summary,
"by_bond_type": bond_type_records,
"monthly": monthly_records,
"bond_monthly": bond_monthly_records,
}
def compute_investor_returns_worker(investor_id: Union[str, int]) -> Optional[Dict]:
"""Worker function that uses global state initialized by _init_worker."""
try:
if _OPERATIONS is None or _PRICES is None:
logger.error(f"Worker not initialized properly for investor {investor_id}")
return None
# Filter the global dataframe for the specific investor
inv_ops = _OPERATIONS.filter(pl.col(C.INVESTOR_ID.value) == investor_id)
if inv_ops.height == 0:
return None
return compute_investor_returns(investor_id, inv_ops, _PRICES, _COUPONS)
except Exception as e:
logger.error(f"Error processing investor {investor_id}: {e}")
return None
def compute_investor_batch_worker(investor_ids_batch: List[Union[str, int]]) -> List[Tuple[Union[str, int], Dict]]:
"""Worker function that processes a batch of investors efficiently.
Instead of filtering the entire operations DataFrame per investor,
filters once for the whole batch and uses group_by.
"""
if _OPERATIONS is None or _PRICES is None:
logger.error(f"Worker {os.getpid()} not initialized properly")
return []
results = []
# Filter operations for entire batch at once — O(n) once instead of O(n) per investor
batch_ops = _OPERATIONS.filter(pl.col(C.INVESTOR_ID.value).is_in(investor_ids_batch))
if batch_ops.height == 0:
return []
# Process each investor by grouping — avoids redundant scans
for investor_id_group, inv_ops in batch_ops.group_by(C.INVESTOR_ID.value):
investor_id = investor_id_group if not isinstance(investor_id_group, tuple) else investor_id_group[0]
try:
result = compute_investor_returns(investor_id, inv_ops, _PRICES, _COUPONS)
if result:
results.append((investor_id, result))
except Exception as e:
logger.error(f"Error processing investor {investor_id}: {e}")
logger.info(f"Worker {os.getpid()} finished batch of {len(investor_ids_batch)} investors ({len(results)} with results)")
return results
# ==============================================================================
# CSV I/O functions
# ==============================================================================
def read_processed_ids(csv_path: Path, id_column: str = "investor_id") -> Set[str]:
"""Read investor IDs that have already been processed."""
if not csv_path.exists():
return set()
try:
df = pl.read_csv(csv_path)
if id_column in df.columns:
return set(df[id_column].cast(pl.Utf8).to_list())
except Exception as e:
logger.warning(f"Could not read {csv_path}: {e}")
return set()
def append_to_csv(csv_path: Path, records: List[Dict], fieldnames: List[str]):
"""Append records to CSV file, creating it if necessary. Optimized with buffering."""
if not records:
return
write_header = not csv_path.exists()
# Open file in append mode with large buffer
with csv_path.open("a", newline="", encoding="utf-8", buffering=262144) as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
if write_header:
writer.writeheader()
writer.writerows(records)
# ==============================================================================
# Main processing logic
# ==============================================================================
def process_investors_sequential(
investor_ids: List[Union[str, int]],
operations: pl.DataFrame,
prices: pl.DataFrame,
coupons: Optional[pl.DataFrame],
output_dir: Path,
batch_size: int,
processed_summary: Set[str],
processed_monthly: Set[str],
) -> None:
"""Process investors sequentially with batch-level vectorized filtering."""
csv_summary = output_dir / "investor_summary.csv"
csv_by_bond = output_dir / "investor_by_bond_type.csv"
csv_monthly = output_dir / "investor_monthly.csv"
csv_bond_monthly = output_dir / "investor_bond_monthly.csv"
fieldnames_summary = [
"investor_id",
"total_invested",
"total_end_value",
"total_return_pct",
"net_position",
"num_operations",
"timestamp",
]
fieldnames_bond = ["investor_id", "bond_type", "invested", "end_value", "return_pct", "net_position", "timestamp"]
fieldnames_monthly = [
"investor_id",
"month",
"monthly_return",
"cumulative_return",
"portfolio_value",
"net_cash_flow",
"timestamp",
]
fieldnames_bond_monthly = [
"investor_id",
"bond_type",
"month",
"monthly_return",
"cumulative_return",
"portfolio_value",
"net_cash_flow",
"timestamp",
]
start_time = time.time()
# Process in batches with vectorized filtering
for batch_start in range(0, len(investor_ids), batch_size):
batch_end = min(batch_start + batch_size, len(investor_ids))
batch_investor_ids = investor_ids[batch_start:batch_end]
# Vectorized: filter operations for entire batch at once
batch_ops = operations.filter(pl.col(C.INVESTOR_ID.value).is_in(batch_investor_ids))
batch_summary = []
batch_bond = []
batch_monthly = []
batch_bond_monthly = []
# Process each investor in batch by grouping
for investor_id_group, inv_ops in batch_ops.group_by(C.INVESTOR_ID.value):
# Extract investor_id from the group key
investor_id = investor_id_group if not isinstance(investor_id_group, tuple) else investor_id_group[0]
result = compute_investor_returns(investor_id, inv_ops, prices, coupons)
if result:
if str(investor_id) not in processed_summary:
batch_summary.append(result["summary"])
batch_bond.extend(result["by_bond_type"])
if str(investor_id) not in processed_monthly:
batch_monthly.extend(result["monthly"])
batch_bond_monthly.extend(result["bond_monthly"])
# Write batch to disk
if batch_summary:
append_to_csv(csv_summary, batch_summary, fieldnames_summary)
processed_summary.update(r["investor_id"] for r in batch_summary)
if batch_bond:
append_to_csv(csv_by_bond, batch_bond, fieldnames_bond)
if batch_monthly:
append_to_csv(csv_monthly, batch_monthly, fieldnames_monthly)
processed_monthly.update(r["investor_id"] for r in batch_monthly)
if batch_bond_monthly:
append_to_csv(csv_bond_monthly, batch_bond_monthly, fieldnames_bond_monthly)
# Progress tracking
processed = min(batch_end, len(investor_ids))
elapsed = time.time() - start_time
rate = processed / elapsed if elapsed > 0 else 0
eta_seconds = (len(investor_ids) - processed) / rate if rate > 0 else 0
logger.info(
f"Progress: {processed}/{len(investor_ids)} ({100*processed/len(investor_ids):.1f}%) | "
f"Rate: {rate:.1f} inv/s | ETA: {eta_seconds/60:.1f} min"
)
def process_investors_parallel(
investor_ids: List[Union[str, int]],
operations: pl.DataFrame,
prices: pl.DataFrame,
coupons: Optional[pl.DataFrame],
output_dir: Path,
batch_size: int,
workers: int,
processed_summary: Set[str],
processed_monthly: Set[str],
) -> None:
"""Process investors in parallel using batch-based multiprocessing.
Each worker receives a batch of investor IDs and filters the operations
DataFrame once per batch (using is_in + group_by), instead of scanning
the entire DataFrame per investor.
"""
csv_summary = output_dir / "investor_summary.csv"
csv_by_bond = output_dir / "investor_by_bond_type.csv"
csv_monthly = output_dir / "investor_monthly.csv"
csv_bond_monthly = output_dir / "investor_bond_monthly.csv"
fieldnames_summary = [
"investor_id",
"total_invested",
"total_end_value",
"total_return_pct",
"net_position",
"num_operations",
"timestamp",
]
fieldnames_bond = ["investor_id", "bond_type", "invested", "end_value", "return_pct", "net_position", "timestamp"]
fieldnames_monthly = [
"investor_id",
"month",
"monthly_return",
"cumulative_return",
"portfolio_value",
"net_cash_flow",
"timestamp",
]
fieldnames_bond_monthly = [
"investor_id",
"bond_type",
"month",
"monthly_return",
"cumulative_return",
"portfolio_value",
"net_cash_flow",
"timestamp",
]
# Split investor IDs into batches for workers
batches = [investor_ids[i : i + batch_size] for i in range(0, len(investor_ids), batch_size)]
logger.info(f"Split {len(investor_ids):,} investors into {len(batches)} batches of up to {batch_size}")
# Create temporary IPC files for worker initialization
with tempfile.TemporaryDirectory() as tmpdir:
logger.info("Preparing data for workers...")
# Serialize main dataframes to IPC files for workers to load
ops_path = Path(tmpdir) / "operations.ipc"
prices_path = Path(tmpdir) / "prices.ipc"
coupons_path = Path(tmpdir) / "coupons.ipc" if coupons is not None else None
operations.write_ipc(ops_path)
prices.write_ipc(prices_path)
if coupons_path and coupons is not None:
coupons.write_ipc(coupons_path)
logger.info(f"Starting {workers} worker processes...")
start_time = time.time()
completed = 0
with ProcessPoolExecutor(
max_workers=workers,
initializer=_init_worker,
initargs=(
str(ops_path),
str(prices_path),
str(coupons_path) if coupons_path else None,
),
) as executor:
# Submit batch tasks (not one per investor)
future_to_batch_idx = {}
for batch_idx, batch in enumerate(batches):
future = executor.submit(compute_investor_batch_worker, batch)
future_to_batch_idx[future] = batch_idx
logger.info(f"Submitted {len(batches)} batch tasks to {workers} workers")
# Process results as batches complete
for future in as_completed(future_to_batch_idx):
batch_idx = future_to_batch_idx[future]
batch_investor_count = len(batches[batch_idx])
try:
batch_results = future.result()
except Exception as e:
logger.error(f"Batch {batch_idx} failed: {e}")
completed += batch_investor_count
continue
# Collect records from batch results
batch_summary = []
batch_bond = []
batch_monthly = []
batch_bond_monthly = []
for investor_id, result in batch_results:
if str(investor_id) not in processed_summary:
batch_summary.append(result["summary"])
batch_bond.extend(result["by_bond_type"])
if str(investor_id) not in processed_monthly:
batch_monthly.extend(result["monthly"])
batch_bond_monthly.extend(result["bond_monthly"])
# Write batch to disk
if batch_summary:
append_to_csv(csv_summary, batch_summary, fieldnames_summary)
processed_summary.update(r["investor_id"] for r in batch_summary)
if batch_bond:
append_to_csv(csv_by_bond, batch_bond, fieldnames_bond)
if batch_monthly:
append_to_csv(csv_monthly, batch_monthly, fieldnames_monthly)
processed_monthly.update(r["investor_id"] for r in batch_monthly)
if batch_bond_monthly:
append_to_csv(csv_bond_monthly, batch_bond_monthly, fieldnames_bond_monthly)
completed += batch_investor_count
elapsed = time.time() - start_time
rate = completed / elapsed if elapsed > 0 else 0
eta_seconds = (len(investor_ids) - completed) / rate if rate > 0 else 0
logger.info(
f"Progress: {completed:,}/{len(investor_ids):,} ({100*completed/len(investor_ids):.1f}%) | "
f"Batch {batch_idx+1}/{len(batches)} done ({len(batch_results)} results) | "
f"Rate: {rate:.1f} inv/s | ETA: {eta_seconds/60:.1f} min"
)
# ==============================================================================
# CLI and main entry point
# ==============================================================================
def main():
parser = argparse.ArgumentParser(
description="Calculate returns for all Tesouro Direto investors",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=f"""
Examples:
# Sequential processing
python scripts/calculate_investor_returns.py --data-dir ~/data/tddata --out-dir results
# Parallel processing with 8 workers
python scripts/calculate_investor_returns.py --data-dir ~/data/tddata --out-dir results --parallel --workers 8
# Force reload data (ignore cache)
python scripts/calculate_investor_returns.py --data-dir ~/data/tddata --out-dir results --no-cache
Output files:
- investor_summary.csv: Total portfolio summary per investor
- investor_by_bond_type.csv: Returns by bond type per investor
- investor_monthly.csv: Monthly time series per investor
- investor_bond_monthly.csv: Monthly time series per bond type and investor
Cache location:
- {CACHE_DIR}
""",
)
parser.add_argument("--data-dir", type=Path, required=True, help="Directory containing TD data files")
parser.add_argument(
"--out-dir", type=Path, default=Path("results"), help="Output directory for CSV files (default: results)"
)
parser.add_argument("--batch-size", type=int, default=200, help="Write to CSV every N investors (default: 200)")
parser.add_argument("--parallel", action="store_true", help="Enable parallel processing")
parser.add_argument(
"--workers", type=int, default=mp.cpu_count(), help=f"Number of worker processes (default: {mp.cpu_count()})"
)
parser.add_argument("--no-cache", action="store_true", help="Force reload data from CSV (ignore cache)")
parser.add_argument("--clear-cache", action="store_true", help="Clear cache directory and exit")
args = parser.parse_args()
# Handle cache clearing
if args.clear_cache:
if CACHE_DIR.exists():
import shutil
shutil.rmtree(CACHE_DIR)
logger.info(f"Cache cleared: {CACHE_DIR}")
else:
logger.info(f"Cache directory does not exist: {CACHE_DIR}")
return
# Create output directory
args.out_dir.mkdir(parents=True, exist_ok=True)
# Load data with caching
try:
operations, ops_files = load_operations_cached(args.data_dir, force_reload=args.no_cache)
prices, prices_file = load_prices_cached(args.data_dir, force_reload=args.no_cache)
coupons, coupons_file = load_coupons_cached(args.data_dir, force_reload=args.no_cache)
except FileNotFoundError as e:
logger.error(f"Error loading data: {e}")
sys.exit(1)
# Get list of investors
logger.info("Identifying investors...")
# Keep investor IDs in their original type, but convert to strings for set operations
all_investors = operations[C.INVESTOR_ID.value].unique().to_list()
all_investors_str = [str(inv) for inv in all_investors]
logger.info(f"Found {len(all_investors_str):,} unique investors")
# Check which investors are already processed
csv_summary = args.out_dir / "investor_summary.csv"
csv_monthly = args.out_dir / "investor_monthly.csv"
processed_summary = read_processed_ids(csv_summary)
processed_monthly = read_processed_ids(csv_monthly)
# Determine which investors need processing
to_process = [
inv
for inv, inv_str in zip(all_investors, all_investors_str)
if inv_str not in processed_summary or inv_str not in processed_monthly
]
logger.info(f"Already processed: {len(all_investors_str) - len(to_process):,} investors")
logger.info(f"To process: {len(to_process):,} investors")
if not to_process:
logger.info("All investors already processed. Nothing to do.")
return
# Process investors
logger.info("=" * 70)
if args.parallel:
logger.info(f"Starting PARALLEL processing with {args.workers} workers")
process_investors_parallel(
to_process,
operations,
prices,
coupons,
args.out_dir,
args.batch_size,
args.workers,
processed_summary,
processed_monthly,
)
else:
logger.info("Starting SEQUENTIAL processing")
process_investors_sequential(
to_process, operations, prices, coupons, args.out_dir, args.batch_size, processed_summary, processed_monthly
)
logger.info("=" * 70)
logger.info("✓ Processing complete!")
logger.info(f"Results saved to: {args.out_dir}")
logger.info(f" - investor_summary.csv")
logger.info(f" - investor_by_bond_type.csv")
logger.info(f" - investor_monthly.csv")
logger.info(f" - investor_bond_monthly.csv")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment