Source code for blip.src.submodel

from enum import Enum
from dataclasses import dataclass

import blip.src.astro as astro
from blip.src.astro import Population
from blip.src.clebschGordan import clebschGordan
from blip.src.fast_geometry import fast_geometry
from blip.src.geometry import geometry
from blip.src.instrNoise import instrNoise
from blip.src.utils import get_robson19_shape_pars_from_tobs


import healpy as hp
import jax.numpy as jnp
import numpy as np


[docs] class SubmodelKind(Enum): """ Kind of submodel specifier string. Most are 'spectral_spatial'. There are only two exceptions: - 'population', which is shorthand for 'population_population'; - 'noise' and 'fixednoise', because there is no spatial model for noise. """ NOISE = 1 """'noise' or 'fixednoise' submodel.""" POPULATION = 2 """'population' submodel, shorthand for 'population_population'.""" SPECTRAL_SPATIAL = 3 """General spectral_spatial submodel specifier. Example: 'powerlaw_fixedgalaxy'."""
[docs] @dataclass class SubmodelSpec: """ Parsed (hence valid) specification of a submodel. Use this in submodel.__init__(). To get a specification from a string, use blip.config.parse_model_spec(). """ name: str """Submodel name without duplicate count. Example: 'population-1' -> 'population'.""" kind: SubmodelKind """Kind of specifier string.""" is_injection: bool """True if the submodel is intended to be used as part of an injection. False if it is intended for analysis.""" spectral: str | None """Spectral model.""" spatial: str | None """Spatial model.""" count: str """Duplicate count enclosed by parenthesis. Example: 'powerlaw_isgwb-2' -> '(2)'.""" raw_name: str """Complete submodel name. Used as unique id.""" truevals: dict """True parameter values. For analysis submodels, this is derived from the aliased injection submodel.""" # only for analysis submodels fixedvals: dict # empty dict if missing """Fixed values of parameters. Only for analysis (not injection) submodels.""" alias: str | None """Raw name of aliased injection submodel. Only used in analysis submodels."""
[docs] class submodel(fast_geometry,clebschGordan,instrNoise): ''' Modular class that can represent either an injection or an analysis model. Will have different attributes depending on use case. Includes all information required to generate an injection or a likelihood/prior. New models (injection or analysis) should be added here. ''' def __init__(self,params,inj,spec: SubmodelSpec,fs,f0,tsegmid,injection=False,suffix='',parallel_response=False): ''' Each submodel should be defined as "[spectral]_[spatial]", save for the noise model, which is just "noise". e.g., "powerlaw_isgwb" defines a submodel with an isotropic spatial distribution and a power law spectrum. Resulting objects has different attributes depending on if it is to be used as an Injection component or part of our unified multi-signal Model. Arguments ------------ params, inj (dict) : params and inj config dictionaries as generated in run_blip.py spec (SubmodelSpec) : submodel specification, as parsed by config.parse_spec_analysis or parse_spec_injection fs, f0 (array) : frequency array and its LISA-characteristic-frequency-scaled counterpart (f0=fs/(2*fstar)) tsegmid (array) : array of time segment midpoints injection (bool) : If True, generate the submodel as an injection component, rather than a Model submodel. suffix (str) : String to append to parameter names, etc., to differentiate between duplicate submodels. parallel_response (bool) : If True, employ multiprocessing for the response calculations. Default False. Returns ------------ submodel (object) : submodel with all needed attributes to serve as an Injection component or Model submodel as desired. ''' ## preliminaries self.params = params self.inj = inj self.armlength = 2.5e9 ## armlength in meters self.fs = fs self.f0 = f0 self.tsegmid = tsegmid self.time_dim = tsegmid.size self.injection = injection if 'lmax' in params.keys() and not injection: clebschGordan.__init__(self,params['lmax']) elif 'inj_lmax' in inj.keys() and injection: clebschGordan.__init__(self,inj['inj_lmax']) elif 'inj_lmax' not in inj.keys() and 'lmax' in params.keys() and injection: clebschGordan.__init__(self,params['lmax']) self.name = spec.raw_name self.kind = spec.kind self.name_split = (spec.name, spec.count) self.spectral_model_name = spec.spectral self.spatial_model_name = spec.spatial self.injvals = spec.truevals self.truevals = {} # truevals keys will be processed and renamed case-by-case self.fixedvals = spec.fixedvals self.alias = spec.alias ## set some preliminary flags; these may get overwritten later self.has_map = False self.parameterized_map = False submodel_count = spec.count ## plot kwargs dict to allow for case-by-case exceptions to our usual plotting approach ## e.g., the population spectra look real weird as dotted lines. self.plot_kwargs = {} self.parameters = [] self.spectral_parameters = [] self.spatial_parameters = [] if spec.kind == SubmodelKind.NOISE and spec.name == "noise": self.spectral_parameters = [r'$\log_{10} (N_p)$'+suffix, r'$\log_{10} (N_a)$'+suffix] self.spatial_parameters = [] self.parameters = self.spectral_parameters self.Npar = 2 ## for plotting self.fancyname = "Instrumental Noise" self.color = 'dimgrey' self.has_map = False # Figure out which instrumental noise spectra to use if self.params['tdi_lev']=='aet': self.instr_noise_spectrum = self.aet_noise_spectrum if injection: self.gen_noise_spectrum = self.gen_aet_noise elif self.params['tdi_lev']=='xyz': self.instr_noise_spectrum = self.xyz_noise_spectrum if injection: self.gen_noise_spectrum = self.gen_xyz_noise elif self.params['tdi_lev']=='michelson': self.instr_noise_spectrum = self.mich_noise_spectrum if injection: self.gen_noise_spectrum = self.gen_michelson_noise else: raise ValueError("Unknown specification of 'tdi_lev'; can be 'michelson', 'xyz', or 'aet'.") if not injection: ## prior transform self.prior = self.instr_noise_prior ## covariance calculation self.cov = self.compute_cov_noise else: ## truevals self.truevals[r'$\log_{10} (N_p)$'] = self.injvals['log_Np'] self.truevals[r'$\log_{10} (N_a)$'] = self.injvals['log_Na'] ## save the frozen noise spectra self.frozen_spectra = self.instr_noise_spectrum(self.fs,self.f0,Np=10**self.injvals['log_Np'],Na=10**self.injvals['log_Na']) return elif spec.kind == SubmodelKind.NOISE and spec.name == "fixednoise": self.spectral_parameters = [] self.spatial_parameters = [] self.parameters = [] self.Npar = 0 ## for plotting self.fancyname = "Known Instrumental Noise" self.color = 'dimgrey' self.has_map = False self.fixedspec = True # Figure out which instrumental noise spectra to use if self.params['tdi_lev']=='aet': self.instr_noise_spectrum = self.aet_noise_spectrum if injection: self.gen_noise_spectrum = self.gen_aet_noise elif self.params['tdi_lev']=='xyz': self.instr_noise_spectrum = self.xyz_noise_spectrum if injection: self.gen_noise_spectrum = self.gen_xyz_noise elif self.params['tdi_lev']=='michelson': self.instr_noise_spectrum = self.mich_noise_spectrum if injection: self.gen_noise_spectrum = self.gen_michelson_noise else: raise ValueError("Unknown specification of 'tdi_lev'; can be 'michelson', 'xyz', or 'aet'.") if not injection: ## prior transform self.prior = self.fixed_model_wrapper_prior ## covariance calculation self.cov_fixed = self.compute_cov_noise([self.fixedvals['log_Np'],self.fixedvals['log_Na']]) self.cov = self.compute_cov_fixed else: raise ValueError("Fixed submodels are not supported for injections. Use the corresponding unfixed submodel.") return ################################################### ### BUILD NEW MODELS HERE ### ################################################### ## assignment of spectrum if self.spectral_model_name == 'powerlaw': self.spectral_parameters = self.spectral_parameters + [r'$\alpha$', r'$\log_{10} (\Omega_{\rm ref})$'] self.omegaf = self.powerlaw_spectrum self.fancyname = "Power Law"+submodel_count if not injection: self.spectral_prior = self.powerlaw_prior else: self.truevals[r'$\alpha$'] = self.injvals['alpha'] self.truevals[r'$\log_{10} (\Omega_{\rm ref})$'] = self.injvals['log_omega0'] elif self.spectral_model_name == 'twothirdspowerlaw': ## it may be worth implementing a more general fixed powerlaw model ## but this suffices for investigating the effects of the stellar-origin binary background self.spectral_parameters = self.spectral_parameters + [r'$\log_{10} (\Omega_{\rm ref})$'] self.omegaf = self.twothirdspowerlaw_spectrum self.fancyname = r'$\alpha=2/3$'+" Power Law"+submodel_count if not injection: self.spectral_prior = self.fixedpowerlaw_prior else: self.truevals[r'$\log_{10} (\Omega_{\rm ref})$'] = self.injvals['log_omega0'] elif self.spectral_model_name == 'fixedalphapowerlaw': if injection: raise ValueError("Fixed-value submodels are not supported for injections. Please use the 'powerlaw' submodel instead.") ## ensure alpha value is provided if 'alpha' not in self.fixedvals.keys(): raise ValueError("The 'fixedalphapowerlaw' submodel requires the following parameters to be provided to the fixedvals dict: alpha.") self.spectral_parameters = self.spectral_parameters + [r'$\log_{10} (\Omega_{\rm ref})$'] self.omegaf = self.fixedpowerlaw_spectrum self.fancyname = r'$\alpha='+'{}$'.format(self.fixedvals['alpha'])+" Power Law"+submodel_count self.spectral_prior = self.fixedpowerlaw_prior elif self.spectral_model_name == 'brokenpowerlaw': self.spectral_parameters = self.spectral_parameters + [r'$\alpha_1$',r'$\log_{10} (\Omega_{\rm ref})$',r'$\alpha_2$',r'$\log_{10} (f_{\rm break})$'] self.omegaf = self.broken_powerlaw_spectrum self.fancyname = "Broken Power Law"+submodel_count if not injection: self.spectral_prior = self.broken_powerlaw_prior else: self.truevals[r'$\alpha_1$'] = self.injvals['alpha1'] self.truevals[r'$\log_{10} (\Omega_{\rm ref})$'] = self.injvals['log_omega0'] self.truevals[r'$\alpha_2$'] = self.injvals['alpha2'] self.truevals[r'$\log_{10} (f_{\rm break})$'] = self.injvals['log_fbreak'] elif self.spectral_model_name == 'fixedalpha1brokenpowerlaw': self.spectral_parameters = self.spectral_parameters + [r'$\log_{10} (\Omega_{\rm ref})$',r'$\alpha_2$',r'$\log_{10} (f_{\rm break})$',r'$\delta$'] self.omegaf = self.broken_powerlaw_fixed_a1_spectrum self.fancyname = "Broken Power Law"+submodel_count if not injection: self.spectral_prior = self.broken_powerlaw_fixed_a1_prior if 'alpha_1' not in self.fixedvals.keys(): raise KeyError("Fixed-alpha_1 broken power law spectral model selected, but no low-frequeny slope parameter (alpha_1) was provided to the fixedvals dict.") else: self.fixedvals[r'$\alpha_1$'] = self.injvals['alpha1'] self.truevals[r'$\log_{10} (\Omega_{\rm ref})$'] = self.injvals['log_omega0'] self.truevals[r'$\alpha_2$'] = self.injvals['alpha2'] self.truevals[r'$\log_{10} (f_{\rm break})$'] = self.injvals['log_fbreak'] self.truevals[r'$\delta$'] = self.injvals['delta'] elif self.spectral_model_name == 'truncatedpowerlaw': self.spectral_parameters = self.spectral_parameters + [r'$\alpha$', r'$\log_{10} (\Omega_{\rm ref})$', r'$\log_{10} (f_{\mathrm{cut}})$'] self.omegaf = self.truncated_powerlaw_3par_spectrum self.fancyname = "Truncated Power Law"+submodel_count if not injection: if 'log_fscale' not in self.fixedvals.keys(): print("Warning: Truncated power law spectral model selected, but no scaling parameter (fscale) was provided to the fixedvals dict. Defaulting to fscale=4e-4 Hz.") self.fixedvals['log_fscale'] = np.log10(4e-4) self.spectral_prior = self.truncated_powerlaw_3par_prior else: self.truevals[r'$\alpha$'] = self.injvals['alpha'] self.truevals[r'$\log_{10} (\Omega_{\rm ref})$'] = self.injvals['log_omega0'] self.truevals[r'$\log_{10} (f_{\mathrm{cut}})$'] = self.injvals['log_fcut'] self.truevals[r'$\log_{10} (f_{\mathrm{scale}})$'] = np.log10(4e-4) ## this is a bit hacky but oh well. Solves an issue that comes up if you use the 3par TPL for an injection. self.fixedvals = {'log_fscale':np.log10(4e-4)} elif self.spectral_model_name == 'truncatedpowerlaw4par': self.spectral_parameters = self.spectral_parameters + [r'$\alpha$', r'$\log_{10} (\Omega_{\rm ref})$', r'$\log_{10} (f_{\mathrm{cut}})$',r'$\log_{10} (f_{\mathrm{scale}})$'] self.omegaf = self.truncated_powerlaw_4par_spectrum self.fancyname = "4-Parameter Truncated Power Law"+submodel_count if not injection: self.spectral_prior = self.truncated_powerlaw_4par_prior else: self.truevals[r'$\alpha$'] = self.injvals['alpha'] self.truevals[r'$\log_{10} (\Omega_{\rm ref})$'] = self.injvals['log_omega0'] self.truevals[r'$\log_{10} (f_{\mathrm{cut}})$'] = self.injvals['log_fcut'] self.truevals[r'$\log_{10} (f_{\mathrm{scale}})$'] = self.injvals['log_fscale'] elif self.spectral_model_name == 'fixedtruncatedpowerlaw': self.fixed_spec = True self.spectral_parameters = self.spectral_parameters self.omegaf = self.fixed_truncated_powerlaw_spectrum self.fancyname = "Fixed Truncated Power Law"+submodel_count if not injection: self.spectral_prior = self.fixed_model_wrapper_prior self.fixed_args = [self.fixedvals['alpha'],self.fixedvals['log_omega0'],self.fixedvals['log_fcut'],self.fixedvals['log_fscale']] else: raise ValueError("Fixed submodels are not supported for injections. Use the corresponding unfixed submodel.") elif self.spectral_model_name == 'mwspec': ## this is a spectral model tailored to analyses of the MW foreground # it is a truncated power law with alpha = 2/3 and fscale = 4e-4 # and astrophysically-motivated prior bounds self.spectral_parameters = self.spectral_parameters + [r'$\log_{10} (\Omega_{\rm ref})$', r'$\log_{10} (f_{\mathrm{cut}})$',r'$\log_{10} (f_{\mathrm{scale}})$'] self.omegaf = self.truncated_powerlaw_fixedalpha_spectrum self.fancyname = "MW Foreground"+submodel_count if not injection: if 'alpha' not in self.fixedvals.keys(): print("Warning: No low-frequency slope (alpha) specified for MWspec spectral model. Defaulting to alpha=2/3.") self.fixedvals['alpha'] = 2/3 self.spectral_prior = self.mwspec_prior else: raise ValueError("mwspec is an inference-only spectral submodel. Use the truncatedpowerlaw submodel for injections.") elif self.spectral_model_name == 'mwspec3par': ## this is a more flexible spectral model tailored to analyses of the MW foreground # it is a 3-parameter truncated power law with astrophysically-motivated prior bounds # fscale parameter is fixed self.spectral_parameters = self.spectral_parameters + [r'$\alpha$', r'$\log_{10} (\Omega_{\rm ref})$', r'$\log_{10} (f_{\mathrm{cut}})$'] self.omegaf = self.truncated_powerlaw_3par_spectrum self.fancyname = "MW Foreground"+submodel_count if not injection: if 'log_fscale' not in self.fixedvals.keys(): print("Warning: 3-parameter MWspec (truncated power law + astrophysical priors) spectral model selected, but no scaling parameter (fscale) was provided to the fixedvals dict. Defaulting to astrophysically-motivated fscale=1.25 mHz.") self.fixedvals['log_fscale'] = -2.907 self.spectral_prior = self.mwspec3par_prior else: raise ValueError("mwspec is an inference-only spectral submodel. Use the truncatedpowerlaw submodel for injections.") elif self.spectral_model_name == 'mwspec4par': ## this is a more flexible spectral model tailored to analyses of the MW foreground # it is a 4-parameter truncated power law with astrophysically-motivated prior bounds self.spectral_parameters = self.spectral_parameters + [r'$\alpha$', r'$\log_{10} (\Omega_{\rm ref})$', r'$\log_{10} (f_{\mathrm{cut}})$',r'$\log_{10} (f_{\mathrm{scale}})$'] self.omegaf = self.truncated_powerlaw_4par_spectrum self.fancyname = "MW Foreground"+submodel_count if not injection: self.spectral_prior = self.mwspec4par_prior else: raise ValueError("mwspec is an inference-only spectral submodel. Use the truncatedpowerlaw submodel for injections.") elif self.spectral_model_name == 'bdspec': # A spectral model for either the MW bulge or the MW disk. self.spectral_parameters = self.spectral_parameters + [r'$\log_{10} (\Omega_{\rm ref})$', r'$\log_{10} (f_{\mathrm{cut}})$',r'$\log_{10} (f_{\mathrm{scale}})$'] self.omegaf = self.truncated_powerlaw_fixedalpha_spectrum self.fancyname = "MW component"+submodel_count if not injection: if 'alpha' not in self.fixedvals.keys(): print("Warning: No low-frequency slope (alpha) specified for bdspec spectral model. Defaulting to alpha=0.5.") self.fixedvals['alpha'] = 0.5 self.spectral_prior = self.bdspec_prior else: raise ValueError("bdspec is an inference-only spectral submodel. Use the truncatedpowerlaw submodel for injections.") elif self.spectral_model_name == 'robson19foreground': ## implementation of the Robson+19 analytic foreground model. ## this is a variation of the tanh-truncated foreground, but with ## additional, time-dependent shape parameters due to subtraction of resolved systems ## for the BLIP implementation, it has been recast into Omega_GW space self.spectral_parameters = self.spectral_parameters + [r'$\log_{10}A$'] self.omegaf = self.robson19_foreground_spectrum self.fancyname = "MW Foreground"+submodel_count if not injection: self.spectral_prior = self.robson19foreground_prior if 'T_obs' not in self.fixedvals.keys(): shape_fixedvals = get_robson19_shape_pars_from_tobs(self.params['dur']/3.154e7) self.fixedvals |= shape_fixedvals else: shape_fixedvals = get_robson19_shape_pars_from_tobs(self.fixedvals['T_obs']) self.fixedvals |= shape_fixedvals else: ## define truevals if 'T_obs' not in self.injvals.keys(): raise ValueError("When simulated data with the Robson+19 foreground spectral model, you must specify T_obs as a trueval.") else: shape_fixedvals = get_robson19_shape_pars_from_tobs(self.injvals['T_obs']) self.truevals |= shape_fixedvals self.truevals[r'$\log_{10}A$'] = jnp.log10(self.injvals['A']) self.fixedvals = self.truevals elif self.spectral_model_name == 'robson19foregroundvaried': ## implementation of the Robson+19 analytic foreground model. ## this is a variation of the tanh-truncated foreground, but with ## additional, time-dependent shape parameters due to subtraction of resolved systems ## for the BLIP implementation, it has been recast into Omega_GW space self.spectral_parameters = self.spectral_parameters + [r'$\alpha$',r'$\log_{10}A$',r'$\log_{10}f_{\rm knee}$'] self.omegaf = self.robson19_foreground_varied_spectrum self.fancyname = "MW Foreground"+submodel_count if not injection: self.spectral_prior = self.robson19foregroundvaried_prior if 'T_obs' not in self.fixedvals.keys(): shape_fixedvals = get_robson19_shape_pars_from_tobs(self.params['dur']/3.154e7) self.fixedvals |= shape_fixedvals else: shape_fixedvals = get_robson19_shape_pars_from_tobs(self.fixedvals['T_obs']) self.fixedvals |= shape_fixedvals else: ## define truevals if 'T_obs' not in self.truevals.keys(): raise ValueError("When simulated data with the Robson+19 foreground spectral model, you must specify T_obs as a trueval.") else: shape_fixedvals = get_robson19_shape_pars_from_tobs(self.truevals['T_obs']) self.truevals |= shape_fixedvals self.truevals[r'$\alpha$'] = self.injvals['alpha'] self.truevals[r'$A$'] = self.injvals['A'] if 'fknee' in self.injvals.keys(): self.truevals[r'$\log_{10}f_{\rm knee}$'] = jnp.log10(self.injvals['fknee']) elif 'log_fknee' in self.injvals.keys(): self.truevals[r'$\log_{10}f_{\rm knee}$'] = self.injvals['log_fknee'] self.truevals[r'$\log_{10}A$'] = jnp.log10(self.injvals['A']) self.fixedvals = self.truevals elif self.spectral_model_name == 'lmcspec': ## this is a spectral model tailored to analyses of the LMC SGWB # it is a broken power law with alpha_1 = 2/3 # and astrophysically-motivated prior bounds self.spectral_parameters = self.spectral_parameters + [r'$\log_{10} (\Omega_{\rm ref})$',r'$\alpha_2$',r'$\log_{10} (f_{\rm break})$']#,r'$\delta$'] self.omegaf = self.broken_powerlaw_fixed_a1delta_spectrum self.fancyname = "LMC Spectrum"+submodel_count if not injection: self.fixedvals['alpha_1'] = 2/3 self.spectral_prior = self.lmcspecbplad_prior else: raise ValueError("lmcspec is an inference-only spectral submodel. Use the truncatedpowerlaw submodel for injections.") elif self.spectral_model_name == 'lmcspecv2': ## this is a spectral model tailored to analyses of the LMC SGWB # it is a broken power law with both alphas free # and astrophysically-motivated prior bounds self.spectral_parameters = self.spectral_parameters + [r'$\alpha_1$',r'$\log_{10} (\Omega_{\rm ref})$',r'$\alpha_2$',r'$\log_{10} (f_{\rm break})$'] self.omegaf = self.broken_powerlaw_spectrum self.fancyname = "LMC Spectrum"+submodel_count if not injection: self.spectral_prior = self.lmcspecfbpl_prior else: raise ValueError("lmcspecv2 is an inference-only spectral submodel. Use the truncatedpowerlaw submodel for injections.") elif self.spectral_model_name == 'sobbhspec': ## spectral model tailored to analyses of the SOBBH ISGWB ## a fixed alpha=2/3 power law ## with astrophysical priors self.spectral_parameters = self.spectral_parameters + [r'$\log_{10} (\Omega_{\rm ref})$'] self.omegaf = self.twothirdspowerlaw_spectrum self.fancyname = "SOBBH Power law"+submodel_count if not injection: self.spectral_prior = self.sobbh_powerlaw_prior else: raise ValueError("sobbhspec is an inference-only spectral submodel. Use the powerlaw submodel for injections.") elif self.spectral_model_name == 'lowpowerlaw': ## spectral model to search for a low-amplitude power law ## this is the power law model with a constrained upper bound on its amplitude prior ## useful when performing spectral separation of an e.g., cosmological background ## from the higher-amplitude SOBBH background if not injection: if hasattr(self,"fixedvals") and 'alpha' in self.fixedvals: self.fancyname = r'$\alpha='+'{}$'.format(self.fixedvals['alpha'])+" Low-Amplitude Power Law"+submodel_count self.omegaf = self.fixedpowerlaw_spectrum self.spectral_parameters = self.spectral_parameters + [r'$\log_{10} (\Omega_{\rm ref})$'] self.spectral_prior = self.flatlowpowerlaw_prior else: self.fancyname = "Low-Amplitude Power Law"+submodel_count self.omegaf = self.powerlaw_spectrum self.spectral_parameters = self.spectral_parameters + [r'$\alpha$', r'$\log_{10} (\Omega_{\rm ref})$'] self.spectral_prior = self.lowpowerlaw_prior else: raise ValueError("lowpowerlaw is an inference-only spectral submodel. Use the powerlaw submodel for injections.") elif self.spectral_model_name == 'population': if not injection: raise ValueError("Populations are injection-only.") popdict = self.inj['popdict'][spec.raw_name] if popdict['name'] is not None: self.fancyname = popdict['name'] else: self.fancyname = "DWD Population"+submodel_count self.population = Population(self.params,self.inj,self.fs,popdict,seed=params['seed']) self.compute_Sgw = self.population.Sgw_wrapper self.omegaf = self.population.omegaf_wrapper self.ispop = True self.plot_kwargs |= {'ls':'-','lw':0.75,'alpha':0.6} else: ValueError("Unsupported spectrum type. Check your spelling or add a new spectrum model!") ## assignment of response and spatial methods response_kwargs = {} ## This is the isotropic spatial model, and has no additional parameters. if self.spatial_model_name == 'isgwb': ## plotting stuff self.fancyname = "Isotropic "+self.fancyname self.subscript = r"_{\mathrm{I}}" self.color='darkorange' self.has_map = False self.fullsky = True if not injection: ## prior transform self.prior = self.isotropic_prior self.cov = self.compute_cov_isgwb else: ## Tell the submodel how to handle the injection response matrix when it's computed later on self.convolve_inj_response_mat = self.wrapper_convolve_inj_response_mat ## This is the spherical harmonic spatial model. It is the workhorse of the spherical harmonic anisotropic analysis. ## It can also be used to perform arbitrary injections in the spherical harmonic basis via direct specification of the blms. elif self.spatial_model_name == 'sph': if injection: if self.inj['inj_basis'] == 'pixel': print("Warning: the injection basis has been specified as the pixel basis (inj_basis=pixel), but this is a spherical harmonic injection. \ Spherical harmonic injections must use the spherical harmonic basis (inj_basis=sph).\ Proceeding with the spherical harmonic basis for this component; other components will continue to use the pixel basis.") self.lmax = self.inj['inj_lmax'] else: self.lmax = self.params['lmax'] ## almax is twice the blmax self.almax = 2*self.lmax response_kwargs['set_almax'] = self.almax ## plotting stuff self.fancyname = "Anisotropic "+self.fancyname self.subscript = r"_{\mathrm{A}}" self.color = 'teal' self.has_map = True self.fullsky = True self.basis = 'sph' # add the blms blm_parameters = gen_blm_parameters(self.lmax) ## save the blm start index for the prior, then add the blms to the parameter list self.blm_start = len(self.spectral_parameters) self.spatial_parameters = self.spatial_parameters + blm_parameters if not injection: self.fixed_map = False self.parameterized_map = True ## set sph indices self.blm_m0_idx = [] self.blm_amp_idx = [] self.blm_phase_idx = [] cnt = 0 for lval in range(1, self.lmax + 1): for mval in range(lval + 1): if mval == 0: self.blm_m0_idx.append(cnt) cnt = cnt + 1 else: ## amplitude, phase self.blm_amp_idx.append(cnt) self.blm_phase_idx.append(cnt+1) cnt = cnt + 2 ## set prior, cov self.prior = self.sph_prior self.cov = self.compute_cov_asgwb else: ## get blm truevals val_list = self.blms_2_blm_params(self.injvals['blms']) for param, val in zip(blm_parameters,val_list): self.truevals[param] = val ## get alms self.alms_inj = np.array(self.compute_skymap_alms(val_list).tolist()) ## get sph basis skymap self.sph_skymap = hp.alm2map(self.alms_inj[0:hp.Alm.getsize(self.almax)],self.params['nside']) ## Tell the submodel how to handle the injection response matrix when it's computed later on self.convolve_inj_response_mat = self.sph_convolve_inj_response_mat ## Handle all the static (non-inferred) astrophysical spatial distributions together due to their similarities elif self.spatial_model_name in ['galaxy','mwdisk','mwbulge','dwarfgalaxy','lmc','pointsource','twopoints','pointsources','population','fixedgalaxy','fixedlmc','fixeddisk','fixedbulge','hotpixel','pixiso','popmap']: ## some of the astrophysical spatial models are injection-only. if self.spatial_model_name in ['galaxy','mwdisk','mwbulge','dwarfgalaxy','lmc','pointsource','twopoints','population'] and not injection: raise ValueError("This model is injection-only.") self.has_map = True if (injection and inj['inj_basis']=='pixel') or (not injection and params['model_basis']=='pixel'): basis = 'pixel' self.fullsky = False else: basis = 'sph' self.fullsky = True self.basis = basis ## set lmax for sph case & define responses if basis == 'sph': ## almax is twice the blmax self.lmax = self.inj['inj_lmax'] self.almax = 2*self.lmax response_kwargs['set_almax'] = self.almax ## model-specific quantities if self.spatial_model_name == 'galaxy': ## store the high-level MW truevals for the hierarchical analysis self.truevals[r'$r_{\mathrm{h}}$'] = self.injvals['rh'] self.truevals[r'$z_{\mathrm{h}}$'] = self.injvals['zh'] ## plotting stuff self.fancyname = "Galactic Foreground" self.subscript = r"_{\mathrm{G}}" self.color = 'mediumorchid' ## generate skymap self.skymap = astro.generate_galactic_foreground(self.injvals['rh'],self.injvals['zh'],self.params['nside']) ## mask to only the first four scale heights mask = self.skymap > (1/np.e**4)*np.max(self.skymap) self.skymap = self.skymap * mask elif self.spatial_model_name == 'mwdisk': ## store the high-level MW truevals for the hierarchical analysis self.truevals[r'$r_{\mathrm{h}}$'] = self.injvals['rh'] self.truevals[r'$z_{\mathrm{h}}$'] = self.injvals['zh'] ## plotting stuff self.fancyname = "Galactic Disk" self.subscript = r"_{\mathrm{MWD}}" self.color = 'mediumorchid' ## generate skymap self.skymap = astro.generate_galactic_disk(self.injvals['rh'],self.injvals['zh'],self.params['nside']) ## mask to only the first four scale heights mask = self.skymap > (1/np.e**4)*np.max(self.skymap) self.skymap = self.skymap * mask elif self.spatial_model_name == 'mwbulge': ## plotting stuff self.fancyname = "Galactic Bulge" self.subscript = r"_{\mathrm{MWB}}" self.color = 'teal' ## generate skymap self.skymap = astro.generate_galactic_bulge(self.params['nside']) ## mask to only the first four scale heights mask = self.skymap > (1/np.e**4)*np.max(self.skymap) self.skymap = self.skymap * mask elif self.spatial_model_name == 'lmc': ## plotting stuff self.fancyname = "LMC" self.subscript = r"_{\mathrm{LMC}}" self.color = 'darkmagenta' ## generate skymap self.skymap = astro.generate_sdg(self.params['nside']) ## sdg defaults are for the LMC elif self.spatial_model_name == 'dwarfgalaxy': ## plotting stuff self.fancyname = "Dwarf Galaxy"+submodel_count self.subscript = r"_{\mathrm{DG}}" self.color = 'maroon' ## generate skymap self.skymap = astro.generate_sdg(self.params['nside'],ra=self.injvals['sdg_RA'], dec=self.injvals['sdg_DEC'], D=self.injvals['sdg_dist'], r=self.injvals['sdg_rad'], N=self.injvals['sdg_N']) elif self.spatial_model_name == 'pointsource': ## plotting stuff self.fancyname = "Point Source"+submodel_count self.subscript = r"_{\mathrm{1P}}" self.color = 'forestgreen' ## generate skymap ## some flexibility, can be defined in either (RA,DEC) or (theta,phi) if 'ra' in self.injvals.keys() and 'dec' in self.injvals.keys(): coord1, coord2 = self.injvals['ra'], self.injvals['dec'] convention = 'radec' elif 'theta' in self.injvals.keys() and 'phi' in self.injvals.keys(): coord1, coord2 = self.injvals['theta'], self.injvals['phi'] convention = 'healpy' else: raise ValueError("Using pointsource spatial model but either no coordinates were provided to the truevals dict or invalid notation was used.") self.skymap = astro.generate_point_source(coord1,coord2,self.params['nside'],convention=convention) elif self.spatial_model_name == 'pointsources': ## plotting stuff self.fancyname = "Multiple Point Sources"+submodel_count self.subscript = r"_{\mathrm{NP}}" self.color = 'forestgreen' ## generate skymap ## some flexibility, can be defined in either (RA,DEC) or (theta,phi) if 'radec_list' in self.injvals.keys(): coord_list = self.injvals['radec_list'] convention = 'radec' elif 'thetaphi_list' in self.injvals.keys(): coord_list = self.injvals['thetaphi_list'] convention = 'healpy' else: raise ValueError("Using pointsources spatial model but either no coordinates were provided to the truevals dict or invalid notation was used.") self.skymap = astro.generate_point_sources(coord_list,self.params['nside'],convention=convention) elif self.spatial_model_name == 'twopoints': ## revisit this when I have duplicates sorted, maybe unnecessary (could just have 2x point source injection components) ## plotting stuff self.fancyname = "Two Point Sources"+submodel_count self.subscript = r"_{\mathrm{2P}}" self.color = 'gold' ## generate skymap self.skymap = astro.generate_two_point_source(self.injvals['theta_1'],self.injvals['phi_1'],self.injvals['theta_2'],self.injvals['phi_2'],self.params['nside']) elif self.spatial_model_name == 'population': ## flag the fact that we have a population skymap self.skypop = True ## plotting stuff self.subscript = r"_{\mathrm{P}}" self.color = 'midnightblue' if self.spectral_model_name != 'population': ## generate population if still needed popdict = self.inj['popdict'][spec.raw_name] if popdict['name'] is not None: self.fancyname = popdict['name'] else: self.fancyname = "DWD Population"+submodel_count self.population = Population(self.params,self.inj,self.fs,popdict,seed=params['seed']) self.skymap = self.population.skymap ## inference models elif self.spatial_model_name == 'fixedgalaxy': ## get the fixed values if 'rh' in self.fixedvals.keys(): rh = self.fixedvals['rh'] else: print("Warning: Using fixedgalaxy spatial model but no 'rh' fixed value was provided. Defaulting to Breivik+2020 thin disk galaxy (rh = 2.9 kpc.)") rh = 2.9 if 'zh' in self.fixedvals.keys(): zh = self.fixedvals['zh'] else: print("Warning: Using fixedgalaxy spatial model but no 'zh' fixed value was provided. Defaulting to Breivik+2020 thin disk galaxy (zh = 0.3 kpc).") zh = 0.3 ## plotting stuff self.fancyname = "Galactic Foreground" self.subscript = r"_{\mathrm{G}}" self.color = 'mediumorchid' ## generate skymap self.skymap = astro.generate_galactic_foreground(rh,zh,self.params['nside']) ## mask to only the first four scale heights mask = self.skymap > (1/np.e**4)*np.max(self.skymap) self.skymap = self.skymap * mask self.fixed_map = True elif self.spatial_model_name == 'fixeddisk': ## get the fixed values if 'rh' in self.fixedvals.keys(): rh = self.fixedvals['rh'] else: print("Warning: Using fixeddisk spatial model but no 'rh' fixed value was provided. Defaulting to Breivik+2020 thin disk galaxy (rh = 2.9 kpc.)") rh = 2.9 if 'zh' in self.fixedvals.keys(): zh = self.fixedvals['zh'] else: print("Warning: Using fixeddisk spatial model but no 'zh' fixed value was provided. Defaulting to Breivik+2020 thin disk galaxy (zh = 0.3 kpc).") zh = 0.3 ## plotting stuff self.fancyname = "Galactic Disk" self.subscript = r"_{\mathrm{MWD}}" self.color = 'mediumorchid' ## generate skymap self.skymap = astro.generate_galactic_disk(rh,zh,self.params['nside']) ## mask to only the first four scale heights mask = self.skymap > (1/np.e**4)*np.max(self.skymap) self.skymap = self.skymap * mask self.fixed_map = True elif self.spatial_model_name == 'fixedbulge': print("Using fixedbulge spatial model with the default bulge values given in Criswell+25a.") ## plotting stuff self.fancyname = "Galactic Bulge" self.subscript = r"_{\mathrm{MWB}}" self.color = 'teal' ## generate skymap self.skymap = astro.generate_galactic_bulge(self.params['nside']) ## mask to only the first four scale heights mask = self.skymap > (1/np.e**4)*np.max(self.skymap) self.skymap = self.skymap * mask self.fixed_map = True elif self.spatial_model_name == 'fixedlmc': ## plotting stuff self.fancyname = "LMC" self.subscript = r"_{\mathrm{LMC}}" self.color = 'darkmagenta' ## generate skymap self.skymap = astro.generate_sdg(self.params['nside']) ## sdg defaults are for the LMC ## mask to only the first four scale heights mask = self.skymap > (1/np.e**4)*np.max(self.skymap) self.skymap = self.skymap * mask self.fixed_map = True elif self.spatial_model_name == 'hotpixel': ## get the fixed values ## some flexibility, can be defined in either (RA,DEC) or (theta,phi) if 'ra' in self.fixedvals.keys() and 'dec' in self.fixedvals.keys(): coord1, coord2 = self.fixedvals['ra'], self.fixedvals['dec'] convention = 'radec' elif 'theta' in self.fixedvals.keys() and 'phi' in self.fixedvals.keys(): coord1, coord2 = self.fixedvals['theta'], self.fixedvals['phi'] convention = 'healpy' else: raise ValueError("Using hotpixel spatial model but either no coordinates were provided to the fixedvals dict or invalid notation was used.") ## plotting stuff self.fancyname = "Point Source" self.subscript = r"_{\mathrm{1P}}" self.color = 'forestgreen' self.skymap = astro.generate_point_source(coord1,coord2,self.params['nside'],convention=convention,pad=True) self.fixed_map = True elif self.spatial_model_name == 'pixiso': self.fancyname = "Pixel Isotropic" self.subscript = r"_{\mathrm{PI}}" self.color = 'forestgreen' self.skymap = np.ones(hp.nside2npix(self.params['nside'])) self.fixed_map = True elif self.spatial_model_name == 'popmap': self.fancyname = "Population Skymap" self.subscript = r"_{\mathrm{PM}}" self.color = 'mediumorchid' popkey = self.fixedvals['pop_id'] popdict = self.inj['popdict'][popkey] if popdict['name'] is not None: self.fancyname = popdict['name'] else: self.fancyname = "DWD Population"+submodel_count self.population = Population(self.params,self.inj,self.fs,popdict,map_only=True) self.skymap = self.population.skymap self.fixed_map = True else: raise ValueError("Astrophysical submodel type not found. Did you add a new model to the list at the top of this section?") ## set skymap if basis == 'pixel': response_kwargs['skymap_inj'] = self.skymap #/(np.sum(self.skymap)*hp.nside2pixarea(self.params['nside'])) ## process skymap, indicate how to compute the response functions later if not injection: if basis == 'sph': self.process_astro_skymap_model(self.skymap) self.prior = self.fixedsky_prior self.cov = self.compute_cov_fixed_asgwb elif basis=='pixel': self.prior = self.fixedsky_prior self.cov = self.compute_cov_fixed_asgwb else: raise TypeError("Basis was not defined, or was incorrectly defined.") else: if basis == 'sph': self.process_astro_skymap_injection(self.skymap) ## Tell the submodel how to handle the injection response matrix when it's computed later on self.convolve_inj_response_mat = self.sph_convolve_inj_response_mat elif basis == 'pixel': ## Tell the submodel how to handle the injection response matrix when it's computed later on self.convolve_inj_response_mat = self.wrapper_convolve_inj_response_mat else: raise TypeError("Basis was not defined, or was incorrectly defined.") ## Parameterized astrophysical spatial distributions. ## Distinct from the fixedsky/injection-only models as we need spatial inference infrastructure ## pixel-basis only elif self.spatial_model_name in ['1parametermw','2parametermw']: ## enforce pixel basis if params["model_basis"] != "pixel": raise ValueError("Parameterized astrophysical spatial submodels are only supported in the pixel basis. (You have set basis={}.)".format(params["model_basis"])) self.basis = "pixel" ## calculate pixel area self.dOmega = hp.pixelfunc.nside2pixarea(self.params['nside']) ## set starting index for spatial model parameters self.spatial_start = len(self.spectral_parameters) ## we won't convolve this response function with anything ahead of time, so set the wrapper self.convolve_inj_response_mat = self.wrapper_convolve_inj_response_mat ## 2-parameter Milky Way model if self.spatial_model_name == '1parametermw': ## model to infer the Milky Way spatial distribution, using a simplified 1-parameter model of the Galaxy ## only infers the vertical scale height z_h ## plotting stuff self.fancyname = "1-Parameter Milky Way" self.subscript = r"_{\mathrm{G}}" self.color = 'mediumorchid' self.has_map = True self.fixed_map = False self.parameterized_map = True ## Initialize the galaxy grid self.galaxy = astro.Galaxy_Model(self.params['nside'],grid_res=0.1, gal_rad=14,gal_height=6,max_rh=3.5,max_zh=1.5,fix_rh=self.fixedvals['rh']) self.max_sky_extent = self.galaxy.max_skymap ## Set the parameterized spatial model function self.compute_skymap = self.galaxy.mw_mapmaker_1par ## mask maps to maximum allowed spatial extent self.mask = self.galaxy.max_skymap > (1/np.e**4)*np.max(self.galaxy.max_skymap) self.masked_skymap = self.galaxy.max_skymap * self.mask self.mask_idx = np.flatnonzero(self.mask) ## ensure normalization self.masked_skymap = self.masked_skymap/(np.sum(self.masked_skymap)*self.dOmega) ## alias as needed for response function calculations self.skymap = self.masked_skymap ## set response kwargs response_kwargs['masked_skymap'] = self.masked_skymap self.spatial_parameters = [r'$z_{\mathrm{h}}$'] self.prior = self.mw1parameter_prior self.cov = self.compute_cov_parameterized_asgwb elif self.spatial_model_name == '2parametermw': ## model to infer the Milky Way spatial distribution, using a basic 2-parameter model of the Galaxy ## plotting stuff self.fancyname = "2-Parameter Milky Way" self.subscript = r"_{\mathrm{G}}" self.color = 'mediumorchid' self.has_map = True self.fixed_map = False self.parameterized_map = True ## Initialize the galaxy grid self.galaxy = astro.Galaxy_Model(self.params['nside'],gal_rad=14,gal_height=6,max_rh=3.5,max_zh=1.5) self.max_sky_extent = self.galaxy.max_skymap ## Set the parameterized spatial model function self.compute_skymap = self.galaxy.mw_mapmaker_2par ## mask maps to maximum allowed spatial extent self.mask = self.galaxy.max_skymap > (1/np.e**4)*np.max(self.galaxy.max_skymap) self.masked_skymap = self.galaxy.max_skymap * self.mask self.mask_idx = np.flatnonzero(self.mask) ## ensure normalization self.masked_skymap = self.masked_skymap/(np.sum(self.masked_skymap)*self.dOmega) ## alias as needed for response function calculations self.skymap = self.masked_skymap ## set response kwargs response_kwargs['masked_skymap'] = self.masked_skymap self.spatial_parameters = [r'$r_{\mathrm{h}}$',r'$z_{\mathrm{h}}$'] self.prior = self.mw2parameter_prior self.cov = self.compute_cov_parameterized_asgwb else: raise ValueError("Parameterized astrophysical spatial submodel type not found. Did you add a new model to the list at the top of this section?") else: raise ValueError("Invalid specification of spatial model name ('{}').".format(self.spatial_model_name)) ############################# ## FOR TESTING ## ############################# # np.save(self.params['out_dir']+'/response_'+spec.raw_name+'.npy',self.response_mat) # print("Saving response array to "+self.params['out_dir']+'/response_'+spec.raw_name+'.npy') ## store final parameter list and count self.parameters = self.parameters + self.spectral_parameters + self.spatial_parameters if not injection: self.Npar = len(self.parameters) ## store response kwargs for use elsewhere as needed self.response_kwargs = response_kwargs ## add suffix to parameter names and trueval keys, if desired ## (we need this in the multi-model or duplicate model case) if suffix != '': if injection: updated_truevals = {suffix+' '+parameter:self.truevals[parameter] for parameter in self.parameters} self.truevals = updated_truevals updated_spectral_parameters = [suffix+' '+parameter for parameter in self.spectral_parameters] updated_spatial_parameters = [suffix+' '+parameter for parameter in self.spatial_parameters] updated_parameters = updated_spectral_parameters+updated_spatial_parameters if len(updated_parameters) != len(self.parameters): raise ValueError("If you've added a new variety of parameters above, you'll need to update this bit of code too!") self.spectral_parameters = updated_spectral_parameters self.spatial_parameters = updated_spatial_parameters self.parameters = updated_parameters return ############################# ## Spectral Functions ## #############################
[docs] def powerlaw_spectrum(self,fs,alpha,log_omega0): ''' Function to calculate a simple power law spectrum. Arguments ----------- fs (array of floats) : frequencies at which to evaluate the spectrum alpha (float) : slope of the power law log_omega0 (float) : power law amplitude in units of log dimensionless GW energy density at f_ref Returns ----------- spectrum (array of floats) : the resulting power law spectrum ''' return 10**(log_omega0)*(fs/self.params['fref'])**alpha
[docs] def twothirdspowerlaw_spectrum(self,fs,log_omega0): ''' Function to calculate a simple power law spectrum, fixed to the alpha=2/3 prediction for the stellar origin binary background. Arguments ----------- fs (array of floats) : frequencies at which to evaluate the spectrum log_omega0 (float) : power law amplitude in units of log dimensionless GW energy density at f_ref Returns ----------- spectrum (array of floats) : the resulting power law spectrum ''' return 10**(log_omega0)*(fs/self.params['fref'])**(2/3)
[docs] def fixedpowerlaw_spectrum(self,fs,log_omega0): ''' Function to calculate a simple power law spectrum, fixed to the alpha=2/3 prediction for the stellar origin binary background. Arguments ----------- fs (array of floats) : frequencies at which to evaluate the spectrum log_omega0 (float) : power law amplitude in units of log dimensionless GW energy density at f_ref Returns ----------- spectrum (array of floats) : the resulting power law spectrum ''' return 10**(log_omega0)*(fs/self.params['fref'])**(self.fixedvals['alpha'])
[docs] def broken_powerlaw_spectrum(self,fs,alpha_1,log_omega0,alpha_2,log_fbreak): ''' Function to calculate a broken power law spectrum. Arguments ----------- fs (array of floats) : frequencies at which to evaluate the spectrum alpha_1 (float) : slope of the first power law log_omega0 (float) : power law amplitude of the first power law in units of log dimensionless GW energy density at f_ref alpha_2 (float) : slope of the second power law log_fbreak (float) : log of the break frequency ("knee") in Hz Returns ----------- spectrum (array of floats) : the resulting broken power law spectrum ''' delta = 0.1 fbreak = 10**log_fbreak norm = (fbreak/self.params['fref'])**alpha_1 ## this normalizes the broken powerlaw such that its first leg matches the equivalent standard power law return norm * (10**log_omega0)*(fs/fbreak)**(alpha_1) * (1+(fs/fbreak)**(1/delta))**((alpha_1-alpha_2)*delta)
[docs] def broken_powerlaw_fixed_a1_spectrum(self,fs,log_omega0,alpha_2,log_fbreak,delta): ''' Function to calculate a broken power law spectrum, with a fixed low-frequency slope and variable turnover scale. Arguments ----------- fs (array of floats) : frequencies at which to evaluate the spectrum log_omega0 (float) : power law amplitude of the first power law in units of log dimensionless GW energy density at f_ref alpha_2 (float) : slope of the second power law log_fbreak (float) : log of the break frequency ("knee") in Hz Returns ----------- spectrum (array of floats) : the resulting broken power law spectrum ''' fbreak = 10**log_fbreak norm = (fbreak/self.params['fref'])**self.fixedvals['alpha_1'] ## this normalizes the broken powerlaw such that its first leg matches the equivalent standard power law return norm * (10**log_omega0)*(fs/fbreak)**(self.fixedvals['alpha_1']) * ((1+(fs/fbreak)**(1/delta)))**((self.fixedvals['alpha_1']-alpha_2)*delta)
[docs] def broken_powerlaw_fixed_a1delta_spectrum(self,fs,log_omega0,alpha_2,log_fbreak): ''' Function to calculate a broken power law spectrum, with a fixed low-frequency slope and turnover scale. Arguments ----------- fs (array of floats) : frequencies at which to evaluate the spectrum log_omega0 (float) : power law amplitude of the first power law in units of log dimensionless GW energy density at f_ref alpha_2 (float) : slope of the second power law log_fbreak (float) : log of the break frequency ("knee") in Hz Returns ----------- spectrum (array of floats) : the resulting broken power law spectrum ''' delta = 0.1 fbreak = 10**log_fbreak norm = (fbreak/self.params['fref'])**self.fixedvals['alpha_1'] ## this normalizes the broken powerlaw such that its first leg matches the equivalent standard power law return norm * (10**log_omega0)*(fs/fbreak)**(self.fixedvals['alpha_1']) * ((1+(fs/fbreak)**(1/delta)))**((self.fixedvals['alpha_1']-alpha_2)*delta)
[docs] def truncated_powerlaw_4par_spectrum(self,fs,alpha,log_omega0,log_fcut,log_fscale): ''' Function to calculate a tanh-truncated power law spectrum. Arguments ----------- fs (array of floats) : frequencies at which to evaluate the spectrum alpha (float) : slope of the power law log_omega0 (float) : power law amplitude of the power law in units of log dimensionless GW energy density at f_ref (if left un-truncated) log_fcut (float) : log of the cut frequency ("knee") in Hz log_fscale : log of the cutoff scale factor in Hz Returns ----------- spectrum (array of floats) : the resulting truncated power law spectrum ''' fcut = 10**log_fcut fscale = 10**log_fscale return 0.5 * (10**log_omega0)*(fs/self.params['fref'])**(alpha) * (1+jnp.tanh((fcut-fs)/fscale))
[docs] def truncated_powerlaw_3par_spectrum(self,fs,alpha,log_omega0,log_fcut): ''' Function to calculate a tanh-truncated power law spectrum with a set truncation scale. Arguments ----------- fs (array of floats) : frequencies at which to evaluate the spectrum alpha (float) : slope of the power law log_omega0 (float) : power law amplitude of the power law in units of log dimensionless GW energy density at f_ref (if left un-truncated) log_fcut (float) : log of the cut frequency ("knee") in Hz Returns ----------- spectrum (array of floats) : the resulting truncated power law spectrum ''' fcut = 10**log_fcut fscale = 10**self.fixedvals['log_fscale'] return 0.5 * (10**log_omega0)*(fs/self.params['fref'])**(alpha) * (1+jnp.tanh((fcut-fs)/fscale))
[docs] def truncated_powerlaw_fixedalpha_spectrum(self,fs,log_omega0,log_fcut,log_fscale): ''' Function to calculate a tanh-truncated power law spectrum with a set low-f slope. Arguments ----------- fs (array of floats) : frequencies at which to evaluate the spectrum log_omega0 (float) : power law amplitude of the power law in units of log dimensionless GW energy density at f_ref (if left un-truncated) log_fcut (float) : log of the cut frequency ("knee") in Hz log_fscale : log of the cutoff scale factor in Hz Returns ----------- spectrum (array of floats) : the resulting truncated power law spectrum ''' fcut = 10**log_fcut fscale = 10**log_fscale return 0.5 * (10**log_omega0)*(fs/self.params['fref'])**(self.fixedvals['alpha']) * (1+jnp.tanh((fcut-fs)/fscale))
[docs] def truncated_powerlaw_2par_spectrum(self,fs,log_omega0,log_fcut): ''' Function to calculate a tanh-truncated power law spectrum with a set truncation scale and low-f slope. Arguments ----------- fs (array of floats) : frequencies at which to evaluate the spectrum log_omega0 (float) : power law amplitude of the power law in units of log dimensionless GW energy density at f_ref (if left un-truncated) log_fcut (float) : log of the cut frequency ("knee") in Hz Returns ----------- spectrum (array of floats) : the resulting truncated power law spectrum ''' fcut = 10**log_fcut fscale = 10**self.fixedvals['log_fscale'] return 0.5 * (10**log_omega0)*(fs/self.params['fref'])**(self.fixedvals['alpha']) * (1+jnp.tanh((fcut-fs)/fscale))
[docs] def robson19_foreground_spectrum(self,fs,logA): ''' Function to calculate an analytical spectrum for the Galactic foreground of the form given in Robson et al. (2019) (arXiv:1803.01944) NOTE: this is given in terms of PSD amplitude A, as opposed to the usual units used in BLIP (dimensionless GW energy density) Arguments ----------- fs (array of floats) : frequencies at which to evaluate the spectrum A (float) : power law amplitude of the power law in units of **PSD** at f_ref (if left un-truncated) Returns ----------- spectrum (array of floats) : the resulting analytical foreground spectrum ''' alpha_shape = self.fixedvals[r'$\alpha_{\rm shape}$'] beta_shape = self.fixedvals[r'$\beta$'] kappa = self.fixedvals[r'$\kappa$'] gamma = self.fixedvals[r'$\gamma$'] log_fknee = jnp.log10(self.fixedvals[r'$f_{\rm knee}$']) Sgw = 10**logA * (fs/self.params['fref'])**(-7/3) * jnp.exp(-fs**alpha_shape + beta_shape*fs*jnp.sin(kappa*fs)) * (1 + jnp.tanh(gamma*(10**log_fknee - fs))) ## defined in terms of Sgw, so need to convert to be in terms of Omegaf return self.compute_Omega0_from_Sgw(fs,Sgw)
[docs] def robson19_foreground_varied_spectrum(self,fs,alpha,logA,log_fknee): ''' Function to calculate an analytical spectrum for the Galactic foreground of the form given in Robson et al. (2019) (arXiv:1803.01944) This version also varies the slope alpha and break ("knee") frequency f_knee. NOTE: this is given in terms of PSD amplitude A, as opposed to the usual units used in BLIP (dimensionless GW energy density) Arguments ----------- fs (array of floats) : frequencies at which to evaluate the spectrum A (float) : power law amplitude of the power law in units of **PSD** at f_ref (if left un-truncated) alpha (float) : power law slope above the truncation log_fknee (float) : Log10 of the break ("knee") frequency. Returns ----------- spectrum (array of floats) : the resulting analytical foreground spectrum ''' alpha_shape = self.fixedvals[r'$\alpha_{\rm shape}$'] beta_shape = self.fixedvals[r'$\beta$'] kappa = self.fixedvals[r'$\kappa$'] gamma = self.fixedvals[r'$\gamma$'] Sgw = 10**logA * (fs/self.params['fref'])**(alpha) * jnp.exp(-fs**alpha_shape + beta_shape*fs*jnp.sin(kappa*fs)) * (1 + jnp.tanh(gamma*(10**log_fknee - fs))) ## defined in terms of Sgw, so need to convert to be in terms of Omegaf return self.compute_Omega0_from_Sgw(fs,Sgw)
[docs] def fixed_truncated_powerlaw_spectrum(self,fs): ''' Function to calculate a tanh-truncated power law spectrum with all parameters fixed. Arguments ----------- fs (array of floats) : frequencies at which to evaluate the spectrum Returns ----------- spectrum (array of floats) : the resulting truncated power law spectrum ''' fcut = 10**self.fixedvals['log_fcut'] fscale = 10**self.fixedvals['log_fscale'] return 0.5 * (10**self.fixedvals['log_omega0'])*(fs/self.params['fref'])**(self.fixedvals['alpha']) * (1+jnp.tanh((fcut-fs)/fscale))
[docs] def compute_Sgw(self,fs,omegaf_args): # pylint: disable=method-hidden ''' Wrapper function to generically calculate the associated stochastic gravitational wave PSD (S_gw) for a spectral model given in terms of the dimensionless GW energy density Omega(f) Arguments ----------- fs (array of floats) : frequencies at which to evaluate the spectrum omegaf_args (list) : list of arguments for the relevant Omega(f) function Returns ----------- Sgw (array of floats) : the resulting GW PSD ''' H0 = 2.2*10**(-18) ## Hubble constant, = 67.88 km/s/Mpc Omegaf = self.omegaf(fs,*omegaf_args) Sgw = Omegaf*(3/(4*fs**3))*(H0/jnp.pi)**2 return Sgw
[docs] def compute_Omega0_from_Sgw(self,fs,Sgw): ''' Wrapper function to generically calculate the associated stochastic gravitational wave dimensionless GW energy density Omega(f) for a spectral model given in terms of the PSD (S_gw) Arguments ----------- fs (array of floats) : frequencies at which to evaluate the spectrum sgw_args (list) : list of arguments for the relevant Omega(f) function Returns ----------- Sgw (array of floats) : the resulting GW PSD ''' H0 = 2.2*10**(-18) ## Hubble constant, = 67.88 km/s/Mpc Omegaf = Sgw*((3/(4*fs**3))*(H0/jnp.pi)**2)**-1 return Omegaf
############################# ## Priors ## #############################
[docs] def isotropic_prior(self,theta): ''' Isotropic prior transform. Just serves as a wrapper for the spectral prior, as no additional foofaraw is necessary. Arguments ----------- theta : float A list or numpy array containing samples from a unit cube. Returns --------- theta : float theta with each element rescaled for the spectral parameters. ''' return self.spectral_prior(theta)
[docs] def fixedsky_prior(self,theta): ''' Fixed sky prior transform. Just serves as a wrapper for the spectral prior, as no additional foofaraw is necessary. Arguments ----------- theta : float A list or numpy array containing samples from a unit cube. Returns --------- theta : float theta with each element rescaled for the spectral parameters. ''' return self.spectral_prior(theta)
[docs] def sph_prior(self,theta): ''' Spherical harmonic anisotropic prior transform. Combines a generic spectral prior function with the spherical harmonic priors for the desired lmax. Arguments ----------- theta : float A list or numpy array containing samples from a unit cube. Returns --------- theta : float theta with each element rescaled for both the spectral and spatial parameters. ''' ## spectral prior takes everything up to spectral_theta = self.spectral_prior(theta[:self.blm_start]) # Calculate lmax from the size of theta blm arrays. The shape is # given by size = (lmax + 1)**2 - 1. The '-1' is because b00 is # an independent parameter # lmax = jnp.sqrt( len(theta[self.blm_start:]) + 1 ) - 1 # lmax = self.lmax # ## theta indices for m == 0 blms # blm_m0_idx = self.blm_m0_idx + self.blm_start # ## theta indices for m != 0 *amplitude* parameters # blm_amp_idx = self.blm_amp_idx + self.blm_start # ## theta indices for m != 0 *phase* parameters # blm_phase_idx = self.blm_phase_idx + self.blm_start sph_base = jnp.zeros(len(theta[self.blm_start:])) # sph_theta = [0 for ii in theta[self.blm_start:]] for ii in self.blm_m0_idx: sph_base = sph_base.at[ii].set(6*theta[ii+self.blm_start] - 3) for ii in self.blm_amp_idx: sph_base = sph_base.at[ii].set(3*theta[ii+self.blm_start]) for ii in self.blm_phase_idx: # sph_base = sph_base.at[ii].set(2*jnp.pi*theta[ii+self.blm_start] - jnp.pi) sph_base = sph_base.at[ii].set(jnp.remainder(2*jnp.pi*theta[ii+self.blm_start],2*jnp.pi) - jnp.pi) sph_theta = [draw for draw in sph_base] ## removing the lmax safety check to be compatible with JAX/jit. # if lmax.is_integer(): # lmax = int(lmax) # else: # raise ValueError('Illegitimate theta size passed to the spherical harmonic prior') # lmax = int(lmax) # The rest of the priors define the blm parameter space # sph_theta = [] # # ## counter for the rest of theta # cnt = self.blm_start # # for lval in range(1, lmax + 1): # for mval in range(lval + 1): # # if mval == 0: # sph_theta.append(6*theta[cnt] - 3) # cnt = cnt + 1 # else: # ## prior on amplitude, phase # sph_theta.append(3* theta[cnt]) # sph_theta.append(2*jnp.pi*theta[cnt+1] - jnp.pi) # cnt = cnt + 2 return spectral_theta+sph_theta
[docs] def mw1parameter_prior(self,theta): ''' Hierarchical anisotropic prior transform. Combines a generic spectral prior function with the hierarchical astrophysical prior. Arguments ----------- theta : float A list or numpy array containing samples from a unit cube. Returns --------- theta : float theta with each element rescaled for both the spectral and spatial parameters. ''' spectral_theta = self.spectral_prior(theta[:self.spatial_start]) zh = 1.45*theta[self.spatial_start] + 0.05 mw_theta = [zh] return spectral_theta+mw_theta
[docs] def mw2parameter_prior(self,theta): ''' Hierarchical anisotropic prior transform. Combines a generic spectral prior function with the hierarchical astrophysical prior. Arguments ----------- theta : float A list or numpy array containing samples from a unit cube. Returns --------- theta : float theta with each element rescaled for both the spectral and spatial parameters. ''' spectral_theta = self.spectral_prior(theta[:self.spatial_start]) rh = 2*theta[self.spatial_start] + 2 zh = 1.45*theta[self.spatial_start+1] + 0.05 mw_theta = [rh,zh] return spectral_theta+mw_theta
[docs] def instr_noise_prior(self,theta): ''' Prior function for only instrumental noise Parameters ----------- theta : float A list or numpy array containing samples from a unit cube. Returns --------- theta : float theta with each element rescaled. The elements are interpreted as alpha, omega_ref, Np and Na ''' # Unpack: Theta is defined in the unit cube log_Np, log_Na = theta # Transform to actual priors log_Np = -5*log_Np - 39 log_Na = -5*log_Na - 46 return [log_Np, log_Na]
[docs] def powerlaw_prior(self,theta): ''' Prior function for an isotropic stochastic backgound analysis. Parameters ----------- theta : float A list or numpy array containing samples from a unit cube. Returns --------- theta : float theta with each element rescaled. The elements are interpreted as alpha and log(Omega0) ''' # Unpack: Theta is defined in the unit cube # Transform to actual priors alpha = 10*theta[0] - 5 log_omega0 = -30*theta[1] + 9 return [alpha, log_omega0]
[docs] def fixedpowerlaw_prior(self,theta): ''' Prior function for a power law with fixed slope. Parameters ----------- theta : float A list or numpy array containing samples from a unit cube. Returns --------- theta : float theta with each element rescaled. The elements are interpreted as alpha and log(Omega0) ''' # Unpack: Theta is defined in the unit cube # Transform to actual priors log_omega0 = -30*theta[0] + 9 return [log_omega0]
[docs] def sobbh_powerlaw_prior(self,theta): ''' Prior function for a power law with fixed slope, with astrophysical prior bounds tailored to the expected SOBBH ISGWB amplitude (see, e.g., Babak+2023) Parameters ----------- theta : float A list or numpy array containing samples from a unit cube. Returns --------- theta : float theta with each element rescaled. The elements are interpreted as alpha and log(Omega0) ''' # Unpack: Theta is defined in the unit cube # Transform to actual priors log_omega0 = -1.5*theta[0] - 11 return [log_omega0]
[docs] def lowpowerlaw_prior(self,theta): ''' Prior function for an isotropic stochastic backgound analysis. Imposes an upper constraint on the prior to aid in spectral separation. Parameters ----------- theta : float A list or numpy array containing samples from a unit cube. Returns --------- theta : float theta with each element rescaled. The elements are interpreted as alpha and log(Omega0) ''' # Unpack: Theta is defined in the unit cube # Transform to actual priors alpha = 6*theta[0] - 3 log_omega0 = -11*theta[1] - 12.6 return [alpha, log_omega0]
[docs] def flatlowpowerlaw_prior(self,theta): ''' Prior function for an isotropic stochastic backgound analysis. Imposes an upper constraint on the prior to aid in spectral separation. Parameters ----------- theta : float A list or numpy array containing samples from a unit cube. Returns --------- theta : float theta with each element rescaled. The elements are interpreted as alpha and log(Omega0) ''' # Unpack: Theta is defined in the unit cube # Transform to actual priors log_omega0 = -11*theta[0] - 12.6 return [log_omega0]
[docs] def broken_powerlaw_prior(self,theta): ''' Prior function for a stochastic signal search with a broken power law spectral model. Parameters ----------- theta : float A list or numpy array containing samples from a unit cube. Returns --------- theta : float theta with each element rescaled. The elements are interpreted as alpha_1, log(Omega_ref), alpha_2, and log(f_break). ''' # Unpack: Theta is defined in the unit cube # Transform to actual priors alpha_1 = 10*theta[0] - 4 log_omega0 = -22*theta[1] alpha_2 = 40*theta[2] log_fbreak = -2*theta[3] - 2 return [alpha_1, log_omega0, alpha_2, log_fbreak]
[docs] def broken_powerlaw_fixed_a1_prior(self,theta): ''' Prior function for a stochastic signal search with a 4-parameter broken power law spectral model. Fixed low-frequency slope. Parameters ----------- theta : float A list or numpy array containing samples from a unit cube. Returns --------- theta : float theta with each element rescaled. The elements are interpreted as alpha, log(Omega_ref), log(f_cut), log(f_scale) ''' # Unpack: Theta is defined in the unit cube # Transform to actual priors log_omega0 = -22*theta[0] alpha_2 = 4*theta[1] + self.fixedvals['alpha_1'] ## must be greater than alpha_1 log_fbreak = -2*theta[2] - 2 delta = 0.99*theta[3] + 0.01 return [log_omega0, alpha_2, log_fbreak, delta]
[docs] def broken_powerlaw_fixed_a1delta_prior(self,theta): ''' Prior function for a stochastic signal search with a 4-parameter broken power law spectral model. Fixed low-frequency slope. Parameters ----------- theta : float A list or numpy array containing samples from a unit cube. Returns --------- theta : float theta with each element rescaled. The elements are interpreted as alpha, log(Omega_ref), log(f_cut), log(f_scale) ''' # Unpack: Theta is defined in the unit cube # Transform to actual priors log_omega0 = -22*theta[0] alpha_2 = 4*theta[1] #+ self.fixedvals['alpha_1'] ## must be greater than alpha_1 log_fbreak = -2*theta[2] - 2 return [log_omega0, alpha_2, log_fbreak]
[docs] def truncated_powerlaw_4par_prior(self,theta): ''' Prior function for a stochastic signal search with a 4-parameter truncated power law spectral model. Parameters ----------- theta : float A list or numpy array containing samples from a unit cube. Returns --------- theta : float theta with each element rescaled. The elements are interpreted as alpha, log(Omega_ref), log(f_cut), and log(f_scale) ''' # Unpack: Theta is defined in the unit cube # Transform to actual priors alpha = 10*theta[0] - 5 log_omega0 = -22*theta[1] + 5 log_fcut = -2*theta[2] - 2 log_fscale = -2*theta[3] - 2 return [alpha, log_omega0, log_fcut, log_fscale]
[docs] def truncated_powerlaw_3par_prior(self,theta): ''' Prior function for a stochastic signal search with a 3-parameter truncated power law spectral model. Parameters ----------- theta : float A list or numpy array containing samples from a unit cube. Returns --------- theta : float theta with each element rescaled. The elements are interpreted as alpha, log(Omega_ref), and log(f_cut) ''' # Unpack: Theta is defined in the unit cube # Transform to actual priors alpha = 10*theta[0] - 5 log_omega0 = -22*theta[1] + 5 log_fcut = -2*theta[2] - 2 return [alpha, log_omega0, log_fcut]
[docs] def truncated_powerlaw_2par_prior(self,theta): ''' Prior function for a stochastic signal search with a 2-parameter truncated power law spectral model. Parameters ----------- theta : float A list or numpy array containing samples from a unit cube. Returns --------- theta : float theta with each element rescaled. The elements are interpreted as log(Omega_ref) and log(f_cut) ''' # Unpack: Theta is defined in the unit cube # Transform to actual priors log_omega0 = -22*theta[0] + 5 log_fcut = -2*theta[1] - 2 return [log_omega0, log_fcut]
[docs] def mwspec_prior(self,theta): ''' Prior function for a stochastic signal search with a 2-parameter truncated power law spectral model. Bounds are astrophysically-motivated and tailored to expectations of the MW foreground. Parameters ----------- theta : float A list or numpy array containing samples from a unit cube. Returns --------- theta : float theta with each element rescaled. The elements are interpreted as alpha, log(Omega_ref), and log(f_cut) ''' # Unpack: Theta is defined in the unit cube # Transform to actual priors log_omega0 = -3*theta[0] - 7 log_fcut = -0.7*theta[1] - 2.4 log_fscale = -2*theta[2] - 2 return [log_omega0, log_fcut, log_fscale]
[docs] def mwspec3par_prior(self,theta): ''' Prior function for a stochastic signal search with a 3-parameter truncated power law spectral model. Bounds are astrophysically-motivated and tailored to expectations of the MW foreground. Parameters ----------- theta : float A list or numpy array containing samples from a unit cube. Returns --------- theta : float theta with each element rescaled. The elements are interpreted as alpha, log(Omega_ref), log(f_cut), and log(f_scale) ''' # Unpack: Theta is defined in the unit cube # Transform to actual priors alpha = 2*theta[0] log_omega0 = -3*theta[1] - 7 log_fcut = -0.7*theta[2] - 2.4 return [alpha, log_omega0, log_fcut]
[docs] def mwspec4par_prior(self,theta): ''' Prior function for a stochastic signal search with a 4-parameter truncated power law spectral model. Bounds are astrophysically-motivated and tailored to expectations of the MW foreground. Parameters ----------- theta : float A list or numpy array containing samples from a unit cube. Returns --------- theta : float theta with each element rescaled. The elements are interpreted as alpha, log(Omega_ref), log(f_cut), and log(f_scale) ''' # Unpack: Theta is defined in the unit cube # Transform to actual priors alpha = 2*theta[0] log_omega0 = -3*theta[1] - 7 log_fcut = -0.7*theta[2] - 2.4 log_fscale = -2*theta[3] - 2 return [alpha, log_omega0, log_fcut, log_fscale]
[docs] def bdspec_prior(self,theta): """Prior for spectrum of bulge or disk of MW (bdspec). Parameters ---------- theta : array (3,) samples from unit cube Returns ------- theta rescaled samples in order: log(Omega_ref), log(f_cut), and log(f_scale) """ assert len(theta) == 3 log_omega0 = -3*theta[0] - 8 log_fcut = -0.7*theta[1] - 2.4 log_fscale = -2*theta[2] - 2 return [log_omega0, log_fcut, log_fscale]
[docs] def robson19foreground_prior(self,theta): logA = -20*theta[0] - 30 return [logA]
[docs] def robson19foregroundvaried_prior(self,theta): alpha = 3*theta[0] - 3 logA = -20*theta[1] - 30 log_fknee = -0.5*theta[2] - 2.75 return [alpha,logA,log_fknee]
[docs] def lmcspec_prior(self,theta): ''' Prior function for a stochastic signal search with a 3-parameter truncated power law spectral model. Parameters ----------- theta : float A list or numpy array containing samples from a unit cube. Returns --------- theta : float theta with each element rescaled. The elements are interpreted as alpha, log(Omega_ref), log(f_cut), log(f_scale) ''' # Unpack: Theta is defined in the unit cube # Transform to actual priors log_omega0 = -3*theta[0] - 9 log_fcut = -1*theta[1] - 2 log_fscale = -1*theta[2] - 3 return [log_omega0, log_fcut, log_fscale]
[docs] def lmcspecbpl_prior(self,theta): ''' Prior function for a stochastic signal search with a 4-parameter broken power law spectral model. Tailored for the LMC DWD SGWB spectrum. Parameters ----------- theta : float A list or numpy array containing samples from a unit cube. Returns --------- theta : float theta with each element rescaled. The elements are interpreted as alpha, log(Omega_ref), log(f_cut), log(f_scale) ''' # Unpack: Theta is defined in the unit cube # Transform to actual priors log_omega0 = -3*theta[0] - 9 alpha_2 = 4*theta[1] #+ self.fixedvals['alpha_1'] ## must be greater than alpha_1 log_fbreak = -1*theta[2] - 2 delta = 0.99*theta[3] + 0.01 return [log_omega0, alpha_2, log_fbreak, delta]
[docs] def lmcspecbplad_prior(self,theta): ''' Prior function for a stochastic signal search with a 3-parameter broken power law spectral model. Tailored for the LMC DWD SGWB spectrum. Parameters ----------- theta : float A list or numpy array containing samples from a unit cube. Returns --------- theta : float theta with each element rescaled. The elements are interpreted as alpha, log(Omega_ref), log(f_cut), log(f_scale) ''' # Unpack: Theta is defined in the unit cube # Transform to actual priors log_omega0 = -3*theta[0] - 9 alpha_2 = 4*theta[1] #+ self.fixedvals['alpha_1'] ## must be greater than alpha_1 log_fbreak = -1*theta[2] - 2 return [log_omega0, alpha_2, log_fbreak]
[docs] def lmcspecfbpl_prior(self,theta): ''' Prior function for a stochastic signal search with a 4-parameter broken power law spectral model. Tailored for the LMC DWD SGWB spectrum. In contrast to lmcspecbpl, this variant fixes the smoothing parameter delta and allows alpha_1 to vary. Parameters ----------- theta : float A list or numpy array containing samples from a unit cube. Returns --------- theta : float theta with each element rescaled. The elements are interpreted as alpha, log(Omega_ref), log(f_cut), log(f_scale) ''' # Unpack: Theta is defined in the unit cube # Transform to actual priors alpha_1 = 2*theta[0] log_omega0 = -3*theta[1] - 9 alpha_2 = 4*theta[2] log_fbreak = -1*theta[2] - 2 return [alpha_1,log_omega0,alpha_2,log_fbreak]
[docs] def fixed_model_wrapper_prior(self,theta): ''' Wrapper function to allow fixed "models" to function within Model.prior Parameters ----------- theta : float Shoudl always be None or []. Returns --------- theta : float empty list ''' return []
############################# ## Covariance Calculations ## #############################
[docs] def compute_cov_noise(self,theta): ''' Computes the noise covariance for a given draw of log_Np, log_Na Arguments ---------- theta (float) : A list or numpy array containing samples from a unit cube. Returns ---------- cov_noise (array) : The corresponding 3 x 3 x frequency x time covariance matrix for the detector noise submodel. ''' ## unpack priors log_Np, log_Na = theta Np, Na = 10**(log_Np), 10**(log_Na) ## Modelled Noise PSD cov_noise = self.instr_noise_spectrum(self.fs,self.f0, Np, Na) ## repeat C_Noise to have the same time-dimension as everything else cov_noise = jnp.repeat(cov_noise[:, :, :, jnp.newaxis], self.time_dim, axis=3) return cov_noise
[docs] def compute_cov_isgwb(self,theta): ''' Computes the covariance matrix contribution from a generic isotropic stochastic GW signal. Arguments ---------- theta (float) : A list or numpy array containing samples from a unit cube. Returns ---------- cov_sgwb (array) : The corresponding 3 x 3 x frequency x time covariance matrix for an isotropic SGWB submodel. ''' ## Signal PSD Sgw = self.compute_Sgw(self.fs,theta) ## The noise spectrum of the GW signal. Written down here as a full ## covariance matrix axross all the channels. cov_sgwb = Sgw[None, None, :, None]*self.response_mat return cov_sgwb
[docs] def compute_cov_asgwb(self,theta): ''' Computes the covariance matrix contribution from a generic anisotropic stochastic GW signal. Arguments ---------- theta (float) : A list or numpy array containing samples from a unit cube. Returns ---------- cov_sgwb (array) : The corresponding 3 x 3 x frequency x time covariance matrix for an anisotropic SGWB submodel. ''' ## Signal PSD Sgw = self.compute_Sgw(self.fs,theta[:self.blm_start]) ## get skymap and integrate over alms summ_response_mat = self.compute_summed_response(self.compute_skymap_alms(theta[self.blm_start:])) ## The noise spectrum of the GW signal. Written down here as a full ## covariance matrix axross all the channels. cov_sgwb = Sgw[None, None, :, None]*summ_response_mat return cov_sgwb
[docs] def compute_cov_fixed_asgwb(self,theta): ''' Computes the covariance matrix contribution from an anisotropic stochastic GW signal with a known (assumed) sky distribution. Arguments ---------- theta (float) : A list or numpy array containing samples from a unit cube. Returns ---------- cov_sgwb (array) : The corresponding 3 x 3 x frequency x time covariance matrix for an anisotropic SGWB submodel. ''' ## Signal PSD Sgw = self.compute_Sgw(self.fs,theta) ## The noise spectrum of the GW signal. Written down here as a full ## covariance matrix axross all the channels. ## the response has been preconvolved with the assumed sky distribution cov_sgwb = Sgw[None, None, :, None]*self.response_mat return cov_sgwb
[docs] def compute_cov_parameterized_asgwb(self,theta): ''' Computes the covariance matrix contribution from a explicitly parameterized (i.e. not a generic spherical harmonic model), pixel-basis anisotropic stochastic GW signal. Arguments ---------- theta (float) : A list or numpy array containing samples from a unit cube. Returns ---------- cov_sgwb (array) : The corresponding 3 x 3 x frequency x time covariance matrix for an anisotropic SGWB submodel. ''' ## Signal PSD Sgw = self.compute_Sgw(self.fs,theta[:self.spatial_start]) ## get skymap and integrate over alms summ_response_mat = self.compute_summed_pixel_response(self.mask_and_norm_pixel_skymap(self.compute_skymap(*theta[self.spatial_start:]))) ## The noise spectrum of the GW signal. Written down here as a full ## covariance matrix axross all the channels. cov_sgwb = Sgw[None, None, :, None]*summ_response_mat return cov_sgwb
[docs] def compute_cov_fixedspec_parameterized_asgwb(self,theta): ''' Computes the covariance matrix contribution from a explicitly parameterized (i.e. not a generic spherical harmonic model), pixel-basis anisotropic stochastic GW signal. Assumes a fixed spectral model. Only compatible with fixedspec spectral models. Arguments ---------- theta (float) : A list or numpy array containing samples from a unit cube. Returns ---------- cov_sgwb (array) : The corresponding 3 x 3 x frequency x time covariance matrix for an anisotropic SGWB submodel. ''' ## Signal PSD Sgw = self.fixed_Sgw ## get skymap and integrate over alms summ_response_mat = self.compute_summed_pixel_response(self.mask_and_norm_pixel_skymap(self.compute_skymap(*theta[self.spatial_start:]))) ## The noise spectrum of the GW signal. Written down here as a full ## covariance matrix axross all the channels. cov_sgwb = Sgw[None, None, :, None]*summ_response_mat return cov_sgwb
[docs] def compute_cov_fixed(self,dummy_theta): ''' Wrapper to allow for "models" that are fixed a priori. Arguments ---------- dummy_theta (NoneType) : Should always be None; meant to allow for wrapper to integrate with Model.Likelihood Returns ---------- cov_fixed (array) : The precomputed 3 x 3 x frequency x time covariance matrix for the fixed model. ''' return self.cov_fixed
########################################## ## Skymap and Response Calculations ## ##########################################
[docs] def wrapper_convolve_inj_response_mat(self,fdata_flag=False): ''' A wrapper function for the ISGWB and pixel basis cases, the skymaps are convolved implicitly when calculating the response. Arguments ----------- fdata_flag (bool) : Whether to compute the convolution for injection frequencies (False, default) or data frequencies (True). Returns ----------- (none) ''' # create a wrapper b/c isotropic and anisotropic injection responses are handled differently w.r.t. skymap convolution if not fdata_flag: self.inj_response_mat = self.response_mat self.summ_response_mat = self.response_mat else: self.fdata_response_mat = self.unconvolved_fdata_response_mat return
[docs] def sph_convolve_inj_response_mat(self,fdata_flag=False): ''' Function to convolve the sph response matrix with an injected spherical harmonic skymap. Arguments ----------- fdata_flag (bool) : Whether to compute the convolution for injection frequencies (False, default) or data frequencies (True). Returns ----------- (none) ''' if not fdata_flag: ## get response integrated over the Ylms self.summ_response_mat = self.compute_summed_response(self.alms_inj) ## create a wrapper b/c isotropic and anisotropic injection responses are different self.inj_response_mat = self.summ_response_mat else: self.fdata_response_mat = jnp.einsum('ijklm,m', self.unconvolved_fdata_response_mat, self.alms_inj) return
[docs] def compute_skymap_alms(self,blm_params): ''' Function to compute the anisotropic skymap a_lms from the blm parameters. Arguments ---------- blm_params (array of complex floats) : the blm parameters Returns ---------- alm_vals (array of complex floats) : the corresponding alms ''' ## Spatial distribution blm_vals = self.blm_params_2_blms(blm_params) alm_vals = self.blm_2_alm(blm_vals) ## normalize and return return alm_vals/(alm_vals[0] * jnp.sqrt(4*jnp.pi))
[docs] def compute_summed_response(self,alms): ''' Function to compute the integrated, skymap-convolved anisotropic response Arguments ---------- alms (array of complex floats) : the spherical harmonic alms Returns ---------- summ_response_mat (array) : the sky/alm-integrated response (3 x 3 x frequency x time) ''' return jnp.einsum('ijklm,m', self.response_mat, alms)
[docs] def compute_summed_pixel_response(self,pixelmap): ''' Function to compute the integrated, skymap-convolved anisotropic response for an arbitrary skymap in the pixel basis. Arguments ---------- pixelmap (healpy array) : the pixel-basis skymap Returns ---------- summ_response_mat (array) : the sky-integrated response (3 x 3 x frequency x time) ''' ## sacrifice einsum efficiency for memory usage here ## due to the extreme memory requirement of the pixel-basis unconvolved anisotropic responses convolved_response = jnp.zeros(self.response_mat.shape[:-1]) for ii in range(len(pixelmap)): convolved_response = convolved_response + self.response_mat[:,:,:,:,ii]*pixelmap[ii] return self.dOmega*convolved_response
# return (self.dOmega)*jnp.einsum('ijklm,m', self.response_mat, pixelmap)
[docs] def process_astro_skymap_injection(self,skymap): ''' Function that takes in an astrophysical pixel skymap and: - calculates all associated sph quantities - computes corresponding blm parameter truevals - convolves with response Arguments ----------- skymap (healpy array) : pixel-basis astrophysical skymap ''' ## transform to blms self.astro_blms = astro.skymap_pix2sph(skymap,self.lmax) ## get corresponding truevals inj_blms = self.blms_2_blm_params(self.astro_blms) blm_parameters = gen_blm_parameters(self.lmax) for param, val in zip(blm_parameters,inj_blms): self.truevals[param] = val self.alms_inj = np.array(self.blm_2_alm(self.astro_blms)) self.alms_inj = self.alms_inj/(self.alms_inj[0] * np.sqrt(4*np.pi)) self.sph_skymap = hp.alm2map(self.alms_inj[0:hp.Alm.getsize(self.almax)],self.params['nside']) ## get response integrated over the Ylms # self.summ_response_mat = self.compute_summed_response(self.alms_inj) # ## create a wrapper b/c isotropic and anisotropic injection responses are different # self.inj_response_mat = self.summ_response_mat return
[docs] def process_astro_skymap_model(self,skymap): ''' Function that takes in an astrophysical pixel skymap and: - calculates all associated sph quantities - convolves with response - sets sample-time response to be the map-convolved This is intended for use with models that assume a fixed spatial distribution (e.g., fixedgalaxy, hotpixel). Arguments ----------- skymap (healpy array) : pixel-basis astrophysical skymap ''' ## transform to blms self.astro_blms = astro.skymap_pix2sph(skymap,self.lmax) ## and then to alms self.astro_alms = np.array(self.blm_2_alm(self.astro_blms)) self.astro_alms = self.astro_alms/(self.astro_alms[0] * np.sqrt(4*np.pi)) self.sph_skymap = hp.alm2map(self.astro_alms[0:hp.Alm.getsize(self.almax)],self.params['nside']) ## get response integrated over the Ylms self.summ_response_mat = self.compute_summed_response(self.astro_alms) ## backup the unconvolved response matrix and set the default response to the skymap-convolved one self.unconvolved_response_mat = self.response_mat self.response_mat = self.summ_response_mat return
[docs] def mask_and_norm_pixel_skymap(self,skymap): ''' Function that takes in a modeled astrophysical skymap and: - masks to the pixels where we have computed the response - normalizes the masked map such that the sky integral = 1 Arguments ----------- skymap (healpy array) : pixel-basis astrophysical skymap ''' masked_map = skymap[self.mask_idx] return masked_map/(np.sum(masked_map)*self.dOmega)
# def process_astro_skymap_pixel_model(self,skymap): # ''' # # Function that takes in an astrophysical pixel skymap and: # - convolves with response # - sets sample-time response to be the map-convolved # # This is intended for use with models that assume a fixed spatial distribution (e.g., fixedgalaxy, hotpixel). # # Arguments # ----------- # skymap (healpy array) : pixel-basis astrophysical skymap # # ''' # ## transform to blms ## self.astro_blms = astro.skymap_pix2sph(skymap,self.lmax) # ## and then to alms ## self.astro_alms = np.array(self.blm_2_alm(self.astro_blms)) ## self.astro_alms = self.astro_alms/(self.astro_alms[0] * np.sqrt(4*np.pi)) ## self.sph_skymap = hp.alm2map(self.astro_alms[0:hp.Alm.getsize(self.almax)],self.params['nside']) # ## get response integrated over the Ylms # self.summ_response_mat = self.compute_summed_pixel_response(skymap) # ## backup the unconvolved response matrix and set the default response to the skymap-convolved one # self.unconvolved_response_mat = self.response_mat # self.response_mat = self.summ_response_mat # # return
[docs] def recompute_response(self,f0=None,tsegmid=None): ''' Function to recompute the LISA response matrices if needed. When we save the Injection object, we delete the LISA response of each injection, as to do otherwise takes up egregious amounts of disk space. This allows us to recompute them identically as desired. Arguments ------------- f0 (array) : LISA-characteristic-frequency-scaled frequency array at which to compute the response (f0=fs/(2*fstar)) tsegmid (array) : array of time segment midpoints at which to compute the response Returns -------------- response_mat (array) : The associated response for this submodel. ''' ## allow for respecification of frequency/time grid, but avoid needless computation of extant response matrices fsame = True tsame = True if f0 is not None: if f0.shape != self.f0.shape: fsame = False elif not np.all(f0==self.f0): fsame = False else: f0 = self.f0 if tsegmid is not None: if tsegmid.shape != self.tsegmid.shape: tsame = False elif not np.all(tsegmid==self.tsegmid): tsame = False else: tsegmid = self.tsegmid tf_same = tsame and fsame ## if we're using the same frequencies and times, first check to see if there's already a response connected to the submodel: if tf_same and hasattr(self,'response_mat'): print("Attempted to recompute response matrix, but there is already an attached response matrix at these times and frequencies. Returning the original...") return self.response_mat else: return self.response(f0,tsegmid,**self.response_kwargs)
[docs] def gen_blm_parameters(blmax): ''' Function to make the blm parameter name strings for all blms of a given lmax, in the correct order. Arguments ----------- blmax (int) : lmax for the blms Returns ----------- blm_parameters (list of str) : Ordered list of blm parameter name strings ''' blm_parameters = [] for lval in range(1, blmax + 1): for mval in range(lval + 1): if mval == 0: blm_parameters.append(r'$b_{' + str(lval) + str(mval) + '}$' ) else: blm_parameters.append(r'$|b_{' + str(lval) + str(mval) + '}|$' ) blm_parameters.append(r'$\phi_{' + str(lval) + str(mval) + '}$' ) return blm_parameters