import numpy as np
import jax.numpy as jnp
import legwork as lw
import pandas as pd
from blip.src.instrNoise import instrNoise
from blip.src.geometry import geometry
from blip.src.sph_geometry import sph_geometry
from scipy.interpolate import interp1d as intrp
import matplotlib.pyplot as plt
import healpy as hp
from astropy import units as u
from astropy import coordinates as cc
from astropy.coordinates import SkyCoord
from scipy.signal import medfilt
[docs]
class Population():
'''
Class for handling binary populations. Includes methods for loading, parsing, calculating physical quantities,
and preparing data for use in some of the makeLISAdata.py signal simulation methods.
'''
def __init__(self, params, inj, frange, popdict, seed=None, map_only=False):
'''
Produces a population object with an attached skymap and spectrum.
Note that we don't carry around the entire set of binaries, just the overall population-level data.
This is a fast way to accurately approximate the strain PSD of the unresolved DWD population in frequency-domain,
but does result in some smoothing of the spectrum, so sharp features may be reduced in the simulation process.
Also note that each bin is phase-averaged, which will underestimate the variance in a small fraction of bins.
NB -- should implement a full time-domain approach when time allows, as it will be more precise. This is a good (and efficient) approximation, though.
Arguments
-------------------
params (dict) : params dict
inj (dict) : injection dict
frange (array of floats) : injection splice fft frequencies
popdict (str) : the populations dict corresponding to the desired population (allows for multiple populations)
map_only (bool) : Whether to only compute the skymap. Used for models that rely on a fixed population skymap.
'''
self.params = params
self.inj = inj
self.frange = frange
self.popdict = popdict
## load the population
if self.popdict['coldict'] is None:
pop = self.load_population(self.popdict['popfile'],self.params['fmin'],self.params['fmax'],
names=self.popdict['columns'],sep=self.popdict['delimiter'],
seed=seed)
else:
pop = self.load_population(self.popdict['popfile'],self.params['fmin'],self.params['fmax'],
names=self.popdict['columns'],sep=self.popdict['delimiter'],
coldict=self.popdict['coldict'],seed=seed)
## get the skymap
df = self.frange[1] - self.frange[0]
self.skymap = self.pop2map(pop,self.params['nside'],df*u.Hz,self.params['fmin'],self.params['fmax'])
## also compute the spherical harmonic transform if the injection is using the spherical harmonic basis
if self.inj['inj_basis']=='sph':
self.sph_skymap = skymap_pix2sph(self.skymap,self.inj['inj_lmax'])
## spectrum
if not map_only:
## PSD at injection frequency binning
self.PSD = self.pop2spec(pop,self.frange,self.params['seglen']*u.s,
SNR_cut=self.popdict['snr_cut'],return_median=False,plot=True,saveto=params['out_dir'])
## PSD at data frequencies
self.fftfreqs = np.fft.rfftfreq(int(self.params['fs']*self.params['seglen']),1/self.params['fs'])[1:]
self.PSD_true = self.pop2spec(pop,self.fftfreqs,self.params['seglen']*u.s,return_median=False,plot=False)[np.logical_and(self.fftfreqs >= self.params['fmin'] , self.fftfreqs <= self.params['fmax'])]
self.frange_true = self.fftfreqs[np.logical_and(self.fftfreqs >= self.params['fmin'] , self.fftfreqs <= self.params['fmax'])]
self.Sgw = self.PSD
## reweight to match what we are injecting at data frequencies
self.Sgw_true = self.PSD_true #* (self.params['seglen']/self.params['tsplice'])
[docs]
def rebin_PSD(self,fs_new):
'''
Function to correctly interpolate the population spectrum to new frequencies without violating conservation of energy
'''
delta_f_old = self.delta_f
delta_f_new = fs_new[1] - fs_new[0]
return (delta_f_new/delta_f_old)*self.PSD_interp(fs_new)
[docs]
def Sgw_wrapper(self,frange,spoof_arg=None):
'''
This is a wrapper function to allow the population spectrum to play well with some of the generic Injection-handling code.
Evaluated at the injection frequencies.
'''
if hasattr(frange,"__len__"):
return self.Sgw
else:
return self.Sgw[np.argmin(np.abs(self.frange - 1e-3))]
[docs]
def Sgw_wrapper_true(self,frange,spoof_arg=None):
'''
This is a wrapper function to allow the population spectrum to play well with some of the generic Injection-handling code.
Evaluated at the data frequencies.
'''
if hasattr(frange,"__len__"):
return self.Sgw_true
else:
return self.Sgw_true[np.argmin(np.abs(self.frange_true - 1e-3))]
[docs]
def omegaf_wrapper(self,fs,spoof_arg=None):
'''
This is a wrapper function to allow the pupulation spectrum to play well with some of the generic Injection-handling code.
'''
H0 = 2.2*10**(-18)
omegaf = self.Sgw_wrapper(fs)/((3/(4*(fs)**3))*(H0/np.pi)**2)
return omegaf
[docs]
@staticmethod
def load_population(popfile,fmin,fmax,coldict={'f':'f','h':'h','lat':'lat','long':'long'},unitdict={'f':u.Hz,'lat':u.rad,'long':u.rad},
sep=' ',seed=None,**read_csv_kwargs):
# Would also be good to have an option for giving binary parameters and computing the strain here?
'''
Function to load a population file and store relevant data. For now this assumes columns with labels ['f','h',lat','long'].
Assumes LEGWORK definition of binary strain (see https://arxiv.org/abs/2111.08717)
Arguments:
popfile (str) : '/path/to/binary/population/data/file.csv'
fmin (float) : Minimum analysis frequency
fmax (float) : Maximum analysis frequency
coldict (dict) : Dictionary explaining which columns correspond to which quantities (allows for users to specify different column names)
unitdict (dict) : Dictionary specifying units of each column.
sep (str) : File delimiter. Overwritten if sep or delimiter is specified in **read_csv_kwargs. Default is ' '.
**read_csv_kwargs : Optional keyword arguments to be passed to pd.read_csv()
Returns:
fs (array) : Binary frequencies
hs (array) : Binary strain amplitudes
lats (array) : Binary latitudes in degrees
longs (array) : Binary longitudes in degrees
'''
## handle conflicting kwargs
if 'sep' in read_csv_kwargs.keys():
sep = read_csv_kwargs['sep']
del read_csv_kwargs['sep']
elif 'delimiter' in read_csv_kwargs.keys():
sep = read_csv_kwargs['delimiter']
del read_csv_kwargs['delimiter']
## load
dwds = pd.read_csv(popfile,sep=sep,**read_csv_kwargs)
## unit conversions and assignments as needed
fs = (dwds[coldict['f']].to_numpy()*unitdict['f']).to(u.Hz).value
hs = dwds[coldict['h']].to_numpy()
lats = (dwds[coldict['lat']].to_numpy()*unitdict['lat']).to(u.deg).value
longs = (dwds[coldict['long']].to_numpy()*unitdict['long']).to(u.deg).value
## inclination handling
if 'inc' in coldict.keys():
cos_incs = np.cos(dwds[coldict['inc']].to_numpy()) ## assumed radians
elif 'cosi' in coldict.keys():
cos_incs = dwds[coldict['cosi']].to_numpy()
else:
print("No inclinations provided. Randomly drawing from cosi ~ U(-1,1)")
if seed is None:
print("Warning: No random seed was provided to the Population object, inclinations will not be reproducible.")
rng = np.random.default_rng(seed)
cos_incs = 2*rng.random(size=len(fs)) - 1
## filter to frequency band
# f_filter = (fs >= fmin) & (fs <= fmax)
## generate pop dict
pop = {'fs':fs,'hs':hs,'lats':lats,'longs':longs,'cos_incs':cos_incs}
return pop
[docs]
@staticmethod
def get_binary_psd(hs,cos_incs,df):
'''
Function to calculate PSD of catalogue binaries. Assumed monochromatic.
We assume a definition of amplitude such that A = h0, so
h(t) = h+(t) + hx(t),
h+(t) = (1+cos^2(i)) * A * cos(2*omega*t + phi0),
and
hx(t) = 2 * cosi * A * sin(2omega*t + phi0).
The strain power is then
<h(t)^2> = (1+cos^2(i))^2 * A^2 + 4 * cos^2(i) * A^2
which for optimal inclination (i=0, face-on) yields
<h(t)^2> = 8A^2.
The PSD contribution from the monochromatic binary at
frequency resolution df = 1/Tobs is then
PSD = (1/df) * < h(t)^2 >
which in the optimal inclination case is
PSD = 8 * (1/df) * A^2.
Note that by combining the + and x contributions prior to convolution with the LISA
response functions, we implicitly assume that the overall population produces an
unpolarized stochastic signal. This is statistically true for any stochastic signal produced
by a population of binaries with uniformly distributed inclinations, but the assumption may
break down in some cases.
Also note that there is an alternate definition for the amplitude A,
such that A = 2h0. If used here, this will result in an erroneous factor of 4 in the PSD.
Arguments:
hs (1D array of floats) : Binary strains.
cos_incs (1D array of floats) : cosine of binary inclinations
df (astropy Quantity, frequency units) : Binning frequency resolution.
Returns:
binary_psds (1D array of floats): Monochromatic PSDs for each binary
'''
h2s = (1+cos_incs**2)**2 * hs**2 + 4 * cos_incs**2 * hs**2
binary_psds = h2s/df
return binary_psds
[docs]
@classmethod
def get_snr(cls,fs,hs,cos_incs,t_obs,noise_PSD='default'):
## need to update this to take either legwork or local noise PSD
'''
Function to get SNRs of catalogue binaries, given their frequencies/strains, observation time, and a detector PSD.
Assumes monochromatic systems.
Arguments:
fs (1D array of floats) : Binary frequencies. Assumed monochromatic.
hs (1D array of floats) : Binary strains.
t_obs (astropy Quantity, time units) : Observation time.
PSD (varies) : If 'default', uses the Legwork LISA PSD without confusion noise.
If an array, used as the detector PSD at each frequency value in fs
Returns:
SNRs (array of floats) : Binary SNRs.
'''
## assuming monochromatic systems, get SNRs and filter any DWDs with SNR>7. Again per Thiele+22:
if noise_PSD=='default':
noise_PSD = lw.psd.lisa_psd(fs,t_obs=t_obs,confusion_noise='robson19')
elif noise_PSD=='no_fg':
noise_PSD = lw.psd.lisa_psd(fs,t_obs=t_obs,confusion_noise=None)
## we want the SNRs for the resolved binaries at the full frequency resolution
SNRs = cls.get_binary_psd(hs,cos_incs,1/t_obs)/(4*noise_PSD)
return SNRs
[docs]
@staticmethod
def filter_by_snr(data,SNRs,SNR_cut=7,get_type='unresolved'):
'''
Function to filter DWD data by SNR. Can return either unresolved (SNR < SNR_cut) or resolved (SNR > SNR_cut) binaries.
Arguments:
data (1D array of floats) : Binary population data of your choice (or list thereof), corresponding to the given SNRs
SNRs (1D array of floats) : SNR value for each system corresponding to data
SNR_cut (float) : Value of SNR that delineates resolved and unresolved binaries. Default is SNR = 7.
get_type (str) : Whether to return the resolved or unresolved binaries. Default is unresolved.
Returns:
data_filt : Filtered arrays of frequencies and strains.
'''
if type(data) is not list:
data = [data]
if get_type=='unresolved':
data_filt = [data_i[SNRs<SNR_cut] for data_i in data]
if len(data_filt) == 1:
data_filt = data_filt[0]
return data_filt
elif get_type=='resolved':
data_filt = [data_i[SNRs>=SNR_cut] for data_i in data]
if len(data_filt) == 1:
data_filt = data_filt[0]
return data_filt
else:
print("Invalid specification of get_type; can be 'resolved' or 'unresolved'.")
raise
[docs]
@classmethod
def gen_summed_spectrum(cls,fs,hs,cos_incs,frange,t_obs,plot=False,saveto=None,return_median=False):
'''
Function to calculate the foreground spectrum arising from a set of monochromatic strains and associated frequencies.
Binaries passed to this function are assumed to all contribute to the foreground.
Arguments:
fs (1D array of floats) : Binary frequencies. Assumed monochromatic.
hs (1D array of floats) : Binary strain amplitudes.
cos_incs (1D array of floats) : Cosine of the binary inclinations
t_obs (astropy Quantity, time units) : Observation time.
frange (1D array of floats) : Frequencies at which to calculate binned PSD
Returns:
fg_PSD_binned (array of floats) : Resulting PSD of unresolved binary background/foreground for all f in frange
'''
## get BLIP frequency bins
bin_width = frange[1] - frange[0]
bin_widths = bin_width
bins = np.append(frange - bin_width/2,frange[-1]+bin_width/2)
## get strain squared power
PSDs_unres = cls.get_binary_psd(hs,cos_incs,bin_width)
## check minimum frequency resolution
## set minimum bin width to delta_f = 1/T_obs
## for now fix to LISA 4yr duration
min_bin_width = (1/(t_obs)).to(u.Hz)
if np.any(bin_widths*u.Hz<min_bin_width):
print("Warning: frequency resolution exceeds the maximum allowed by t_obs.")
## bin
fg_hist_binned, edges = np.histogram(fs,bins=bins,weights=PSDs_unres)
## np.histogram computes p((1/dfbin)*h^2|f) x N_unres x dfbin
## PSD should be p(h^2|f) x N_unres / dfbin
fg_PSD_binned = fg_hist_binned #/ (bin_width)
## get running median if needed
if plot or return_median:
runmed_binned = medfilt(fg_PSD_binned,kernel_size=3)
## make plots if desired
## note that in BLIP proper, this is called for every segment, and is then ifft'd.
## The true FG spectrum will be the result of splicing these segments together in time domain and taking another fft. Do not expect these to be representative of your expectations of the FG.
if plot:
plt.figure()
det_PSD = lw.psd.lisa_psd(frange*u.Hz,t_obs=4*u.yr,confusion_noise=None,approximate_R=True)
response_lw = lw.psd.approximate_response_function(frange,fstar=1e-3)
det_PSD_robson = lw.psd.lisa_psd(frange*u.Hz,t_obs=4*u.yr,confusion_noise='robson19',approximate_R=True)
plt.plot(frange,det_PSD,color='black',ls='--',label='Detector PSD')
plt.plot(frange,det_PSD_robson,color='black',label='Detector PSD (R19)')
plt.plot(frange,response_lw*fg_PSD_binned,color='slategray',alpha=0.5,label='Foreground')
plt.plot(frange,response_lw*runmed_binned,color='teal',alpha=0.5,label='FG Running Median')
plt.plot(frange,response_lw*runmed_binned*(1/u.Hz)+det_PSD,color='mediumorchid',alpha=0.5,label='FG + Det. PSD')
plt.legend(loc='upper right')
plt.xscale('log')
plt.yscale('log')
plt.ylim(1e-43,1e-34)
# plt.xlim(1e-4,1e-2)
plt.xlabel('Frquency [Hz]')
plt.ylabel('GW Power Spectral Density [Hz$^{-1}$]')
if saveto is None:
saveto = '.'
plt.savefig(saveto + '/population_injection.png', dpi=150)
plt.close()
## zoom zoom
plt.figure()
plt.plot(frange,det_PSD,color='black',ls='--',label='Detector PSD')
plt.plot(frange,det_PSD_robson,color='black',label='Detector PSD (R19)')
plt.plot(frange,response_lw*fg_PSD_binned,color='slategray',alpha=0.5,label='Foreground')
plt.plot(frange,response_lw*runmed_binned,color='teal',alpha=0.5,label='FG Running Median')
plt.plot(frange,response_lw*runmed_binned*(1/u.Hz)+det_PSD,color='mediumorchid',alpha=0.5,label='FG + Det. PSD')
plt.legend(loc='upper right')
plt.xscale('log')
plt.yscale('log')
plt.ylim(1e-40,1e-35)
plt.xlim(2e-4,4e-3)
plt.xlabel('Frquency [Hz]')
plt.ylabel('GW Power Spectral Density [Hz$^{-1}$]')
plt.savefig(saveto + '/population_injection_zoom.png', dpi=150)
plt.close()
if return_median:
spectrum = fg_PSD_binned *u.Hz*u.s
median_spectrum = runmed_binned *u.Hz*u.s
return spectrum, median_spectrum
else:
spectrum = fg_PSD_binned *u.Hz*u.s
return spectrum
[docs]
@staticmethod
def gen_summed_map(lats,longs,PSDs,nside,return_log=False):
'''
Function to get a skymap from a collection of binary sky coordinates and (monochromatic) PSDs.
Note that this function will process all binaries given to it; SNR filtering must be done beforehand.
Arguments:
lats, longs (arrays of floats) : Latitudes and longitudes of catalogue binaries.
IMPORTANT: Must be given in ecliptic coordinates and units of degrees!
PSDs (array of floats) : Corresponding catalogue binary PSDs (assumed monochromatic)
nside (int) : Healpix nside to use for skymap. Must be power of 2 < 2**32.
return_log (bool) : If True, also return the log skymap, with slight zero-buffering.
Returns:
skymap (array of floats) : Healpix skymap of GW power on the sky
logskymap (array of floats) : Healpix skymap of log GW power on the sky
'''
## convert sky coordinates to healpy pixels
pix_idx = hp.ang2pix(nside,longs,lats,lonlat=True)
## sum power from all binaries in same pixel
skymap = np.bincount(pix_idx,weights=PSDs.value,minlength=hp.nside2npix(nside))
if return_log:
## set any zero pixels to a very small number to avoid problems with taking the log
skymap[skymap<=0] = 1e-80
## get log
logskymap = np.log10(skymap)
return skymap, logskymap
else:
return skymap
[docs]
@classmethod
def pop2spec(cls,pop,frange,t_obs,SNR_cut=7,plot=False,return_median=False,saveto=None):
'''
Function to calculate the foreground spectrum arising from a population catalogue of unresolved DWD binaries.
Arguments:
popfile (str) : '/path/to/binary/population/data/file.csv'
frange (1D array of floats) : Frequencies at which to calculate binned PSD
t_obs (astropy Quantity, time units) : Observation time.
SNR_cut (float) : SNR above which a binary will be assumed to be individually resolveable and subtracted. Default SNR=7.
return_median (bool) : If True, also return a running median of the spectrum (Useful for smoothing, plotting). Default False.
Returns:
fg_PSD (array of floats) : Resulting PSD of unresolved binary background/foreground for all f in frange
'''
fs, hs, cos_incs = pop['fs'], pop['hs'], pop['cos_incs']
## note, for now we are fixing t_obs=4yr for the purpose of determining which systems are unresolved!!
snrs = cls.get_snr(fs*u.Hz,hs,cos_incs,(4*u.yr).to(u.s))
fs_unres, hs_unres, cos_incs_unres = cls.filter_by_snr([fs,hs,cos_incs],snrs,SNR_cut=SNR_cut)
PSD = cls.gen_summed_spectrum(fs_unres,hs_unres,cos_incs_unres,frange,t_obs,return_median=return_median,plot=plot,saveto=saveto)
if return_median:
return PSD[0].value, PSD[1].value
else:
return PSD.value
[docs]
@classmethod
def pop2map(cls,pop,nside,df,fmin,fmax,SNR_cut=7):
'''
Function to get a skymap from a catalogue of binaries.
Arguments:
lats, longs (arrays of floats) : Latitudes and longitudes of catalogue binaries.
IMPORTANT: Must be given in ecliptic coordinates and units of degrees!
PSDs (array of floats) : Corresponding catalogue binary PSDs (assumed monochromatic)
t_obs (astropy Quantity, time units) : Observation time.
nside (int) : Healpix nside to use for skymap. Must be power of 2 < 2**32.
Returns:
skymap (array of floats) : Healpix skymap of GW power on the sky
logskymap (array of floats) : Healpix skymap of log GW power on the sky
'''
fs, hs, lats, longs, cos_incs = pop['fs'], pop['hs'], pop['lats'], pop['longs'], pop['cos_incs']
## note, for now we are fixing t_obs=4yr for the purpose of determining which systems are unresolved!!
snrs = cls.get_snr(fs*u.Hz,hs,cos_incs,(4*u.yr).to(u.s))
hs_unres, cos_incs_unres, lats_unres, longs_unres = cls.filter_by_snr([hs,cos_incs,lats,longs],snrs,SNR_cut=SNR_cut)
psds = cls.get_binary_psd(hs_unres,cos_incs_unres,df)
skymap = cls.gen_summed_map(lats_unres,longs_unres,psds,nside)
return skymap
[docs]
@classmethod
def file2spec(cls,popfile,frange,t_obs,SNR_cut=7,plot=False,return_median=False,**read_csv_kwargs):
'''
Wrapper function to calculate the foreground spectrum directly from a population catalogue file.
Arguments:
popfile (str) : '/path/to/binary/population/data/file.csv'
frange (1D array of floats) : Frequencies at which to calculate binned PSD
t_obs (astropy Quantity, time units) : Observation time.
Returns:
fg_PSD (array of floats) : Resulting PSD of unresolved binary background/foreground for all f in frange
'''
pop = cls.load_population(popfile,frange.min(),frange.max(),**read_csv_kwargs)
return cls.pop2spec(pop,frange,t_obs,SNR_cut=SNR_cut,plot=plot,return_median=return_median)
[docs]
@classmethod
def file2map(cls,popfile,nside,df,fmin,fmax,SNR_cut=7,**read_csv_kwargs):
'''
Wrapper function to get a skymap directly from a population catalogue file.
Arguments:
lats, longs (arrays of floats) : Latitudes and longitudes of catalogue binaries.
IMPORTANT: Must be given in ecliptic coordinates and units of degrees!
PSDs (array of floats) : Corresponding catalogue binary PSDs (assumed monochromatic)
t_obs (astropy Quantity, time units) : Observation time.
nside (int) : Healpix nside to use for skymap. Must be power of 2 < 2**32.
Returns:
skymap (array of floats) : Healpix skymap of GW power on the sky
logskymap (array of floats) : Healpix skymap of log GW power on the sky
'''
pop = cls.load_population(popfile,fmin,fmax,**read_csv_kwargs)
return cls.pop2map(pop,nside,df,fmin,fmax,SNR_cut=SNR_cut)
##################################################
## Analytic Astrophysical Spatial Distributions ##
##################################################
[docs]
class Galaxy_Model():
'''
Class to support parameterized inference of the Galactic white dwarf binary spatial distribution.
'''
def __init__(self,nside,grid_spec='interval',grid_res=0.33,gal_rad=16,gal_height=8,max_rh=4,max_zh=2,fix_rh=None):
'''
Function to initialize a grid on which to generate simple parameterized density models of the galactic DWD distribution.
Arguments:
nside (int) : Healpy nside (pixel resolution).
grid_spec (str) : Determines the nature of grid_res (below). Can be 'interval' or 'npoints'.
If 'interval', grid_res is the dx=dy=dz grid interval in kpc.
If 'npoints', grid_res is the number of number of points along x and y.
(Note that the number of points along z will be scaled to keep dx=dy=dz if gal_rad and gal_height are different.)
grid_res (float) : Grid resolution as defined above. If grid_spec='npoints', type must be int.
gal_rad (float) : Max galactic radius of the grid in kpc. Grid will be definded on -gal_rad <= x,y <= +gal_rad.
gal_height (float) : Max galactic height of the grid in kpc. Grid will be definded on -gal_height <= z <= +gal_height.
max_rh (float) : Maximum prior value of the Galaxy model's radial scale height (rh). Used to create a mask for response function calculations.
max_zh (float) : As max_rh, but for the vertical scale height (zh).
fix_rh (float) : Value of the radial scale height to fix for the model (if None, rh is treated as a parameter.)
'''
self.nside = nside
## for binning
self.length = hp.nside2npix(self.nside)
## create grid *in cartesian coordinates*
## size of density grid gives enough padding around the galactic plane without becoming needlessly large
## set to 4x max default radial/vertical scale height, respectively (corresponds to "edge" density ~1/10 of central density)
## distances in kpc
if grid_spec=='interval':
resolution = grid_res
print("Generating grid with dx = dy = dz = {:0.2f} kpc".format(resolution))
xs = jnp.arange(-gal_rad,gal_rad,resolution)
ys = jnp.arange(-gal_rad,gal_rad,resolution)
zs = jnp.arange(-gal_height,gal_height,resolution)
elif grid_spec=='npoints':
if type(grid_res) is not int:
raise TypeError("If grid_spec is 'npoints', grid_res must be an integer.")
resolution = gal_rad*2 / grid_res
print("Generating grid with dx = dy = dz = {:0.2f} kpc".format(resolution))
xs = jnp.linspace(-gal_rad,gal_rad,grid_res)
ys = jnp.linspace(-gal_rad,gal_rad,grid_res)
zs = jnp.arange(-gal_height,gal_height,resolution)
## generate meshgrid
x, y, z = jnp.meshgrid(xs,ys,zs)
self.z = z
self.r = jnp.sqrt(x**2 + y**2)
## Use astropy.coordinates to transform from galactocentric frame to galactic (solar system barycenter) frame.
gc = cc.SkyCoord(x=x*u.kpc,y=y*u.kpc,z=z*u.kpc, frame='galactocentric')
SSBc = gc.transform_to(cc.BarycentricMeanEcliptic)
## 1/D^2 with filtering to avoid nearby, presumeably resolved, DWDs
self.dist_adj = (jnp.array(SSBc.distance)>2)*(jnp.array(SSBc.distance))**-2
## make pixel grid
self.pixels = hp.ang2pix(self.nside,np.array(SSBc.lon),np.array(SSBc.lat),lonlat=True).flatten()
## set global (fixed) MW model parameters
self.rho_c = 1 # some fiducial central density
self.r_cut = 2.1 #kpc
self.r0 = 0.075 #kpc
self.alpha = 1.8
self.q = 0.5
## compute the bulge density (independent of rh,zh)
rp = jnp.sqrt(self.r**2 + (self.z/self.q)**2)
self.bulge_density = self.rho_c*(jnp.exp(-(rp/self.r_cut)**2)/(1+rp/self.r0)**self.alpha)
if fix_rh is not None:
self.rh = fix_rh
self.disk_density_radial_prefactor = self.rho_c*jnp.exp(-self.r/self.rh)
self.max_skymap = self.mw_mapmaker_2par(fix_rh+0.1,max_zh+0.1)
else:
## create skymap with maximum allowed spatial extent (plus some buffer)
self.max_skymap = self.mw_mapmaker_2par(max_rh+0.1,max_zh+0.1)
[docs]
def mw_mapmaker_1par(self,zh):
'''
Generate a galactic white dwarf binary foreground modeled after Breivik et al. (2020), consisting of a bulge + disk. The default values are those given in Breivik+20; the model itself traces back to electromagnetic studies of the Galaxy by McMillan et al. (2011), Bissantz and Gerhard (2002), and Binney et al. (1997).
zh is the vertical scale height in kpc.
The distribution is azimuthally symmetric in the galactocentric frame.
Designed for speed, as it is intended for use during sampling. Relies on pre-computed galaxy grid that is produced as part of Galaxy_Model() initialization.
Returns
---------
skymap : float
Healpy GW power skymap of the Milky Way white dwarf binary distribution.
'''
## Calculate density distribution
disk_density = self.disk_density_radial_prefactor*jnp.exp(-jnp.abs(self.z)/zh)
summed_density = disk_density + self.bulge_density
## use stored grid to convert density to power and filter nearby resolved DWDs
unresolved_powers = summed_density*self.dist_adj
## Bin
skymap = jnp.bincount(self.pixels,weights=unresolved_powers.flatten(),length=self.length)
return skymap
[docs]
def mw_mapmaker_2par(self,rh,zh):
'''
Generate a galactic white dwarf binary foreground modeled after Breivik et al. (2020), consisting of a bulge + disk. The default values are those given in Breivik+20; the model itself traces back to electromagnetic studies of the Galaxy by McMillan et al. (2011), Bissantz and Gerhard (2002), and Binney et al. (1997).
rh is the radial scale height in kpc, zh is the vertical scale height in kpc.
The distribution is azimuthally symmetric in the galactocentric frame.
Designed for speed, as it is intended for use during sampling. Relies on pre-computed galaxy grid that is produced as part of Galaxy_Model() initialization.
Returns
---------
skymap : float
Healpy GW power skymap of the Milky Way white dwarf binary distribution.
'''
## Calculate density distribution
disk_density = self.rho_c*jnp.exp(-self.r/rh)*jnp.exp(-jnp.abs(self.z)/zh)
summed_density = disk_density + self.bulge_density
## use stored grid to convert density to power and filter nearby resolved DWDs
unresolved_powers = summed_density*self.dist_adj
## Bin
skymap = jnp.bincount(self.pixels,weights=unresolved_powers.flatten(),length=self.length)
return skymap
# FIXME there is no log_DWD_FG_map being returned
[docs]
def generate_galactic_foreground(rh,zh,nside):
'''
Generate a galactic white dwarf binary foreground modeled after Breivik et al. (2020), consisting of a bulge + disk. The default values are those given in Breivik+20; the model itself traces back to electromagnetic studies of the Galaxy by McMillan et al. (2011), Bissantz and Gerhard (2002), and Binney et al. (1997).
rh is the radial scale height in kpc, zh is the vertical scale height in kpc.
Thin disk has rh=2.9kpc, zh=0.3kpc; Thick disk has rh=3.31kpc, zh=0.9kpc. Defaults to thin disk.
The distribution is azimuthally symmetric in the galactocentric frame.
Arguments
----------
rh (float) : MW DWD population radial scale height
zh (float) : MW DWD population vertical scale height
nside (int) : healpix nside (skymap resolution)
Returns
---------
astro_map : float
Healpy GW power skymap of the DWD galactic foreground.
log_DWD_FG_map : float
Healpy log GW power skymap. For plotting purposes.
'''
## set grid density
grid_fill = 200
## create grid *in cartesian coordinates*
## size of density grid gives enough padding around the galactic plane without becoming needlessly large
## distances in kpc
gal_rad = 20
xs = np.linspace(-gal_rad,gal_rad,grid_fill)
ys = np.linspace(-gal_rad,gal_rad,grid_fill)
zs = np.linspace(-5,5,grid_fill)
x, y, z = np.meshgrid(xs,ys,zs)
r = np.sqrt(x**2 + y**2)
## Calculate density distribution
rho_c = 1 # some fiducial central density (?? not sure what to use for this)
r_cut = 2.1 #kpc
r0 = 0.075 #kpc
alpha = 1.8
q = 0.5
disk_density = rho_c*np.exp(-r/rh)*np.exp(-np.abs(z)/zh)
rp = np.sqrt(r**2 + (z/q)**2)
bulge_density = rho_c*(np.exp(-(rp/r_cut)**2)/(1+rp/r0)**alpha)
DWD_density = disk_density + bulge_density
## Use astropy.coordinates to transform from galactocentric frame to eclipticc (solar system barycenter) frame.
gc = cc.SkyCoord(x=x*u.kpc,y=y*u.kpc,z=z*u.kpc, frame='galactocentric')
SSBc = gc.transform_to(cc.BarycentricMeanEcliptic)
## Calculate GW power
DWD_powers = DWD_density*(np.array(SSBc.distance))**-2
## Filter nearby grid points (cut out 2kpc sphere)
## This is a temporary soln. Later, we will want to do something more subtle, sampling a DWD pop from
## the density distribution and filtering out resolveable SNR>80 binaries
DWD_unresolved_powers = DWD_powers*(np.array(SSBc.distance) > 2)
## Transform to healpix basis
## resolution is 2x analysis resolution
# import pdb; pdb.set_trace()
pixels = hp.ang2pix(nside,np.array(SSBc.lon),np.array(SSBc.lat),lonlat=True)
## Create skymap
## Bin
astro_map = np.bincount(pixels.flatten(),weights=DWD_unresolved_powers.flatten(),minlength=hp.nside2npix(nside))
return astro_map
[docs]
def generate_galactic_bulge(nside,rho_c=1,r_cut=2.1, r0=0.075,alpha=1.8,q=0.5):
'''
Generate the bulge component of a galactic white dwarf binary foreground modeled after the Breivik et al. (2020) bulge+disk spatial model. The default values are those given in Breivik+20; the model itself traces back to electromagnetic studies of the Galaxy by McMillan et al. (2011), Bissantz and Gerhard (2002), and Binney et al. (1997).
The distribution is azimuthally symmetric in the galactocentric frame.
Arguments
----------
nside (int) : healpix nside (skymap resolution)
rho_c (float) : A fiducial central density for the galaxy (the normalization is not important for BLIP simulations, as the skymap will be normalized such that the sky integral is one.) Default 1.
r_cut (float) : The cutoff radius of the bulge. Default 2.1 kpc.
r0 (float) : Scale radius of the bulge. Default 0.075 kpc.
alpha (float) Bulge density power law index. Default 1.8.
q (float) : Bulge axis ratio. Default 0.5.
Returns
---------
astro_map : float
Healpy GW power skymap of the Galactic bulge contribution to the foreground.
log_DWD_FG_map : float
Healpy log GW power skymap. For plotting purposes.
'''
## set grid density
grid_fill = 200
## create grid *in cartesian coordinates*
## size of density grid gives enough padding around the galactic plane without becoming needlessly large
## distances in kpc
gal_rad = 5
xs = np.linspace(-gal_rad,gal_rad,grid_fill)
ys = np.linspace(-gal_rad,gal_rad,grid_fill)
zs = np.linspace(-5,5,grid_fill)
## filter to only bulge range while keeping same 3D grid resolution
xs = xs[(xs>-5)*(xs<5)]
ys = ys[(ys>-5)*(ys<5)]
x, y, z = np.meshgrid(xs,ys,zs)
r = np.sqrt(x**2 + y**2)
## Calculate density distribution
# disk_density = rho_c*np.exp(-r/rh)*np.exp(-np.abs(z)/zh)
rp = np.sqrt(r**2 + (z/q)**2)
bulge_density = rho_c*(np.exp(-(rp/r_cut)**2)/(1+rp/r0)**alpha)
DWD_density = bulge_density
## Use astropy.coordinates to transform from galactocentric frame to eclipticc (solar system barycenter) frame.
gc = cc.SkyCoord(x=x*u.kpc,y=y*u.kpc,z=z*u.kpc, frame='galactocentric')
SSBc = gc.transform_to(cc.BarycentricMeanEcliptic)
## Calculate GW power
DWD_powers = DWD_density*(np.array(SSBc.distance))**-2
## Filter nearby grid points (cut out 2kpc sphere)
## This is a temporary soln. Later, we will want to do something more subtle, sampling a DWD pop from
## the density distribution and filtering out resolveable SNR>80 binaries
DWD_unresolved_powers = DWD_powers*(np.array(SSBc.distance) > 2)
## Transform to healpix basis
pixels = hp.ang2pix(nside,np.array(SSBc.lon),np.array(SSBc.lat),lonlat=True)
## Create skymap
## Bin
astro_map = np.bincount(pixels.flatten(),weights=DWD_unresolved_powers.flatten(),minlength=hp.nside2npix(nside))
return astro_map
[docs]
def generate_galactic_disk(rh,zh,nside):
'''
Generate the disk component of a galactic white dwarf binary foreground modeled after the Breivik et al. (2020) bulge+disk spatial model. The default values are those given in Breivik+20; the model itself traces back to electromagnetic studies of the Galaxy by McMillan et al. (2011), Bissantz and Gerhard (2002), and Binney et al. (1997).
The distribution is azimuthally symmetric in the galactocentric frame.
Arguments
----------
rh (float) : MW DWD population radial disk scale height
zh (float) : MW DWD population vertical disk scale height
nside (int) : healpix nside (skymap resolution)
Returns
---------
astro_map : float
Healpy GW power skymap of the Galactic disk contribution to the foreground.
log_DWD_FG_map : float
Healpy log GW power skymap. For plotting purposes.
'''
## set grid density
grid_fill = 200
## create grid *in cartesian coordinates*
## size of density grid gives enough padding around the galactic plane without becoming needlessly large
## distances in kpc
gal_rad = 20
xs = np.linspace(-gal_rad,gal_rad,grid_fill)
ys = np.linspace(-gal_rad,gal_rad,grid_fill)
zs = np.linspace(-5,5,grid_fill)
x, y, z = np.meshgrid(xs,ys,zs)
r = np.sqrt(x**2 + y**2)
## Calculate density distribution
rho_c = 1
disk_density = rho_c*np.exp(-r/rh)*np.exp(-np.abs(z)/zh)
DWD_density = disk_density
## Use astropy.coordinates to transform from galactocentric frame to eclipticc (solar system barycenter) frame.
gc = cc.SkyCoord(x=x*u.kpc,y=y*u.kpc,z=z*u.kpc, frame='galactocentric')
SSBc = gc.transform_to(cc.BarycentricMeanEcliptic)
## Calculate GW power
DWD_powers = DWD_density*(np.array(SSBc.distance))**-2
## Filter nearby grid points (cut out 2kpc sphere)
## This is a temporary soln. Later, we will want to do something more subtle, sampling a DWD pop from
## the density distribution and filtering out resolveable SNR>80 binaries
DWD_unresolved_powers = DWD_powers*(np.array(SSBc.distance) > 2)
## Transform to healpix basis
pixels = hp.ang2pix(nside,np.array(SSBc.lon),np.array(SSBc.lat),lonlat=True)
## Create skymap
## Bin
astro_map = np.bincount(pixels.flatten(),weights=DWD_unresolved_powers.flatten(),minlength=hp.nside2npix(nside))
return astro_map
[docs]
def generate_sdg(nside,ra=80.21496, dec=-69.37772, D=50, r=2.1462, N=2169264):
'''
Generates the stochastic DWD signal from a a generic toy model spherical dwarf galaxy (SDG). Default values are for the LMC.
Arguments
---------
ra, dec : float
Right ascension and declination.
D : float
Distance to SDG in kpc.
r : float
radius of SDG in kpc
N : int
Number of DWD systems in the SDG
Returns
---------
skymap : float
Healpy GW power skymap of the stochastic DWD signal.
'''
## ===== ipynb compute_density function ========================================
## all below is only for galaxy model creation
## set grid density
grid_fill = 200
# sdg radius: (default is the LMC)
sdg_r = r*u.kpc
# default coordinates give the position of the center of the LMC in ICRS coordinates:
sdg_icrs = SkyCoord(ra=ra*u.degree, dec=dec*u.degree, distance=D*u.kpc)
# transform to galactocentric coordinates:
sdg_galcen = sdg_icrs.transform_to(cc.Galactocentric)
# convert to cartesian coordinates with the origin at the galactic center
x_sdg = sdg_galcen.cartesian.x
y_sdg = sdg_galcen.cartesian.y
z_sdg = sdg_galcen.cartesian.z
## create grid *in cartesian coordinates*
## distances in kpc
xs = np.linspace(x_sdg-sdg_r,x_sdg+sdg_r,grid_fill)
ys = np.linspace(y_sdg-sdg_r,y_sdg+sdg_r,grid_fill)
zs = np.linspace(z_sdg-sdg_r,z_sdg+sdg_r,grid_fill)
x, y, z = np.meshgrid(xs,ys,zs)
## creating a uniform spherical density, and zero density beyond that
# rs = distance from any point to the center of the sdg
rs = np.sqrt((x-x_sdg)**2+(y-y_sdg)**2+(z-z_sdg)**2)
DWD_density = (rs<=sdg_r) * N / (0.524*grid_fill**3)
# 0.524 is the filling factor of a sphere in a cube
# this gives us the number density for points only within the sphere of the sdg, instead of the entire cube
## Use astropy.coordinates to transform from galactocentric frame to galactic (solar system barycenter) frame.
gc = cc.SkyCoord(x=x,y=y,z=z, frame='galactocentric')
#cc.Galactocentric(x=x,y=y,z=z)
SSBc = gc.transform_to(cc.Galactic)
## Calculate GW power
## density will be total power divided by the points that we're simulating
## assuming all grid points will contribute an equal amount of power
DWD_powers = DWD_density*(np.array(SSBc.distance))**-2
## Filter nearby grid points (cut out 2kpc sphere)
## This is a temporary soln. Later, we will want to do something more subtle, sampling a DWD pop from
## the density distribution and filtering out resolveable SNR>80 binaries
DWD_unresolved_powers = DWD_powers*(np.array(SSBc.distance) > 2)
## will need to generate DWD_unresolved_powers for sdg
## Transform to healpix basis
## resolution is 2x analysis resolution
## setting resolution, taking coordinates from before and transforming to longlat
## replace np.array ... with sdg coordinates
pixels = hp.ang2pix(nside,np.array(SSBc.l),np.array(SSBc.b),lonlat=True)
## Create skymap
## Bin
astro_mapG = np.bincount(pixels.flatten(),weights=DWD_unresolved_powers.flatten(),minlength=hp.nside2npix(nside))
## below isn't in the jupyter notebook?
## Transform into the ecliptic
rGE = hp.rotator.Rotator(coord=['G','E'])
astro_map = rGE.rotate_map_pixel(astro_mapG)
## returning healpix skymaps
return astro_map
[docs]
def generate_point_source(ang_coord1,ang_coord2,nside,convention='healpy',pad=True):
'''
Generates a point source skymap.
Arguments
---------
ang_coord1, ang_coord2 : float
angular coordinates of the point source in radians. Either theta, phi or ra, dec (see convention variable)
nside : int
Healpy nside (skymap resolution)
convention : str
Angle specification convention. Can be 'healpy' (Healpy polar theta, aziumuthal phi) or 'radec' (standard astronomical RA/DEC). Default is theta/phi.
pad : bool
Whether to allow a small amount of power to artifically bleed into adjacent pixels to avoid numerical error issues later on. Only needed for single-pixel case.
Returns
---------
astro_map (array of floats) : healpy skymap
'''
if convention=='healpy':
theta, phi = ang_coord1, ang_coord2
elif convention=='radec':
ra, dec = ang_coord1, ang_coord2
theta, phi = np.pi/2 - np.deg2rad(dec), np.deg2rad(ra)
else:
raise ValueError("Unknown specification of angular coordinate convention. Can be 'healpy' (Healpy theta/phi) or 'radec' (RA/DEC).")
astro_map = np.zeros(hp.nside2npix(nside))
ps_id = hp.ang2pix(nside, theta, phi)
astro_map[ps_id] = 1
if pad:
neighbours = hp.pixelfunc.get_all_neighbours(nside,ps_id)
astro_map[neighbours] = 1e-10
astro_map = astro_map/np.sum(astro_map)
return astro_map
[docs]
def generate_two_point_source(theta_1,phi_1,theta_2,phi_2,nside):
'''
Generates a two-point-source skymap.
Depreciation note: Keeping until the angular resolution study is finished, then will depreciate in favor of generate_point_sources() (below)/
Arguments
---------
theta_1, phi_1 : float
angular coordinates of the 1st point source in radians
theta_2, phi_2 : float
angular coordinates of the 2nd point source in radians
Returns
---------
astro_map (array of floats) : healpy skymap
'''
astro_map = np.zeros(hp.nside2npix(nside))
ps_idx = [hp.ang2pix(nside, theta_1, phi_1),
hp.ang2pix(nside, theta_2, phi_2)]
astro_map[ps_idx] = 0.5
return astro_map
[docs]
def generate_point_sources(coord_list,nside,convention='healpy'):
'''
Generates a skymap with a flexible number of point sources.
Arguments
---------
coord_list : list of tuples
List of (ang_coord1,ang_coord2) tuples, one tuple per source. Each tuple gives angular coordinates of their respective point sourc as either (theta, phi) or (ra, dec) (see convention variable).
nside : int
Healpy nside (skymap resolution)
convention : str
Angle specification convention. Can be 'healpy' (Healpy polar theta, aziumuthal phi) or 'radec' (standard astronomical RA/DEC). Default is theta/phi.
Returns
---------
astro_map (array of floats) : healpy skymap
'''
astro_map = np.zeros(hp.nside2npix(nside))
## add sources
for source_coord in coord_list:
source_map = generate_point_source(source_coord[0],source_coord[1],nside,convention=convention,pad=False)
astro_map += source_map
## normalise to 1
astro_map = astro_map/np.sum(astro_map)
return astro_map
[docs]
def skymap_pix2sph(skymap, blmax):
'''
Transform a pixel-basis skymap into the b_lm spherical harmonic basis
Returns
---------
astro_blms : float
Spherical harmonic healpy expansion of the galactic foreground
'''
## Take square root of powers
sqrt_map = np.sqrt(skymap)
## Generate blms of power (alms of sqrt(power))
astro_blms = hp.sphtfunc.map2alm(sqrt_map, lmax=blmax)
# Normalize such that b00 = 1
astro_blms = astro_blms/(astro_blms[0])
return astro_blms