Skip to content

Instantly share code, notes, and snippets.

@mayankt18
Last active November 24, 2025 20:57
Show Gist options
  • Select an option

  • Save mayankt18/418a8112968b6d6c2b061a1f76c60007 to your computer and use it in GitHub Desktop.

Select an option

Save mayankt18/418a8112968b6d6c2b061a1f76c60007 to your computer and use it in GitHub Desktop.
"""
train_pipeline.py
Produces:
- model_artifacts/lgbm_booster.txt (fast C++ booster)
- model_artifacts/lgbm_model.pkl (sklearn wrapper saved)
- model_artifacts/feature_list.pkl
- model_artifacts/target_encoders.pkl (dict: col -> {value: mean})
- model_artifacts/global_mean.pkl
- model_artifacts/scaler.pkl
"""
import os
import argparse
import pickle
from pathlib import Path
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.preprocessing import RobustScaler
import lightgbm as lgb
from sklearn.metrics import average_precision_score, classification_report
ARTIFACT_DIR = "model_artifacts"
os.makedirs(ARTIFACT_DIR, exist_ok=True)
# -------------------------
# Utility / Feature fns
# -------------------------
def haversine(lat1, lon1, lat2, lon2):
# approximate haversine distance (km)
R = 6371.0
lat1, lon1, lat2, lon2 = map(np.radians, (lat1, lon1, lat2, lon2))
dlat = lat2 - lat1
dlon = lon2 - lon1
a = np.sin(dlat/2.0)**2 + np.cos(lat1)*np.cos(lat2)*np.sin(dlon/2.0)**2
c = 2 * np.arcsin(np.sqrt(a))
return R * c
def create_time_features(df):
# trans_date -> day-of-week, month, is_weekend, day_of_year
if "trans_date" in df.columns:
df["trans_date"] = pd.to_datetime(df["trans_date"], errors="coerce")
df["trans_dow"] = df["trans_date"].dt.weekday.fillna(-1).astype(int)
df["trans_month"] = df["trans_date"].dt.month.fillna(-1).astype(int)
df["trans_dayofyear"] = df["trans_date"].dt.dayofyear.fillna(-1).astype(int)
df["trans_is_weekend"] = (df["trans_dow"] >= 5).astype(int)
df.drop(columns=["trans_date"], inplace=True)
# trans_time -> hour, minute
if "trans_time" in df.columns:
# handle various time formats
df["trans_time"] = pd.to_datetime(df["trans_time"], errors="coerce", format="%H:%M:%S")
df["trans_hour"] = df["trans_time"].dt.hour.fillna(-1).astype(int)
df["trans_minute"] = df["trans_time"].dt.minute.fillna(-1).astype(int)
df.drop(columns=["trans_time"], inplace=True)
# DOB -> age
if "dob" in df.columns:
df["dob"] = pd.to_datetime(df["dob"], errors="coerce")
# If dob is NaT, fill with median age later
today = pd.Timestamp.now()
df["age"] = ((today - df["dob"]).dt.days // 365).fillna(-1).astype(int)
df.drop(columns=["dob"], inplace=True)
return df
def safe_drop_cols(df, drop_cols):
return df.drop(columns=[c for c in drop_cols if c in df.columns])
# -------------------------
# Target encoding utility (KFold OOF for training)
# -------------------------
def compute_target_encoding_maps(X, y, cols, n_splits=5, random_state=42):
"""
Returns:
- enc_maps: dict: col -> dict(value -> oof_mean)
- global_mean: float
Approach:
- For each column, compute OOF mean encode (so training encoding is leakage-free)
- After OOF, compute final mapping on full data (to use for test)
"""
enc_maps = {}
global_mean = y.mean()
skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=random_state)
for col in cols:
oof = pd.Series(index=X.index, dtype=float)
# if column has many nulls, treat them as string '___MISSING___'
vals = X[col].astype(str).fillna("___MISSING___")
for train_idx, val_idx in skf.split(X, y):
tr_vals = vals.iloc[train_idx]
tr_y = y.iloc[train_idx]
mapping = tr_y.groupby(tr_vals).mean()
# map validation
oof.iloc[val_idx] = vals.iloc[val_idx].map(mapping).fillna(global_mean)
# final mapping trained on full data (for test/inference)
full_map = y.groupby(vals).mean().to_dict()
enc_maps[col] = {
"oof_values": oof,
"mapping": full_map
}
return enc_maps, global_mean
def apply_target_encoding_for_test(df, enc_maps, global_mean):
# df: test dataframe
for col, meta in enc_maps.items():
if col in df.columns:
vals = df[col].astype(str).fillna("___MISSING___")
mapping = meta["mapping"]
df[col] = vals.map(lambda v: mapping.get(v, global_mean))
else:
# if missing column, create filled with global mean
df[col] = global_mean
return df
# -------------------------
# Main train function
# -------------------------
def train(train_csv, tune=False, n_trials=30, random_state=42):
print("Loading", train_csv)
df = pd.read_csv(train_csv)
# normalize column names: strip spaces
df.columns = df.columns.str.strip()
df = df.dropna(subset=["is_fraud"])
df.reset_index(drop=True, inplace=True)
# target
y = df["is_fraud"].astype(int)
X = df.drop(columns=["is_fraud"])
# -------------------------
# DROP high-cardinality ID columns (strongly recommended)
# -------------------------
drop_cols = [
"ssn", "first", "last", "street", "city", "dob", "job",
"trans_num", "acct_num", "customer_card_number",
"customer_transaction_ip", "merch_transaction_ip",
"merch_url", "merch_name", "profile"
]
X = safe_drop_cols(X, drop_cols)
# -------------------------
# Feature engineering: dates, time, age
# -------------------------
X = create_time_features(X)
# -------------------------
# Create distance feature if lat/lon available
# -------------------------
if set(["lat", "long", "merch_lat", "merch_long"]).issubset(X.columns):
X["dist_km"] = haversine(X["lat"].astype(float).fillna(0),
X["long"].astype(float).fillna(0),
X["merch_lat"].astype(float).fillna(0),
X["merch_long"].astype(float).fillna(0))
else:
# if partial, fill zeros
X["dist_km"] = 0.0
# -------------------------
# Split (stratified)
# -------------------------
X_train, X_val, y_train, y_val = train_test_split(
X, y, test_size=0.2, random_state=random_state, stratify=y
)
# -------------------------
# CATEGORICAL SELECTION (after dropping)
# We'll only target-encode object dtype columns; numeric stay numeric.
# -------------------------
cat_cols = X_train.select_dtypes(include=["object"]).columns.tolist()
print("Categorical columns to TE:", cat_cols)
# -------------------------
# Compute target encoding maps with KFold OOF (avoid leakage)
# -------------------------
print("Computing target encoding maps (KFold OOF)...")
enc_maps, global_mean = compute_target_encoding_maps(X_train, y_train, cat_cols, n_splits=5, random_state=random_state)
# Replace training categorical columns with OOF encodings, and validation/test will use final mapping
for col in cat_cols:
X_train[col] = enc_maps[col]["oof_values"].values
# For validation, map using final mapping with fallback to global_mean
X_val[col] = X_val[col].astype(str).fillna("___MISSING___").map(lambda v, m=enc_maps[col]["mapping"]: m.get(v, global_mean))
# -------------------------
# Impute numeric NaNs (simple)
# -------------------------
numeric_cols = X_train.select_dtypes(include=[np.number]).columns.tolist()
# fill with median from train
medians = X_train[numeric_cols].median()
X_train[numeric_cols] = X_train[numeric_cols].fillna(medians)
X_val[numeric_cols] = X_val[numeric_cols].fillna(medians)
# -------------------------
# Scale numeric features using RobustScaler
# -------------------------
scaler = RobustScaler()
X_train[numeric_cols] = scaler.fit_transform(X_train[numeric_cols])
X_val[numeric_cols] = scaler.transform(X_val[numeric_cols])
# -------------------------
# Save artifacts: enc_maps mapping, scaler, feature_list, global_mean
# -------------------------
# But we only save final mapping dicts (value->mean)
final_maps = {col: enc_maps[col]["mapping"] for col in enc_maps}
with open(os.path.join(ARTIFACT_DIR, "target_encoders.pkl"), "wb") as f:
pickle.dump(final_maps, f)
with open(os.path.join(ARTIFACT_DIR, "global_mean.pkl"), "wb") as f:
pickle.dump(global_mean, f)
with open(os.path.join(ARTIFACT_DIR, "scaler.pkl"), "wb") as f:
pickle.dump(scaler, f)
feature_list = X_train.columns.tolist()
with open(os.path.join(ARTIFACT_DIR, "feature_list.pkl"), "wb") as f:
pickle.dump(feature_list, f)
# -------------------------
# LightGBM training (use tuned params; optionally run optuna)
# -------------------------
default_params = {
"objective": "binary",
"boosting_type": "gbdt",
"metric": "aucpr",
"verbosity": -1,
"n_jobs": -1,
"learning_rate": 0.03,
"num_leaves": 90,
"min_child_samples": 30,
"subsample": 0.9,
"colsample_bytree": 0.9,
"reg_alpha": 1.0,
"reg_lambda": 2.0,
"class_weight": "balanced"
}
if tune:
try:
import optuna
def objective(trial):
param = default_params.copy()
param.update({
"learning_rate": trial.suggest_loguniform("learning_rate", 0.005, 0.2),
"num_leaves": trial.suggest_int("num_leaves", 31, 256),
"min_child_samples": trial.suggest_int("min_child_samples", 5, 200),
"subsample": trial.suggest_uniform("subsample", 0.5, 1.0),
"colsample_bytree": trial.suggest_uniform("colsample_bytree", 0.4, 1.0),
"reg_alpha": trial.suggest_loguniform("reg_alpha", 1e-3, 10.0),
"reg_lambda": trial.suggest_loguniform("reg_lambda", 1e-3, 10.0)
})
model = lgb.LGBMClassifier(n_estimators=2000, **param)
model.fit(X_train, y_train,
eval_set=[(X_val, y_val)],
eval_metric="aucpr",
early_stopping_rounds=100,
verbose=False)
val_probs = model.predict_proba(X_val)[:, 1]
return average_precision_score(y_val, val_probs)
study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=n_trials)
best = study.best_trial.params
default_params.update(best)
print("Optuna best params:", best)
except Exception as e:
print("Optuna tuning failed or not installed; falling back to defaults.", str(e))
# Train final model
print("Training LightGBM with params:", {k: default_params[k] for k in ["learning_rate", "num_leaves", "subsample", "colsample_bytree"] if k in default_params})
model = lgb.LGBMClassifier(n_estimators=2000, **default_params)
model.fit(
X_train, y_train,
eval_set=[(X_val, y_val)],
eval_metric="aucpr",
early_stopping_rounds=100,
verbose=100
)
# -------------------------
# Evaluation
# -------------------------
val_probs = model.predict_proba(X_val)[:, 1]
print("Validation PR-AUC (avg precision):", average_precision_score(y_val, val_probs))
val_preds = (val_probs >= 0.5).astype(int)
print(classification_report(y_val, val_preds))
# -------------------------
# Save model artifacts
# -------------------------
# Save sklearn wrapper
with open(os.path.join(ARTIFACT_DIR, "lgbm_model.pkl"), "wb") as f:
pickle.dump(model, f)
# Save native booster (fast inference)
model.booster_.save_model(os.path.join(ARTIFACT_DIR, "lgbm_booster.txt"))
print("Saved artifacts to", ARTIFACT_DIR)
return
# -------------------------
# CLI
# -------------------------
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--train_csv", default="train_data.csv")
parser.add_argument("--tune", action="store_true", help="Run optuna tuning (optional)")
parser.add_argument("--n_trials", type=int, default=30, help="Optuna trials if tune")
args = parser.parse_args()
train(args.train_csv, tune=args.tune, n_trials=args.n_trials)
-----------------------------------------------------------------------------
-----------------------------------------------------------------------------
-----------------------------------------------------------------------------
-----------------------------------------------------------------------------
-----------------------------------------------------------------------------
-----------------------------------------------------------------------------
-----------------------------------------------------------------------------
-----------------------------------------------------------------------------
-----------------------------------------------------------------------------
"""
predict_pipeline.py
Loads:
- model_artifacts/feature_list.pkl
- model_artifacts/target_encoders.pkl
- model_artifacts/global_mean.pkl
- model_artifacts/scaler.pkl
- model_artifacts/lgbm_booster.txt
Produces:
- prediction_scores.csv (index, pred_prob)
"""
import os
import pickle
import pandas as pd
import numpy as np
import lightgbm as lgb
ARTIFACT_DIR = "model_artifacts"
def create_time_features_for_test(df):
# same logic as training pipeline
if "trans_date" in df.columns:
df["trans_date"] = pd.to_datetime(df["trans_date"], errors="coerce")
df["trans_dow"] = df["trans_date"].dt.weekday.fillna(-1).astype(int)
df["trans_month"] = df["trans_date"].dt.month.fillna(-1).astype(int)
df["trans_dayofyear"] = df["trans_date"].dt.dayofyear.fillna(-1).astype(int)
df["trans_is_weekend"] = (df["trans_dow"] >= 5).astype(int)
df.drop(columns=["trans_date"], inplace=True)
if "trans_time" in df.columns:
df["trans_time"] = pd.to_datetime(df["trans_time"], errors="coerce", format="%H:%M:%S")
df["trans_hour"] = df["trans_time"].dt.hour.fillna(-1).astype(int)
df["trans_minute"] = df["trans_time"].dt.minute.fillna(-1).astype(int)
df.drop(columns=["trans_time"], inplace=True)
if "dob" in df.columns:
df["dob"] = pd.to_datetime(df["dob"], errors="coerce")
today = pd.Timestamp.now()
df["age"] = ((today - df["dob"]).dt.days // 365).fillna(-1).astype(int)
df.drop(columns=["dob"], inplace=True)
return df
def apply_target_encoding_test(df, enc_maps, global_mean):
# enc_maps: dict col -> mapping dict
for col, mapping in enc_maps.items():
if col in df.columns:
vals = df[col].astype(str).fillna("___MISSING___")
df[col] = vals.map(lambda v, m=mapping: m.get(v, global_mean))
else:
df[col] = global_mean
return df
def main(test_csv="test_data.csv", out_csv="prediction_scores.csv", chunk_size=None):
print("Loading artifacts...")
with open(os.path.join(ARTIFACT_DIR, "feature_list.pkl"), "rb") as f:
feature_list = pickle.load(f)
with open(os.path.join(ARTIFACT_DIR, "target_encoders.pkl"), "rb") as f:
enc_maps = pickle.load(f)
with open(os.path.join(ARTIFACT_DIR, "global_mean.pkl"), "rb") as f:
global_mean = pickle.load(f)
with open(os.path.join(ARTIFACT_DIR, "scaler.pkl"), "rb") as f:
scaler = pickle.load(f)
# use booster for fastest inference
booster = lgb.Booster(model_file=os.path.join(ARTIFACT_DIR, "lgbm_booster.txt"))
# if chunking desired, we can read in chunks. For simplicity, do one load (430k rows is OK).
print("Loading test csv:", test_csv)
df = pd.read_csv(test_csv)
df.columns = df.columns.str.strip() # normalize
# DROP same high-card columns (defensive)
drop_cols = [
"ssn", "first", "last", "street", "city", "dob", "job",
"trans_num", "acct_num", "customer_card_number",
"customer_transaction_ip", "merch_transaction_ip",
"merch_url", "merch_name", "profile"
]
df = df.drop(columns=[c for c in drop_cols if c in df.columns], errors="ignore")
# feature engineering same as training
df = create_time_features_for_test(df)
# create distance if possible
if set(["lat", "long", "merch_lat", "merch_long"]).issubset(df.columns):
df["dist_km"] = haversine(df["lat"].astype(float).fillna(0),
df["long"].astype(float).fillna(0),
df["merch_lat"].astype(float).fillna(0),
df["merch_long"].astype(float).fillna(0))
else:
if "dist_km" not in df.columns:
df["dist_km"] = 0.0
# ensure feature_list present; if missing features add default values
for c in feature_list:
if c not in df.columns:
df[c] = 0.0
# keep only feature_list in same order
df = df[feature_list]
# Apply target encoding (mapping) safely
df = apply_target_encoding_test(df, enc_maps, global_mean)
# Fill numeric missing using 0 (scaler expects same dims). Better: save medians from train; we used scaler so fill 0s
numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist()
df[numeric_cols] = df[numeric_cols].fillna(0)
# scale numeric (scaler was fit on training numeric columns; the scaler expects correct columns)
df[numeric_cols] = scaler.transform(df[numeric_cols])
# convert to numpy and predict using booster
X = df.to_numpy()
print("Predicting on shape", X.shape)
preds = booster.predict(X) # returns probabilities
out = pd.DataFrame({"index": range(len(preds)), "pred_prob": preds})
out.to_csv(out_csv, index=False)
print("Saved predictions to", out_csv)
# local haversine needed
def haversine(lat1, lon1, lat2, lon2):
R = 6371.0
lat1, lon1, lat2, lon2 = map(np.radians, (lat1, lon1, lat2, lon2))
dlat = lat2 - lat1
dlon = lon2 - lon1
a = np.sin(dlat/2.0)**2 + np.cos(lat1)*np.cos(lat2)*np.sin(dlon/2.0)**2
c = 2 * np.arcsin(np.sqrt(a))
return R * c
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment