"""This module contains additional functions for the package"""
from pathlib import Path
from astropy.modeling import models
from astropy.coordinates import angular_separation
from astropy.table import Table
from astropy.cosmology import WMAP9 as cosmo
from astropy.coordinates import SkyCoord, Distance, ICRS
from scipy.spatial import cKDTree
import astropy.units as u
import seaborn as sb
import matplotlib.pyplot as plt
import matplotlib
import numpy as np
import pandas as pd
data_path = Path(__file__).parent.joinpath()
sb.set_style('white')
matplotlib.rcParams['font.size'] = 12
matplotlib.rcParams['figure.figsize'] = (10, 10)
[docs]
def bandpass(wav, flux, inputs, plot=True, fig=None, ax=None):
"""
Function to convolve response functions
Parameters
----------
wav : numpy.ndarray
wavelenth in angstrom
flux: numpy.ndarray
flux normalized to [0,1]
plot: bool,
If true shows plots with input and convolved response functions
fig : matplotlib.pyplot.figure
User defined figure
ax : matplotlib.pyplot.axes
User defined axes
Returns
-------
fig, ax, data, params
data : tuple,
(wavelenth array, flux_array, convolved flux array)
params: tuple,
(effective wavelength, integrated flux, Effective Width)
"""
lambda_ = wav
flux_AB = flux
if plot:
if fig is None or ax is None:
fig, ax = plt.subplots(1, 1, figsize=(12, 8))
ax.plot(lambda_, flux_AB/flux_AB.max(), label=r'$F(\lambda)$',
alpha=0.7)
R_eff = 1
for i in inputs:
file_name = i.split(',')[0]
n = float(i.split(',')[1])
f_max = float(i.split(',')[2])
filt_dat = np.loadtxt(file_name)
wav = filt_dat[:, 0]
flux = filt_dat[:, 1]
if np.amax(flux) > 1:
flux /= f_max
indices = np.where((wav > lambda_[0]) & (wav < lambda_[-1]))
wav_new = wav[indices]
flux_new = flux[indices]
wav_new = np.concatenate([[lambda_[0]], [wav_new[0] - 1], wav_new,
[wav_new[-1] + 1], [lambda_[-1]]])
flux_new = np.concatenate([[0], [0], flux_new, [0], [0]])
flux_out = np.interp(lambda_, wav_new, flux_new)
R_eff *= flux_out**n
if plot:
ax.plot(lambda_, flux_out/flux_out.max(),
label=f"{file_name.split('/')[-1][:-4]}x{n}", alpha=0.7)
# Wavelength units
conv_flux = R_eff*flux_AB
int_flux = np.trapezoid(lambda_*conv_flux, lambda_)/np.trapezoid(lambda_*R_eff,
lambda_)
W_eff = np.trapezoid(R_eff, lambda_)/R_eff.max()
lambda_phot = np.trapezoid(lambda_**2*conv_flux,
lambda_)/np.trapezoid(lambda_*conv_flux, lambda_)
c1 = lambda_ >= lambda_phot-W_eff/2
c2 = lambda_ <= lambda_phot+W_eff/2
R_sq = np.where((c1 & c2), 1, 0)
flux_ratio = np.trapezoid(R_eff, lambda_)/np.trapezoid(R_sq, lambda_)
if plot:
ax.plot(lambda_, R_sq,
label="Square Filter", alpha=0.7)
# Frequency units
R_eff_Jy = R_eff*lambda_**2*3.34e4
flux_AB = flux_AB*lambda_**2*3.34e4
nu = 3e18/lambda_
conv_flux_Jy = R_eff_Jy*flux_AB
int_flux_Jy = np.trapezoid(nu*conv_flux_Jy, nu)/np.trapezoid(nu*R_eff_Jy, nu)
# Comparing to a square filter with same width
data = lambda_, conv_flux, R_eff
params = lambda_phot, int_flux, int_flux_Jy, W_eff, flux_ratio
if plot:
ax.plot(lambda_, conv_flux/conv_flux.max(), label='Convloved Flux',
linewidth=5)
y = np.linspace(0, 1)
x = y*0 + lambda_phot
label = r'$\lambda_{phot} = $' + f'{round(lambda_phot,3)}' + r' $\AA$'
ax.plot(x, y, '--', color='black', label=label)
ax.set_xlabel(r'$\AA$')
ax.set_ylabel(r'Normalized Flux')
fig.suptitle('Bandpass', fontsize=20, y=0.95)
ax.legend()
return fig, ax, data, params
[docs]
def generate_psf(npix, params, function='Gaussian'):
"""
Function for generating user defined PSF
npix : int,
number of pixels along one axis for pixel array
sigma: float,
standard deviation of the PSF in pixels
function: str,
type of PSF function
Returns
-------
numpy.ndarray
"""
x = np.linspace(0, npix - 1, npix)
y = x
yy, xx = np.meshgrid(x, y)
if function == 'Gaussian':
sigma_x = params['sigma_x']
sigma_y = params['sigma_y']
psf = models.Gaussian2D(1, npix//2, npix//2,
sigma_x, sigma_y)(xx, yy)
psf /= psf.sum()
elif function == 'Moffat':
gamma = params['gamma']
alpha = params['alpha']
psf = models.Moffat2D(1, npix//2, npix//2,
gamma, alpha)(xx, yy)
psf /= psf.sum()
return psf
[docs]
def Xmatch(df1, df2, r=1):
"""Function for crossmatching two
catalogs using RAs and Decs"""
if isinstance(df1, Table):
df1 = df1.to_pandas()
if isinstance(df2, Table):
df2 = df2.to_pandas()
# define the catalogs
matched = []
for i, row in df1.iterrows():
dist = angular_separation(row['ra'], row['dec'],
df2['ra'].values, df2['dec'].values)
dist = dist*u.radian
dist = dist.to(u.arcsec).value
if dist.min() <= r:
index = np.where(dist == dist.min())[0][0]
matched.append([i, index, dist.min()])
else:
matched.append([i, None, None])
return matched
[docs]
def redshift_corr(df):
"""
Function for redshift correction of input data
Parameters
----------
df: astropy.table.Table or pandas.DataFrame
table with columns 'wav', 'flux', 'z1' and 'z2'
wav: numpy.ndarray
wavelength in Angstrom
flux: numpy.ndarray
flux in ergs/s/cm2/A
z1: float,
Original redshift to star
z2: float,
New redshift to star
Returns
-------
astropy.table.Table or pandas.DataFrame
"""
if 'z1' in df.keys() and 'z2' in df.keys():
red_corr = df['z2'].value.reshape(-1, 1)-df['z1'].value.reshape(-1, 1)
df['wav'] = df['wav'].value*(1 + red_corr)
d1 = cosmo.luminosity_distance(df['z1']).value
d2 = cosmo.luminosity_distance(df['z2']).value
flux_corr = (d1/d2)**2
elif 'd1' in df.keys() and 'd2' in df.keys():
flux_corr = (df['d1'].value/df['d2'].value)**2
df['flux'] = df['flux'].value*flux_corr.reshape(-1, 1)
return df
[docs]
def spectra_to_mags_df(df, inputs):
"""
Function to convert spectra to magnitude using telescope response functions
Using Astropy Table
Parameters
----------
df: astropy.table.Table
table with columns 'wav', 'flux', 'd1' and 'd2' or 'z1' or 'z2'
wav: numpy.ndarray
wavelength in Angstrom
flux: numpy.ndarray
flux in ergs/s/cm2/A
d1: float,
Original distance to star.
d2: float
New distance to star
z1: float,
Original redshift to star
z2: float,
New redshift to star
inputs: list,
list of path to response functions
Returns
-------
astropy.table.Table
"""
mags = []
c1 = ('z1' in df.keys() and 'z2' in df.keys())
c2 = ('d1' in df.keys() and 'd2' in df.keys())
if c1 or c2:
df = redshift_corr(df)
else:
KeyError("z1 and z2 or d1 and d2")
fluxes = df['flux'].value
wavs = df['wav'].value
for wav, flux in zip(wavs, fluxes):
out = bandpass(wav, flux, inputs=inputs,
plot=False)
params = out[3]
int_flux_Jy = params[2]
ABmag = -2.5*np.log10(int_flux_Jy/3631)
if ABmag == np.nan:
ABmag = 100
mags.append(ABmag)
df['mag'] = mags
return df
[docs]
def spectra_to_mags(wav, flux, inputs, z1=None, z2=None, d1=None, d2=None):
"""
Function to convert spectra to magnitude using telescope response functions
Using Astropy Table
Parameters
----------
wav: list of numpy.ndarray
wavelength in Angstrom
flux: list of numpy.ndarray
flux in ergs/s/cm2/A
inputs: list,
list of path to response functions
d1: float,
Original distance to star.
d2: float
New distance to star
z1: float,
Original redshift to star
z2: float,
New redshift to star
Returns
-------
ABmag: numpy.ndarray
magnitude of stars from spectra
"""
c1 = z1 is not None and z2 is not None
c2 = d1 is not None and d2 is not None
if c1:
wav = wav*(1 + z1 - z2)
d1 = cosmo.luminosity_distance(z1).value
d2 = cosmo.luminosity_distance(z2).value
flux_corr = (d1/d2)**2
elif c2:
flux_corr = (d1/d2)**2
flux *= flux_corr
out = bandpass(wav, flux, inputs=inputs,
plot=False)
params = out[3]
int_flux_Jy = params[2]
ABmag = -2.5*np.log10(int_flux_Jy/3631)
if ABmag == np.nan:
ABmag = 100
return ABmag
[docs]
def calc_mos_size(L, B, PA, lw):
"""
Function to calculate minimum size of a Multi-object Spectrometer (MOS)
detector based on it's orientation and shape
Parameters
----------
L: float,
Length of MOS Field of View (FoV).
B: float,
Breadth of MOS FoV.
PA: float,
Orientation of MOS FoV with respect to sky plane. (degrees)
lw: int,
Number of pixels in dispersion direction.
Returns
-------
x_size, y_size: tuple
(int,int)
"""
PA *= np.pi/180
x_size = lw + B*np.sin(PA) + L*np.cos(PA)
y_size = B*np.cos(PA) + L*np.sin(PA)
x_size = int(np.round(x_size, 0))
y_size = int(np.round(y_size, 0))
return x_size, y_size
[docs]
def count_sources_within_radius(catalog, radius):
"""
Function for counting the number of source around each stat
within a given radius using KDTree
Parameters
----------
catalog: (np.ndarray, np.ndarray),
source coordinates
radius: float,
radius of circle to count sources within
Returns
-------
counts: list,
number density of sources around each star
"""
tree = cKDTree(catalog)
counts = []
for star in catalog:
indices = tree.query_ball_point(star, radius)
count = len(indices) - 1 # Exclude the star itself from the count
counts.append(count)
return counts
[docs]
def select_mos(df, cen_x, cen_y, radius=10, min_sep=10, ny=1000):
"""
Function to select sources from a given database such that overlap of
spectra is minimum
Parameters
----------
df1: astropy.table.Table,
table with columns 'ra' and 'dec'
ra: float,
Right Ascension in degrees
dec: float,
Declnation in degrees
cen_x, float,
Reference x position in pixel coordinates
cen_y: float,
Reference x position in pixel coordinates
radius: float,
Radius of circle to count stars within.
min_sep: float,
minimum separation between stars in the spatial axis,
ny: int,
length of spatial axis in pixels
Returns
-------
mos_df: astropy.table.Table,
table containing list of selected sources
res_df: astropy.table.Table,
table containing list of remaining sources
"""
catalog = df[['x', 'y']].values
result = count_sources_within_radius(catalog, radius)
df['n_density'] = result
top = np.flip(np.arange(cen_y, 0, -min_sep))
bottom = np.arange(cen_y + min_sep, ny, min_sep)
y_pos = np.concatenate([top, bottom])
mos_df = []
df['x_sep'] = abs(df['x'] - cen_x)
for i in y_pos:
p = df[(df['y'] > i-2) & (df['y'] < i+2)]
p = p[p['n_density'] == 0]
if len(p) > 0:
p = p[p['x_sep'] == p['x_sep'].min()]
mos_df.append(p.values[0])
mos_df = pd.DataFrame(mos_df, columns=df.keys())
res_df = mos_df[0:0]
for id in df['objid']:
if id not in mos_df['objid'].values:
t = df[df['objid'] == id]
res_df = pd.concat([res_df, t])
return mos_df, res_df, cen_x, cen_y