Source code for src.superphot_plus.import_utils

"""This script provides functions for importing and manipulating ZTF 
data from the Alerce API."""

import csv
import os

import numpy as np

from superphot_plus.surveys.surveys import Survey
from superphot_plus.utils import convert_mags_to_flux


[docs]def import_lc(filename, survey=Survey.ZTF(), clip_lightcurve=True): """Imports a single file, but only the points from a single survey. Parameters ---------- filename : str Path to the input CSV file. survey : Survey, optional Assumes light curve data was taken by this survey. Defaults to ZTF. Returns ------- tuple Tuple containing the imported light curve data. """ ra = None dec = None if not os.path.exists(filename): # pragma: no cover return [None] * 6 with open(filename, "r", encoding="utf-8") as csv_f: csvreader = csv.reader(csv_f, delimiter=",") row_intro = next(csvreader) ra_idx = row_intro.index("ra") dec_idx = row_intro.index("dec") b_idx = row_intro.index("fid") f_idx = row_intro.index("magpsf") ferr_idx = row_intro.index("sigmapsf") flux = [] flux_err = [] mjd = [] bands = [] for row in csvreader: if ra is None: ra = float(row[ra_idx]) dec = float(row[dec_idx]) try: ext_dict = survey.get_extinctions(ra, dec) except: # pragma: no cover return [None] * 6 if int(row[b_idx]) == 2: flux.append(float(row[f_idx]) - ext_dict["r"]) bands.append("r") elif int(row[b_idx]) == 1: flux.append(float(row[f_idx]) - ext_dict["g"]) bands.append("g") else: # pragma: no cover continue mjd.append(float(row[1])) flux_err.append(float(row[ferr_idx])) sort_idx = np.argsort(np.array(mjd)) t = np.array(mjd)[sort_idx].astype(float) m = np.array(flux)[sort_idx].astype(float) merr = np.array(flux_err)[sort_idx].astype(float) b = np.array(bands)[sort_idx] t = t[merr != np.nan] m = m[merr != np.nan] b = b[merr != np.nan] merr = merr[merr != np.nan] f, ferr = convert_mags_to_flux(m, merr, 26.3) if clip_lightcurve: t, f, ferr, b = clip_lightcurve_end(t, f, ferr, b) snr = np.abs(f / ferr) for band in survey.wavelengths: if len(snr[(snr > 3.0) & (b == band)]) < 5: # pragma: no cover return [None] * 6 if (np.max(f[b == band]) - np.min(f[b == band])) < 3.0 * np.mean(ferr[b == band]): # pragma: no cover print("SKIPPED BECAUSE AMP") return [None] * 6 return t, f, ferr, b, ra, dec
[docs]def clip_lightcurve_end(times, fluxes, fluxerrs, bands): """Clips end of lightcurve with approximately 0 slope. Checks from back to max of lightcurve. Parameters ---------- times : np.ndarray Time values of the light curve. fluxes : np.ndarray Flux values of the light curve. fluxerrs : np.ndarray Flux error values of the light curve. bands : np.ndarray Band information of the light curve. Returns ------- tuple Tuple containing the clipped light curve data. """ t_clip, flux_clip, ferr_clip, b_clip = [], [], [], [] for b in np.unique(bands): idx_b = bands == b t_b, f_b, ferr_b = times[idx_b], fluxes[idx_b], fluxerrs[idx_b] end_i = len(t_b) - np.argmax(f_b) num_to_cut = 0 if np.argmax(f_b) == len(f_b) - 1: t_clip.extend(t_b) flux_clip.extend(f_b) ferr_clip.extend(ferr_b) b_clip.extend([b] * len(f_b)) continue m_cutoff = 0.2 * np.abs((f_b[-1] - np.amax(f_b)) / (t_b[-1] - t_b[np.argmax(f_b)])) for i in range(2, end_i): cut_idx = -1 * i m = (f_b[cut_idx] - f_b[-1]) / (t_b[cut_idx] - t_b[-1]) if np.abs(m) < m_cutoff: num_to_cut = i if num_to_cut > 0: t_clip.extend(t_b[:-num_to_cut]) flux_clip.extend(f_b[:-num_to_cut]) ferr_clip.extend(ferr_b[:-num_to_cut]) b_clip.extend([b] * len(f_b[:-num_to_cut])) else: t_clip.extend(t_b) flux_clip.extend(f_b) ferr_clip.extend(ferr_b) b_clip.extend([b] * len(f_b)) return np.array(t_clip), np.array(flux_clip), np.array(ferr_clip), np.array(b_clip)
[docs]def add_to_new_csv(name, label, redshift, output_csv): """Add row to CSV of included files for training. Parameters ---------- name : str Name in the new row. label : str Label in the new row. redshift : float Redshift value in the new row. output_csv : str The output CSV file path. """ with open(output_csv, "a", encoding="utf-8") as csv_file: writer = csv.writer(csv_file, delimiter=",") writer.writerow([name, label, redshift])