"""This module contains classes that can be utilized for visualizing and
analyzing the simulated images and spectra"""
from photutils import aperture as aper
from matplotlib import colors as col
import matplotlib.pyplot as plt
from matplotlib.ticker import AutoMinorLocator
from photutils.detection import DAOStarFinder
from photutils.psf import DAOPhotPSFPhotometry, FittableImageModel
from astropy.io import fits
from astropy.io.fits import CompImageHDU
from astropy.modeling.fitting import LevMarLSQFitter
from astropy.stats import sigma_clipped_stats
from .utils import Xmatch
import numpy as np
[docs]class Analyzer(object):
def __init__(self):
"""
A class to visualize and analyze the simulated image
Parameters
----------
Returns
-------
None.
"""
def __call__(self, df=None, wcs=None, data=None,
photometry=None, detect_sources=False, fwhm=3, sigma=13,
ZP=None):
"""
Performs sim simulation and sim Photometry
Imager.call()
do_photometry : Bool, Default : True
Do Aperture Photometry
"""
self.photometry_type = photometry
if photometry == 'Aper':
self.aper_photometry(data, wcs, df, fwhm, sigma, detect_sources,
ZP)
elif photometry == 'PSF':
self.psf_photometry(data, wcs, df, fwhm, sigma, ZP)
[docs] def aper_photometry(self, data, wcs, df, fwhm, sigma, detect, ZP):
"""
Function to perform Aperture photometry
Parameters
----------
data: np.ndarray,
image to perform photometry on
wcs: astropy.wcs.WCS
WCS object of the image
df: pandas.DataFrame,
Source catalog of source in the image from simulation for
reference
fwhm : float, pixels
During aperture photometry,
fwhm corresponds to FWHM circular aperture for
aperture photometry
During PSF photometry,
fwhm corresponds FWHM kernel to use for PSF photometry
sigma: float,
The numbers of standard deviations above which source has to be
detected
detect: bool,
If true, DARStarFinder is used to detect sources for aperture
photometry
if false, input catalog is used for getting positions
of sources for aperture photometry
ZP : float,
zero point of the telescope.
Returns
-------
phot_table: astropy.table.Table
table containing photometry of the souces
Columns
'x-centeroid'
'y-centeroid'
'sky'
'flux'
'mag_in'
'mag_out'
'mag_err'
'SNR'
"""
# if detect flag is set to True, detect sources in the image
if detect:
# Calculate the mean, median and standard deviation of the data
mean, median, std = sigma_clipped_stats(data, sigma=sigma)
# Create DAOStarFinder object to detect sources
daofind = DAOStarFinder(fwhm=fwhm, threshold=sigma*std)
# Detect sources in the image
sources = daofind(data-median)
# Get the source positions
positions = np.transpose((sources['xcentroid'],
sources['ycentroid']))
else:
# create SkyCoord object from ra and dec values in the dataframe
coords = np.array([df['ra'], df['dec']])
# convert the sky coordinates to pixel coordinates
pix = wcs.world_to_pixel_values(coords.T)
positions = np.array(pix)
# create circular aperture object
self.aps = aper.CircularAperture(positions, r=2*fwhm)
# count number of pixels within the aperture
ap_pix = np.count_nonzero(self.aps.to_mask()[0])
# create circular annulus object
self.bags = aper.CircularAnnulus(positions, r_in=3*fwhm,
r_out=5*fwhm)
# count number of pixels within the annulus
bag_pix = np.count_nonzero(self.bags.to_mask()[0])
# perform aperture photometry on the data
phot_table = aper.aperture_photometry(data, [self.aps, self.bags])
# calculate sky flux
phot_table['sky_flux'] = phot_table['aperture_sum_1']*(ap_pix/bag_pix)
# calculate source flux
phot_table['flux'] = phot_table['aperture_sum_0'].value - \
phot_table['sky_flux'].value
# calculate error on the source flux
phot_table['flux_err'] = np.sqrt(phot_table['flux'].value +
phot_table['sky_flux'].value)
# calculate signal to noise ratio
phot_table['SNR'] = phot_table['flux']/phot_table['flux_err']
if not detect:
phot_table['ra'] = df['ra'].values
phot_table['dec'] = df['dec'].values
phot_table['mag_in'] = df['mag'].values
else:
coords = np.array(wcs.pixel_to_world_values(positions))
phot_table['ra'] = coords[:, 0]
phot_table['dec'] = coords[:, 1]
matched = Xmatch(phot_table, df, 3*fwhm)
mag_in = []
sep = []
for i, j, k in matched:
if j is not None:
mag_in.append(df['mag'].values[j])
sep.append(k)
else:
mag_in.append(np.nan)
sep.append(np.nan)
phot_table['mag_in'] = mag_in
phot_table['sep'] = sep
phot_table['mag_out'] = -2.5*np.log10(phot_table['flux']) + ZP
phot_table['mag_err'] = 1.082/phot_table['SNR']
coords = np.array(wcs.pixel_to_world_values(positions))
phot_table['ra'] = coords[:, 0]
phot_table['dec'] = coords[:, 1]
self.phot_table = phot_table
def psf_photometry(self, data, wcs, df, fwhm, sigma, ZP):
"""
Function to perform PSF photometry
Parameters
----------
data: np.ndarray,
image to perform photometry on
wcs: astropy.wcs.WCS
WCS object of the image
df: pandas.DataFrame,
Source catalog of source in the image from simulation for
reference
fwhm : float, pixels
During aperture photometry,
fwhm corresponds to FWHM circular aperture for
aperture photometry
During PSF photometry,
fwhm corresponds FWHM kernel to use for PSF photometry
sigma: float,
The numbers of standard deviations above which source
has to be detected
detect: bool,
If true, DARStarFinder is used to detect sources
for aperture photometry
if false, input catalog is used for getting positions
of sources for aperture photometry
ZP : float,
zero point of the telescope.
Returns
-------
phot_table: astropy.table.Table
table containing photometry of the souces
Columns
'x-centeroid'
'y-centeroid'
'sky'
'flux'
'mag_in'
'mag_out'
'mag_err'
'SNR'
"""
mean, median, std = sigma_clipped_stats(data, sigma=3)
psf_model = FittableImageModel(self.psf)
self.psf_model = psf_model
photometry = DAOPhotPSFPhotometry(crit_separation=2,
threshold=mean + sigma*std,
fwhm=fwhm,
aperture_radius=3,
psf_model=psf_model,
fitter=LevMarLSQFitter(),
fitshape=(11, 11),
niters=3)
phot_table = photometry(image=data)
positions = np.array([phot_table['x_fit'], phot_table['y_fit']]).T
coords = np.array(wcs.pixel_to_world_values(positions))
phot_table['ra'] = coords[:, 0]
phot_table['dec'] = coords[:, 1]
phot_table['SNR'] = phot_table['flux_fit']/phot_table['flux_unc']
phot_table['mag_out'] = -2.5*np.log10(phot_table['flux_fit'])
phot_table['mag_out'] += ZP
phot_table['mag_err'] = 1.082/phot_table['SNR']
matched = Xmatch(phot_table, df, 3*fwhm)
mag_in = []
sep = []
for i, j, k in matched:
if j is not None:
mag_in.append(df['mag'].values[j])
sep.append(k)
else:
mag_in.append(np.nan)
sep.append(np.nan)
phot_table['mag_in'] = mag_in
phot_table['sep'] = sep
self.phot_table = phot_table
[docs] def show_field(self, figsize=(10, 10), marker='.', cmap='jet'):
"""
Function for creating a scatter plot of sources within the FoV
Parameters
----------
figsize : tuple,
Figure size
Returns
-------
fig, ax
"""
scale = self.n_x/self.n_y
figsize = figsize[0]*scale, figsize[1]
# Cropping Dataframe based on FoV
left = (self.n_x_sim - self.n_x)//2
right = left + self.n_x
df = self.sim_df
x_min_cut = (df['x'] > left)
x_max_cut = (df['x'] < right)
df = df[x_min_cut & x_max_cut]
bottom = (self.n_y_sim - self.n_y)//2
top = bottom + self.n_y
y_min_cut = (df['y'] > bottom)
y_max_cut = (df['y'] < top)
fov_x = (self.n_x*self.pixel_scale)/3600
fov_y = (self.n_y*self.pixel_scale)/3600
fov_x = np.round(fov_x, 4)
fov_y = np.round(fov_y, 4)
df = df[y_min_cut & y_max_cut]
fig, ax = plt.subplots(1, 1, figsize=figsize)
if hasattr(self, 'spec_bins'):
x = df['x']
y = df['y']
c = None
cmap = None
color = None
if 'mag' in df.keys():
c = df['mag']
cmap = 'jet'
color = None
img = ax.scatter(x, y, c=c, color=color, cmap=cmap,
marker=marker)
cb = plt.colorbar(img, fraction=1/scale, pad=0.01)
cb.set_label('mag (ABmag)')
ax.set_xlabel('x (pix)')
ax.set_ylabel('y (pix)')
if hasattr(self, 'L'):
B = self.B
L = self.L
lw = len(self.spec_bins)
PA = self.PA
delta = np.arctan(B/L)
d = np.sqrt(L**2 + B**2)/2
om = PA - delta
x_corr = self.n_x_sim//2 - d*np.cos(om) - lw/2
y_corr = self.n_y_sim//2 - d*np.sin(om) - B*np.cos(PA)
start = B*np.sin(PA) + lw/2 + x_corr
end = start + L*np.cos(PA)
x = np.linspace(start, end, 100)
y = np.tan(PA)*(x - B*np.sin(PA) - lw/2 - x_corr) + y_corr
ax.plot(x, y, color='red')
start = lw/2 + x_corr
end = start + L*np.cos(PA)
x = np.linspace(start, end, 100)
y = np.tan(PA)*(x - lw/2 - x_corr) + B*np.cos(PA) + y_corr
ax.plot(x, y, color='red')
start = y_corr
end = start + B*np.cos(PA)
y = np.linspace(start, end, 100)
x = -np.tan(PA)*(y - B*np.cos(PA) - y_corr) + lw/2 + x_corr
ax.plot(x, y, color='red')
start = y_corr + L*np.sin(PA)
end = start + B*np.cos(PA)
y = np.linspace(start, end, 100)
x = -np.tan(PA)*(y - B*np.cos(PA) - L*np.sin(PA) - y_corr)\
+ L*np.cos(PA) + lw/2 + x_corr
ax.plot(x, y, color='red')
ax.set_xlim(0, self.n_x_sim)
ax.set_ylim(0, self.n_y_sim)
else:
x = df['ra']
y = df['dec']
c = df['mag']
img = ax.scatter(x, y, c=c, marker=marker, cmap=cmap)
cb = plt.colorbar(img)
cb.set_label('mag (ABmag)')
ax.invert_xaxis()
ax.set_xlabel('RA (Degrees)')
ax.set_ylabel('Dec (Degrees)')
ax.invert_xaxis()
ax.set_xlim(self.ra+fov_x/2, self.ra-fov_x/2)
ax.set_ylim(self.dec-fov_y/2, self.dec+fov_y/2)
ax.xaxis.set_minor_locator(AutoMinorLocator())
ax.yaxis.set_minor_locator(AutoMinorLocator())
ax.tick_params(which='both', width=2, direction="in",
top=True, right=True,
bottom=True, left=True)
title1 = f'Requested Center : {self.name} {len(df)} sources'
title2 = f'Fov(RA) : {fov_x} (deg) | Fov(Dec) : {fov_y} (deg)'
ax.set_title(f"""{title1}
{title2}""")
return fig, ax
[docs] def show_image(self, source='Digital', fig=None, ax=None, cmap='jet',
figsize=(15, 10), download=False, show_wcs=True,
overlay_apertures=False):
"""
Function for plotting the simulated field image
Source: str,
Choose from
'Digital' : Final digial image
'Charge' : electrons, Light(Source + sky) +
Dark Current + Noises
'Source' : Source + Sky + Noises
'Sky' : Sky + shot_noise
'DC' : Dark Current + DNFP
'QE' : Quantum efficiency fluctuation
across
detector
'Bias' : Charge offset
'PRNU' : Photon Response Non-Uniformity
'DNFP' : Dark Noise Fixed Pattern
'QN' : Quantization Noise
fig : matplotlib.pyplot.figure
User defined figure
ax : matplotlib.pyplot.axes
User defined axes
cmap : str,
matplotlib.pyplot colormap
figsize : tuple
download : bool
show_wcs : bool
If true adds WCS projection to the image
Returns
-------
Image
fig, ax
"""
if hasattr(self, 'digital'):
if fig is None or ax is None:
fig = plt.figure(figsize=figsize)
if show_wcs:
ax = fig.add_subplot(projection=self.wcs)
else:
ax = fig.add_subplot()
norm = None
if source == 'Digital':
data = self.digital
norm = col.LogNorm()
elif source == 'Charge':
data = self.charge
norm = col.LogNorm()
elif source == 'Source':
data = self.light_array
norm = col.LogNorm()
elif source == 'Sky':
data = self.sky_photons
elif source == 'DC':
data = self.DC_array
elif source == 'Bias':
data = self.bias_array
elif source == 'PRNU':
data = self.PRNU_array
elif source == 'DNFP':
norm = col.LogNorm()
data = self.DNFP_array
elif source == 'QN':
data = self.QN_array
else:
print("Invalid Input")
return None
if data.min() < 0:
print('Negative values in image. Increase Bias')
data += data.min()
img = ax.imshow(data, cmap=cmap, norm=norm)
ax.grid(False)
cb = plt.colorbar(img, ax=ax)
cb.set_label('DN')
ax.set_title(f'{source} \nRequested center : {self.name}')
ax.grid(False)
if overlay_apertures and self.photometry_type == "Aper":
for aperture in self.aps:
if aperture is not None:
aperture.plot(ax=ax, color='red', lw=1.5)
for aperture in self.bags:
if aperture is not None:
aperture.plot(ax=ax, color='yellow', lw=1.5)
if hasattr(self, 'L'):
B = self.B
L = self.L
lw = len(self.spec_bins)
PA = self.PA
delta = np.arctan(B/L)
d = np.sqrt(L**2 + B**2)/2
om = PA - delta
x_corr = self.n_x//2 - d*np.cos(om) - lw/2
y_corr = self.n_y//2 - d*np.sin(om) - B*np.cos(PA)
start = B*np.sin(PA) + lw/2 + x_corr
end = start + L*np.cos(PA)
x = np.linspace(start, end, 100)
y = np.tan(PA)*(x - B*np.sin(PA) - lw/2 - x_corr) + y_corr
ax.plot(x, y, color='red')
start = lw/2 + x_corr
end = start + L*np.cos(PA)
x = np.linspace(start, end, 100)
y = np.tan(PA)*(x - lw/2 - x_corr) + B*np.cos(PA) + y_corr
ax.plot(x, y, color='red')
start = y_corr
end = start + B*np.cos(PA)
y = np.linspace(start, end, 100)
x = -np.tan(PA)*(y - B*np.cos(PA) - y_corr) + lw/2 + x_corr
ax.plot(x, y, color='red')
start = y_corr + L*np.sin(PA)
end = start + B*np.cos(PA)
y = np.linspace(start, end, 100)
x = -np.tan(PA)*(y - B*np.cos(PA) - L*np.sin(PA) - y_corr)\
+ L*np.cos(PA) + lw/2 + x_corr
ax.plot(x, y, color='red')
if download:
fig.savefig(f"{source}.png", format='png')
return fig, ax
else:
print("Run Simulation")
[docs] def show_hist(self, source='Digital', bins=None,
fig=None, ax=None, figsize=(15, 8)):
"""
Function for plotting histogram of various stages of simulation
Parameters
----------
Source: str,
Choose from
'Digital' : Final digial image
'Charge' : electrons, Light(Source + sky)
+ Dark Current + Noises
'Source' : Source + Sky + Noises
'Sky' : Sky + shot_noise
'DC' : Dark Current + DNFP
'QE' : Quantum efficiency fluctuation across
detector
'Bias' : Charge offset
'PRNU' : Photon Response Non-Uniformity
'DNFP' : Dark Noise Fixed Pattern
'QN' : Quantization Noise
bins : numpy.array,
bins for making histogram
fig : matplotlib.pyplot.figure
User defined figure
ax : matplotlib.pyplot.axes
User defined axes
figsize : tuple
"""
if hasattr(self, 'digital'):
if fig is None or ax is None:
fig, ax = plt.subplots(1, 1, figsize=figsize)
if source == 'Digital':
data = self.digital.ravel()
elif source == 'Charge':
data = self.charge.ravel()
elif source == 'Source':
data = self.light_array
elif source == 'Sky':
data = self.sky_photons.ravel()
elif source == 'DC':
data = self.DC_array.ravel()
elif source == 'Bias':
data = self.bias_array.ravel()
elif source == 'PRNU':
data = self.PRNU_array.ravel()
elif source == 'DNFP':
data = self.DNFP_array.ravel()
elif source == 'QN':
data = self.QN_array.ravel()
if bins is None:
bins = np.linspace(data.min(), data.max(), 20)
ax.hist(data, bins=bins)
ax.set_title(f'{source} histogram')
ax.set_ylabel('Count')
ax.set_yscale('log')
return fig, ax
else:
print("Run Simulation")
[docs] def getImage(self, source='Digital'):
"""
Function of retrieving image array at different
stages of simulation.
Parameters
----------
Source: str,
Choose from
'Digital' : Final digial image
'Charge' : electrons, Light(Source + sky)
+ Dark Current + Noises
'Source' : Source + Sky + Noises
'Sky' : Sky + shot_noise
'DC' : Dark Current + DNFP
'QE' : Quantum efficiency fluctuation across
detector
'Bias' : Charge offset
'PRNU' : Photon Response Non-Uniformity
'DNFP' : Dark Noise Fixed Pattern
'QN' : Quantization Noise
"""
if hasattr(self, 'digital'):
if source == 'Digital':
data = self.digital
elif source == 'Charge':
data = self.charge
elif source == 'Sky':
data = self.sky_photoelec
elif source == 'DC':
data = self.DC_array
elif source == 'QE':
data = self.qe_array
elif source == 'Bias':
data = (self.bias_array + self.DC_array)
elif source == 'PRNU':
data = self.PRNU_array
elif source == 'DNFP':
data = self.DNFP_array
elif source == 'QN':
data = self.QN_array
else:
data = 0
return data
else:
print("Run Simulation")
[docs] def writeto(self, name, source='Digital', user_source=None,
with_dark_flat=False):
"""
Function for downloading a fits file of simulated field image
Parameters
----------
name : str
filename, Example : simulation.fits
Source: str,
Choose from
'Digital' : Final digial image
'Charge' : electrons, Light(Source + sky)
+ Dark Current + Noises
'Source' : Source + Sky + Noises
'Sky' : Sky + shot_noise
'DC' : Dark Current + DNFP
'Bias' : Charge offset
'PRNU' : Photon Response Non-Uniformity
'DNFP' : Dark Noise Fixed Pattern
'QN' : Quantization Noise
user_source : numpy.ndarray
2D numpy array user wants to save as FITS
with_dark_flat : bool
True : Output fits will provide dark
and flat frames
"""
if hasattr(self, 'digital'):
c1 = user_source is not None
c2 = isinstance(user_source, np.ndarray)
if c1 and c2:
data = user_source
elif source == 'Digital':
data = self.digital
elif source == 'Charge':
data = self.charge
elif source == 'Source':
data = self.light_array
elif source == 'Sky':
data = self.sky_photoelec
elif source == 'DC':
data = self.DC_array
elif source == 'Bias':
data = self.bias_array
elif source == 'PRNU':
data = self.PRNU_array
elif source == 'DNFP':
data = self.DNFP_array
elif source == 'QN':
data = self.QN_array
else:
print(f"{source} is not a valid source")
hdu = fits.PrimaryHDU(data, header=self.header)
hdu.wcs = self.wcs
hdus = [hdu]
if with_dark_flat:
dark = self.dark_frame/self.exp_time
header = self.header
header['Frame'] = 'Dark'
hdu = fits.ImageHDU(dark, header=header)
hdus.append(hdu)
flat = self.flat_frame
header['Frame'] = 'Flat'
hdu = fits.ImageHDU(flat, header=header)
hdus.append(hdu)
hdul = fits.HDUList(hdus)
hdul.writeto(f'{name}', overwrite=True, checksum=True)
else:
print("Run Simulation")
[docs] def writecomp(self, name):
hdu = CompImageHDU(data=self.digital, header=self.header)
hdus = [fits.PrimaryHDU(), hdu]
hdul = fits.HDUList(hdus)
hdul.writeto(name, overwrite=True, checksum=True)