"""This module contains helper functions to access/manipulate data for plotting more concisely."""
import colorsys
import os
import matplotlib.colors as mc
import numpy as np
import pandas as pd
from alerce.core import Alerce
from scipy.stats import binned_statistic
from superphot_plus.lightcurve import Lightcurve
from superphot_plus.supernova_class import SupernovaClass as SnClass
[docs]
def lighten_color(color, amount=0.5):
"""
Lightens the given color by multiplying (1-luminosity) by the given amount.
Input can be matplotlib color string, hex string, or RGB tuple.
"""
try:
color_val = mc.cnames[color]
except:
color_val = color
color_hls = colorsys.rgb_to_hls(*mc.to_rgb(color_val))
return colorsys.hls_to_rgb(color_hls[0], max(0, min(1, amount * color_hls[1])), color_hls[2])
[docs]
def get_survey_fracs():
"""Return catalog with supernova fractions from existing
catalogue datasets. referenced in papers.
"""
yse_counts = np.array([314, 107, 15, 2, 32])
yse_fracs = yse_counts / np.sum(yse_counts)
psmds_counts = np.array([404, 94, 24, 17, 19])
psmds_fracs = psmds_counts / np.sum(psmds_counts)
return {"YSE": yse_fracs, "PS-MDS": psmds_fracs}
[docs]
def read_probs_csv(probs_fn):
"""Helper function to read in a probability csv file
and return the columns as numpy arrays.
"""
df = pd.read_csv(probs_fn)
names = df.Name.to_numpy()
try:
labels = df.Label.to_numpy()
except:
labels = None
probs = df[['pSNIa', 'pSNII', 'pSNIIn', 'pSLSNI', 'pSNIbc']].astype(float).to_numpy()
try:
folds = df.Fold.to_numpy()
except:
folds = None
pred_classes = np.argmax(probs, axis=1)
return names, labels, probs, pred_classes, folds, df
[docs]
def get_alerce_pred_class(ztf_name, alerce, superphot_style=False):
"""Get alerce probabilities corresponding to the four (no SN IIn)
classes in our ZTF classifier.
Parameters
----------
ztf_name : str
ZTF name of the object.
superphot_style : bool, optional
If True, change format of output labels. Default is False.
Returns
-------
str
Predicted class label.
"""
try:
query = alerce.query_probabilities(oid=ztf_name, format="pandas")
query_transient = query[query["classifier_name"] == "lc_classifier_transient"]
label = query_transient[query_transient["ranking"] == 1]["class_name"].iat[0]
return SnClass.from_alerce_to_superphot_format(label) if superphot_style else label
except:
return "None"
[docs]
def create_alerce_pred_csv(probs_fn, save_fn):
"""Generate csv with ALeRCE's predicted
classes for SN names in probs_csv.
"""
alerce_obj = Alerce()
names = pd.read_csv(probs_fn).Name.to_numpy()
alerce_labels = []
for sn_name in names:
alerce_labels.append(get_alerce_pred_class(sn_name, alerce_obj, superphot_style=True))
df = pd.DataFrame({"name": names, "alerce_label": alerce_labels})
df.to_csv(save_fn, index=False)
[docs]
def retrieve_four_class_info(probs_csv, probs_alerce_csv, p07=False):
"""Extract Superphot+ and ALeRCE predictions and true class info."""
_, classes_to_labels = SnClass.get_type_maps()
(sn_names, true_classes, class_probs, pred_classes, folds, _) = read_probs_csv(probs_csv)
secondary_pred_classes = np.argsort(class_probs, axis=1)[:,-2]
if true_classes is None:
true_classes = np.zeros(len(sn_names)) # filler
if folds is None:
folds = np.zeros(len(sn_names))
try:
true_labels = np.array([classes_to_labels[x] for x in true_classes])
except:
true_labels = np.array([SnClass.canonicalize(x) for x in true_classes])
pred_labels = np.array([classes_to_labels[x] for x in pred_classes])
pred_labels2 = np.array([classes_to_labels[x] for x in secondary_pred_classes])
# read in ALeRCE classes
df_alerce = pd.read_csv(probs_alerce_csv)
pred_alerce = df_alerce.alerce_label.to_numpy().astype(str)
ignore_mask = (pred_alerce == "None") | (pred_alerce == "nan") | (pred_alerce == "SKIP")
# ignore true SNe IIn
ignore_mask = ignore_mask | (true_labels == "SN IIn")
(sn_names, true_labels, class_probs, pred_labels2, pred_labels, pred_alerce, folds) = (
sn_names[~ignore_mask],
true_labels[~ignore_mask],
class_probs[~ignore_mask],
pred_labels2[~ignore_mask],
pred_labels[~ignore_mask],
pred_alerce[~ignore_mask],
folds[~ignore_mask]
)
print(np.unique(pred_labels2[pred_labels == "SN IIn"], return_counts=True))
# merge SN IIn predictions with SN II
pred_labels[pred_labels == "SN IIn"] = pred_labels2[pred_labels == "SN IIn"]
if p07:
p07_mask = np.max(class_probs, axis=1) > 0.7
(sn_names, true_labels, class_probs, pred_labels, pred_alerce, folds) = (
sn_names[p07_mask],
true_labels[p07_mask],
class_probs[p07_mask],
pred_labels[p07_mask],
pred_alerce[p07_mask],
folds[p07_mask]
)
return (sn_names, true_labels, class_probs, pred_labels, pred_alerce, folds)
[docs]
def gaussian(inputs, amp, mean, sigma):
"""Evaluate a gaussian with params A at the values in x.
Parameters
----------
inputs : array-like or float
Value(s) to evaluate gaussian at
amp : float
Amplitude of the Gaussian.
mean : float
Mean of Gaussian
sigma : float
Standard deviation of Gaussian
Returns
----------
array-like or float:
Gaussian values evaluated at x
"""
if ~np.isscalar(inputs):
inputs = np.array(inputs)
return amp * np.exp(-((inputs - mean) ** 2) / sigma**2 / 2.0)
[docs]
def histedges_equalN(vals, nbin):
"""Generate histogram bin edges, such that counts are equal in each bin.
Parameters
----------
vals : array-like or float
Value(s) to bin in histogram
nbin : integer
number of bins
"""
npt = len(vals)
return np.interp(np.linspace(0, npt, nbin + 1), np.arange(npt), np.sort(vals))
[docs]
def add_snr_to_prob_csv(probs_csv, data_dir, new_csv):
"""
Adds 10% SNR and num of SNR > 5 points columns
to probability CSV. Useful for plots.
"""
#names, _, _, _, _, probs_df = read_probs_csv(probs_csv)
probs_df = pd.read_csv(probs_csv)
names = probs_df.Name.to_numpy()
extended_df = probs_df.copy()
n_snr_3 = []
n_snr_5 = []
n_snr_10 = []
snr_ten_percent = []
max_flux = []
for name in names:
try:
filename = os.path.join(data_dir, name + ".npz")
lightcurve = Lightcurve.from_file(filename)
snr = np.abs(lightcurve.fluxes / lightcurve.flux_errors)
n_snr_3.append(len(snr[(snr > 3.0)]))
n_snr_5.append(len(snr[(snr > 5.0)]))
n_snr_10.append(len(snr[(snr > 10.0)]))
snr_ten_percent.append(np.quantile(snr, 0.9))
max_flux.append(lightcurve.find_max_flux(band="r")[0])
except:
n_snr_3.append(-1)
n_snr_5.append(-1)
n_snr_10.append(-1)
snr_ten_percent.append(-1)
max_flux.append(-1)
extended_df["Fmax"] = np.array(max_flux)
extended_df["SNR90"] = np.array(snr_ten_percent)
extended_df["nSNR3"] = np.array(n_snr_3)
extended_df["nSNR5"] = np.array(n_snr_5)
extended_df["nSNR10"] = np.array(n_snr_10)
extended_df.to_csv(new_csv, index=False)
[docs]
def calc_precision_recall(y_true, y_score, folds):
"""Calculate purity recall values at
multiple threshholds for plot. Assumes y_true only
contains 0s and 1s (target), and y_score are
probabilities of being class 1.
"""
threshholds = np.linspace(0, 1, num=1000)
all_precs = []
all_recalls = []
all_thresh = []
y_true = y_true.astype(bool)
for t in threshholds:
y_pred = y_score > t
intersect = (y_pred & y_true).astype(int)
for f in np.unique(folds):
f_idx = folds == f
if sum(y_pred[f_idx].astype(int)) == 0:
precision = 1.0
else:
precision = sum(intersect[f_idx]) / sum(y_pred[f_idx].astype(int))
recall = sum(intersect[f_idx]) / sum(y_true[f_idx].astype(int))
all_precs.append(precision)
all_recalls.append(recall)
all_thresh.append(t)
recall_bin_edges = np.unique(histedges_equalN(all_recalls, 30))
#r_centers = (recall_bin_edges[1:] + recall_bin_edges[:-1]) / 2
p, _, _ = binned_statistic(all_recalls, all_precs, statistic='mean', bins=recall_bin_edges)
perr, _, _ = binned_statistic(all_recalls, all_precs, statistic='std', bins=recall_bin_edges)
t, _, _ = binned_statistic(all_recalls, all_thresh, statistic='mean', bins=recall_bin_edges)
p = np.append(p, sum(y_true)/len(y_true)) # end of curve
perr = np.append(perr, 0.0)
return np.asarray(t), np.asarray(recall_bin_edges), np.asarray(p), np.asarray(perr)
[docs]
def roc_curve_w_uncertainties(y_true, y_score, folds):
"""Incorporate K-fold uncertainties."""
threshholds = np.linspace(0, 1, num=1000)
all_tpr = []
all_fpr = []
all_thresh = []
y_true = y_true.astype(bool)
for t in threshholds:
y_pred = y_score > t
false_pos = (y_pred & ~y_true).astype(int)
true_pos = (y_pred & y_true).astype(int)
for f in np.unique(folds):
f_idx = folds == f
single_tpr = sum(true_pos[f_idx]) / sum(y_true[f_idx].astype(int))
single_fpr = sum(false_pos[f_idx]) / sum((~y_true[f_idx]).astype(int))
all_tpr.append(single_tpr)
all_fpr.append(single_fpr)
all_thresh.append(t)
fpr_bin_edges = np.unique(histedges_equalN(all_fpr, 30))
fpr_centers = (fpr_bin_edges[1:] + fpr_bin_edges[:-1]) / 2
tpr, _, _ = binned_statistic(all_fpr, all_tpr, statistic='mean', bins=fpr_bin_edges)
tpr_err, _, _ = binned_statistic(all_fpr, all_tpr, statistic='std', bins=fpr_bin_edges)
print(tpr, fpr_bin_edges)
bin_widths = fpr_bin_edges[1:] - fpr_bin_edges[:-1]
auc = np.sum(tpr*bin_widths)
print(auc)
t, _, _ = binned_statistic(all_fpr, all_thresh, statistic='mean', bins=fpr_bin_edges)
return (
np.append(np.asarray(t), t[-1]),
np.asarray(fpr_bin_edges),
np.append(np.asarray(tpr), tpr[-1]),
np.append(np.asarray(tpr_err), tpr_err[-1])
)
[docs]
def rebin_prec_recall(t, r, rerr, p, perr):
"""Turn completeness to the independent variable,
and bin precision accordingly.
"""
r_bin_centers = r
r_bin_edges = (r[1:] + r[:-1]) / 2.0
# add endpoints
r_bin_edges = np.insert(r_bin_edges, 0, 2*r_bin_centers[0] - r_bin_edges[0])
r_bin_edges = np.append(r_bin_edges, 2*r_bin_centers[-1] - r_bin_edges[-1])
# find purity idxs that fall into each bin
pmin_updated = []
pmax_updated = []
for i, b_center in enumerate(r_bin_centers):
contained_idxs = (
r - rerr < r_bin_edges[i+1]
) & (
r + rerr >= r_bin_edges[i]
)
if len(p[contained_idxs]) == 0:
pmin_updated.append(p[i])
pmax_updated.append(p[i])
else:
pmin_updated.append(
p[contained_idxs][0] - perr[contained_idxs][0]
)
pmax_updated.append(
p[contained_idxs][-1] + perr[contained_idxs][-1]
)
pmin_updated.append(pmin_updated[-1])
pmax_updated.append(pmax_updated[-1])
p_copy = np.append(p, p[-1])
return np.asarray(r_bin_edges), np.asarray(p_copy), np.asarray(pmin_updated), np.asarray(pmax_updated)
[docs]
def calc_calibration_curve(y_true, y_score, folds):
"""Return confidence vs. fraction of true positives."""
thresh_bins = histedges_equalN(y_score, 50)
thresh_bins[-1] = 1.0
f_means = []
f_errs = []
y_true = y_true.astype(bool)
for i in range(len(thresh_bins)-1):
y_pred = (thresh_bins[i] <= y_score) & (y_score < thresh_bins[i+1])
num_true = (y_pred & y_true).astype(int)
fracs = []
for f in np.unique(folds):
f_idx = folds == f
frac = sum(num_true[f_idx]) / sum(y_pred[f_idx].astype(int))
fracs.append(frac)
f_means.append(np.mean(fracs))
f_errs.append(np.std(fracs))
return thresh_bins, np.append(np.asarray(f_means), 1.0), np.append(np.asarray(f_errs), 0.0)