# ============================================================
# XGB OOF-first (5x2) — IISIGNATURE FIRMA, DATOS REALES
# + FIXED OOF (no overwrite in repeated CV)
# + TOP-5 evaluation: TRAIN & TEST confusion matrices + reports
# + Timing (search / top5 eval / total)
# + Extra: build a LaTeX-ready table for TOP-5
# ============================================================
import time
import inspect
import os
import numpy as np
import pandas as pd
from tqdm.auto import tqdm
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import (
StratifiedKFold,
RepeatedStratifiedKFold,
ParameterSampler
)
from sklearn.impute import SimpleImputer
from sklearn.feature_selection import VarianceThreshold
from sklearn.metrics import (
roc_auc_score,
confusion_matrix,
classification_report,
accuracy_score,
f1_score,
balanced_accuracy_score
)
from sklearn.utils.class_weight import compute_class_weight
from scipy.stats import randint, uniform, loguniform
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
import xgboost as xgb
from xgboost import XGBClassifier
# ============================================================
# 1) LOAD + MERGE + SPLIT (80/20) [IISIGNATURE FIRMA, DATOS REALES]
# ============================================================
# Si tu archivo tiene otro nombre, cambia solo esta ruta.
x = pd.read_csv('/home/felorrieta/Downloads/path_signature_iisignature_M9.csv')
y = pd.read_csv('/home/felorrieta/Catalina/ts_v9.0.1_SMBH_ZTF_xmatch.csv')
y["id"] = y["oid"]
data = pd.merge(x, y, on="id")
train_idx = data.sample(frac=0.8, random_state=42).index
data_train = data.loc[train_idx].reset_index(drop=True)
data_test = data.drop(train_idx).reset_index(drop=True)
X_train = data_train.drop(columns=['oid', 'survey_class_mapped', 'survey_class', 'survey_class_cat', 'id'])
y_train = data_train['survey_class_mapped']
X_test = data_test[X_train.columns].copy()
y_test = data_test['survey_class_mapped']
X_train_np = np.asarray(X_train)
X_test_np = np.asarray(X_test)
print("Shapes:", X_train.shape, X_test.shape)
# ============================================================
# 2) Encoding + class weights (+ optional HARD_FACTOR)
# ============================================================
le = LabelEncoder()
y_train_enc = le.fit_transform(y_train)
y_test_enc = le.transform(y_test)
labels = le.classes_
n_classes = len(labels)
classes = np.unique(y_train_enc)
cw = compute_class_weight(class_weight="balanced", classes=classes, y=y_train_enc)
class_weight_dict = {int(c): float(w) for c, w in zip(classes, cw)}
# opcional: subir peso a clases más difíciles
HARD_CLASSES = ["AGN", "QSO"]
HARD_FACTOR = 1.8
if HARD_CLASSES:
for name in HARD_CLASSES:
if name in labels:
hid = int(np.where(labels == name)[0][0])
class_weight_dict[hid] *= HARD_FACTOR
def make_sample_weight(y_enc, cw_dict):
return np.array([cw_dict[int(c)] for c in y_enc], dtype=float)
BLZ_ID = int(np.where(labels == "Blazar")[0][0]) if "Blazar" in labels else None
def f1_blazar(y_true, y_pred):
if BLZ_ID is None:
return np.nan
return f1_score((y_true == BLZ_ID).astype(int), (y_pred == BLZ_ID).astype(int))
print("Labels:", list(labels))
print("Distribución train:", dict(zip(*np.unique(y_train_enc, return_counts=True))))
print("Class weights:", class_weight_dict)
# ============================================================
# 3) Preprocess + XGB fit (early stopping)
# ============================================================
def preprocess_fit_transform(X_tr_raw, X_va_raw):
imp = SimpleImputer(strategy="median")
X_tr_i = imp.fit_transform(X_tr_raw)
X_va_i = imp.transform(X_va_raw)
vt = VarianceThreshold(0.0)
X_tr_v = vt.fit_transform(X_tr_i)
X_va_v = vt.transform(X_va_i)
return X_tr_v, X_va_v, imp, vt
def preprocess_full(X_train_raw, X_test_raw):
imp = SimpleImputer(strategy="median")
X_train_i = imp.fit_transform(X_train_raw)
X_test_i = imp.transform(X_test_raw)
vt = VarianceThreshold(0.0)
X_train_v = vt.fit_transform(X_train_i)
X_test_v = vt.transform(X_test_i)
return X_train_v, X_test_v, imp, vt
def fit_xgb_one_fold(params, X_tr, y_tr, X_va, y_va, w_tr=None, w_va=None):
base = dict(
random_state=42,
n_jobs=-1,
tree_method="hist",
n_estimators=20000,
verbosity=0,
objective="multi:softprob",
num_class=n_classes,
eval_metric="mlogloss",
)
es = xgb.callback.EarlyStopping(rounds=250, save_best=True)
model = XGBClassifier(**base, **params, callbacks=[es])
fit_kwargs = dict(
X=X_tr,
y=y_tr,
sample_weight=w_tr,
eval_set=[(X_va, y_va)],
verbose=False
)
sig = inspect.signature(model.fit)
if "sample_weight_eval_set" in sig.parameters and (w_va is not None):
fit_kwargs["sample_weight_eval_set"] = [w_va]
model.fit(**fit_kwargs)
return model
def to_plain_params(d):
out = {}
for k, v in d.items():
out[k] = float(v) if isinstance(v, (np.floating,)) else v
return out
# ============================================================
# 4) FIXED OOF eval for repeated CV (accumulate & average)
# ============================================================
def oof_eval_xgb(params, X, y, cv):
K = len(np.unique(y))
proba_sum = np.zeros((len(y), K), dtype=float)
proba_cnt = np.zeros(len(y), dtype=float)
fold_f1m = []
fold_bacc = []
fold_f1blz = []
best_iters = []
for tr_idx, va_idx in cv.split(X, y):
X_tr_raw, X_va_raw = X[tr_idx], X[va_idx]
y_tr, y_va = y[tr_idx], y[va_idx]
X_tr, X_va, _, _ = preprocess_fit_transform(X_tr_raw, X_va_raw)
w_tr = make_sample_weight(y_tr, class_weight_dict)
w_va = make_sample_weight(y_va, class_weight_dict)
model = fit_xgb_one_fold(params, X_tr, y_tr, X_va, y_va, w_tr=w_tr, w_va=w_va)
p_va = model.predict_proba(X_va)
proba_sum[va_idx] += p_va
proba_cnt[va_idx] += 1.0
yhat_va = np.argmax(p_va, axis=1)
fold_f1m.append(f1_score(y_va, yhat_va, average="macro"))
fold_bacc.append(balanced_accuracy_score(y_va, yhat_va))
fold_f1blz.append(f1_blazar(y_va, yhat_va))
best_iters.append(getattr(model, "best_iteration", None))
proba_oof = proba_sum / (proba_cnt[:, None] + 1e-12)
yhat_oof = np.argmax(proba_oof, axis=1)
oof_macroF1 = f1_score(y, yhat_oof, average="macro")
oof_bacc = balanced_accuracy_score(y, yhat_oof)
oof_acc = accuracy_score(y, yhat_oof)
oof_f1blz = f1_blazar(y, yhat_oof)
its = [b for b in best_iters if b is not None]
best_n_cv = int(np.median(its) + 1) if len(its) else 800
return {
"oof_macroF1": float(oof_macroF1),
"oof_bacc": float(oof_bacc),
"oof_acc": float(oof_acc),
"oof_f1_blazar": float(oof_f1blz),
"fold_f1m_mean": float(np.mean(fold_f1m)),
"fold_f1m_std": float(np.std(fold_f1m)),
"fold_bacc_mean": float(np.mean(fold_bacc)),
"fold_f1blz_mean": float(np.mean(fold_f1blz)),
"best_n_cv": int(best_n_cv),
}
# ============================================================
# 5) Search space + OOF-first run
# ============================================================
TOTAL_T0 = time.time()
cv_rep = RepeatedStratifiedKFold(n_splits=5, n_repeats=2, random_state=42)
param_dist = {
"learning_rate": loguniform(0.007, 0.06),
"max_depth": randint(2, 6),
"min_child_weight": loguniform(10.0, 150.0),
"subsample": uniform(0.65, 0.35),
"colsample_bytree": uniform(0.65, 0.35),
"colsample_bynode": uniform(0.65, 0.35),
"gamma": loguniform(1e-4, 8.0),
"reg_alpha": loguniform(1e-10, 1.0),
"reg_lambda": loguniform(1.0, 200.0),
"grow_policy": ["depthwise", "lossguide"],
"max_leaves": randint(16, 129),
}
N_ITER = 30
TOP_K = 5
sampler = list(ParameterSampler(param_dist, n_iter=N_ITER, random_state=42))
print("\n########## XGB OOF-first (5x2) — IISIGNATURE FIRMA REALES ##########")
print(f"N_ITER={N_ITER} | CV=5x2 | TOP_K={TOP_K}")
SEARCH_T0 = time.time()
rows = []
for params in tqdm(sampler, total=N_ITER, desc="OOF trials"):
stats = oof_eval_xgb(params, X_train_np, y_train_enc, cv_rep)
rows.append({"params": params, **stats})
results = pd.DataFrame(rows)
results = results.sort_values(
["oof_macroF1", "fold_f1m_std", "oof_bacc", "oof_f1_blazar"],
ascending=[False, True, False, False]
).reset_index(drop=True)
SEARCH_T1 = time.time()
print("\nTOP 10 by FIXED OOF macro-F1 (and stability):")
print(results.head(10)[[
"oof_macroF1", "oof_bacc", "oof_acc", "oof_f1_blazar",
"fold_f1m_std", "best_n_cv", "params"
]].to_string(index=False))
print("\nTiempo búsqueda (hh:mm:ss):", time.strftime("%H:%M:%S", time.gmtime(SEARCH_T1 - SEARCH_T0)))
# ============================================================
# 6) Train TOP-5 on full-train and show TRAIN+TEST confusion matrices
# ============================================================
def _row_normalize(cm):
cm = cm.astype(float)
row_sums = cm.sum(axis=1, keepdims=True)
row_sums[row_sums == 0] = 1.0
return (cm / row_sums) * 100.0
def save_confusion_train_test(cm_train, cm_test, labels, outpath,
title_prefix="", subtitle="",
gap_width=0.28, wspace=0.15,
label_fontsize=13, tick_fontsize=13, title_fontsize=14,
cmap="Greens"):
cm_tr_pct = _row_normalize(cm_train)
cm_te_pct = _row_normalize(cm_test)
fig = plt.figure(figsize=(10.8, 4.8))
gs = gridspec.GridSpec(1, 4, width_ratios=[1, gap_width, 1, 0.08], wspace=wspace)
ax1 = fig.add_subplot(gs[0, 0])
ax_gap = fig.add_subplot(gs[0, 1])
ax2 = fig.add_subplot(gs[0, 2])
ax_cbar = fig.add_subplot(gs[0, 3])
ax_gap.axis("off")
panels = [
(ax1, cm_tr_pct, cm_train, "Train"),
(ax2, cm_te_pct, cm_test, "Test"),
]
for ax, cm_pct, cm_cnt, t in panels:
im = ax.imshow(cm_pct, cmap=cmap, vmin=0, vmax=100)
ax.set_title(t, fontsize=title_fontsize)
ax.set_xticks(np.arange(len(labels)))
ax.set_yticks(np.arange(len(labels)))
ax.set_xticklabels(labels, rotation=45, ha="right", fontsize=tick_fontsize)
ax.set_yticklabels(labels, fontsize=tick_fontsize)
ax.set_xlabel("Predicho", fontsize=label_fontsize)
ax.set_ylabel("Real", fontsize=label_fontsize)
thr = 50
for i in range(cm_pct.shape[0]):
for j in range(cm_pct.shape[1]):
pct = cm_pct[i, j]
cnt = int(cm_cnt[i, j])
color_txt = "white" if pct > thr else "black"
ax.text(j, i - 0.10, f"{pct:.1f}%",
ha="center", va="center",
color=color_txt, fontsize=10, fontweight="bold")
ax.text(j, i + 0.22, f"({cnt})",
ha="center", va="center",
color=color_txt, fontsize=7)
fig.colorbar(im, cax=ax_cbar, label="% por fila (clase real)")
fig.suptitle(f"{title_prefix}\n{subtitle}", fontsize=13, y=0.98)
fig.subplots_adjust(left=0.08, right=0.92, bottom=0.22, top=0.82)
fig.savefig(outpath, dpi=300, bbox_inches="tight")
plt.close(fig)
EVAL_T0 = time.time()
topk = results.head(TOP_K).reset_index(drop=True)
X_train_v, X_test_v, _, _ = preprocess_full(X_train_np, X_test_np)
w_train_full = make_sample_weight(y_train_enc, class_weight_dict)
print("\n" + "#" * 95)
print("TOP-5 (selected by FIXED OOF) — TRAIN & TEST evaluation (reference)")
print("#" * 95)
top5_summary = []
downloads = "/home/felorrieta/Catalina"
os.makedirs(downloads, exist_ok=True)
for i, row in topk.iterrows():
params = row["params"]
best_n = int(row["best_n_cv"])
model = XGBClassifier(
random_state=100 + i,
n_jobs=-1,
tree_method="hist",
n_estimators=best_n,
verbosity=0,
objective="multi:softprob",
num_class=n_classes,
eval_metric="mlogloss",
**params
)
model.fit(X_train_v, y_train_enc, sample_weight=w_train_full, verbose=False)
p_tr = model.predict_proba(X_train_v)
p_te = model.predict_proba(X_test_v)
yhat_tr = np.argmax(p_tr, axis=1)
yhat_te = np.argmax(p_te, axis=1)
f1m_tr = f1_score(y_train_enc, yhat_tr, average="macro")
f1m_te = f1_score(y_test_enc, yhat_te, average="macro")
bacc_tr = balanced_accuracy_score(y_train_enc, yhat_tr)
bacc_te = balanced_accuracy_score(y_test_enc, yhat_te)
acc_tr = accuracy_score(y_train_enc, yhat_tr)
acc_te = accuracy_score(y_test_enc, yhat_te)
f1w_te = f1_score(y_test_enc, yhat_te, average="weighted")
f1blz_te = f1_blazar(y_test_enc, yhat_te)
print("\n" + "=" * 95)
print(f"OOF-TOP{TOP_K} | Model #{i+1}")
print("=" * 95)
print("FIXED OOF macroF1:", f"{row['oof_macroF1']:.4f}", "| fold std:", f"{row['fold_f1m_std']:.4f}")
print("PARAMS:", {"best_n_estimators": best_n, **to_plain_params(params)})
print("\nMÉTRICAS (TRAIN / TEST)")
print(f"macro-F1 train={f1m_tr:.4f} | test={f1m_te:.4f}")
print(f"bal_acc train={bacc_tr:.4f} | test={bacc_te:.4f}")
print(f"acc train={acc_tr:.4f} | test={acc_te:.4f}")
print(f"F1_w test={f1w_te:.4f} | F1(Blazar) test={f1blz_te:.4f}")
print(f"GAP macro-F1 (train-test) = {f1m_tr - f1m_te:.4f}")
cm_tr = confusion_matrix(y_train_enc, yhat_tr)
cm_te = confusion_matrix(y_test_enc, yhat_te)
print("\nMatriz de confusión (TRAIN)")
print(pd.DataFrame(cm_tr, index=labels, columns=labels))
print("\nMatriz de confusión (TEST)")
print(pd.DataFrame(cm_te, index=labels, columns=labels))
print("\nClassification report (TRAIN)")
print(classification_report(y_train_enc, yhat_tr, target_names=labels, zero_division=0))
print("\nClassification report (TEST)")
print(classification_report(y_test_enc, yhat_te, target_names=labels, zero_division=0))
subtitle = (
f"Acc train={acc_tr:.3f} | Acc test={acc_te:.3f} | "
f"F1w train={f1_score(y_train_enc, yhat_tr, average='weighted', zero_division=0):.3f} | "
f"F1w test={f1w_te:.3f}"
)
outpath = os.path.join(downloads, f"XGB_IISIG_FIRMA_REALES_{i+1}.png")
save_confusion_train_test(
cm_tr, cm_te, labels,
outpath=outpath,
title_prefix=f"XGB_{i+1} (IISIG firma, datos reales) | n_estimators={best_n}",
subtitle=subtitle,
gap_width=0.28,
wspace=0.15,
cmap="Greens"
)
print(f"✅ Guardado: {outpath}")
top5_summary.append({
"Modelo": f"XGB_{i+1}",
"best_n": best_n,
"oof_macroF1": row["oof_macroF1"],
"oof_f1_blazar": row["oof_f1_blazar"],
"fold_f1m_std": row["fold_f1m_std"],
"Acc_test": acc_te,
"F1_w_test": f1w_te,
"macroF1_test": f1m_te,
"bacc_test": bacc_te,
})
EVAL_T1 = time.time()
TOTAL_T1 = time.time()
print("\nTiempo eval TOP-5 (hh:mm:ss):", time.strftime("%H:%M:%S", time.gmtime(EVAL_T1 - EVAL_T0)))
print("Tiempo TOTAL (hh:mm:ss):", time.strftime("%H:%M:%S", time.gmtime(TOTAL_T1 - TOTAL_T0)))
# ============================================================
# 7) Extra: Build LaTeX-ready table metrics for TOP-5
# ============================================================
def cv_auc_gap(params, X, y, cv):
auc_tr, auc_va = [], []
for tr_idx, va_idx in cv.split(X, y):
X_tr_raw, X_va_raw = X[tr_idx], X[va_idx]
y_tr, y_va = y[tr_idx], y[va_idx]
X_tr, X_va, _, _ = preprocess_fit_transform(X_tr_raw, X_va_raw)
w_tr = make_sample_weight(y_tr, class_weight_dict)
w_va = make_sample_weight(y_va, class_weight_dict)
model = fit_xgb_one_fold(params, X_tr, y_tr, X_va, y_va, w_tr=w_tr, w_va=w_va)
p_tr = model.predict_proba(X_tr)
p_va = model.predict_proba(X_va)
auc_tr.append(roc_auc_score(y_tr, p_tr, multi_class="ovr", average="weighted"))
auc_va.append(roc_auc_score(y_va, p_va, multi_class="ovr", average="weighted"))
mean_va = float(np.mean(auc_va))
std_va = float(np.std(auc_va))
gap = float(np.mean(auc_tr) - mean_va)
return mean_va, std_va, gap
cv_5 = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
cv_5x2 = RepeatedStratifiedKFold(n_splits=5, n_repeats=2, random_state=42)
LATEX_T0 = time.time()
latex_rows = []
for i, row in tqdm(list(topk.iterrows()), total=len(topk), desc="Building LaTeX table metrics"):
params = row["params"]
auc_cv, sd_cv, gap_cv = cv_auc_gap(params, X_train_np, y_train_enc, cv_5)
auc_rep, sd_rep, gap_rep = cv_auc_gap(params, X_train_np, y_train_enc, cv_5x2)
acc_test = top5_summary[i]["Acc_test"]
f1w_test = top5_summary[i]["F1_w_test"]
latex_rows.append({
"Modelo": top5_summary[i]["Modelo"],
"AUC_CV": auc_cv,
"SD_CV": sd_cv,
"Gap_CV": gap_cv,
"AUC_rep": auc_rep,
"SD_rep": sd_rep,
"Gap_rep": gap_rep,
"Acc_test": acc_test,
"F1_w_test": f1w_test,
})
LATEX_T1 = time.time()
latex_df = pd.DataFrame(latex_rows)
print("\nTiempo métricas tabla LaTeX (hh:mm:ss):", time.strftime("%H:%M:%S", time.gmtime(LATEX_T1 - LATEX_T0)))
print("\nTabla resumen (para elegir mejor modelo):")
print(latex_df.sort_values(["Acc_test", "F1_w_test"], ascending=False).to_string(index=False))
print("\n--- LaTeX rows (pegables dentro de tu tabular) ---")
for _, r in latex_df.iterrows():
print(
f"{r['Modelo']} & "
f"{r['AUC_CV']:.4f} & {r['SD_CV']:.4f} & {r['Gap_CV']:.4f} & "
f"{r['AUC_rep']:.4f} & {r['SD_rep']:.4f} & {r['Gap_rep']:.4f} & "
f"{r['Acc_test']:.4f} & {r['F1_w_test']:.4f} \\\\"
)