import os
import shutil
import extinction
import numpy as np
import torch
from astropy.coordinates import SkyCoord
from dustmaps.config import config as dustmaps_config
from dustmaps.sfd import SFDQuery
from torch.utils.data import TensorDataset
from torch.utils.tensorboard import SummaryWriter
from superphot_plus.file_paths import (
CLASSIFY_LOG_FILE,
FIT_PLOTS_FOLDER,
PROBS_FILE,
PROBS_FILE2,
WRONGLY_CLASSIFIED_FOLDER,
)
from superphot_plus.sfd import dust_filepath
[docs]def get_band_extinctions_from_mwebv(mwebv, wvs):
"""Get extinction list from MWEBV value and wavelengths.
"""
Av_sfd = 2.742 * mwebv
band_wvs = 1.0 / (0.0001 * np.asarray(wvs)) # in inverse microns
# Now figure out how much the magnitude is affected by this dust
ext_list = extinction.fm07(band_wvs, Av_sfd, unit="invum") # in magnitudes
return ext_list
[docs]def get_band_extinctions(ra, dec, wvs):
"""Get g- and r-band extinctions in magnitudes for a single
supernova lightcurve based on right ascension (RA) and declination
(DEC).
Parameters
----------
ra : float
The right ascension of the object of interest, in degrees.
dec : float
The declination of the object of interest, in degrees.
wvs : list or np.ndarray
Array of wavelengths, in angstroms.
Returns
-------
ext_dict : Dict
A dictionary mapping bands to extinction magnitudes for the given coordinates.
"""
dustmaps_config["data_dir"] = dust_filepath
sfd = SFDQuery()
# First look up the amount of mw dust at this location
coords = SkyCoord(ra, dec, frame="icrs", unit="deg")
# from https://dustmaps.readthedocs.io/en/latest/examples.html
mwebv = sfd(coords)
return get_band_extinctions_from_mwebv(mwebv, wvs)
[docs]def calc_accuracy(pred_classes, test_labels):
"""Calculates the accuracy of the random forest after predicting all
classes.
Parameters
----------
pred_classes : np.ndarray of int
Classes predicted by MLP.
test_labels : np.ndarray of int
True spectroscopic classes.
Returns
-------
Accuracy: float
The percentage of the predictions that are correct.
Raises
------
ValueError
If the pred_classes or test_labels arrays are empty or if they
are of mismatched sizes.
"""
num_total = len(pred_classes)
if num_total == 0:
raise ValueError("Empty array provided to calc_accuracy.")
if num_total != len(test_labels):
raise ValueError(f"Array size mismatch for calc_accuracy {num_total} vs {len(test_labels)}.")
num_correct = np.sum(np.where(pred_classes == test_labels, 1, 0))
return num_correct / num_total
[docs]def f1_score(pred_classes, true_classes, class_average=False):
"""Calculates the F1 score for the classifier. If
class_average=True, then the macro-F1 is used. Else, uses the
weighted-F1 score.
Parameters
----------
pred_classes : np.ndarray of int
Classes predicted by MLP.
true_classes : np.ndarray of int
True spectroscopic classes.
class_average : bool, optional
Determines whether F1 score is weighted equally for each class,
or by number of samples per class. Defaults to False.
Returns
-------
float
The calculated F1 score.
"""
samples_per_class = {}
for true_class in true_classes:
if true_class in samples_per_class:
samples_per_class[true_class] += 1
else:
samples_per_class[true_class] = 1
f1_sum = 0.0
for true_class, count in samples_per_class.items():
true_positive = len(pred_classes[(pred_classes == true_class) & (true_classes == true_class)])
if len(pred_classes[pred_classes == true_class]) == 0:
f_1 = 0.0
else:
purity = true_positive / len(pred_classes[pred_classes == true_class])
completeness = true_positive / len(true_classes[true_classes == true_class])
if purity + completeness == 0: # pragma: no cover
f_1 = 0.0
else:
f_1 = 2.0 * purity * completeness / (purity + completeness)
if class_average:
f1_sum += f_1
else:
f1_sum += count * f_1
if class_average:
return f1_sum / len(samples_per_class.keys())
return f1_sum / len(true_classes)
[docs]def convert_mags_to_flux(mag, merr, zero_point):
"""Converts magnitudes to flux.
Parameters
----------
mag : array-like
The magnitudes.
merr : array-like
The error in magnitudes.
zero_point : float
The zero point in the magnitude system.
Returns
-------
fluxes, flux_unc : tuple of numpy array
The calculated fluxes and their uncertainties.
"""
fluxes = 10.0 ** (-1.0 * (mag - zero_point) / 2.5)
flux_unc = np.log(10.0) / 2.5 * fluxes * merr
return fluxes, flux_unc
[docs]def flux_model(cube, t_data, b_data, ordered_bands, ref_band):
"""Given "cube" of fit parameters, returns the flux measurements for
a given set of time and band data.
Parameters
----------
cube : array-like
The cube of fit parameters.
t_data : array-like
The time data.
b_data : array-like
The band data.
ordered_bands : array-like
The band names in the cube's parameter order.
ref_band : str
The base/reference band which all other values are multiples of.
Returns
-------
f_model : numpy array
The flux model for the given set of time and band data.
"""
b_data = np.array(b_data)
ref_band_idx = np.argmax(ref_band == np.array(ordered_bands))
start_idx = ref_band_idx * 7
amp, beta, gamma, t_0, tau_rise, tau_fall, _ = cube[start_idx : start_idx + 7]
phase = t_data - t_0
f_model = (
amp / (1.0 + np.exp(-phase / tau_rise)) * (1.0 - beta * gamma) * np.exp((gamma - phase) / tau_fall)
)
f_model[phase < gamma] = (
amp / (1.0 + np.exp(-phase[phase < gamma] / tau_rise)) * (1.0 - beta * phase[phase < gamma])
)
for band_idx, ordered_band in enumerate(ordered_bands):
if ordered_band == ref_band:
continue
start_idx = 7 * band_idx
amp_b = amp * cube[start_idx]
beta_b = beta * cube[start_idx + 1]
gamma_b = gamma * cube[start_idx + 2]
t0_b = t_0 * cube[start_idx + 3]
tau_rise_b = tau_rise * cube[start_idx + 4]
tau_fall_b = tau_fall * cube[start_idx + 5]
inc_band_ix = b_data == ordered_band
phase_b = (t_data - t0_b)[inc_band_ix]
phase_b2 = (t_data - t0_b)[inc_band_ix & (t_data - t0_b < gamma_b)]
f_model[inc_band_ix] = (
amp_b
/ (1.0 + np.exp(-phase_b / tau_rise_b))
* (1.0 - beta_b * gamma_b)
* np.exp((gamma_b - phase_b) / tau_fall_b)
)
f_model[inc_band_ix & (t_data - t0_b < gamma_b)] = (
amp_b / (1.0 + np.exp(-phase_b2 / tau_rise_b)) * (1.0 - phase_b2 * beta_b)
)
return f_model
[docs]def params_valid(beta, gamma, tau_rise, tau_fall):
"""Check if parameters are valid given certain model constraints.
Parameters
----------
beta : float
Parameter beta.
gamma : float
Parameter gamma.
tau_rise : float
Parameter tau_rise.
tau_fall : float
Parameter tau_fall.
Returns
-------
bool
True if parameters are valid, False otherwise.
"""
if tau_fall > 1.0 / beta:
return False
if gamma > (1.0 - beta * tau_fall) / beta:
return False
if tau_rise * (1.0 + np.exp(gamma / tau_rise)) < tau_fall:
return False
return True
[docs]def get_numpyro_cube(params, max_flux, ref_band, ordered_bands):
"""
Convert output param dict from numpyro sampler to match that
of dynesty.
Parameters
----------
params : dict
Parameter dictionary
max_flux : float
Max flux of light curve
aux_bands : array-like, optional
The names of auxiliary bands, in order. If None or excluded,
attempts to infer them from the dictionary.
Returns
----------
cube : np.ndarray
Array of all equal-weight parameter draws
aux_bands : np.ndarray
Auxiliary bands, including those inferred if input arg was None.
"""
logA, beta, log_gamma = params["logA"], params["beta"], params["log_gamma"]
t0, log_tau_rise, log_tau_fall, log_extra_sigma = (
params["t0"],
params["log_tau_rise"],
params["log_tau_fall"],
params["log_extra_sigma"],
)
A = max_flux * 10**logA
gamma = 10**log_gamma
tau_rise = 10**log_tau_rise
tau_fall = 10**log_tau_fall
extra_sigma = 10**log_extra_sigma # pylint: disable=unused-variable
cube = []
#cube = [A, beta, gamma, t0, tau_rise, tau_fall, extra_sigma]
for b in ordered_bands:
if b == ref_band:
cube.extend(
[
A, beta, gamma, t0,
tau_rise, tau_fall, extra_sigma
]
)
else:
cube.extend(
[
params[f"A_{b}"],
params[f"beta_{b}"],
params[f"gamma_{b}"],
params[f"t0_{b}"],
params[f"tau_rise_{b}"],
params[f"tau_fall_{b}"],
params[f"extra_sigma_{b}"],
]
)
return np.array(cube).T, np.array(ordered_bands)
[docs]def calculate_log_likelihood(cube, lightcurve, unique_bands, ref_band):
"""Calculate the log-likelihood of a single lightcurve.
Copied from dynesty_sampler.py
Parameters
----------
cube : np.ndarray
Array of parameters. Must be length 7 * B where B is
the number of unique bands.
lightcurve : Lightcurve
The lightcurve object to evaluate.
unique_bands : list
A list of bands to use.
ref_band : str
The reference band.
Returns
-------
logL : float
Log-likelihood value.
"""
if ref_band not in unique_bands:
raise ValueError("Reference band not included in unique_bands.")
if 7 * len(unique_bands) != len(cube):
raise ValueError(
f"Size mismatch with curve parameters. Expected {7 * len(unique_bands)}. Found {len(cube)}."
)
if len(lightcurve.times) == 0:
raise ValueError("Empty light curve provided.")
# Generate points from 'cube' for comparison.
max_flux, _ = lightcurve.find_max_flux(band=ref_band)
f_model = flux_model(cube, lightcurve.times, lightcurve.bands, unique_bands, ref_band)
extra_sigma_arr = np.ones(len(lightcurve.times)) * cube[6] * max_flux
for band_idx, ordered_band in enumerate(unique_bands):
if ordered_band == ref_band:
continue
extra_sigma_arr[lightcurve.bands == ordered_band] *= cube[7 * band_idx + 6]
# Compute the loglikelihood based on the differences in flux between the observed
# and generated.
sigma_sq = lightcurve.flux_errors**2 + extra_sigma_arr**2
logL = np.sum(
np.log(1.0 / np.sqrt(2.0 * np.pi * sigma_sq)) - 0.5 * (f_model - lightcurve.fluxes) ** 2 / sigma_sq
)
return logL
[docs]def calculate_mse(cube, lightcurve, unique_bands, ref_band):
"""Calculate the mean-square error for a lightcurve.
Parameters
----------
cube : np.ndarray
Array of parameters. Must be length 7 * B where B is
the number of unique bands.
lightcurve : Lightcurve
The lightcurve object to evaluate.
unique_bands : list
A list of bands to use.
ref_band : str
The reference band.
Returns
-------
mse : float
The mean square error of the predictions
"""
if ref_band not in unique_bands:
raise ValueError("Reference band not included in unique_bands.")
if 7 * len(unique_bands) != len(cube):
raise ValueError(
f"Size mismatch with curve parameters. Expected {7 * len(unique_bands)}. Found {len(cube)}."
)
if len(lightcurve.times) == 0:
raise ValueError("Empty light curve provided.")
# Generate points from 'cube' for comparison.
f_model = flux_model(cube, lightcurve.times, lightcurve.bands, unique_bands, ref_band)
mse_sum = np.sum(np.square(f_model - lightcurve.fluxes))
return mse_sum / len(lightcurve.times)
[docs]def calculate_neg_chi_squareds(cubes, t, f, ferr, b, ordered_bands=None, ref_band="r"):
"""Gets the negative chi-squared of posterior fits from the model
parameters and original data files.
Parameters
----------
names : list of str
The names of the objects.
fit_dir : str
The directory where the fit files are located.
data_dirs : list of str
The directories where the data files are located.
ordered_bands : list of str
Bands in order they appear in cubes. Defaults to ZTF band order.
ref_band : str
Base/reference band. Defaults to 'r'.
Returns
-------
log_likelihoods : np.ndarray
The log likelihoods for each object.
"""
if ordered_bands is None:
ordered_bands = ["r", "g"]
model_f = np.array(
[flux_model(cube, t, b, ordered_bands, ref_band) for cube in cubes]
) # in future, maybe vectorize flux_model
extra_sigma_arr = np.ones((len(cubes), len(t))) * np.max(f[b == ref_band]) * cubes[:, 6][:, np.newaxis]
for i, ordered_band in enumerate(ordered_bands[ordered_bands != ref_band]):
extra_sigma_arr[:, b == ordered_band] *= cubes[:, 7 * i + 13][:, np.newaxis]
sigma_sq = extra_sigma_arr**2 + ferr**2
log_likelihoods = np.sum(
np.log(1.0 / np.sqrt(2.0 * np.pi * sigma_sq)) - 0.5 * (f - model_f) ** 2 / sigma_sq, axis=1
) / len(t)
return log_likelihoods
[docs]def create_dataset(features, labels, idxs=None):
"""Creates a PyTorch dataset object from numpy arrays.
Parameters
----------
features : np.ndarray
The features array.
labels : np.ndarray
The labels array.
idxs : np.ndarray, optional
The indices array. Defaults to None.
Returns
-------
torch.utils.data.TensorDataset
The created dataset.
"""
tensor_x = torch.Tensor(features) # transform to torch tensor
tensor_y = torch.Tensor(labels).type(torch.LongTensor)
if idxs is None:
return TensorDataset(tensor_x, tensor_y) # create your datset
tensor_z = torch.Tensor(idxs)
return TensorDataset(tensor_x, tensor_y, tensor_z) # create your datset
[docs]def calculate_accuracy(y_pred, y):
"""Calculate the prediction accuracy.
Parameters
----------
y_pred : torch.Tensor
The predicted tensor.
y : torch.Tensor
The true tensor.
Returns
-------
torch.Tensor
The calculated accuracy.
"""
top_pred = y_pred.argmax(1, keepdim=True)
correct = top_pred.eq(y.view_as(top_pred)).sum()
acc = correct.float() / y.shape[0]
return acc
[docs]def epoch_time(start_time, end_time):
"""Sets the time it takes for each epoch to train.
Parameters
----------
start_time : float
The start time.
end_time : float
The end time.
Returns
-------
tuple
A tuple containing the elapsed minutes and elapsed seconds.
"""
elapsed_time = end_time - start_time
elapsed_mins = int(elapsed_time / 60)
elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
return elapsed_mins, elapsed_secs
[docs]def save_test_probabilities(
output_filename, pred_probabilities, true_label=None, output_dir=None, save_file=None
):
"""Saves probabilities to a separate file for ROC curve generation.
Parameters
----------
output_filename : str
The file name to save to.
pred_probabilities : array-like
The prediction probabilities.
true_label : str or int
The true label.
output_dir: str
Where to store the generated file.
"""
default_dir = PROBS_FILE2 if true_label is None else PROBS_FILE
default_dir = default_dir if save_file is None else save_file
output_path = default_dir if output_dir is None else os.path.join(output_dir, default_dir)
with open(output_path, "a+", encoding="utf-8") as probs_file:
if true_label is None:
probs_file.write(output_filename)
else:
probs_file.write(f"{output_filename},{str(true_label)}")
for prob in pred_probabilities:
probs_file.write(f",{prob:.04f}")
probs_file.write("\n")
[docs]def adjust_log_dists(features_orig, redshift=False):
"""Takes log of fit parameters with log-Gaussian priors before
feeding into classifier. Also removes apparent amplitude and t0.
Parameters
----------
features_orig : np.ndarray
Array of fit features of all samples.
redshift : boolean, optional
Whether to keep redshift data or not.
Returns
---------
features : np.ndarray
Array of adjusted fit features.
"""
features = np.copy(features_orig)
features[:, 4:7] = np.log10(features[:, 4:7])
features[:, 2] = np.log10(features[:, 2])
if redshift: # keep amplitude
return np.delete(
features,
[
3,
],
1,
)
return np.delete(features, [0, 3], 1)
[docs]def write_metrics_to_file(
config,
true_classes,
pred_classes,
prob_above_07,
log_file=CLASSIFY_LOG_FILE,
):
"""Calculates the accuracy and f1 score metrics for the
test set and outputs them to a log file.
Parameters
----------
config : ModelConfig
The configuration of the model used for evaluation.
true_classes : np.ndarray
The ground truth for the test ZTF objects.
pred_classes : np.ndarray
The predicted classes for the test ZTF objects.
prob_above_07 : np.ndarray
Indicates which predictions had a 70% confidence.
log_file : str
The file where the metrics information will be written.
"""
test_acc = calc_accuracy(pred_classes, true_classes)
test_f1_score = f1_score(pred_classes, true_classes, class_average=True)
with open(log_file, "a+", encoding="utf-8") as the_file:
the_file.write(str(config.goal_per_class) + " samples per class\n")
the_file.write(
str(config.neurons_per_layer)
+ " neurons per each of "
+ str(config.num_hidden_layers)
+ " layers\n"
)
the_file.write(str(config.num_epochs) + " epochs\n")
the_file.write(
"HOW MANY CERTAIN " + str(len(true_classes)) + " " + str(len(true_classes[prob_above_07])) + "\n"
)
the_file.write(f"MLP class-averaged F1-score: {test_f1_score:.04f}\n")
the_file.write(f"Accuracy: {test_acc:.04f}\n")
the_file.write(f"Best Validation Loss: {config.best_val_loss:.04f}\n\n")
[docs]def get_session_metrics(metrics):
"""Calculates the validation loss and accuracy for the hyperparameter set.
The best model is considered the one with the lowest validation loss.
Parameters
----------
metrics : tuple
Tuple containing the training accuracies and losses, and the validation
accuracies and losses, for each epoch and fold.
Returns
-------
tuple
The mean validation loss and accuracy for the hyperparameter set.
"""
_, _, val_accs, val_losses = list(zip(*metrics))
# Find min loss value in all folds
min_val_losses = list(map(min, val_losses))
# Find indices for corresponding min values
min_indices = [val_losses[i].index(min_val_loss) for i, min_val_loss in enumerate(min_val_losses)]
# Get the accuracies for the best validation losses
val_accs = [val_accs[min_indices[i]] for i, val_accs in enumerate(val_accs)]
return np.mean(min_val_losses), np.mean(val_accs)
[docs]def log_metrics_to_tensorboard(metrics, config, trial_id, base_dir="runs"):
"""Calculates the training and validation accuracies and losses
for each epoch (by averaging each fold) and logs these metrics to
Tensorboard using a SummaryWriter. It also stores the run
configuration for further reference.
Parameters
----------
metrics : tuple
Tuple containing the training accuracies and losses,
and the validation accuracies and losses, for each
epoch and fold.
config : ModelConfig
The model's training configuration.
trial_id : str
The experiment identifier.
base_dir : str
The directory where all tensorboard metrics should be stored.
Defaults to "runs".
Returns
-------
tuple
The training losses and accuracies, followed by the validation
losses and accuracies, for each epoch.
"""
train_accs, train_losses, val_accs, val_losses = list(zip(*metrics))
avg_train_losses = np.array(train_losses).mean(axis=0)
avg_val_losses = np.array(val_losses).mean(axis=0)
avg_train_accs = np.array(train_accs).mean(axis=0)
avg_val_accs = np.array(val_accs).mean(axis=0)
run_dir = os.path.join(base_dir, trial_id)
writer = SummaryWriter(run_dir)
for i in range(config.num_epochs):
writer.add_scalar("Loss/train", avg_train_losses[i], i)
writer.add_scalar("Loss/val", avg_val_losses[i], i)
writer.add_scalar("Accuracy/train", avg_train_accs[i], i)
writer.add_scalar("Accuracy/val", avg_val_accs[i], i)
# Store current config to file
config.write_to_file(f"{run_dir}/config.yaml")
return avg_train_losses, avg_train_accs, avg_val_losses, avg_val_accs