def _pair_to_mat_indices(pair_labels, mat_labels):
"""
Map each pair label 'A-B' to (mat_idx_A, mat_idx_B).
"""
mat_to_idx = {label: i for i, label in enumerate(mat_labels)}
return [tuple(mat_to_idx[s] for s in pl.split("-")) for pl in pair_labels]
def _pair_label_to_repeat_fold_ids(pair_label):
# 'rep0_f0-rep0_f1', 'rep1_f0-rep1_f1', ...
pattern = r"rep(\d+)_f(\d+)-rep(\d+)_f(\d+)"
m = re.match(pattern, pair_label)
repeat_id = int(m.group(1))
if repeat_id != int(m.group(3)):
raise ValueError(f"Repeat IDs dont match: {m.group(1)} and {m.group(3)}")
fold_id_1 = int(m.group(2))
fold_id_2 = int(m.group(4))
return repeat_id, fold_id_1, fold_id_2
def spectral_gap_from_record(record):
rows = []
nucnorm = int(round(float(record["nucnorm"])))
mat_labels = record["mat_labels"]
pair_labels = record["pair_labels"]
n_matrices = record["n_matrices"]
n_pairs = record["n_pairs"]
by_k = sorted(record["by_k"], key=lambda x: int(x["k"]))
cum_energy = np.array([e["energies"] for e in by_k], dtype=np.float64) # (n_factors, n_mats)
dist_kp = np.array([e["distances"] for e in by_k], dtype=np.float64) # (n_factors, n_pairs)
k_values = [int(e["k"]) for e in by_k]
E0 = np.zeros((1, n_matrices), dtype=float)
E = np.vstack([E0, cum_energy])
delta_E = np.diff(E, axis=0) # rows corresponding to k_values
delta_E = np.clip(delta_E, 0.0, None) # Numerical cleanup
# normalized singular values: s_k / ||s||_2
s_norm = np.sqrt(delta_E)
pair_mats = _pair_to_mat_indices(pair_labels, mat_labels)
for p, (ia, ib) in enumerate(pair_mats):
for idx, k in enumerate(k_values):
row = {
"nucnorm": nucnorm,
"k": k,
"pair_label": pair_labels[p],
"pair_idx": p,
"distance": dist_kp[idx, p],
"energy": np.nanmin([cum_energy[idx, ia], cum_energy[idx, ib]]),
"s_norm": np.nanmin([s_norm[idx, ia], s_norm[idx, ib]]),
}
# Gap at k requires k+1, so cannot compute for the last stored k.
if idx + 1 < len(k_values):
s_ka, s_kb = s_norm[idx, ia], s_norm[idx, ib]
s_next_a, s_next_b = s_norm[idx + 1, ia], s_norm[idx + 1, ib]
gap_a = s_ka - s_next_a
gap_b = s_kb - s_next_b
rel_gap_a = gap_a / s_ka if s_ka > 0 else np.nan
rel_gap_b = gap_b / s_kb if s_kb > 0 else np.nan
gap_ratio_a = s_next_a / s_ka if s_ka > 0 else np.nan
gap_ratio_b = s_next_b / s_kb if s_kb > 0 else np.nan
row.update({
"s_gap": np.nanmin([gap_a, gap_b]),
"rel_s_gap": np.nanmin([rel_gap_a, rel_gap_b]),
"gap_ratio": np.nanmax([gap_ratio_a, gap_ratio_b]),
})
else:
row.update({
"s_gap": np.nan,
"rel_s_gap": np.nan,
"gap_ratio": np.nan,
})
rows.append(row)
return rows
def load_subspace_npz(subspace_dir, prefix):
subspace_dir = Path(subspace_dir)
rows = []
for path in subspace_dir.glob(f"{prefix}_r*.npz"):
with np.load(path, allow_pickle=False) as data:
n_folds = int(data["n_folds"])
nucnorm = data["nucnorm"].item()
repeat_id = data["repeat_id"].item()
for f in range(n_folds):
s = data[f"s_f{f}"]
n_factors = len(s)
row = {
"nucnorm": nucnorm,
"repeat_id": repeat_id,
"fold_id": f,
}
row.update({f"s{k}": s[k] for k in range(n_factors)})
rows.append(row)
df = (pd.DataFrame(rows)
.sort_values(["nucnorm","repeat_id", "fold_id"])
.reset_index(drop=True))
return df
def spectral_gap_from_subspace(subspace_df, record):
def get_subspace_spectrum(subspace_df, repeat_id, fold_id):
df = subspace_df[
(subspace_df["repeat_id"] == repeat_id) &
(subspace_df["fold_id"] == fold_id)
].reset_index(drop=True)
return df.sort_values(["nucnorm"]).reset_index(drop=True)
def get_eigvalue(df, r, k_idx):
return float(df[df["nucnorm"] == r][f"s{k_idx}"].iloc[0])
rows = []
nucnorm = int(round(float(record["nucnorm"])))
mat_labels = record["mat_labels"]
pair_labels = record["pair_labels"]
n_matrices = record["n_matrices"]
n_pairs = record["n_pairs"]
by_k = sorted(record["by_k"], key=lambda x: int(x["k"]))
dist_kp = np.array([e["distances"] for e in by_k], dtype=np.float64) # (n_factors, n_pairs)
k_values = [int(e["k"]) for e in by_k]
for p, pair_label in enumerate(pair_labels):
repeat_id, fold_id_1, fold_id_2 = _pair_label_to_repeat_fold_ids(pair_label)
df_fold_1 = get_subspace_spectrum(subspace_df, repeat_id, fold_id_1)
df_fold_2 = get_subspace_spectrum(subspace_df, repeat_id, fold_id_2)
s_0a = get_eigvalue(df_fold_1, nucnorm, 0)
s_0b = get_eigvalue(df_fold_2, nucnorm, 0)
for idx, k in enumerate(k_values):
s_ka = get_eigvalue(df_fold_1, nucnorm, idx)
s_kb = get_eigvalue(df_fold_2, nucnorm, idx)
row = {
"nucnorm": nucnorm,
"k": k,
"pair_label": pair_label,
"pair_idx": p,
"distance": dist_kp[idx, p],
"eigvalue": np.min([s_ka, s_kb]),
"s_norm": np.nanmin([s_ka/s_0a, s_kb/s_0b])
}
# Gap at k requires k+1, so cannot compute for the last stored k.
if idx + 1 < len(k_values):
s_next_a = get_eigvalue(df_fold_1, nucnorm, idx+1)
s_next_b = get_eigvalue(df_fold_2, nucnorm, idx+1)
gap_a = s_ka - s_next_a
gap_b = s_kb - s_next_b
rel_gap_a = gap_a / s_ka if s_ka > 0 else np.nan
rel_gap_b = gap_b / s_kb if s_kb > 0 else np.nan
gap_ratio_a = s_next_a / s_ka if s_ka > 0 else np.nan
gap_ratio_b = s_next_b / s_kb if s_kb > 0 else np.nan
row.update({
"s_gap": np.nanmin([gap_a, gap_b]),
"rel_s_gap": np.nanmin([rel_gap_a, rel_gap_b]),
"gap_ratio": np.nanmax([gap_ratio_a, gap_ratio_b]),
})
else:
row.update({
"s_gap": np.nan,
"rel_s_gap": np.nan,
"gap_ratio": np.nan,
})
rows.append(row)
return rows
def build_spectral_gap_pair(records, subspace = None):
rows = []
for record in records:
if subspace is None:
rows.extend(spectral_gap_from_record(record))
else:
rows.extend(spectral_gap_from_subspace(subspace, record))
return (
pd.DataFrame(rows)
.sort_values(["k", "nucnorm", "pair_idx"])
.reset_index(drop=True)
)
subspace_df = load_subspace_npz(subspace_out_dir, prefix)
energy_df = build_spectral_gap_pair(records, subspace = subspace_df)