Source code for superraenn

"""Run SuperRAENN on superphot+ light curves."""
import pandas as pd
import os
import numpy as np
from astropy.table import Table
from astropy.cosmology import Planck13 as cosmo

from superphot_plus.supernova_class import SupernovaClass as SnClass
from superphot_plus.utils import convert_mags_to_flux
from superphot_plus.lightcurve import Lightcurve
from superphot_plus.posterior_samples import PosteriorSamples
from superphot_plus.file_utils import get_posterior_filename
from tensorflow.keras.optimizers.legacy import Adam

from superraenn.preprocess import save_lcs
from superraenn.lc import LightCurve
from superraenn.raenn import *
from superraenn.feature_extraction import *
from tensorflow.keras.models import load_model
from tensorflow.keras.layers import Input, RepeatVector, concatenate
import tensorflow as tf

[docs] NEURON_N_DEFAULT = 100
[docs] ENCODING_N_DEFAULT = 10
[docs] N_EPOCH_DEFAULT = 2000
[docs] def prep_lcs_superraenn( dataset_csv, probs_csv, data_dir, save_dir, metafile ): """Run equivalent of superraenn-prep on processed light curves.""" print("STARTS") os.makedirs(save_dir, exist_ok=True) full_df = pd.read_csv(dataset_csv) all_names = full_df.NAME.to_numpy() labels = full_df.CLASS.to_numpy() redshifts = full_df.Z.to_numpy() final_names = pd.read_csv(probs_csv).Name.to_numpy() my_lcs = [] with open(metafile, 'w+') as f: f.write('# SN Redshift Type T_explosion MW(EBV)\n') for i, name in enumerate(all_names): if i % 500 == 0: print(i) if name not in final_names: continue l_canon = SnClass.canonicalize(labels[i]) l_oneword = l_canon.replace(" ", "") lc = Lightcurve.from_file( os.path.join( data_dir, name + ".npz" ) ) my_lc = LightCurve( name, lc.times[lc.bands != 'i'], lc.fluxes[lc.bands != 'i'], lc.flux_errors[lc.bands != 'i'], lc.bands[lc.bands != 'i'] ) filt_dict = {'g': 0, 'r': 1} my_lc.add_LC_info( zpt=26.3, redshift=redshifts[i], lim_mag=20.6, obj_type=l_canon ) my_lc.get_abs_mags() my_lc.sort_lc() my_lc.correct_time_dilation() my_lc.filter_names_to_numbers(filt_dict) my_lc.cut_lc() try: my_lc.make_dense_LC(2) my_lcs.append(my_lc) with open(metafile, 'a') as f: f.write(f'{name} {redshifts[i]} {l_oneword} 0.0 0.0\n') except: print("skipped") continue save_lcs(my_lcs, save_dir)
[docs] def run_superraenn_raenn(lcfile, outdir, load_file=None): sequence, outseq, ids, maxlen, nfilts = prep_input(lcfile, save=True, outdir=outdir) model, callbacks_list, input_1, encoded = make_model( NEURON_N_DEFAULT, ENCODING_N_DEFAULT, int(maxlen), 2 ) if load_file is not None: model = tf.keras.models.load_model( load_file, custom_objects={'customLoss': customLoss}, compile=False ) new_optimizer = Adam(learning_rate=1e-4, beta_1=0.9, beta_2=0.999, decay=0) model.compile(optimizer=new_optimizer, loss=customLoss) model = fit_model(model, callbacks_list, sequence, outseq, N_EPOCH_DEFAULT) encoder = get_encoder(model, input_1, encoded) # These comments used in testing, and sould be removed... # lms = outseq[:, 0, 1] # test_model(sequence_test, model, lm, maxlen, plot=True) # decoder = get_decoder(model, args.encodingN) # get_decodings(decoder, encoder, sequence, lms, args.encodingN, \ # maxlen, plot=False) if outdir[-1] != '/': outdir += '/' save_model(model, ENCODING_N_DEFAULT, NEURON_N_DEFAULT, outdir=outdir) save_encodings(model, encoder, sequence, ids, lcfile, ENCODING_N_DEFAULT, NEURON_N_DEFAULT, len(ids), maxlen, outdir=outdir)
[docs] def encode_raenn_features( lc_file, model_base, prep_file, save_dir, raenn_only=False ): """Encode RAENN features in PosteriorSamples objects. """ input_lcs = np.load(lc_file, allow_pickle=True)['lcs'] ids = [] feat_names = [] for input_lc in input_lcs: ids.append(input_lc.name) feat = feat_from_raenn( lc_file, model_base=model_base, prep_file=prep_file, plot=False ) features = feat if raenn_only: for i, input_lc in enumerate(input_lcs): ps = PosteriorSamples( features[i], name=input_lc.name, sampling_method='superraenn', redshift=input_lc.redshift, sn_class=input_lc.obj_type ) ps.save_to_file(save_dir) else: nfilts = 2 feats_all = [] for i, input_lc in enumerate(input_lcs): if i % 100 == 0: print(i) save_path = get_posterior_filename( input_lc.name, fits_dir=save_dir, sampler='superraenn' ) #if os.path.exists(save_path): # continue gp = input_lc.gp gp_mags = input_lc.gp_mags feats = [] for j in np.arange(nfilts): new_times = np.linspace(-100, 100, 500) x_stacked = np.asarray([new_times, [j] * 500]).T pred, var = gp.predict(gp_mags, x_stacked) feats.append( np.nanmin(input_lc.dense_lc[:, j, 0], axis=0) ) max_ind = np.nanargmin(pred) max_mag = pred[max_ind] max_t = new_times[max_ind] for n_mag in [1,2,3]: trise = np.where( (new_times < max_t) & (pred > (max_mag + n_mag)) ) tfall = np.where( (new_times > max_t) & (pred > (max_mag + n_mag)) ) if len(trise[0]) == 0: trise = max_t - np.min(new_times) else: trise = max_t - new_times[trise][-1] if len(tfall[0]) == 0: tfall = np.max(new_times) - max_t else: tfall = new_times[tfall][0] - max_t feats.append(trise) feats.append(tfall) new_times_sub = new_times - max_t lc_grad = np.gradient(pred, new_times_sub) gindmean = np.where( ( new_times_sub > 10 ) & ( new_times_sub < 30 ) ) feats.append( np.nanmedian(lc_grad[gindmean]) ) feats.append(np.trapz(pred)) features_i = np.append(features[i], feats) ps = PosteriorSamples( features_i, name=input_lc.name, sampling_method='superraenn', redshift=input_lc.redshift, sn_class=input_lc.obj_type ) ps.save_to_file(save_dir)
[docs] def retrieve_decodings( model_path, lc_file, prep_fn, sn_name=None ): model = load_model( model_path, custom_objects={"customLoss": customLoss}, compile=False ) input_1 = Input((None, 5)) mid_layer = model.layers[1](input_1) encoded = model.layers[2](mid_layer) sequence, outsequence, ids, maxlen, nfilts = prep_input( lc_file, save=False, ) ids = np.array(ids) if sn_name is not None: seq = sequence[ids == sn_name] outseq = outsequence[ids == sn_name] outseq_single = outsequence[ids == sn_name] u_time = np.unique(outseq_single[0,:,:]) outseq = np.zeros((1, 200, 2)) outseq[0, :, 0] = np.linspace( np.min(u_time[:-1]), np.max(u_time[:-1]), num=200 ) outseq = outseq_single outseq[0, :, 1] = outseq_single[0,0,1] else: seq = sequence outseq = outsequence encoder = get_encoder(model, input_1, encoded) decoder = get_decoder(model, ENCODING_N_DEFAULT) encodings = encoder(seq) repeater = RepeatVector(outseq.shape[1])(encodings) merged = concatenate([repeater, outseq], axis=-1) decodings = decoder(merged) #decodings = model([seq, outseq]) prep = np.load(prep_fn) bandmin = prep['bandmin'] bandmax = prep['bandmax'] # rescale back to original ABSOLUTE mags decodings = decodings * ( bandmin - bandmax) - bandmin if sn_name is None: return decodings # convert back to apparent magnitues lcs = np.load(lc_file, allow_pickle=True)['lcs'] for lc in lcs: if lc.name == sn_name: redshift = lc.redshift k_correction = 2.5 * np.log10(1.+redshift) dist = cosmo.luminosity_distance([redshift]).value[0] # returns dist in Mpc fluxes = 10**((decodings - 26.3 - k_correction + 5. * np.log10(dist*1e5))/(-2.5)) return outseq[0,:,0], fluxes[0].numpy()