Files
EDSS-calc/audit.py
2026-02-23 18:19:50 +01:00

2372 lines
86 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# %% Confirm EDSS missing
import pandas as pd
import numpy as np
def clean_series(s):
return s.astype(str).str.strip().str.lower()
def gt_edss_audit(ground_truth_path, edss_col="EDSS"):
df_gt = pd.read_csv(ground_truth_path, sep=';')
# normalize keys
df_gt['unique_id'] = clean_series(df_gt['unique_id'])
df_gt['MedDatum'] = clean_series(df_gt['MedDatum'])
df_gt['key'] = df_gt['unique_id'] + "_" + df_gt['MedDatum']
print("GT rows:", len(df_gt))
print("GT unique keys:", df_gt['key'].nunique())
# IMPORTANT: parse EDSS robustly (German decimal commas etc.)
if edss_col in df_gt.columns:
edss_raw = df_gt[edss_col]
edss_num = pd.to_numeric(
edss_raw.astype(str).str.replace(",", ".", regex=False).str.strip(),
errors="coerce"
)
df_gt["_edss_num"] = edss_num
print(f"GT missing EDSS look (numeric-coerce): {df_gt['_edss_num'].isna().sum()}")
print(f"GT missing EDSS unique keys: {df_gt.loc[df_gt['_edss_num'].isna(), 'key'].nunique()}")
# duplicates on key
dup = df_gt['key'].duplicated(keep=False)
print("GT duplicate-key rows:", dup.sum())
if dup.any():
# how many duplicate keys exist?
print("GT duplicate keys:", df_gt.loc[dup, 'key'].nunique())
# of duplicate-key rows, how many have missing EDSS?
print("Duplicate-key rows with missing EDSS:", df_gt.loc[dup, "_edss_num"].isna().sum())
# show the worst offenders
print("\nTop duplicate keys (by count):")
print(df_gt.loc[dup, 'key'].value_counts().head(10))
else:
print(f"EDSS column '{edss_col}' not found in GT columns:", df_gt.columns.tolist())
return df_gt
df_gt = gt_edss_audit("/home/shahin/Lab/Doktorarbeit/Barcelona/Data/GT_Numbers.csv", edss_col="EDSS")
##
# %% trace missing ones
import json, glob, os
import pandas as pd
def load_preds(json_dir_path):
all_preds = []
for file_path in glob.glob(os.path.join(json_dir_path, "*.json")):
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
file_name = os.path.basename(file_path)
for entry in data:
if entry.get("success"):
res = entry["result"]
all_preds.append({
"unique_id": str(res.get("unique_id")).strip().lower(),
"MedDatum": str(res.get("MedDatum")).strip().lower(),
"file": file_name
})
df_pred = pd.DataFrame(all_preds)
df_pred["key"] = df_pred["unique_id"] + "_" + df_pred["MedDatum"]
return df_pred
df_pred = load_preds("/home/shahin/Lab/Doktorarbeit/Barcelona/Data/iteration")
print("Pred rows:", len(df_pred))
print("Pred unique keys:", df_pred["key"].nunique())
# Suppose df_gt was returned from step 1 and has _edss_num + key
missing_gt_keys = set(df_gt.loc[df_gt["_edss_num"].isna(), "key"])
df_pred["gt_key_missing_edss"] = df_pred["key"].isin(missing_gt_keys)
print("Pred rows whose GT key has missing EDSS:", df_pred["gt_key_missing_edss"].sum())
print("Unique keys (among preds) whose GT EDSS missing:", df_pred.loc[df_pred["gt_key_missing_edss"], "key"].nunique())
print("\nTop files contributing to missing-GT-EDSS rows:")
print(df_pred.loc[df_pred["gt_key_missing_edss"], "file"].value_counts().head(20))
print("\nTop keys replicated in predictions (why count inflates):")
print(df_pred.loc[df_pred["gt_key_missing_edss"], "key"].value_counts().head(20))
##
# %% verify
merged = df_pred.merge(
df_gt[["key", "_edss_num"]], # use the numeric-coerced GT EDSS
on="key",
how="left",
validate="many_to_one" # will ERROR if GT has duplicate keys (GOOD!)
)
print("Merged rows:", len(merged))
print("Merged missing GT EDSS:", merged["_edss_num"].isna().sum())
##
# %% 1json (rewritten with robust parsing + detailed data log)
import pandas as pd
import numpy as np
import json
import matplotlib.pyplot as plt
import seaborn as sns
from matplotlib.patches import Patch
from matplotlib.lines import Line2D
def plot_single_json_error_analysis_with_log(
json_file_path,
ground_truth_path,
edss_gt_col="EDSS",
min_bin_count=5,
):
def norm_str(x):
# normalize identifiers and dates consistently
return str(x).strip().lower()
def parse_edss(x):
# robust numeric parse: handles "3,5" as 3.5, blanks, "nan", etc.
if x is None:
return np.nan
s = str(x).strip()
if s == "" or s.lower() in {"nan", "none", "null"}:
return np.nan
s = s.replace(",", ".")
return pd.to_numeric(s, errors="coerce")
print("\n" + "="*80)
print("SINGLE-JSON ERROR ANALYSIS (WITH LOG)")
print("="*80)
print(f"JSON: {json_file_path}")
print(f"GT: {ground_truth_path}")
# ------------------------------------------------------------------
# 1) Load Ground Truth
# ------------------------------------------------------------------
df_gt = pd.read_csv(ground_truth_path, sep=";")
required_gt_cols = {"unique_id", "MedDatum", edss_gt_col}
missing_cols = required_gt_cols - set(df_gt.columns)
if missing_cols:
raise ValueError(f"GT is missing required columns: {missing_cols}. Available: {df_gt.columns.tolist()}")
df_gt["unique_id"] = df_gt["unique_id"].map(norm_str)
df_gt["MedDatum"] = df_gt["MedDatum"].map(norm_str)
df_gt["key"] = df_gt["unique_id"] + "_" + df_gt["MedDatum"]
# Robust EDSS parsing (important!)
df_gt["EDSS_gt"] = df_gt[edss_gt_col].map(parse_edss)
# GT logs
print("\n--- GT LOG ---")
print(f"GT rows: {len(df_gt)}")
print(f"GT unique keys: {df_gt['key'].nunique()}")
gt_dup = df_gt["key"].duplicated(keep=False).sum()
print(f"GT duplicate-key rows: {gt_dup}")
print(f"GT missing EDSS (numeric): {df_gt['EDSS_gt'].isna().sum()}")
print(f"GT missing EDSS unique keys: {df_gt.loc[df_gt['EDSS_gt'].isna(), 'key'].nunique()}")
if gt_dup > 0:
print("\n[WARNING] GT has duplicate keys. Merge can duplicate rows. Example duplicate keys:")
print(df_gt.loc[df_gt["key"].duplicated(keep=False), "key"].value_counts().head(10))
# ------------------------------------------------------------------
# 2) Load Predictions from the specific JSON
# ------------------------------------------------------------------
with open(json_file_path, "r", encoding="utf-8") as f:
data = json.load(f)
total_entries = len(data)
success_entries = sum(1 for e in data if e.get("success"))
all_preds = []
skipped = {
"not_success": 0,
"missing_uid_or_date": 0,
"missing_edss": 0,
"missing_conf": 0,
}
for entry in data:
if not entry.get("success"):
skipped["not_success"] += 1
continue
res = entry.get("result", {})
uid = res.get("unique_id")
md = res.get("MedDatum")
if uid is None or md is None or str(uid).strip() == "" or str(md).strip() == "":
skipped["missing_uid_or_date"] += 1
continue
edss_pred = parse_edss(res.get("EDSS"))
conf = pd.to_numeric(res.get("certainty_percent"), errors="coerce")
if pd.isna(edss_pred):
skipped["missing_edss"] += 1
if pd.isna(conf):
skipped["missing_conf"] += 1
all_preds.append({
"unique_id": norm_str(uid),
"MedDatum": norm_str(md),
"key": norm_str(uid) + "_" + norm_str(md),
"EDSS_pred": edss_pred,
"confidence": conf,
})
df_pred = pd.DataFrame(all_preds)
# Pred logs
print("\n--- PRED LOG ---")
print(f"JSON total entries: {total_entries}")
print(f"JSON success entries: {success_entries}")
print(f"Pred rows loaded (success + has keys): {len(df_pred)}")
if len(df_pred) == 0:
print("[ERROR] No usable prediction rows found. Nothing to plot.")
return
print(f"Pred unique keys: {df_pred['key'].nunique()}")
print(f"Pred missing EDSS (numeric): {df_pred['EDSS_pred'].isna().sum()}")
print(f"Pred missing confidence: {df_pred['confidence'].isna().sum()}")
print("Skipped counts:", skipped)
# Are keys duplicated within this JSON? (often yes if multiple notes map to same key)
key_counts = df_pred["key"].value_counts()
dup_pred_rows = (key_counts > 1).sum()
max_rep = int(key_counts.max())
print(f"Keys with >1 prediction in this JSON: {dup_pred_rows}")
print(f"Max repetitions of a single key in this JSON: {max_rep}")
if max_rep > 1:
print("Top repeated keys in this JSON:")
print(key_counts.head(10))
# ------------------------------------------------------------------
# 3) Merge (and diagnose why rows drop)
# ------------------------------------------------------------------
# Diagnose how many pred keys exist in GT
gt_key_set = set(df_gt["key"])
df_pred["key_in_gt"] = df_pred["key"].isin(gt_key_set)
not_in_gt = df_pred.loc[~df_pred["key_in_gt"]]
print("\n--- KEY MATCH LOG ---")
print(f"Pred rows with key found in GT: {df_pred['key_in_gt'].sum()} / {len(df_pred)}")
print(f"Pred rows with key NOT found in GT: {len(not_in_gt)}")
if len(not_in_gt) > 0:
print("[WARNING] Some prediction keys are not present in GT. First 10:")
print(not_in_gt[["unique_id", "MedDatum", "key"]].head(10))
# Now merge; we expect GT is one-to-many with pred (many_to_one)
# If GT had duplicates, validate would raise.
df_merged = df_pred.merge(
df_gt[["key", "EDSS_gt"]],
on="key",
how="inner",
validate="many_to_one"
)
print("\n--- MERGE LOG ---")
print(f"Merged rows (inner join): {len(df_merged)}")
print(f"Merged unique keys: {df_merged['key'].nunique()}")
print(f"Merged missing GT EDSS: {df_merged['EDSS_gt'].isna().sum()}")
print(f"Merged missing pred EDSS: {df_merged['EDSS_pred'].isna().sum()}")
print(f"Merged missing confidence:{df_merged['confidence'].isna().sum()}")
# How many rows will be removed by dropna() in your old code?
# Old code did .dropna() on ALL columns, which can remove rows for missing confidence too.
rows_complete = df_merged.dropna(subset=["EDSS_gt", "EDSS_pred", "confidence"])
print("\n--- FILTER LOG (what will be used for stats/plot) ---")
print(f"Rows with all required fields (EDSS_gt, EDSS_pred, confidence): {len(rows_complete)}")
if len(rows_complete) == 0:
print("[ERROR] No complete rows after filtering. Nothing to plot.")
return
# Compute abs error
rows_complete = rows_complete.copy()
rows_complete["abs_error"] = (rows_complete["EDSS_pred"] - rows_complete["EDSS_gt"]).abs()
# ------------------------------------------------------------------
# 4) Binning + stats (with guardrails)
# ------------------------------------------------------------------
bins = [0, 70, 80, 90, 100]
labels = ["Low (<70%)", "Moderate (70-80%)", "High (80-90%)", "Very High (90-100%)"]
# Confidence outside bins becomes NaN; log it
rows_complete["conf_bin"] = pd.cut(rows_complete["confidence"], bins=bins, labels=labels, include_lowest=True)
conf_outside = rows_complete["conf_bin"].isna().sum()
print(f"Rows with confidence outside [0,100] or outside bin edges: {conf_outside}")
if conf_outside > 0:
print("Example confidences outside bins:")
print(rows_complete.loc[rows_complete["conf_bin"].isna(), "confidence"].head(20).to_list())
df_plot = rows_complete.dropna(subset=["conf_bin"])
stats = (
df_plot.groupby("conf_bin", observed=True)["abs_error"]
.agg(mean="mean", std="std", count="count")
.reindex(labels)
.reset_index()
)
print("\n--- BIN STATS ---")
print(stats)
# Warn about low counts
low_bins = stats.loc[stats["count"].fillna(0) < min_bin_count, ["conf_bin", "count"]]
if not low_bins.empty:
print(f"\n[WARNING] Some bins have < {min_bin_count} rows; error bars/trend may be unstable:")
print(low_bins)
# ------------------------------------------------------------------
# 5) Plot
# ------------------------------------------------------------------
plt.figure(figsize=(13, 8))
colors = sns.color_palette("Blues", n_colors=len(labels))
# Replace NaNs in mean for plotting bars (empty bins)
means = stats["mean"].to_numpy()
counts = stats["count"].fillna(0).astype(int).to_numpy()
stds = stats["std"].to_numpy()
# For bins with no data, bar height 0 (and no errorbar)
means_plot = np.nan_to_num(means, nan=0.0)
bars = plt.bar(labels, means_plot, color=colors, edgecolor="black", alpha=0.85)
# Error bars only where count>1 and std is not NaN
sem = np.where((counts > 1) & (~np.isnan(stds)), stds / np.sqrt(counts), np.nan)
plt.errorbar(labels, means_plot, yerr=sem, fmt="none", c="black", capsize=8, elinewidth=1.5)
# Trend line only if at least 2 non-empty bins
valid_idx = np.where(~np.isnan(means))[0]
if len(valid_idx) >= 2:
x_idx = np.arange(len(labels))
z = np.polyfit(valid_idx, means[valid_idx], 1)
p = np.poly1d(z)
plt.plot(x_idx, p(x_idx), color="#e74c3c", linestyle="--", linewidth=3, zorder=5)
trend_label = "Trend Line"
else:
trend_label = "Trend Line (insufficient bins)"
print("\n[INFO] Not enough non-empty bins to fit a trend line.")
# Data labels
for i, bar in enumerate(bars):
n_count = int(counts[i])
mae_val = means[i]
if np.isnan(mae_val) or n_count == 0:
txt = "empty"
y = 0.02
else:
txt = f"MAE: {mae_val:.2f}\nn={n_count}"
y = bar.get_height() + 0.04
plt.text(
bar.get_x() + bar.get_width()/2,
y,
txt,
ha="center",
va="bottom",
fontweight="bold",
fontsize=10
)
# Legend
legend_elements = [
Patch(facecolor=colors[0], edgecolor="black", label=f"Bin 1: {labels[0]}"),
Patch(facecolor=colors[1], edgecolor="black", label=f"Bin 2: {labels[1]}"),
Patch(facecolor=colors[2], edgecolor="black", label=f"Bin 3: {labels[2]}"),
Patch(facecolor=colors[3], edgecolor="black", label=f"Bin 4: {labels[3]}"),
Line2D([0], [0], color="#e74c3c", linestyle="--", lw=3, label=trend_label),
Line2D([0], [0], color="black", marker="_", linestyle="None", markersize=10, label="Std Error (SEM)"),
Patch(color="none", label="Metric: Mean Absolute Error (MAE)")
]
plt.legend(handles=legend_elements, loc="upper right", frameon=True, shadow=True, title="Legend")
plt.title("Validation: Confidence vs. Error Magnitude (Single JSON)", fontsize=15, pad=30)
plt.ylabel("Mean Absolute Error (EDSS Points)", fontsize=12)
plt.xlabel("LLM Confidence Bracket", fontsize=12)
plt.grid(axis="y", linestyle=":", alpha=0.5)
ymax = np.nanmax(means) if np.any(~np.isnan(means)) else 0.0
plt.ylim(0, max(0.5, float(ymax) + 0.6))
plt.tight_layout()
plt.show()
print("\n" + "="*80)
print("DONE")
print("="*80)
# --- RUN ---
json_path = "/home/shahin/Lab/Doktorarbeit/Barcelona/Data/iteration/MS_Briefe_400_with_unique_id_SHA3_explore_cleaned_unique_results_iter_1_20260212_020628.json"
gt_path = "/home/shahin/Lab/Doktorarbeit/Barcelona/Data/GT_Numbers.csv"
plot_single_json_error_analysis_with_log(json_path, gt_path)
##
# %% Certainty vs Delta (rewritten with robust parsing + detailed data loss logs)
import pandas as pd
import numpy as np
import json
import glob
import os
import matplotlib.pyplot as plt
import seaborn as sns
from matplotlib.patches import Patch
from matplotlib.lines import Line2D
def plot_confidence_vs_abs_error_with_log(
json_dir_path,
ground_truth_path,
edss_gt_col="EDSS",
min_bin_count=5,
include_lowest=True,
):
def norm_str(x):
return str(x).strip().lower()
def parse_edss(x):
# robust numeric parse: handles comma decimals and empty tokens
if x is None:
return np.nan
s = str(x).strip()
if s == "" or s.lower() in {"nan", "none", "null"}:
return np.nan
s = s.replace(",", ".")
return pd.to_numeric(s, errors="coerce")
print("\n" + "="*90)
print("CERTAINTY vs ABS ERROR (ALL JSONs) — WITH DATA LOSS LOG")
print("="*90)
print(f"JSON DIR: {json_dir_path}")
print(f"GT FILE: {ground_truth_path}")
# ------------------------------------------------------------------
# 1) Load GT
# ------------------------------------------------------------------
df_gt = pd.read_csv(ground_truth_path, sep=";")
required_gt_cols = {"unique_id", "MedDatum", edss_gt_col}
missing_cols = required_gt_cols - set(df_gt.columns)
if missing_cols:
raise ValueError(f"GT missing columns: {missing_cols}. Available: {df_gt.columns.tolist()}")
df_gt["unique_id"] = df_gt["unique_id"].map(norm_str)
df_gt["MedDatum"] = df_gt["MedDatum"].map(norm_str)
df_gt["key"] = df_gt["unique_id"] + "_" + df_gt["MedDatum"]
df_gt["EDSS_gt"] = df_gt[edss_gt_col].map(parse_edss)
# GT logs
print("\n--- GT LOG ---")
print(f"GT rows: {len(df_gt)}")
print(f"GT unique keys: {df_gt['key'].nunique()}")
gt_dup_rows = df_gt["key"].duplicated(keep=False).sum()
print(f"GT duplicate-key rows: {gt_dup_rows}")
print(f"GT missing EDSS (numeric): {df_gt['EDSS_gt'].isna().sum()}")
print(f"GT missing EDSS unique keys: {df_gt.loc[df_gt['EDSS_gt'].isna(), 'key'].nunique()}")
if gt_dup_rows > 0:
print("\n[WARNING] GT has duplicate keys; merge can explode rows. Top duplicate keys:")
print(df_gt.loc[df_gt["key"].duplicated(keep=False), "key"].value_counts().head(10))
gt_key_set = set(df_gt["key"])
# ------------------------------------------------------------------
# 2) Load predictions from all JSON files (with per-file logs)
# ------------------------------------------------------------------
json_files = sorted(glob.glob(os.path.join(json_dir_path, "*.json")))
if not json_files:
raise FileNotFoundError(f"No JSON files found in: {json_dir_path}")
all_preds = []
per_file_summary = []
total_entries_all = 0
total_success_all = 0
skipped_all = {"not_success": 0, "missing_uid_or_date": 0}
for file_path in json_files:
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
total_entries = len(data)
success_entries = sum(1 for e in data if e.get("success"))
total_entries_all += total_entries
total_success_all += success_entries
skipped = {"not_success": 0, "missing_uid_or_date": 0}
loaded_rows = 0
for entry in data:
if not entry.get("success"):
skipped["not_success"] += 1
continue
res = entry.get("result", {})
uid = res.get("unique_id")
md = res.get("MedDatum")
if uid is None or md is None or str(uid).strip() == "" or str(md).strip() == "":
skipped["missing_uid_or_date"] += 1
continue
all_preds.append({
"file": os.path.basename(file_path),
"unique_id": norm_str(uid),
"MedDatum": norm_str(md),
"key": norm_str(uid) + "_" + norm_str(md),
"EDSS_pred": parse_edss(res.get("EDSS")),
"confidence": pd.to_numeric(res.get("certainty_percent"), errors="coerce"),
})
loaded_rows += 1
skipped_all["not_success"] += skipped["not_success"]
skipped_all["missing_uid_or_date"] += skipped["missing_uid_or_date"]
per_file_summary.append({
"file": os.path.basename(file_path),
"entries_total": total_entries,
"entries_success": success_entries,
"pred_rows_loaded": loaded_rows,
"skipped_not_success": skipped["not_success"],
"skipped_missing_uid_or_date": skipped["missing_uid_or_date"],
})
df_pred = pd.DataFrame(all_preds)
df_file = pd.DataFrame(per_file_summary)
# PRED logs
print("\n--- PRED LOG (ALL FILES) ---")
print(f"JSON files found: {len(json_files)}")
print(f"Total JSON entries: {total_entries_all}")
print(f"Total success entries:{total_success_all}")
print(f"Pred rows loaded (success + has keys): {len(df_pred)}")
if len(df_pred) == 0:
print("[ERROR] No usable prediction rows found. Nothing to plot.")
return
print(f"Pred unique keys (across all files): {df_pred['key'].nunique()}")
print(f"Pred missing EDSS (numeric): {df_pred['EDSS_pred'].isna().sum()}")
print(f"Pred missing confidence: {df_pred['confidence'].isna().sum()}")
print("Skipped totals:", skipped_all)
# show per-file quick check (useful when one iteration is broken)
print("\nPer-file loaded rows (head):")
print(df_file.sort_values("file").head(10))
# ------------------------------------------------------------------
# 3) Key match log (pred -> GT)
# ------------------------------------------------------------------
df_pred["key_in_gt"] = df_pred["key"].isin(gt_key_set)
not_in_gt = df_pred.loc[~df_pred["key_in_gt"]]
print("\n--- KEY MATCH LOG ---")
print(f"Pred rows with key found in GT: {df_pred['key_in_gt'].sum()} / {len(df_pred)}")
print(f"Pred rows with key NOT in GT: {len(not_in_gt)}")
if len(not_in_gt) > 0:
print("[WARNING] Example keys not found in GT (first 10):")
print(not_in_gt[["file", "unique_id", "MedDatum", "key"]].head(10))
print("\n[WARNING] Files contributing most to key-mismatch:")
print(not_in_gt["file"].value_counts().head(10))
# ------------------------------------------------------------------
# 4) Merge (no dropna yet) + detailed data loss accounting
# ------------------------------------------------------------------
df_merged = df_pred.merge(
df_gt[["key", "EDSS_gt"]],
on="key",
how="inner",
validate="many_to_one" # catches GT duplicates
)
print("\n--- MERGE LOG ---")
print(f"Merged rows (inner join): {len(df_merged)}")
print(f"Merged unique keys: {df_merged['key'].nunique()}")
# Now quantify what you lose at each filter stage
n0 = len(df_merged)
miss_gt = df_merged["EDSS_gt"].isna()
miss_pred = df_merged["EDSS_pred"].isna()
miss_conf = df_merged["confidence"].isna()
print("\n--- MISSINGNESS IN MERGED ---")
print(f"Missing GT EDSS: {miss_gt.sum()}")
print(f"Missing Pred EDSS: {miss_pred.sum()}")
print(f"Missing Confidence: {miss_conf.sum()}")
# IMPORTANT: your old code used .dropna() with no subset => drops if ANY column is NaN.
# We'll replicate the intended logic explicitly and log counts.
df_complete = df_merged.dropna(subset=["EDSS_gt", "EDSS_pred", "confidence"])
n1 = len(df_complete)
print("\n--- FILTER LOG ---")
print(f"Rows before filtering: {n0}")
print(f"Rows after requiring EDSS_gt, EDSS_pred, confidence: {n1}")
print(f"Rows lost due to missing required fields: {n0 - n1}")
# Break down why rows were lost (overlap-aware)
lost_mask = df_merged[["EDSS_gt", "EDSS_pred", "confidence"]].isna().any(axis=1)
lost = df_merged.loc[lost_mask].copy()
if len(lost) > 0:
lost_reason = (
(lost["EDSS_gt"].isna()).astype(int).map({1:"GT",0:""}) +
(lost["EDSS_pred"].isna()).astype(int).map({1:"+PRED",0:""}) +
(lost["confidence"].isna()).astype(int).map({1:"+CONF",0:""})
)
lost["loss_reason"] = lost_reason.str.replace(r"^\+", "", regex=True).replace("", "UNKNOWN")
print("\nTop loss reasons (overlap-aware):")
print(lost["loss_reason"].value_counts().head(10))
print("\nFiles contributing most to lost rows:")
print(lost["file"].value_counts().head(10))
if len(df_complete) == 0:
print("[ERROR] No complete rows left after filtering. Nothing to plot.")
return
# ------------------------------------------------------------------
# 5) Abs error + binning
# ------------------------------------------------------------------
df_complete = df_complete.copy()
df_complete["abs_error"] = (df_complete["EDSS_pred"] - df_complete["EDSS_gt"]).abs()
bins = [0, 70, 80, 90, 100]
labels = ["Low (<70%)", "Moderate (70-80%)", "High (80-90%)", "Very High (90-100%)"]
df_complete["conf_bin"] = pd.cut(
df_complete["confidence"],
bins=bins,
labels=labels,
include_lowest=include_lowest
)
conf_outside = df_complete["conf_bin"].isna().sum()
print("\n--- BINNING LOG ---")
print(f"Rows with confidence outside bin edges / invalid: {conf_outside}")
if conf_outside > 0:
print("Example out-of-bin confidences:")
print(df_complete.loc[df_complete["conf_bin"].isna(), "confidence"].head(20).to_list())
df_plot = df_complete.dropna(subset=["conf_bin"])
print(f"Rows kept for bin stats/plot (after dropping out-of-bin): {len(df_plot)}")
print(f"Rows lost due to out-of-bin confidence: {len(df_complete) - len(df_plot)}")
stats = (
df_plot.groupby("conf_bin", observed=True)["abs_error"]
.agg(mean="mean", std="std", count="count")
.reindex(labels)
.reset_index()
)
print("\n--- BIN STATS ---")
print(stats)
low_bins = stats.loc[stats["count"].fillna(0) < min_bin_count, ["conf_bin", "count"]]
if not low_bins.empty:
print(f"\n[WARNING] Some bins have < {min_bin_count} rows (unstable SEM/trend):")
print(low_bins)
# ------------------------------------------------------------------
# 6) Plot
# ------------------------------------------------------------------
plt.figure(figsize=(12, 8))
colors = sns.color_palette("Blues", n_colors=len(labels))
means = stats["mean"].to_numpy()
counts = stats["count"].fillna(0).astype(int).to_numpy()
stds = stats["std"].to_numpy()
means_plot = np.nan_to_num(means, nan=0.0)
bars = plt.bar(labels, means_plot, color=colors, edgecolor="black", linewidth=1.2)
sem = np.where((counts > 1) & (~np.isnan(stds)), stds / np.sqrt(counts), np.nan)
plt.errorbar(labels, means_plot, yerr=sem, fmt="none", c="black", capsize=6, elinewidth=1.5)
# Trend line only if >=2 non-empty bins
valid_idx = np.where(~np.isnan(means))[0]
if len(valid_idx) >= 2:
x_idx = np.arange(len(labels))
z = np.polyfit(valid_idx, means[valid_idx], 1)
p = np.poly1d(z)
plt.plot(x_idx, p(x_idx), color="#e74c3c", linestyle="--", linewidth=2.5)
trend_label = "Correlation Trend"
else:
trend_label = "Correlation Trend (insufficient bins)"
print("\n[INFO] Not enough non-empty bins to fit a trend line.")
# Bar annotations (MAE + n)
for i, bar in enumerate(bars):
n = int(counts[i])
m = means[i]
if n == 0 or np.isnan(m):
txt = "empty"
y = 0.02
else:
txt = f"MAE: {m:.2f}\nn={n}"
y = bar.get_height() + 0.05
plt.text(bar.get_x() + bar.get_width()/2, y, txt, ha="center", fontweight="bold")
legend_elements = [
Patch(facecolor=colors[0], edgecolor="black", label=f"Bin 1: {labels[0]}"),
Patch(facecolor=colors[1], edgecolor="black", label=f"Bin 2: {labels[1]}"),
Patch(facecolor=colors[2], edgecolor="black", label=f"Bin 3: {labels[2]}"),
Patch(facecolor=colors[3], edgecolor="black", label=f"Bin 4: {labels[3]}"),
Line2D([0], [0], color="black", marker="_", linestyle="None", markersize=10, label="Standard Error (SEM)"),
Line2D([0], [0], color="#e74c3c", linestyle="--", lw=2.5, label=trend_label),
Patch(color="none", label="Metric: Mean Absolute Error (MAE)")
]
plt.legend(handles=legend_elements, loc="upper right", frameon=True, shadow=True, fontsize=10, title="Legend")
plt.title("Validation: Inverse Correlation of Confidence vs. Error Magnitude", fontsize=15, pad=20)
plt.ylabel("Mean Absolute Error (Δ EDSS Points)", fontsize=12)
plt.xlabel("LLM Confidence Bracket", fontsize=12)
plt.grid(axis="y", linestyle=":", alpha=0.5)
ymax = np.nanmax(means) if np.any(~np.isnan(means)) else 0.0
plt.ylim(0, max(0.5, float(ymax) + 0.6))
plt.tight_layout()
plt.show()
print("\n" + "="*90)
print("DONE")
print("="*90)
# Example run:
plot_confidence_vs_abs_error_with_log("/home/shahin/Lab/Doktorarbeit/Barcelona/Data/iteration", "/home/shahin/Lab/Doktorarbeit/Barcelona/Data/GT_Numbers.csv")
##
# %% Empirical Confidence
# Empirical stability confidence (from 10 runs) + LLM certainty_percent as secondary signal
# - Reads all JSONs in a folder (your 10 iterations)
# - Aggregates by key = unique_id + MedDatum
# - Computes:
# * EDSS_mean, EDSS_std, EDSS_iqr, mode/share
# * empirical_conf_0_100 (based on stability)
# * llm_conf_mean_0_100 (mean certainty_percent)
# * combined_conf_0_100 (weighted blend)
# - Optional: merges GT EDSS and computes abs error on the aggregated prediction
import os, glob, json
import numpy as np
import pandas as pd
def build_empirical_confidence_table(
json_dir_path: str,
ground_truth_path: str | None = None,
gt_sep: str = ";",
gt_edss_col: str = "EDSS",
w_empirical: float = 0.7, # weight for empirical stability
w_llm: float = 0.3, # weight for LLM self-reported confidence
tol_mode: float = 0.5, # tolerance to treat EDSS as "same" (EDSS often in 0.5 steps)
min_runs_expected: int = 10,
):
# -----------------------------
# Helpers
# -----------------------------
def norm_str(x):
return str(x).strip().lower()
def parse_number(x):
if x is None:
return np.nan
s = str(x).strip()
if s == "" or s.lower() in {"nan", "none", "null"}:
return np.nan
s = s.replace(",", ".")
return pd.to_numeric(s, errors="coerce")
def robust_iqr(x: pd.Series):
x = x.dropna()
if len(x) == 0:
return np.nan
return float(x.quantile(0.75) - x.quantile(0.25))
def stability_to_confidence(std_val: float) -> float:
"""
Map EDSS variability across runs to a 0..100 confidence.
EDSS is typically on 0.5 steps. A natural scale:
std ~= 0.0 -> ~100
std ~= 0.25 -> ~75-90
std ~= 0.5 -> ~50-70
std >= 1.0 -> low
Use a smooth exponential mapping.
"""
if np.isnan(std_val):
return np.nan
# scale parameter: std=0.5 -> exp(-1)=0.367 -> ~36.7
scale = 0.5
conf = 100.0 * np.exp(-(std_val / scale))
# clamp
return float(np.clip(conf, 0.0, 100.0))
def mode_share_with_tolerance(values: np.ndarray, tol: float) -> tuple[float, float]:
"""
Compute a 'mode' under tolerance: pick the cluster center (median) and count
how many values fall within +/- tol. Return (mode_center, share).
This is robust to tiny float differences.
"""
vals = values[~np.isnan(values)]
if len(vals) == 0:
return (np.nan, np.nan)
center = float(np.median(vals))
share = float(np.mean(np.abs(vals - center) <= tol))
return (center, share)
# -----------------------------
# Load predictions from all JSONs
# -----------------------------
json_files = sorted(glob.glob(os.path.join(json_dir_path, "*.json")))
if not json_files:
raise FileNotFoundError(f"No JSON files found in: {json_dir_path}")
rows = []
per_file = []
total_entries_all = 0
total_success_all = 0
skipped_all = {"not_success": 0, "missing_uid_or_date": 0}
for fp in json_files:
with open(fp, "r", encoding="utf-8") as f:
data = json.load(f)
total_entries = len(data)
success_entries = sum(1 for e in data if e.get("success"))
total_entries_all += total_entries
total_success_all += success_entries
skipped = {"not_success": 0, "missing_uid_or_date": 0}
loaded = 0
for entry in data:
if not entry.get("success"):
skipped["not_success"] += 1
continue
res = entry.get("result", {})
uid = res.get("unique_id")
md = res.get("MedDatum")
if uid is None or md is None or str(uid).strip() == "" or str(md).strip() == "":
skipped["missing_uid_or_date"] += 1
continue
edss = parse_number(res.get("EDSS"))
conf = parse_number(res.get("certainty_percent"))
it = res.get("iteration", None)
rows.append({
"file": os.path.basename(fp),
"iteration": it,
"unique_id": norm_str(uid),
"MedDatum": norm_str(md),
"key": norm_str(uid) + "_" + norm_str(md),
"EDSS_pred": edss,
"llm_conf": conf,
})
loaded += 1
skipped_all["not_success"] += skipped["not_success"]
skipped_all["missing_uid_or_date"] += skipped["missing_uid_or_date"]
per_file.append({
"file": os.path.basename(fp),
"entries_total": total_entries,
"entries_success": success_entries,
"rows_loaded": loaded,
"skipped_not_success": skipped["not_success"],
"skipped_missing_uid_or_date": skipped["missing_uid_or_date"],
})
df_pred = pd.DataFrame(rows)
df_file = pd.DataFrame(per_file)
# -----------------------------
# Logs: ingestion
# -----------------------------
print("\n" + "="*90)
print("EMPIRICAL CONFIDENCE (10-RUN STABILITY) + LLM CONFIDENCE (SECONDARY)")
print("="*90)
print(f"JSON DIR: {json_dir_path}")
print(f"JSON files: {len(json_files)}")
print("\n--- INGEST LOG ---")
print(f"Total JSON entries: {total_entries_all}")
print(f"Total success entries:{total_success_all}")
print(f"Pred rows loaded: {len(df_pred)}")
print(f"Unique keys in preds: {df_pred['key'].nunique() if len(df_pred) else 0}")
print(f"Missing EDSS_pred: {df_pred['EDSS_pred'].isna().sum() if len(df_pred) else 0}")
print(f"Missing llm_conf: {df_pred['llm_conf'].isna().sum() if len(df_pred) else 0}")
print("Skipped totals:", skipped_all)
print("\nPer-file summary (top 10 by name):")
print(df_file.sort_values("file").head(10))
# -----------------------------
# Aggregate by key (empirical stability)
# -----------------------------
if len(df_pred) == 0:
print("[ERROR] No usable prediction rows.")
return None
# how many runs per key (expect ~10)
runs_per_key = df_pred.groupby("key")["EDSS_pred"].size().rename("n_rows").reset_index()
print("\n--- RUNS PER KEY LOG ---")
print(f"Keys with at least 1 row: {len(runs_per_key)}")
print("Distribution of rows per key (value_counts):")
print(runs_per_key["n_rows"].value_counts().sort_index())
# Aggregate stats
def agg_block(g: pd.DataFrame):
ed = g["EDSS_pred"].to_numpy(dtype=float)
ll = g["llm_conf"].to_numpy(dtype=float)
n_rows = len(g)
n_edss = int(np.sum(~np.isnan(ed)))
n_llm = int(np.sum(~np.isnan(ll)))
ed_mean = float(np.nanmean(ed)) if n_edss else np.nan
ed_std = float(np.nanstd(ed, ddof=1)) if n_edss >= 2 else (0.0 if n_edss == 1 else np.nan)
ed_iqr = robust_iqr(pd.Series(ed))
mode_center, mode_share = mode_share_with_tolerance(ed, tol=tol_mode)
llm_mean = float(np.nanmean(ll)) if n_llm else np.nan
llm_std = float(np.nanstd(ll, ddof=1)) if n_llm >= 2 else (0.0 if n_llm == 1 else np.nan)
emp_conf = stability_to_confidence(ed_std) if not np.isnan(ed_std) else np.nan
# Combined confidence (weighted). If one side missing, fall back to the other.
if np.isnan(emp_conf) and np.isnan(llm_mean):
comb = np.nan
elif np.isnan(emp_conf):
comb = llm_mean
elif np.isnan(llm_mean):
comb = emp_conf
else:
comb = w_empirical * emp_conf + w_llm * llm_mean
return pd.Series({
"unique_id": g["unique_id"].iloc[0],
"MedDatum": g["MedDatum"].iloc[0],
"n_rows": n_rows,
"n_edss": n_edss,
"n_llm_conf":n_llm,
"EDSS_mean": ed_mean,
"EDSS_std": ed_std,
"EDSS_iqr": ed_iqr,
"EDSS_mode_center": mode_center,
"EDSS_mode_share": mode_share, # fraction within ±tol_mode of median center
"llm_conf_mean": llm_mean,
"llm_conf_std": llm_std,
"empirical_conf_0_100": emp_conf,
"combined_conf_0_100": float(np.clip(comb, 0.0, 100.0)) if not np.isnan(comb) else np.nan,
})
df_agg = df_pred.groupby("key", as_index=False).apply(agg_block)
# groupby+apply returns a multiindex sometimes depending on pandas version
if isinstance(df_agg.index, pd.MultiIndex):
df_agg = df_agg.reset_index(drop=True)
# Logs: aggregation + losses
print("\n--- AGGREGATION LOG ---")
print(f"Aggregated keys: {len(df_agg)}")
print(f"Keys with EDSS in >=1 run: {(df_agg['n_edss'] >= 1).sum()}")
print(f"Keys with EDSS in >=2 runs (std meaningful): {(df_agg['n_edss'] >= 2).sum()}")
print(f"Keys missing EDSS in all runs: {(df_agg['n_edss'] == 0).sum()}")
print(f"Keys missing llm_conf in all runs: {(df_agg['n_llm_conf'] == 0).sum()}")
# Expected runs check
if min_runs_expected is not None:
print(f"\nKeys with < {min_runs_expected} rows (potential missing iterations):")
print(df_agg.loc[df_agg["n_rows"] < min_runs_expected, ["key", "n_rows"]].sort_values("n_rows").head(20))
# -----------------------------
# Optional: merge GT and compute error on aggregated EDSS_mean
# -----------------------------
if ground_truth_path is not None:
df_gt = pd.read_csv(ground_truth_path, sep=gt_sep)
need = {"unique_id", "MedDatum", gt_edss_col}
miss = need - set(df_gt.columns)
if miss:
raise ValueError(f"GT missing columns: {miss}. Available: {df_gt.columns.tolist()}")
df_gt["unique_id"] = df_gt["unique_id"].map(norm_str)
df_gt["MedDatum"] = df_gt["MedDatum"].map(norm_str)
df_gt["key"] = df_gt["unique_id"] + "_" + df_gt["MedDatum"]
df_gt["EDSS_gt"] = df_gt[gt_edss_col].apply(parse_number)
print("\n--- GT MERGE LOG ---")
print(f"GT rows: {len(df_gt)} | GT unique keys: {df_gt['key'].nunique()}")
print(f"GT missing EDSS (numeric): {df_gt['EDSS_gt'].isna().sum()}")
df_final = df_agg.merge(df_gt[["key", "EDSS_gt"]], on="key", how="left", validate="one_to_one")
print(f"Aggregated keys with GT match: {df_final['EDSS_gt'].notna().sum()} / {len(df_final)}")
print(f"Aggregated keys missing GT EDSS: {df_final['EDSS_gt'].isna().sum()}")
df_final["abs_error_mean"] = (df_final["EDSS_mean"] - df_final["EDSS_gt"]).abs()
# How many keys usable for evaluation?
usable = df_final.dropna(subset=["EDSS_mean", "EDSS_gt"])
print("\n--- EVAL LOG (AGGREGATED) ---")
print(f"Keys with both EDSS_mean and EDSS_gt: {len(usable)}")
if len(usable) > 0:
print(f"MAE on EDSS_mean vs GT: {usable['abs_error_mean'].mean():.3f}")
print(f"Median abs error: {usable['abs_error_mean'].median():.3f}")
return df_final
return df_agg
# Example usage:
df = build_empirical_confidence_table(json_dir_path="/home/shahin/Lab/Doktorarbeit/Barcelona/Data/iteration", ground_truth_path="/home/shahin/Lab/Doktorarbeit/Barcelona/Data/GT_Numbers.csv", w_empirical=0.7, w_llm=0.3, tol_mode=0.5,min_runs_expected=10,)
df.to_csv("empirical_confidence_table.csv", index=False)
##
# %% Executive Boxplot
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib.patches import Patch
def plot_exec_boxplots(df, min_bin_size_warn=10):
"""
Two side-by-side boxplots:
- Left: abs_error_mean grouped by empirical_conf_0_100 quantile bins
- Right: abs_error_mean grouped by llm_conf_mean quantile bins
Adds:
- Robust qcut labeling (handles ties; bins may be < 4)
- Data logs + per-bin summary table printed
- Clear legend explaining each panel and what box elements mean
"""
need_cols = ["abs_error_mean", "empirical_conf_0_100", "llm_conf_mean"]
missing = [c for c in need_cols if c not in df.columns]
if missing:
raise ValueError(f"Missing columns in df: {missing}. Available: {df.columns.tolist()}")
d = df[need_cols].copy()
# -----------------------------
# Data logs: survivorship
# -----------------------------
d_emp = d.dropna(subset=["abs_error_mean", "empirical_conf_0_100"]).copy()
d_llm = d.dropna(subset=["abs_error_mean", "llm_conf_mean"]).copy()
print("\n" + "="*90)
print("EXECUTIVE BOXPLOTS — DATA LOG + SUMMARY")
print("="*90)
print(f"Total rows in df: {len(df)}")
print(f"Rows for empirical plot: {len(d_emp)} (dropped {len(df) - len(d_emp)})")
print(f"Rows for LLM plot: {len(d_llm)} (dropped {len(df) - len(d_llm)})")
if len(d_emp) == 0 or len(d_llm) == 0:
print("[ERROR] Not enough data after dropping NaNs to build both plots.")
return
# -----------------------------
# Robust quantile binning (handles ties)
# -----------------------------
# Empirical
emp_bins = pd.qcut(d_emp["empirical_conf_0_100"], q=4, duplicates="drop")
k_emp = emp_bins.cat.categories.size
emp_labels = [f"Q{i+1}" for i in range(k_emp)]
d_emp["emp_q"] = pd.qcut(d_emp["empirical_conf_0_100"], q=4, duplicates="drop", labels=emp_labels)
# LLM
llm_bins = pd.qcut(d_llm["llm_conf_mean"], q=4, duplicates="drop")
k_llm = llm_bins.cat.categories.size
llm_labels = [f"Q{i+1}" for i in range(k_llm)]
d_llm["llm_q"] = pd.qcut(d_llm["llm_conf_mean"], q=4, duplicates="drop", labels=llm_labels)
# Print bin edges (so you can discuss exact thresholds)
print("\n--- BIN EDGES (actual ranges) ---")
print("Empirical confidence bins:")
for i, interval in enumerate(emp_bins.cat.categories):
print(f" {emp_labels[i]}: {interval}")
print("LLM confidence bins:")
for i, interval in enumerate(llm_bins.cat.categories):
print(f" {llm_labels[i]}: {interval}")
# -----------------------------
# Summary tables (per bin)
# -----------------------------
def summarize_bins(df_in, bin_col, conf_col, label):
g = df_in.groupby(bin_col, observed=True).agg(
n=("abs_error_mean", "size"),
mae_mean=("abs_error_mean", "mean"),
mae_median=("abs_error_mean", "median"),
mae_q25=("abs_error_mean", lambda x: x.quantile(0.25)),
mae_q75=("abs_error_mean", lambda x: x.quantile(0.75)),
conf_mean=(conf_col, "mean"),
conf_median=(conf_col, "median"),
).reset_index().rename(columns={bin_col: "bin"})
g["panel"] = label
return g[["panel", "bin", "n", "mae_mean", "mae_median", "mae_q25", "mae_q75", "conf_mean", "conf_median"]]
summary_emp = summarize_bins(d_emp, "emp_q", "empirical_conf_0_100", "Empirical")
summary_llm = summarize_bins(d_llm, "llm_q", "llm_conf_mean", "LLM")
print("\n--- SUMMARY TABLE: Empirical confidence quartiles (or fewer if ties) ---")
print(summary_emp.to_string(index=False, float_format=lambda x: f"{x:.3f}"))
print("\n--- SUMMARY TABLE: LLM confidence quartiles (or fewer if ties) ---")
print(summary_llm.to_string(index=False, float_format=lambda x: f"{x:.3f}"))
# Warn about small bins
small_emp = summary_emp.loc[summary_emp["n"] < min_bin_size_warn, ["bin", "n"]]
small_llm = summary_llm.loc[summary_llm["n"] < min_bin_size_warn, ["bin", "n"]]
if not small_emp.empty or not small_llm.empty:
print(f"\n[WARNING] Some bins have < {min_bin_size_warn} points; compare them cautiously.")
if not small_emp.empty:
print(" Empirical small bins:")
print(small_emp.to_string(index=False))
if not small_llm.empty:
print(" LLM small bins:")
print(small_llm.to_string(index=False))
# -----------------------------
# Prepare data for boxplots
# -----------------------------
emp_cats = list(d_emp["emp_q"].cat.categories)
llm_cats = list(d_llm["llm_q"].cat.categories)
emp_groups = [d_emp.loc[d_emp["emp_q"] == q, "abs_error_mean"].values for q in emp_cats]
llm_groups = [d_llm.loc[d_llm["llm_q"] == q, "abs_error_mean"].values for q in llm_cats]
# -----------------------------
# Plot
# -----------------------------
fig, axes = plt.subplots(1, 2, figsize=(12, 5), sharey=True)
bp0 = axes[0].boxplot(emp_groups, labels=emp_cats, showfliers=False, patch_artist=True)
bp1 = axes[1].boxplot(llm_groups, labels=llm_cats, showfliers=False, patch_artist=True)
# Make panels visually distinct but still simple (no extra clutter)
for patch in bp0["boxes"]:
patch.set_alpha(0.6)
for patch in bp1["boxes"]:
patch.set_alpha(0.6)
axes[0].set_title("Error by Empirical Confidence (quantile bins)")
axes[0].set_xlabel("Empirical confidence bin")
axes[0].set_ylabel("Absolute Error (|EDSS_mean EDSS_gt|)")
axes[1].set_title("Error by LLM Confidence (quantile bins)")
axes[1].set_xlabel("LLM confidence bin")
for ax in axes:
ax.grid(axis="y", linestyle=":", alpha=0.5)
# -----------------------------
# Legend (simple, but useful)
# -----------------------------
legend_elements = [
Patch(facecolor="white", edgecolor="black", label="Box = IQR (25%75%)"),
Patch(facecolor="white", edgecolor="black", label="Center line = median"),
Patch(facecolor="white", edgecolor="black", label="Whiskers = typical range (no outliers shown)"),
Patch(facecolor="white", edgecolor="white", label="Left panel: empirical stability bins"),
Patch(facecolor="white", edgecolor="white", label="Right panel: LLM self-reported bins"),
]
fig.legend(handles=legend_elements, loc="upper center", ncol=3, frameon=True)
plt.tight_layout(rect=[0, 0, 1, 0.90])
plt.show()
print("\n" + "="*90)
print("DONE")
print("="*90)
# Example (complete):
df_final = build_empirical_confidence_table(
json_dir_path="/home/shahin/Lab/Doktorarbeit/Barcelona/Data/iteration",
ground_truth_path="/home/shahin/Lab/Doktorarbeit/Barcelona/Data/GT_Numbers.csv",
w_empirical=0.7,
w_llm=0.3,
tol_mode=0.5,
min_runs_expected=10,
)
plot_exec_boxplots(df_final)
##
# %% Scatter
import os, json
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
def scatter_abs_error_by_conf_bins_single_json(
json_file_path,
ground_truth_path,
gt_sep=";",
gt_edss_col="EDSS",
):
def norm_str(x):
return str(x).strip().lower()
def parse_edss(x):
if x is None:
return np.nan
s = str(x).strip()
if s == "" or s.lower() in {"nan", "none", "null"}:
return np.nan
s = s.replace(",", ".")
return pd.to_numeric(s, errors="coerce")
# ---- Load GT
df_gt = pd.read_csv(ground_truth_path, sep=gt_sep)
df_gt["unique_id"] = df_gt["unique_id"].map(norm_str)
df_gt["MedDatum"] = df_gt["MedDatum"].map(norm_str)
df_gt["key"] = df_gt["unique_id"] + "_" + df_gt["MedDatum"]
df_gt["EDSS_gt"] = df_gt[gt_edss_col].map(parse_edss)
# ---- Load preds from JSON
with open(json_file_path, "r", encoding="utf-8") as f:
data = json.load(f)
rows = []
for entry in data:
if not entry.get("success"):
continue
res = entry.get("result", {})
uid, md = res.get("unique_id"), res.get("MedDatum")
if uid is None or md is None or str(uid).strip() == "" or str(md).strip() == "":
continue
rows.append({
"key": norm_str(uid) + "_" + norm_str(md),
"EDSS_pred": parse_edss(res.get("EDSS")),
"confidence": pd.to_numeric(res.get("certainty_percent"), errors="coerce"),
})
df_pred = pd.DataFrame(rows)
# ---- Merge + filter
df = df_pred.merge(df_gt[["key", "EDSS_gt"]], on="key", how="inner", validate="many_to_one")
df = df.dropna(subset=["EDSS_gt", "EDSS_pred", "confidence"]).copy()
df["abs_error"] = (df["EDSS_pred"] - df["EDSS_gt"]).abs()
# ---- Bin confidence into 4 categories
bins = [0, 70, 80, 90, 100]
labels = ["Low (<70%)", "Moderate (70-80%)", "High (80-90%)", "Very High (90-100%)"]
df["conf_bin"] = pd.cut(df["confidence"], bins=bins, labels=labels, include_lowest=True)
df = df.dropna(subset=["conf_bin"]).copy()
# ---- Logs
print("\n--- BIN COUNTS (points plotted) ---")
print(df["conf_bin"].value_counts().reindex(labels).fillna(0).astype(int))
print(f"Total points plotted: {len(df)}")
# ---- Scatter (categorical x with jitter)
x_map = {lab: i for i, lab in enumerate(labels)}
x = df["conf_bin"].map(x_map).astype(float).to_numpy()
jitter = np.random.uniform(-0.12, 0.12, size=len(df))
xj = x + jitter
plt.figure(figsize=(12, 6))
plt.scatter(xj, df["abs_error"].to_numpy(), alpha=0.55)
plt.xticks(range(len(labels)), labels)
plt.xlabel("certainty_percent category (Iteration 1)")
plt.ylabel("Absolute Error (|EDSS_pred EDSS_gt|)")
plt.title("Absolute Error vs LLM Confidence Category (Single JSON)")
plt.grid(axis="y", linestyle=":", alpha=0.5)
plt.tight_layout()
plt.show()
# --- RUN ---
scatter_abs_error_by_conf_bins_single_json(
json_file_path="/home/shahin/Lab/Doktorarbeit/Barcelona/Data/iteration/MS_Briefe_400_with_unique_id_SHA3_explore_cleaned_unique_results_iter_1_20260212_020628.json",
ground_truth_path="/home/shahin/Lab/Doktorarbeit/Barcelona/Data/GT_Numbers.csv",
)
##
# %% Boxplot2
# Boxplot + light jittered points
# - Single JSON (iteration 1)
# - X: confidence bin (<70, 70-80, 80-90, 90-100)
# - Y: absolute error
# - Legend includes n per bin
import json
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib.patches import Patch
def boxplot_with_jitter_abs_error_by_conf_bins_single_json(
json_file_path,
ground_truth_path,
gt_sep=";",
gt_edss_col="EDSS",
jitter_width=0.12,
point_alpha=0.25,
show_outliers=False,
):
def norm_str(x):
return str(x).strip().lower()
def parse_edss(x):
if x is None:
return np.nan
s = str(x).strip()
if s == "" or s.lower() in {"nan", "none", "null"}:
return np.nan
s = s.replace(",", ".")
return pd.to_numeric(s, errors="coerce")
# ---- Load GT
df_gt = pd.read_csv(ground_truth_path, sep=gt_sep)
for col in ["unique_id", "MedDatum", gt_edss_col]:
if col not in df_gt.columns:
raise ValueError(f"GT missing column '{col}'. Available: {df_gt.columns.tolist()}")
df_gt["unique_id"] = df_gt["unique_id"].map(norm_str)
df_gt["MedDatum"] = df_gt["MedDatum"].map(norm_str)
df_gt["key"] = df_gt["unique_id"] + "_" + df_gt["MedDatum"]
df_gt["EDSS_gt"] = df_gt[gt_edss_col].map(parse_edss)
# ---- Load preds from JSON
with open(json_file_path, "r", encoding="utf-8") as f:
data = json.load(f)
rows = []
for entry in data:
if not entry.get("success"):
continue
res = entry.get("result", {})
uid, md = res.get("unique_id"), res.get("MedDatum")
if uid is None or md is None or str(uid).strip() == "" or str(md).strip() == "":
continue
rows.append({
"key": norm_str(uid) + "_" + norm_str(md),
"EDSS_pred": parse_edss(res.get("EDSS")),
"confidence": pd.to_numeric(res.get("certainty_percent"), errors="coerce"),
})
df_pred = pd.DataFrame(rows)
# ---- Merge + filter
df = df_pred.merge(df_gt[["key", "EDSS_gt"]], on="key", how="inner", validate="many_to_one")
df = df.dropna(subset=["EDSS_gt", "EDSS_pred", "confidence"]).copy()
df["abs_error"] = (df["EDSS_pred"] - df["EDSS_gt"]).abs()
# ---- Bin confidence
bins = [0, 70, 80, 90, 100]
labels = ["Low (<70%)", "Moderate (70-80%)", "High (80-90%)", "Very High (90-100%)"]
df["conf_bin"] = pd.cut(df["confidence"], bins=bins, labels=labels, include_lowest=True)
df = df.dropna(subset=["conf_bin"]).copy()
# ---- Prepare per-bin arrays
bin_arrays = [df.loc[df["conf_bin"] == lab, "abs_error"].to_numpy() for lab in labels]
n_counts = [len(a) for a in bin_arrays]
# ---- Plot
fig, ax = plt.subplots(figsize=(12, 6))
# Boxplot (no fliers by default to reduce clutter)
bp = ax.boxplot(
bin_arrays,
labels=labels,
showfliers=show_outliers,
patch_artist=True,
widths=0.55,
)
# Light fill for boxes (no explicit color choices required)
for b in bp["boxes"]:
b.set_alpha(0.35)
# Jittered points on top
for i, arr in enumerate(bin_arrays, start=1):
if len(arr) == 0:
continue
x = np.full(len(arr), i, dtype=float)
x += np.random.uniform(-jitter_width, jitter_width, size=len(arr))
ax.scatter(x, arr, alpha=point_alpha, s=18)
ax.set_title("Absolute Error by LLM Confidence Bin (Iteration 1)")
ax.set_xlabel("certainty_percent category")
ax.set_ylabel("Absolute Error (|EDSS_pred EDSS_gt|)")
ax.grid(axis="y", linestyle=":", alpha=0.5)
# Legend showing n per bin
legend_handles = [
Patch(facecolor="white", edgecolor="black", label=f"{lab}: n={n}")
for lab, n in zip(labels, n_counts)
]
ax.legend(handles=legend_handles, title="Bin counts", loc="upper right", frameon=True)
plt.tight_layout()
plt.show()
# Print counts too (useful for discussion)
print("\n--- BIN COUNTS (points plotted) ---")
for lab, n in zip(labels, n_counts):
print(f"{lab:>18}: n={n}")
print(f"Total points plotted: {sum(n_counts)}")
# Example run:
boxplot_with_jitter_abs_error_by_conf_bins_single_json(
json_file_path="/home/shahin/Lab/Doktorarbeit/Barcelona/Data/iteration/MS_Briefe_400_with_unique_id_SHA3_explore_cleaned_unique_results_iter_1_20260212_020628.json",
ground_truth_path="/home/shahin/Lab/Doktorarbeit/Barcelona/Data/GT_Numbers.csv"
)
##
# %% Boxplot3
# Boxplot + jitter with SIGNED error (direction)
# - Y-axis: signed error = EDSS_pred - EDSS_gt (negative = underestimation, positive = overestimation)
# - Also prints per-bin summary (n, mean signed error, median, MAE)
import json
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib.patches import Patch
from matplotlib.lines import Line2D
def boxplot_with_jitter_signed_error_by_conf_bins_single_json(
json_file_path,
ground_truth_path,
gt_sep=";",
gt_edss_col="EDSS",
jitter_width=0.12,
point_alpha=0.25,
show_outliers=False,
):
def norm_str(x):
return str(x).strip().lower()
def parse_edss(x):
if x is None:
return np.nan
s = str(x).strip()
if s == "" or s.lower() in {"nan", "none", "null"}:
return np.nan
s = s.replace(",", ".")
return pd.to_numeric(s, errors="coerce")
# ---- Load GT
df_gt = pd.read_csv(ground_truth_path, sep=gt_sep)
for col in ["unique_id", "MedDatum", gt_edss_col]:
if col not in df_gt.columns:
raise ValueError(f"GT missing column '{col}'. Available: {df_gt.columns.tolist()}")
df_gt["unique_id"] = df_gt["unique_id"].map(norm_str)
df_gt["MedDatum"] = df_gt["MedDatum"].map(norm_str)
df_gt["key"] = df_gt["unique_id"] + "_" + df_gt["MedDatum"]
df_gt["EDSS_gt"] = df_gt[gt_edss_col].map(parse_edss)
# ---- Load preds from JSON
with open(json_file_path, "r", encoding="utf-8") as f:
data = json.load(f)
rows = []
for entry in data:
if not entry.get("success"):
continue
res = entry.get("result", {})
uid, md = res.get("unique_id"), res.get("MedDatum")
if uid is None or md is None or str(uid).strip() == "" or str(md).strip() == "":
continue
rows.append({
"key": norm_str(uid) + "_" + norm_str(md),
"EDSS_pred": parse_edss(res.get("EDSS")),
"confidence": pd.to_numeric(res.get("certainty_percent"), errors="coerce"),
})
df_pred = pd.DataFrame(rows)
# ---- Merge + filter
df = df_pred.merge(df_gt[["key", "EDSS_gt"]], on="key", how="inner", validate="many_to_one")
df = df.dropna(subset=["EDSS_gt", "EDSS_pred", "confidence"]).copy()
# SIGNED ERROR (direction)
df["signed_error"] = df["EDSS_pred"] - df["EDSS_gt"]
df["abs_error"] = df["signed_error"].abs()
# ---- Bin confidence
bins = [0, 70, 80, 90, 100]
labels = ["Low (<70%)", "Moderate (70-80%)", "High (80-90%)", "Very High (90-100%)"]
df["conf_bin"] = pd.cut(df["confidence"], bins=bins, labels=labels, include_lowest=True)
df = df.dropna(subset=["conf_bin"]).copy()
# ---- Prepare arrays
bin_arrays = [df.loc[df["conf_bin"] == lab, "signed_error"].to_numpy() for lab in labels]
n_counts = [len(a) for a in bin_arrays]
# ---- Plot
fig, ax = plt.subplots(figsize=(12, 6))
bp = ax.boxplot(
bin_arrays,
labels=labels,
showfliers=show_outliers,
patch_artist=True,
widths=0.55,
)
for b in bp["boxes"]:
b.set_alpha(0.35)
# Jittered points
for i, arr in enumerate(bin_arrays, start=1):
if len(arr) == 0:
continue
x = np.full(len(arr), i, dtype=float)
x += np.random.uniform(-jitter_width, jitter_width, size=len(arr))
ax.scatter(x, arr, alpha=point_alpha, s=18)
# Zero line to show over/under clearly
ax.axhline(0, linewidth=1.5, linestyle="--")
ax.set_title("Signed Error by LLM Confidence Bin (Iteration 1)")
ax.set_xlabel("certainty_percent category")
ax.set_ylabel("Signed Error (EDSS_pred EDSS_gt)")
ax.grid(axis="y", linestyle=":", alpha=0.5)
# Legend with n per bin + zero-line meaning
legend_handles = [
Patch(facecolor="white", edgecolor="black", label=f"{lab}: n={n}")
for lab, n in zip(labels, n_counts)
]
legend_handles.append(Line2D([0], [0], linestyle="--", color="black", label="0 = unbiased (over/under split)"))
ax.legend(handles=legend_handles, title="Bin counts", loc="upper right", frameon=True)
plt.tight_layout()
plt.show()
# ---- Print per-bin summary to discuss
print("\n--- PER-BIN SUMMARY (points plotted) ---")
for lab in labels:
sub = df.loc[df["conf_bin"] == lab]
n = len(sub)
if n == 0:
print(f"{lab:>18}: n=0")
continue
print(
f"{lab:>18}: n={n:3d} | "
f"mean signed={sub['signed_error'].mean(): .3f} | "
f"median signed={sub['signed_error'].median(): .3f} | "
f"MAE={sub['abs_error'].mean(): .3f}"
)
print(f"Total points plotted: {len(df)}")
# Example run:
boxplot_with_jitter_signed_error_by_conf_bins_single_json(
json_file_path="/home/shahin/Lab/Doktorarbeit/Barcelona/Data/iteration/MS_Briefe_400_with_unique_id_SHA3_explore_cleaned_unique_results_iter_1_20260212_020628.json",
ground_truth_path="/home/shahin/Lab/Doktorarbeit/Barcelona/Data/GT_Numbers.csv"
)
##
# %% jitter and violin 10x10
# Violin + jitter (all JSONs in folder), with signed error
# - X: confidence bins (<70, 70-80, 80-90, 90-100)
# - Y: signed error = EDSS_pred - EDSS_gt (direction)
# - Prints bin counts (n) and puts n into the legend
import os, glob, json
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from matplotlib.patches import Patch
from matplotlib.lines import Line2D
def violin_jitter_signed_error_all_jsons(
json_dir_path,
ground_truth_path,
gt_sep=";",
gt_edss_col="EDSS",
jitter_width=0.12,
point_alpha=0.20,
point_size=10,
violin_inner="quartile", # 'quartile', 'box', 'stick', or None
):
def norm_str(x):
return str(x).strip().lower()
def parse_edss(x):
if x is None:
return np.nan
s = str(x).strip()
if s == "" or s.lower() in {"nan", "none", "null"}:
return np.nan
s = s.replace(",", ".")
return pd.to_numeric(s, errors="coerce")
# ---- Load GT
df_gt = pd.read_csv(ground_truth_path, sep=gt_sep)
for col in ["unique_id", "MedDatum", gt_edss_col]:
if col not in df_gt.columns:
raise ValueError(f"GT missing column '{col}'. Available: {df_gt.columns.tolist()}")
df_gt["unique_id"] = df_gt["unique_id"].map(norm_str)
df_gt["MedDatum"] = df_gt["MedDatum"].map(norm_str)
df_gt["key"] = df_gt["unique_id"] + "_" + df_gt["MedDatum"]
df_gt["EDSS_gt"] = df_gt[gt_edss_col].map(parse_edss)
# ---- Load preds from ALL JSONs
json_files = sorted(glob.glob(os.path.join(json_dir_path, "*.json")))
if not json_files:
raise FileNotFoundError(f"No JSON files found in: {json_dir_path}")
rows = []
for fp in json_files:
with open(fp, "r", encoding="utf-8") as f:
data = json.load(f)
for entry in data:
if not entry.get("success"):
continue
res = entry.get("result", {})
uid, md = res.get("unique_id"), res.get("MedDatum")
if uid is None or md is None or str(uid).strip() == "" or str(md).strip() == "":
continue
rows.append({
"file": os.path.basename(fp),
"key": norm_str(uid) + "_" + norm_str(md),
"EDSS_pred": parse_edss(res.get("EDSS")),
"confidence": pd.to_numeric(res.get("certainty_percent"), errors="coerce"),
})
df_pred = pd.DataFrame(rows)
# ---- Merge + filter
df = df_pred.merge(df_gt[["key", "EDSS_gt"]], on="key", how="inner", validate="many_to_one")
df = df.dropna(subset=["EDSS_gt", "EDSS_pred", "confidence"]).copy()
df["signed_error"] = df["EDSS_pred"] - df["EDSS_gt"]
# ---- Bin confidence
bins = [0, 70, 80, 90, 100]
labels = ["Low (<70%)", "Moderate (70-80%)", "High (80-90%)", "Very High (90-100%)"]
df["conf_bin"] = pd.cut(df["confidence"], bins=bins, labels=labels, include_lowest=True)
df = df.dropna(subset=["conf_bin"]).copy()
# ---- Counts + log
counts = df["conf_bin"].value_counts().reindex(labels).fillna(0).astype(int)
print("\n--- BIN COUNTS (all JSONs) ---")
for lab in labels:
print(f"{lab:>18}: n={counts[lab]}")
print(f"Total points plotted: {len(df)}")
print(f"JSON files: {len(json_files)}")
# Ensure ordering for seaborn
df["conf_bin"] = pd.Categorical(df["conf_bin"], categories=labels, ordered=True)
# ---- Plot
plt.figure(figsize=(12, 6))
# Violin (density)
sns.violinplot(
data=df,
x="conf_bin",
y="signed_error",
order=labels,
inner=violin_inner,
cut=0
)
# Jittered points (manual jitter to keep it consistent and fast)
x_map = {lab: i for i, lab in enumerate(labels)}
x = df["conf_bin"].map(x_map).astype(float).to_numpy()
xj = x + np.random.uniform(-jitter_width, jitter_width, size=len(df))
plt.scatter(xj, df["signed_error"].to_numpy(), alpha=point_alpha, s=point_size)
# Zero line (over/under split)
plt.axhline(0, linestyle="--", linewidth=1.5)
plt.xticks(range(len(labels)), labels)
plt.xlabel("certainty_percent category (all iterations)")
plt.ylabel("Signed Error (EDSS_pred EDSS_gt)")
plt.title("Signed Error vs LLM Confidence Category — Violin + Jitter (All JSONs)")
plt.grid(axis="y", linestyle=":", alpha=0.5)
# Legend with n per bin
legend_handles = [
Patch(facecolor="white", edgecolor="black", label=f"{lab}: n={int(counts[lab])}")
for lab in labels
]
legend_handles.append(Line2D([0], [0], linestyle="--", color="black", label="0 = unbiased (over/under split)"))
plt.legend(handles=legend_handles, title="Bin counts", loc="upper right", frameon=True)
plt.tight_layout()
plt.show()
# Example run:
violin_jitter_signed_error_all_jsons(
json_dir_path="/home/shahin/Lab/Doktorarbeit/Barcelona/Data/iteration",
ground_truth_path="/home/shahin/Lab/Doktorarbeit/Barcelona/Data/GT_Numbers.csv"
)
##
# %% jitter and violin 10x1
# Adjusted: Violin + jitter (ALL JSONs for points) but X-bins come ONLY from JSON #1 (reference)
# Fixes:
# 1) Legend has colors matching bins
# 2) Legend placed OUTSIDE plot area
# 3) X-axis binning uses certainty_percent from JSON1 (by key), then all iterations' points inherit that bin
import os, glob, json
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from matplotlib.patches import Patch
from matplotlib.lines import Line2D
def violin_jitter_signed_error_all_jsons_xbins_from_json1(
json_dir_path,
json1_file_path,
ground_truth_path,
gt_sep=";",
gt_edss_col="EDSS",
jitter_width=0.12,
point_alpha=0.18,
point_size=10,
violin_inner="quartile", # 'quartile', 'box', 'stick', or None
):
def norm_str(x):
return str(x).strip().lower()
def parse_edss(x):
if x is None:
return np.nan
s = str(x).strip()
if s == "" or s.lower() in {"nan", "none", "null"}:
return np.nan
s = s.replace(",", ".")
return pd.to_numeric(s, errors="coerce")
# ----------------------------
# Load GT
# ----------------------------
df_gt = pd.read_csv(ground_truth_path, sep=gt_sep)
for col in ["unique_id", "MedDatum", gt_edss_col]:
if col not in df_gt.columns:
raise ValueError(f"GT missing column '{col}'. Available: {df_gt.columns.tolist()}")
df_gt["unique_id"] = df_gt["unique_id"].map(norm_str)
df_gt["MedDatum"] = df_gt["MedDatum"].map(norm_str)
df_gt["key"] = df_gt["unique_id"] + "_" + df_gt["MedDatum"]
df_gt["EDSS_gt"] = df_gt[gt_edss_col].map(parse_edss)
# ----------------------------
# Load JSON1 and build reference bins by KEY
# ----------------------------
with open(json1_file_path, "r", encoding="utf-8") as f:
data1 = json.load(f)
ref_rows = []
for entry in data1:
if not entry.get("success"):
continue
res = entry.get("result", {})
uid, md = res.get("unique_id"), res.get("MedDatum")
if uid is None or md is None or str(uid).strip() == "" or str(md).strip() == "":
continue
ref_rows.append({
"key": norm_str(uid) + "_" + norm_str(md),
"confidence_ref": pd.to_numeric(res.get("certainty_percent"), errors="coerce"),
})
df_ref = pd.DataFrame(ref_rows)
# If JSON1 has duplicates for a key (unlikely, but safe), take the first non-null confidence
df_ref = (df_ref.sort_values("confidence_ref")
.groupby("key", as_index=False)["confidence_ref"]
.apply(lambda s: s.dropna().iloc[0] if s.dropna().any() else np.nan))
if isinstance(df_ref.index, pd.MultiIndex):
df_ref = df_ref.reset_index(drop=True)
# Confidence bins
bins = [0, 70, 80, 90, 100]
labels = ["Low (<70%)", "Moderate (70-80%)", "High (80-90%)", "Very High (90-100%)"]
df_ref["conf_bin_ref"] = pd.cut(df_ref["confidence_ref"], bins=bins, labels=labels, include_lowest=True)
df_ref = df_ref.dropna(subset=["conf_bin_ref"]).copy()
# ----------------------------
# Load ALL JSONs (all points)
# ----------------------------
json_files = sorted(glob.glob(os.path.join(json_dir_path, "*.json")))
if not json_files:
raise FileNotFoundError(f"No JSON files found in: {json_dir_path}")
rows = []
for fp in json_files:
with open(fp, "r", encoding="utf-8") as f:
data = json.load(f)
for entry in data:
if not entry.get("success"):
continue
res = entry.get("result", {})
uid, md = res.get("unique_id"), res.get("MedDatum")
if uid is None or md is None or str(uid).strip() == "" or str(md).strip() == "":
continue
rows.append({
"file": os.path.basename(fp),
"key": norm_str(uid) + "_" + norm_str(md),
"EDSS_pred": parse_edss(res.get("EDSS")),
})
df_pred = pd.DataFrame(rows)
# ----------------------------
# Merge: preds + GT + reference bins (from JSON1)
# ----------------------------
df = df_pred.merge(df_gt[["key", "EDSS_gt"]], on="key", how="inner", validate="many_to_one")
df = df.merge(df_ref[["key", "conf_bin_ref"]], on="key", how="inner", validate="many_to_one")
# filter for plotting
df = df.dropna(subset=["EDSS_gt", "EDSS_pred", "conf_bin_ref"]).copy()
df["signed_error"] = df["EDSS_pred"] - df["EDSS_gt"]
# ordering
df["conf_bin_ref"] = pd.Categorical(df["conf_bin_ref"], categories=labels, ordered=True)
# ----------------------------
# Logs + counts
# ----------------------------
counts = df["conf_bin_ref"].value_counts().reindex(labels).fillna(0).astype(int)
print("\n--- BIN COUNTS (ALL JSON points, binned by JSON1 confidence) ---")
for lab in labels:
print(f"{lab:>18}: n={int(counts[lab])}")
print(f"Total points plotted: {len(df)}")
print(f"JSON files used for points: {len(json_files)}")
print(f"Reference JSON1 bins derived from: {os.path.basename(json1_file_path)}")
print(f"Keys in reference (after binning & non-null): {df_ref['key'].nunique()}")
# ----------------------------
# Colors + legend patches
# ----------------------------
palette = sns.color_palette("Blues", n_colors=len(labels))
bin_colors = {lab: palette[i] for i, lab in enumerate(labels)}
legend_handles = [
Patch(facecolor=bin_colors[lab], edgecolor="black", label=f"{lab}: n={int(counts[lab])}")
for lab in labels
]
legend_handles.append(Line2D([0], [0], linestyle="--", color="black", label="0 = unbiased (over/under split)"))
# ----------------------------
# Plot (legend outside)
# ----------------------------
fig, ax = plt.subplots(figsize=(12.5, 6))
sns.violinplot(
data=df,
x="conf_bin_ref",
y="signed_error",
order=labels,
inner=violin_inner,
cut=0,
palette=[bin_colors[l] for l in labels],
ax=ax,
)
# jittered points (manual jitter)
x_map = {lab: i for i, lab in enumerate(labels)}
x = df["conf_bin_ref"].map(x_map).astype(float).to_numpy()
xj = x + np.random.uniform(-jitter_width, jitter_width, size=len(df))
ax.scatter(xj, df["signed_error"].to_numpy(), alpha=point_alpha, s=point_size)
ax.axhline(0, linestyle="--", linewidth=1.5)
ax.set_xlabel("certainty_percent category (from JSON 1 as reference)")
ax.set_ylabel("Signed Error (EDSS_pred EDSS_gt)")
ax.set_title("Signed Error vs LLM Confidence Category — Violin + Jitter (All JSONs)\nBinned by JSON 1 certainty_percent")
ax.grid(axis="y", linestyle=":", alpha=0.5)
# Legend outside (right)
ax.legend(
handles=legend_handles,
title="Bin counts",
loc="center left",
bbox_to_anchor=(1.02, 0.5),
frameon=True
)
plt.tight_layout()
plt.show()
# Example run:
json1_path = "/home/shahin/Lab/Doktorarbeit/Barcelona/Data/iteration/MS_Briefe_400_with_unique_id_SHA3_explore_cleaned_unique_results_iter_1_20260212_020628.json"
violin_jitter_signed_error_all_jsons_xbins_from_json1(
json_dir_path="/home/shahin/Lab/Doktorarbeit/Barcelona/Data/iteration",
json1_file_path=json1_path,
ground_truth_path="/home/shahin/Lab/Doktorarbeit/Barcelona/Data/GT_Numbers.csv")
##
# %% Coorelation
# Correlation plot (RAW certainty_percent) vs error
# - Uses ALL JSONs as points
# - Uses JSON1 certainty_percent as the x-value reference (per key)
# - Y can be abs_error or signed_error (choose with y_mode)
# - Prints Spearman + Pearson correlations
# - Adds a simple linear trend line
import os, glob, json
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
def correlation_scatter_raw_certainty_json1_reference(
json_dir_path,
json1_file_path,
ground_truth_path,
gt_sep=";",
gt_edss_col="EDSS",
y_mode="abs", # "abs" or "signed"
point_alpha=0.18,
point_size=12,
):
def norm_str(x):
return str(x).strip().lower()
def parse_edss(x):
if x is None:
return np.nan
s = str(x).strip()
if s == "" or s.lower() in {"nan", "none", "null"}:
return np.nan
s = s.replace(",", ".")
return pd.to_numeric(s, errors="coerce")
def rankdata(a):
# Average-rank for ties (Spearman needs ranks)
s = pd.Series(a)
return s.rank(method="average").to_numpy()
# ----------------------------
# Load GT
# ----------------------------
df_gt = pd.read_csv(ground_truth_path, sep=gt_sep)
for col in ["unique_id", "MedDatum", gt_edss_col]:
if col not in df_gt.columns:
raise ValueError(f"GT missing column '{col}'. Available: {df_gt.columns.tolist()}")
df_gt["unique_id"] = df_gt["unique_id"].map(norm_str)
df_gt["MedDatum"] = df_gt["MedDatum"].map(norm_str)
df_gt["key"] = df_gt["unique_id"] + "_" + df_gt["MedDatum"]
df_gt["EDSS_gt"] = df_gt[gt_edss_col].map(parse_edss)
# ----------------------------
# Load JSON1 reference certainty_percent (per key)
# ----------------------------
with open(json1_file_path, "r", encoding="utf-8") as f:
data1 = json.load(f)
ref_rows = []
for entry in data1:
if not entry.get("success"):
continue
res = entry.get("result", {})
uid, md = res.get("unique_id"), res.get("MedDatum")
if uid is None or md is None or str(uid).strip() == "" or str(md).strip() == "":
continue
ref_rows.append({
"key": norm_str(uid) + "_" + norm_str(md),
"certainty_ref": pd.to_numeric(res.get("certainty_percent"), errors="coerce"),
})
df_ref = pd.DataFrame(ref_rows)
# Deduplicate keys if needed: take first non-null certainty
df_ref = (df_ref.dropna(subset=["certainty_ref"])
.groupby("key", as_index=False)["certainty_ref"]
.first())
# ----------------------------
# Load ALL JSON predictions (points)
# ----------------------------
json_files = sorted(glob.glob(os.path.join(json_dir_path, "*.json")))
if not json_files:
raise FileNotFoundError(f"No JSON files found in: {json_dir_path}")
rows = []
for fp in json_files:
with open(fp, "r", encoding="utf-8") as f:
data = json.load(f)
for entry in data:
if not entry.get("success"):
continue
res = entry.get("result", {})
uid, md = res.get("unique_id"), res.get("MedDatum")
if uid is None or md is None or str(uid).strip() == "" or str(md).strip() == "":
continue
rows.append({
"file": os.path.basename(fp),
"key": norm_str(uid) + "_" + norm_str(md),
"EDSS_pred": parse_edss(res.get("EDSS")),
})
df_pred = pd.DataFrame(rows)
# ----------------------------
# Merge: preds + GT + JSON1 reference certainty
# ----------------------------
df = df_pred.merge(df_gt[["key", "EDSS_gt"]], on="key", how="inner", validate="many_to_one")
df = df.merge(df_ref[["key", "certainty_ref"]], on="key", how="inner", validate="many_to_one")
# Filter needed fields
df = df.dropna(subset=["EDSS_gt", "EDSS_pred", "certainty_ref"]).copy()
df["signed_error"] = df["EDSS_pred"] - df["EDSS_gt"]
df["abs_error"] = df["signed_error"].abs()
y_col = "abs_error" if y_mode == "abs" else "signed_error"
# ----------------------------
# Logs
# ----------------------------
print("\n" + "="*90)
print("CORRELATION: RAW certainty_percent (JSON1 reference) vs ERROR (ALL JSON points)")
print("="*90)
print(f"JSON DIR (points): {json_dir_path} | files: {len(json_files)}")
print(f"JSON1 reference: {os.path.basename(json1_file_path)}")
print(f"Points available after merge+filter: {len(df)}")
print(f"Unique keys in plot: {df['key'].nunique()}")
print(f"Y mode: {y_mode} ({y_col})")
# ----------------------------
# Correlations (Pearson + Spearman)
# ----------------------------
x = df["certainty_ref"].to_numpy(dtype=float)
y = df[y_col].to_numpy(dtype=float)
# Pearson
pearson = np.corrcoef(x, y)[0, 1] if len(df) >= 2 else np.nan
# Spearman = Pearson corr of ranks
rx = rankdata(x)
ry = rankdata(y)
spearman = np.corrcoef(rx, ry)[0, 1] if len(df) >= 2 else np.nan
print(f"\nPearson r: {pearson:.4f}")
print(f"Spearman ρ: {spearman:.4f}")
# ----------------------------
# Trend line (simple linear fit)
# ----------------------------
# Fit y = a*x + b
if len(df) >= 2:
a, b = np.polyfit(x, y, 1)
else:
a, b = np.nan, np.nan
# ----------------------------
# Plot
# ----------------------------
plt.figure(figsize=(12, 6))
plt.scatter(x, y, alpha=point_alpha, s=point_size)
# trend line across full x-range
if np.isfinite(a) and np.isfinite(b):
xs = np.linspace(np.nanmin(x), np.nanmax(x), 200)
plt.plot(xs, a * xs + b, linestyle="--", linewidth=2)
plt.xlabel("certainty_percent (from JSON 1, per key)")
ylabel = "Absolute Error |EDSS_pred EDSS_gt|" if y_mode == "abs" else "Signed Error (EDSS_pred EDSS_gt)"
plt.ylabel(ylabel)
plt.title(f"Correlation of JSON1 certainty_percent vs {y_col} (All iterations)\n"
f"Pearson r={pearson:.3f} | Spearman ρ={spearman:.3f}")
plt.grid(linestyle=":", alpha=0.5)
plt.tight_layout()
plt.show()
# Example run:
json1_path = "/home/shahin/Lab/Doktorarbeit/Barcelona/Data/iteration/MS_Briefe_400_with_unique_id_SHA3_explore_cleaned_unique_results_iter_1_20260212_020628.json"
correlation_scatter_raw_certainty_json1_reference(
json_dir_path="/home/shahin/Lab/Doktorarbeit/Barcelona/Data/iteration",
json1_file_path=json1_path,
ground_truth_path="/home/shahin/Lab/Doktorarbeit/Barcelona/Data/GT_Numbers.csv",
y_mode="abs" # or "signed"
)
##
# %% Correlation adjusted
# Correlation scatter (RAW certainty_percent from JSON1) vs error (all JSON points)
# Adds:
# 1) Legend (points, trend line) + Pearson/Spearman shown in legend and title
# 2) Trend line color set to high-contrast (black by default)
# 3) Density coloring: dots colored by local point density (bluer = more cases) + colorbar
import os, glob, json
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib.lines import Line2D
from matplotlib.colors import LogNorm, PowerNorm
def correlation_scatter_raw_certainty_json1_reference(
json_dir_path,
json1_file_path,
ground_truth_path,
gt_sep=";",
gt_edss_col="EDSS",
y_mode="abs", # "abs" or "signed"
point_alpha=0.85, # higher alpha works better with density coloring
point_size=14,
trend_color="black", # high-contrast line
save_svg_path=None,
dpi=300
):
def norm_str(x):
return str(x).strip().lower()
def parse_edss(x):
if x is None:
return np.nan
s = str(x).strip()
if s == "" or s.lower() in {"nan", "none", "null"}:
return np.nan
s = s.replace(",", ".")
return pd.to_numeric(s, errors="coerce")
def rankdata(a):
return pd.Series(a).rank(method="average").to_numpy()
# ----------------------------
# Load GT
# ----------------------------
df_gt = pd.read_csv(ground_truth_path, sep=gt_sep)
for col in ["unique_id", "MedDatum", gt_edss_col]:
if col not in df_gt.columns:
raise ValueError(f"GT missing column '{col}'. Available: {df_gt.columns.tolist()}")
df_gt["unique_id"] = df_gt["unique_id"].map(norm_str)
df_gt["MedDatum"] = df_gt["MedDatum"].map(norm_str)
df_gt["key"] = df_gt["unique_id"] + "_" + df_gt["MedDatum"]
df_gt["EDSS_gt"] = df_gt[gt_edss_col].map(parse_edss)
# ----------------------------
# Load JSON1 reference certainty_percent (per key)
# ----------------------------
with open(json1_file_path, "r", encoding="utf-8") as f:
data1 = json.load(f)
ref_rows = []
for entry in data1:
if not entry.get("success"):
continue
res = entry.get("result", {})
uid, md = res.get("unique_id"), res.get("MedDatum")
if uid is None or md is None or str(uid).strip() == "" or str(md).strip() == "":
continue
ref_rows.append({
"key": norm_str(uid) + "_" + norm_str(md),
"certainty_ref": pd.to_numeric(res.get("certainty_percent"), errors="coerce"),
})
df_ref = pd.DataFrame(ref_rows)
df_ref = (df_ref.dropna(subset=["certainty_ref"])
.groupby("key", as_index=False)["certainty_ref"]
.first())
# ----------------------------
# Load ALL JSON predictions (points)
# ----------------------------
json_files = sorted(glob.glob(os.path.join(json_dir_path, "*.json")))
if not json_files:
raise FileNotFoundError(f"No JSON files found in: {json_dir_path}")
rows = []
for fp in json_files:
with open(fp, "r", encoding="utf-8") as f:
data = json.load(f)
for entry in data:
if not entry.get("success"):
continue
res = entry.get("result", {})
uid, md = res.get("unique_id"), res.get("MedDatum")
if uid is None or md is None or str(uid).strip() == "" or str(md).strip() == "":
continue
rows.append({
"file": os.path.basename(fp),
"key": norm_str(uid) + "_" + norm_str(md),
"EDSS_pred": parse_edss(res.get("EDSS")),
})
df_pred = pd.DataFrame(rows)
# ----------------------------
# Merge: preds + GT + JSON1 reference certainty
# ----------------------------
df = df_pred.merge(df_gt[["key", "EDSS_gt"]], on="key", how="inner", validate="many_to_one")
df = df.merge(df_ref[["key", "certainty_ref"]], on="key", how="inner", validate="many_to_one")
df = df.dropna(subset=["EDSS_gt", "EDSS_pred", "certainty_ref"]).copy()
df["signed_error"] = df["EDSS_pred"] - df["EDSS_gt"]
df["abs_error"] = df["signed_error"].abs()
y_col = "abs_error" if y_mode == "abs" else "signed_error"
# ----------------------------
# Correlations
# ----------------------------
x = df["certainty_ref"].to_numpy(dtype=float)
y = df[y_col].to_numpy(dtype=float)
pearson = np.corrcoef(x, y)[0, 1] if len(df) >= 2 else np.nan
rx, ry = rankdata(x), rankdata(y)
spearman = np.corrcoef(rx, ry)[0, 1] if len(df) >= 2 else np.nan
# ----------------------------
# Trend line (linear fit)
# ----------------------------
if len(df) >= 2:
a, b = np.polyfit(x, y, 1)
else:
a, b = np.nan, np.nan
# ----------------------------
# Density coloring (2D histogram bin counts)
# "how blue" = how many points are around that location
# ----------------------------
# Choose binning resolution (balanced for ~thousands of points)
x_bins = 50
y_bins = 50
# Compute bin index per point
x_edges = np.linspace(np.nanmin(x), np.nanmax(x), x_bins + 1)
y_edges = np.linspace(np.nanmin(y), np.nanmax(y), y_bins + 1)
xi = np.clip(np.digitize(x, x_edges) - 1, 0, x_bins - 1)
yi = np.clip(np.digitize(y, y_edges) - 1, 0, y_bins - 1)
# 2D counts
counts2d = np.zeros((x_bins, y_bins), dtype=int)
for i in range(len(x)):
counts2d[xi[i], yi[i]] += 1
# density per point = count of its bin
density = np.array([counts2d[xi[i], yi[i]] for i in range(len(x))], dtype=float)
# Plot low density first, high density last (so dense points are visible)
order = np.argsort(density)
x_o, y_o, d_o = x[order], y[order], density[order]
# ... keep everything above the "Plot" section identical ...
# ----------------------------
# Plot (IMPROVED COLORS)
# ----------------------------
fig, ax = plt.subplots(figsize=(12.5, 6))
# Option A (recommended): logarithmic color scaling
# Add +1 to avoid log(0)
d_plot = d_o + 1
# clip vmax so one extreme bin doesn't wash everything out
vmax = np.percentile(d_plot, 99) # try 95 or 99 depending on your data
norm = LogNorm(vmin=1, vmax=max(2, vmax))
sc = ax.scatter(
x_o, y_o,
c=d_plot,
cmap="Blues",
norm=norm,
s=point_size,
alpha=point_alpha,
linewidths=0
)
# Trend line (black)
if np.isfinite(a) and np.isfinite(b):
xs = np.linspace(np.nanmin(x), np.nanmax(x), 200)
ax.plot(xs, a * xs + b, linestyle="--", linewidth=2.5, color=trend_color)
ax.set_xlabel("certainty percent")
ax.set_ylabel("Absolute Error" if y_mode == "abs" else "Signed Error (EDSS_pred EDSS_gt)")
# ax.set_title(
# f"Correlation: JSON1 certainty_percent vs {y_col} (All iterations)\n"
# f"Pearson r={pearson:.3f} | Spearman ρ={spearman:.3f}"
# )
ax.grid(linestyle=":", alpha=0.5)
# Colorbar
cbar = plt.colorbar(sc, ax=ax)
cbar.set_label("Local density (count of cases in bin, log-scaled)")
# Legend
legend_items = [
Line2D([0], [0], marker="o", linestyle="None", color="navy",
label=f"Data points (n={len(df)})"),
Line2D([0], [0], linestyle="--", color=trend_color, linewidth=2.5,
label=f"Linear trend (Pearson r={pearson:.3f})"),
]
ax.legend(handles=legend_items, loc="upper right", frameon=True, title="Legend")
plt.tight_layout()
# Save as SVG (optional)
if save_svg_path:
fig.savefig(save_svg_path, format="svg", bbox_inches="tight", dpi=dpi)
print(f"[SAVED] {save_svg_path}")
plt.show()
json1_path = "/home/shahin/Lab/Doktorarbeit/Barcelona/Data/iteration/MS_Briefe_400_with_unique_id_SHA3_explore_cleaned_unique_results_iter_1_20260212_020628.json"
correlation_scatter_raw_certainty_json1_reference(
json_dir_path="/home/shahin/Lab/Doktorarbeit/Barcelona/Data/iteration",
json1_file_path=json1_path,
ground_truth_path="/home/shahin/Lab/Doktorarbeit/Barcelona/Data/GT_Numbers.csv",
y_mode="abs",
# save_svg_path="/home/shahin/Lab/Doktorarbeit/Barcelona/results/corr_json1_abs_error.svg"
)
##