import os
from dataclasses import dataclass
from enum import Enum
import configparser
import numpy as np
import pickle
from blip.src.makeLISAdata import (
get_simulation_tf_grid,
get_data_tf_grid,
get_simulation_length,
get_data_length,
)
from blip.src.submodel import SubmodelKind, SubmodelSpec
from blip.src.utils import catch_duplicates
[docs]
@dataclass
class Option:
"Configuration option."
name: str
desc: str
default: str | None = None
required: bool = False
# The configuration file is an .ini file with three sections: `params`, `run_params` and `inj`.
# Here is what they look like.
SECTION_PARAMS = [
Option("fmin", desc="Minimum frequency.", required=True),
Option("fmax", desc="Maximum frequency.", required=True),
Option(
"duration",
desc="Duration in seconds, should be compatiable with fmin.",
required=True,
),
Option(
"seglen",
desc="""
Segment length for the analysis STFT fft in seconds, should be compatiable with fmin.
Note by SB: Looks like this needs to be at least a factor of 10 larger
than 1/fmin to give consistant recoveries.""",
default="1e5",
),
Option(
"tsplice",
desc="""
Splice length for the simulation in seconds, should be compatiable with fmin.
""",
default="2.5e4",
),
Option(
"fs",
desc="""
Sample rate in Hz, should be compatible with fmax.
Note by SB: Looks like this needs to be atleast a factor of 4
higher than fmax (as opposed to the usual factor of 2) to give
consistent recoveries.""",
default="0.25",
),
Option("Shfile", desc="[TODO -- check depreciation]", default="LISA_2017_PSD_M.npy"),
Option("load_data", desc="Whether to load data from `datafile`.", default="false"),
Option(
"datatype",
desc="Doppler or strain data. MLDC data is doppler, autogenerated data is strain. Can be doppler or strain.",
default="strain",
),
Option("mldc", desc="FIXME: doesn't do anything!", default="0"),
Option(
"datafile",
desc="""
Input data file. Only used if `load_data` is true.
The data format is specified by `datafileformat`.""",
default="None",
),
Option(
"datafileformat",
desc="Format for external input data. Can be 'mldc' or 'ldc'.",
default="ldc",
),
Option(
"datadomain",
desc="How to interpret LDC data. Can be 'time' or 'freq'.",
default="time",
),
Option(
"ldc_dataset",
desc="HDF5 path to array with TDI series in LDC input.",
default="obs/tdi",
),
Option("fref", desc="TODO", default="0.001"),
Option(
"model",
desc="""
Recovery Model description
write out as spectralmodel1_spatialmodel1+spectralmodel2_spatialmodel2+...
exception, noise is just 'noise' with no spatial component
handle duplicates as (e.g.) noise+powerlaw_isgwb-1+powerlaw_isgwb-2
spectral models available: noise, powerlaw, brokenpowerlaw, truncatedpowerlaw
spatial models available: isgwb, sph, fixedgalaxy, hotpixel""",
required=True,
),
Option(
"fixedvals",
desc="""
Recovery model fixed values
fixedvals allows you to fix the value of specific parameters in the recovery model
These parameters will be set to the value provided and not sampled over
Currently only implemented for the fixedgalaxy and hotpixel spatial models""",
default="None",
),
Option(
"alias",
desc="""
Model/Injection aliasing
optionally specify which models should be associated with which injections
any models/injections left unspecified will be matched by name as default
and if no match is found will not be treated as associated (i.e., truevals won't be provided)
notation is alias={'model_name':'injection_name'}""",
default="{}",
),
Option(
"tdi_lev", desc="Level of the tdi. Can be michelson, xyz or aet.", default="xyz"
),
Option(
"lisa_config",
desc="LISA configuration. Can be stationary or orbiting.",
default="orbiting",
),
Option(
"nside", desc="Healpix nside (skymap resolution).", required=True
), # why is this always required? It's always used and the choice matters for memory and accuracy
Option("model_basis",
desc="""
Skymap representation. Can be 'pixel' (Criswell+25 pixelated skymaps)
or 'sph' (Banagiri+21 spherical harmonics).
""",
default="pixel"),
Option(
"tstart",
desc="Start time in seconds. Determines starting orbital position.",
default="0.0",
),
Option("lmax", desc="spherical harmonic lmax for the b_lms (a_lmax/2)"),
Option("faster_geometry", desc="Enable new response module", default="true"),
]
SECTION_INJ = [
Option("doInj", desc="Whether to generate data (injections).", required=True),
Option(
"loadInj",
desc="""Whether to load a previously-generated injection from `injdir`.""",
default="false",
),
Option(
"inj_only",
desc="If true, BLIP will perform only the injection and data generation process, then exit.",
default="false",
),
Option("injdir", desc="If loadInj, path to injection directory."),
Option(
"injection",
desc="""
Injection model description
Same syntax as for the Recovery Model
Additional option: "population" generates spectral/spatial distributions from a DWD population""",
),
Option("inj_basis", desc="""
Skymap representation for the injection. Can be 'pixel' (Criswell+25 pixelated skymaps)
or 'sph' (Banagiri+21 spherical harmonics).
""",
default="pixel"),
Option(
"truevals",
desc="""
Dictionary of injection 'ground truth'. If doing an sph injection, blms should be given as
complex quantities in a comma seperated list in healpix order
b00 can only be 1. b_{l, -m} = (-1)**m b_lm
blms = b00, b10, b11 for blmax = 1, alamx = 2
blms = b00, b10, b20, b11, b21, b22 for blmax = 2, almax = 4
blms = b00, b10, b20, b30, b11, b21, b31, b22, b32, b33 for blmax = 3, almax = 6
e.g. 'blms':[1.0, 0.75, 0.5, 0.7j, 0.7-0.3j, 1.1j]""",
),
Option(
"parallel_inj",
desc="If true, BLIP will multithread data simulation on a component-by-component basis.",
default="0",
),
Option(
"inj_nthread",
desc="""
How many processes to use for injection parallelization.
No benefit from N_threads > N_components. Defaults to N_components.""",
),
Option("response_nthread", desc="[TODO -- update or depreciate]", default="1"),
Option(
"inj_lmax",
desc="Injection blmax. If not specified, will match to the analysis lmax.",
),
Option(
"population_params",
desc="""
Population injection
Population Params Dict
Dictionary with the settings for each population.
Top-level keys should correspond to inj (+eventually, model)""",
default="None",
),
]
SECTION_RUN_PARAMS = [
Option(
"sampler",
desc="""Sampler to use. Supported: dynesty, numpyro.""",
required=True,
),
Option("out_dir", desc="Output directory.", required=True),
Option(
"doPreProc",
desc="""
If true, force generation of fake data again rather than
using precalculated data in the data_spectrum file""",
default="0",
),
Option("input_spectrum", desc="Must be .npz file", default="data_spectrum.npz"),
Option(
"projection",
desc="Projection used for making skymaps. Can be 'E' (ecliptic) or 'G' (galactic).",
default="E",
),
Option(
"FixSeed",
desc="Whether to seed the sampler with the `seed` parameter.",
default="false",
),
Option(
"seed",
desc="Seed for the sampler. Only used if `FixSeed` is true. Must be a 32-bit integer.",
),
Option(
"Nthreads",
desc="""
For multiprocessing, number of threads
Warning: most often, multithreading the dynesty sampler does not
result in significant gains as its proposal process must be serial.""",
default="1",
),
Option(
"N_GPU",
desc="""
GPUs (numpyro only)
Number of available GPU devices
If one GPU is available, Nthreads > 1 will *vectorize* chains on a single GPU.
If more than one GPU is available, BLIP will parallelize across GPUs, setting Nthreads=N_GPU""",
default="0",
),
Option(
"colormap", desc="matplotlib colormap used for the skymaps.", default="magma"
),
Option("nlive", desc="Number of live points for dynesty or emcee.", default="800"),
Option(
"sample_method",
desc="""
Sampling method (dynesty only)
see https://dynesty.readthedocs.io/en/latest/quickstart.html?highlight=method#sampling-options
rslice recommended for MW foreground analyses""",
default="rwalk",
),
Option(
"Nburn",
desc="Length of burn-in phase. Affects numpyro.",
default="1000",
),
Option(
"Nsamples", desc="Number of samples. Affects emcee and numpyro.", default="1000"
),
Option(
"show_progress",
desc="If false, disable numpyro progress bar. Disabling improves performance, especially when multithreading.",
default="1",
),
Option(
"checkpoint",
desc="""
Checkpointing and restarting (dynesty/numpyro only).
If `checkpoint` is true, the sampler will save its current state every so often.
For dynesty this is every `checkpoint_interval` seconds.
For numpyro the behavior changes depending on `checkpoint_at` and `checkpoint_interval`.
To resume an interrupted checkpointed run, run blip on the copy of the params file in the
output directory with 'resume' as an additional argument:
run_blip [path/to/out_dir/params_file] resume""",
default="false",
),
Option(
"checkpoint_at",
desc="""
When to checkpoint (numpyro only).
Can be 'end' (only saves sampler state at the very end of the run),
'warmup' (saves after warmup phase and at end), or
'interval' (saves after warmup, at end, and after every checkpoint_interval
number of samples; need to also specify checkpoint_interval).
Generally not worth checkpointing while sampling for large models/datasets,
as the recompliation and GPU off/onloading time will exceed the sampling time.""",
default="end",
),
Option(
"checkpoint_interval",
desc="Checkpointing interval in seconds (dynesty) or samples (numpyro).",
),
Option(
"additional_samples",
desc="""
With (checkpointed) numpyro runs, you can get additional samples after the run
has concluded by setting additional_samples in the run directory's param file and doing:
run_blip [paramsfile] resume""",
default="0",
)
]
# A few options have fallback values that are set dynamically depending on
# the values of other options; these don't have defaults.
DEFAULTS_SECTION_PARAMS = {
opt.name: opt.default for opt in SECTION_PARAMS if opt.default
}
DEFAULTS_SECTION_INJ = {opt.name: opt.default for opt in SECTION_INJ if opt.default}
DEFAULTS_SECTION_RUN_PARAMS = {
opt.name: opt.default for opt in SECTION_RUN_PARAMS if opt.default
}
REQUIRED_SECTION_PARAMS = [opt.name for opt in SECTION_PARAMS if opt.required]
REQUIRED_SECTION_INJ = [opt.name for opt in SECTION_INJ if opt.required]
REQUIRED_SECTION_RUN_PARAMS = [opt.name for opt in SECTION_RUN_PARAMS if opt.required]
ALL_LOWER_PARAMS = [opt.name.lower() for opt in SECTION_PARAMS]
ALL_LOWER_INJ = [opt.name.lower() for opt in SECTION_INJ]
ALL_LOWER_RUN_PARAMS = [opt.name.lower() for opt in SECTION_RUN_PARAMS]
[docs]
def parse_config(paramsfile: str, resume: bool):
"""Parse a configuration file `paramsfile`.
Args:
`paramsfile`: the config filename.
`resume`: a command-line flag to run_blip. It affects whether the
configuration file is valid because it only makes sense to
resume checkpointed runs if they have fixed seeds.
Returns:
a tuple of dictionaries `(params, inj, misc)` containing a
*valid* configuration where all parameters have been converted from
`str` to the appropriate types.
For invalid input configurations: raises `TypeError`, `ValueError`, `KeyError`,
`AssertionError`, or `configparse.NoOptionError`, `FileNotFoundError`.
"""
if not os.path.isfile(paramsfile):
raise FileNotFoundError(paramsfile)
# These are the returned dictionaries, containing a *valid* configuration
# with options that already have the right types.
params = {} # basically combines `params` and `run_params` sections
inj = {} # basically receives options from the `inj` section
misc = {} # receives a few run parameters and the generated seed
# Preserve original paramsfile after parsing, for the sake of having a copy.
with open(paramsfile) as f:
misc["paramsfile_name"] = os.path.basename(paramsfile)
misc["paramsfile_text"] = f.read()
config = configparser.ConfigParser()
config.read_dict(
{
"params": DEFAULTS_SECTION_PARAMS,
"inj": DEFAULTS_SECTION_INJ,
"run_params": DEFAULTS_SECTION_RUN_PARAMS,
}
)
config.read(paramsfile)
config_params = config["params"]
config_inj = config["inj"]
config_run_params = config["run_params"]
# is everything here?
req_params_missing = [
optname for optname in REQUIRED_SECTION_PARAMS if (optname not in config_params)
]
req_inj_missing = [
optname for optname in REQUIRED_SECTION_INJ if (optname not in config_inj)
]
req_run_params_missing = [
optname
for optname in REQUIRED_SECTION_RUN_PARAMS
if (optname not in config_run_params)
]
if req_params_missing or req_inj_missing or req_run_params_missing:
raise TypeError(
f"""
Some required configuration options are missing:
In section params: {req_params_missing}
In section inj: {req_inj_missing}
In section run_params: {req_run_params_missing}
"""
)
# do we recognize all the options in the input?
unknown_params = [
optname for optname in config_params if (optname not in ALL_LOWER_PARAMS)
]
unknown_inj = [optname for optname in config_inj if (optname not in ALL_LOWER_INJ)]
unknown_run_params = [
optname
for optname in config_run_params
if (optname not in ALL_LOWER_RUN_PARAMS)
]
if unknown_params or unknown_inj or unknown_run_params:
raise TypeError(
f"""
Unrecognized options:
In section params: {unknown_params}
In section inj: {unknown_inj}
In section run_params: {unknown_run_params}
"""
)
# Params Dict
params["fmin"] = float(config_params["fmin"])
params["fmax"] = float(config_params["fmax"])
params["dur"] = float(config_params["duration"])
params["seglen"] = float(config_params["seglen"])
assert (
params["seglen"] >= 10 / params["fmin"]
), "Segment length incompatible with fmin"
params["tsplice"] = float(config_params["tsplice"])
assert (
params["tsplice"] > 1 / params["fmin"]
), "Splice length incompatible with fmin"
params["fs"] = float(config_params["fs"])
assert params["fs"] >= 4 * params["fmax"], "Sampling rate fs incompatible with fmax"
params["Shfile"] = config_params["Shfile"]
params["load_data"] = config_params.getboolean("load_data")
params["datatype"] = config_params["datatype"]
params["datafile"] = config_params["datafile"]
if params["load_data"]:
if not os.path.isfile(params["datafile"]):
raise FileNotFoundError(params["datafile"])
params["datafileformat"] = config_params["datafileformat"]
assert params["datafileformat"] in ["mldc", "ldc"], "Invalid input file format."
params["datadomain"] = config_params["datadomain"]
assert params["datadomain"] in ["time", "freq"], "Invalid input data domain."
params["ldc_dataset"] = config_params["ldc_dataset"]
## default fref is 1mHz
## until we update the prior structure, using other reference frequencies may lead to unintended behavior in the prior bounds
## in the new prior structure, the default astrophysical prior bounds should scale with fref
params["fref"] = float(config_params["fref"])
if params["fref"] != 0.001:
print(
"Warning: reference frequency set to a value other than 1 mHz (fref={} Hz).".format(
params["fref"]
)
)
print(
"This may lead to unexpected prior behavior, as the prior bounds are based on a reference frequency of 1 mHz!"
)
params["faster_geometry"] = config_params.getboolean("faster_geometry")
# params["model"] will be a list[SubmodelAnalysisSpec] and depends on the injections
# because of aliasing. We postpone that parsing until the injections have been parsed.
params["model_raw"] = str(config_params["model"])
## precompute time-frequency alignment
params["nsegs"], params["Nperseg"] = get_data_tf_grid(
params["dur"], params["seglen"], params["fs"]
)
## make sure alignment is possible for `tser2fser()`
# FIXME BLIP should just trim the data for alignment
Ndata_from_duration = int((params["dur"]) * params["fs"])
Ndata_from_alignment = get_data_length(
params["dur"], params["seglen"], params["fs"]
)
assert (
Ndata_from_duration % params["Nperseg"] == 0
), f"""Data duration misaligned with segment length
{Ndata_from_duration = }, {Ndata_from_alignment = }
duration = {params['dur']}, nsegs = {params['nsegs']}
seglen = {params['seglen']}, Nperseg = {params['Nperseg']}
{Ndata_from_duration % params['Nperseg'] = }"""
## get the model fixed values, passed as a dict
fixedvals = eval(str(config_params["fixedvals"]))
# TODO check that the fixedvals make sense
params["fixedvals"] = {}
if fixedvals is not None:
## enforce that the keys of the fixedvals dict correspond to the desired models
## at this point we will only make sure that all the keys are in the model string
submodel_names = params["model_raw"].split("+")
if not np.all([key in submodel_names for key in fixedvals.keys()]):
raise ValueError(
"Fixedvals dictionary has an invalid key. Fixedvals must be provided as a nested dictionary with top-level keys corresponding to the models specified in the 'model' parameter."
)
## build the fixedvals dict
## some quantities we want to use as log values, so convert them
log_list = ["Np", "Na", "omega0", "fbreak", "fcut", "fscale"]
for submodel_name in fixedvals.keys():
params["fixedvals"][submodel_name] = {}
for name in fixedvals[submodel_name].keys():
if name in log_list:
new_name = "log_" + name
params["fixedvals"][submodel_name][new_name] = np.log10(
fixedvals[submodel_name][name]
)
else:
params["fixedvals"][submodel_name][name] = fixedvals[submodel_name][
name
]
params["alias"] = eval(config_params["alias"])
if not isinstance(params["alias"], dict):
raise ValueError("alias must be dict")
for k, v in params["alias"].items():
if not isinstance(k, str):
raise ValueError("alias keys must be strings")
if not isinstance(v, str):
raise ValueError("alias values must be strings")
params["tdi_lev"] = config_params["tdi_lev"]
params["lisa_config"] = config_params["lisa_config"]
if params["faster_geometry"] and params["lisa_config"] != "orbiting":
raise ValueError("faster_geometry requires lisa_config = 'orbiting'")
params["nside"] = int(config_params["nside"])
params["model_basis"] = config_params["model_basis"]
params["tstart"] = float(config_params["tstart"])
## precompute injection TF grid params
(
params["tsplice"],
params["nsplice"],
params["tsegmid"],
params["Npersplice"],
) = get_simulation_tf_grid(
params["dur"], params["tstart"], params["fs"], params["tsplice"]
)
## see if we need to initialize the spherical harmonic subroutines
sph_check = [
sublist.split("-")[0].split("_")[-1]
for sublist in params["model_raw"].split("+")
]
# Injection Dict
inj["doInj"] = config_inj.getboolean("doInj")
inj["loadInj"] = config_inj.getboolean("loadInj")
inj["inj_only"] = config_inj.getboolean("inj_only")
if inj["doInj"]:
## first see if we are loading the injection
if inj["loadInj"]:
if inj["inj_only"]:
raise ValueError(
"Both loadInj and inj_only flags are set to True. This won't accomplish anything..."
)
inj["injdir"] = config_inj["injdir"]
# FIXME why are we pickling configuration dictionaries?
# they can be saved as plain .ini files by configparser!
## get the already-generated injection dict
with open(inj["injdir"] + "/config.pickle", "rb") as paramfile:
## things are loaded from the pickle file in the same order they are put in
loaded_params = pickle.load(paramfile)
loaded_inj = pickle.load(paramfile)
## for this to work, all params that impact the data/response times/frequencies/types can't change
## but you can change e.g., recovery model, sampler, etc.
required_immutable = [
"fmin",
"fmax",
"dur",
"seglen",
"fs",
"nside",
"tstart",
"lisa_config",
"tdi_lev",
"datatype",
]
requirements_violated = [
requirement
for requirement in required_immutable
if params[requirement] != loaded_params[requirement]
]
if len(requirements_violated) > 0:
raise ValueError(
"Loaded injection is incompatible with specified configuration due to mismatches in the following config settings: {}".format(
requirements_violated
)
)
## update the injection dictionary with the loaded one
inj |= loaded_inj
## reset the top-level injection flags to their original state
inj["loadInj"] = True
inj["inj_only"] = False
## otherwise make a new one
else:
## make sure the simulation will generate enough data
# FIXME this should just be impossible i.e. BLIP should always generate
# enough data
Nsim_from_alignement = get_simulation_length(
params["dur"], params["tstart"], params["fs"], params["tsplice"]
)
assert (
Ndata_from_duration <= Nsim_from_alignement
), f"""Simulation will be too short compared to data
tsplice = {params['tsplice']}, nsplice = {params['nsplice']},
tsegmid = {params['tsegmid']}, Npersplice = {params['Npersplice']}"""
truevals_raw = eval(config_inj["truevals"])
if not isinstance(truevals_raw, dict):
raise ValueError("truevals must be dictionary")
for k, v in truevals_raw.items():
if not isinstance(k, str):
raise ValueError("truevals keys must be strings")
if not isinstance(v, dict):
raise ValueError("truevals values must be dictionaries")
# inj["truevals"] will be built later with a little pre-processing and renaming.
inj["truevals_raw"] = truevals_raw
inj["truevals"] = preprocess_truevals(truevals_raw)
inj["injection_raw"] = config_inj["injection"]
inj["injection"] = parse_model_spec(
inj["injection_raw"],
is_injection=True,
truevals_all=inj["truevals"],
)
inj["inj_basis"] = config_inj["inj_basis"]
## enforce that the keys of the truevals dict correspond to the injected models
## at this point we will only make sure that all the keys are in the injection string
## (not all injections will have truevals; e.g., population injections)
inj_component_names = inj["injection_raw"].split("+")
if not np.all(
[key in inj_component_names for key in inj["truevals"].keys()]
):
raise ValueError(
"Truevals dictionary has an invalid key. Truevals must be provided as a nested dictionary with top-level keys corresponding to the injections specified in the 'injection' parameter."
)
## injection per-component multithreading
inj["parallel_inj"] = config_inj.getboolean("parallel_inj")
## if parallel_inj is True but there is only one component, set parallel_inj to False
# if len(inj_component_names) == 1 and inj['parallel_inj']:
# inj['parallel_inj'] = 0
if inj["parallel_inj"]:
inj["inj_nthread"] = int(
config_inj.get("inj_nthread", fallback=len(inj_component_names))
)
inj["response_nthread"] = int(config_inj["response_nthread"])
## give preference to response multi-threading
if inj["inj_nthread"] > 1 and inj["response_nthread"] > 1:
print(
"Warning: you have set both inj_nthread and response_nthread > 1."
)
print(
"The Multiprocessing package does not allow workers to spawn additional workers."
)
print(
"Giving precedence to response multiprocessing, setting inj_nthread=1."
)
inj["inj_nthread"] = 1
elif inj["inj_nthread"] == 1 and inj["response_nthread"] == 1:
inj["parallel_inj"] = 0
else:
inj["inj_nthread"] = 1
## add injections to the spherical harmonic check if needed
sph_check = sph_check + [
sublist.split("-")[0].split("_")[-1]
for sublist in inj["injection_raw"].split("+")
]
elif not params["load_data"]:
raise ValueError("Either inj.doInj or params.load_data has to be true")
# now that we know the injections and aliases, we can parse the analysis models
_truevals_all = inj.get("truevals", {})
_fixedvals_all = params.get("fixedvals", {})
_injection_specs = inj.get("injection", [])
params["model"] = parse_model_spec(
params["model_raw"],
is_injection=False,
truevals_all=_truevals_all,
fixedvals_all=_fixedvals_all,
alias_all=params["alias"],
injection_specs=_injection_specs,
)
## pop out to set sph flags
params["sph_flag"] = "sph" in sph_check # or ('hierarchical' in sph_check)
## set sph flag to false if both inj and model basis are pixel
if params["sph_flag"]:
params["lmax"] = int(config_params["lmax"])
elif params["model_basis"] == "sph" or inj.get("inj_basis", "sph") == "sph":
params["sph_flag"] = True
params["lmax"] = int(config_params["lmax"])
## some final flag, injection parameter setting if we aren't loading the Injection directly
if inj["doInj"] and not inj["loadInj"]:
## similarly, set inj sph flag to False if we're doing pixel basis injections
if inj["inj_basis"] == "pixel" and not ("sph" in sph_check):
inj["sph_flag"] = False
## but if we're also explicitly doing a sph injection, set it to true
elif inj["inj_basis"] == "pixel" and ("sph" in sph_check):
inj["sph_flag"] = True
else:
inj["sph_flag"] = np.any(
[(item not in ["noise", "isgwb"]) for item in sph_check]
)
## set pop flag if spatial and/or spectral injection is a population
inj["pop_flag"] = ("population" in sph_check) or (
"population"
in [
sublist.split("-")[0].split("_")[0]
for sublist in inj["injection_raw"].split("+")
]
)
if inj["sph_flag"]:
try:
inj["inj_lmax"] = int(config_inj["inj_lmax"])
except (configparser.NoOptionError, KeyError) as err:
if params["sph_flag"]:
print(
"Performing a spherical harmonic basis injection and inj_lmax has not been specified. Injection and recovery will use same lmax (lmax={}).".format(
params["lmax"]
)
)
inj["inj_lmax"] = params["lmax"]
else:
print(
"You are trying to do a spherical harmonic injection, but have not specified lmax."
)
if "lmax" in params.keys():
print(
"Warning: using analysis lmax parameter for inj_lmax, but you are not performing a spherical harmonic analysis."
)
inj["inj_lmax"] = params["lmax"]
else:
raise err
## NB -- will have to change this structure to allow pop-based recovery models. But it's a good start.
if inj["doInj"] and inj["pop_flag"]:
inj["popdict"] = eval(str(config_inj["population_params"]))
## make sure every injection population component has a corresponding entry
inj_pop_component_names = [
cmn for cmn in inj_component_names if "population" in cmn
]
for cmn in inj_pop_component_names:
if cmn not in inj["popdict"].keys():
raise KeyError(
"Population injection '{}' does not have a corresponding entry in the population_params dict.".format(
cmn
)
)
## step through the (possibly multiple) populations and tweak formatting for the delimiters
## also enforce the required keys and substitute defaults if optional setting isn't given
required_keys = ["popfile", "columns", "delimiter"]
pop_defaults = {"snr_cut": 7, "name": None, "coldict": None}
for key in inj["popdict"].keys():
## make sure it corresponds to an injection
if key not in inj_component_names:
raise KeyError(
"Population '{}' not in injection. Top-level keys for the population_params dict must correspond to an injection.".format(
key
)
)
## enforce required keys
for rk in required_keys:
if rk not in inj["popdict"][key].keys():
raise KeyError(
"population_params dict missing required key: '{}'".format(
rk
)
)
## set defaults
for dk in pop_defaults.keys():
if dk not in inj["popdict"][key].keys():
print(
"No value found for populations parameter '{}' in population_params dict for injection component '{}'. Setting to default ({}).".format(
dk, key, pop_defaults[dk]
)
)
inj["popdict"][key][dk] = pop_defaults[dk]
## formatting
if inj["popdict"][key]["delimiter"] == "space":
inj["popdict"][key]["delimiter"] = " "
elif inj["popdict"][key]["delimiter"] == "tab":
inj["popdict"][key]["delimiter"] == "\t"
# some run parameters
params["out_dir"] = config_run_params["out_dir"]
params["doPreProc"] = config_run_params.getboolean("doPreProc")
params["input_spectrum"] = config_run_params["input_spectrum"]
params["projection"] = str(config_run_params["projection"])
params["FixSeed"] = config_run_params.getboolean("FixSeed")
if params["FixSeed"]:
params["seed"] = int(config_run_params["seed"])
misc["nthread"] = int(config_run_params["Nthreads"])
misc["N_GPU"] = int(config_run_params["N_GPU"])
params["colormap"] = config_run_params["colormap"]
## sampler selection
params["sampler"] = config_run_params["sampler"]
## only numpyro has GPU support
if misc["N_GPU"] > 0 and params["sampler"] not in ["numpyro", "numpyro_nested"]:
raise ValueError(
"Only numpyro supports GPU acceleration but N_GPU ({}) > 0 and sampler is {}.".format(
misc["N_GPU"], params["sampler"]
)
)
misc["nlive"] = int(config_run_params["nlive"])
## sampler setup and late-time imports to reduce dependencies
## dynesty
if params["sampler"] == "dynesty":
params["sample_method"] = config_run_params["sample_method"]
## numpyro
elif params["sampler"] == "numpyro":
params["show_progress"] = config_run_params.getboolean("show_progress")
params["Nburn"] = int(config_run_params["Nburn"])
params["Nsamples"] = int(config_run_params["Nsamples"])
else:
raise ValueError(
"Unknown sampler. Supported samplers: 'dynesty' and 'numpyro'."
)
# checkpointing (dynesty+numpyro only for now)
if params["sampler"] == "dynesty" or params["sampler"] == "numpyro":
params["checkpoint"] = config_run_params.getboolean("checkpoint")
## numpyro's checkpoint_interval is in number of samples, vs. seconds for dynesty
if params["sampler"] == "numpyro":
params["checkpoint_at"] = config_run_params["checkpoint_at"]
if params["checkpoint_at"] == "interval":
params["checkpoint_interval"] = int(
config_run_params.get("checkpoint_interval", fallback=100)
)
params["additional_samples"] = int(config_run_params["additional_samples"])
if params["additional_samples"] == 0:
params["additional_samples"] = None
else:
params["checkpoint_interval"] = int(
config_run_params.get("checkpoint_interval", fallback=3600)
)
# Fix random seed
if params["FixSeed"]:
from blip.tools.SetRandomState import SetRandomState as setrs
seed = params["seed"]
misc["randst"] = setrs(seed)
else:
if params["checkpoint"]:
raise TypeError(
"Checkpointing without a fixed seed is not supported. Set 'FixSeed' to true and specify 'seed'."
)
if resume:
raise TypeError(
"Resuming from a checkpoint requires re-generation of data, so the random seed MUST be fixed."
)
misc["randst"] = None
return params, inj, misc
[docs]
def parse_model_spec(
specs: str,
*,
is_injection: bool,
truevals_all: dict,
fixedvals_all: dict | None = None,
alias_all: dict | None = None,
injection_specs: list[SubmodelSpec] | None = None,
) -> list[SubmodelSpec]:
"""Parse a model specifier string into its component submodels.
Parameters
----------
specs : str
model specifier string, with submodels separated by "+" and no spaces.
is_injection : bool
if True, parse this as an injection model. This will ignore the parameters alias_all and injection_specs, since
the injections cannot be aliased to anything.
truevals_all : dict
the truevals dictionary in the config params, specifying injected values. Not checked for correctness (this is
for now the job of submodel.__init__).
fixedvals_all : dict | None, optional
the fixedvals dictionary in the config params, specifying parameters to hold fixed in likelihood evaluation.
Ignored if is_injection=True.
alias_all : dict | None, optional
the alias dictionary in the config params. Ignored if is_injection=True.
injection_specs : list[SubmodelSpec] | None, optional
the already-parsed specifiers for the injection submodels. Required to correctly parse the analysis model.
Ignored if is_injection=True.
Returns
-------
list[SubmodelSpec]
The specifications to build the submodels.
Raises
------
ValueError
If the model specifier string contains invalid syntax.
"""
raw_names = specs.split("+")
raw_names = catch_duplicates(raw_names)
submodel_specs = []
for raw_name in raw_names:
if raw_names.count(raw_name) > 1:
raise ValueError(f"Duplicate model '{raw_name}'")
invalid_err = ValueError(f"Invalid injection submodel specifier '{raw_name}'")
## remove the duplicate identifier if needed (powerlaw_isgwb-3 -> powerlaw_isgwb)
name_split = raw_name.split("-")
if len(name_split) > 2:
raise invalid_err
name = name_split[0]
if len(name_split) == 2:
count = f"({name_split[1]})"
else:
count = ""
is_noise = name in ["noise", "fixednoise"]
if is_noise and count != "":
raise invalid_err
is_population = name == "population"
if is_noise:
kind = SubmodelKind.NOISE
spectral, spatial = None, None
elif is_population:
kind = SubmodelKind.POPULATION
spectral, spatial = "population", "population"
else:
kind = SubmodelKind.SPECTRAL_SPATIAL
name_split = name.split("_")
if len(name_split) != 2:
raise invalid_err
spectral, spatial = name_split
truevals = {}
if raw_name in truevals_all:
truevals: dict = truevals_all[raw_name]
if is_injection:
fixedvals = {}
alias = None
else:
fixedvals = {}
if raw_name in fixedvals_all:
fixedvals: dict = fixedvals_all[raw_name]
alias = None
truevals = {}
if raw_name in alias_all:
alias: str = alias_all[raw_name]
for injspec in injection_specs:
if injspec.raw_name == alias:
truevals = injspec.truevals
break
spec = SubmodelSpec(
name=name,
kind=kind,
is_injection=is_injection,
spectral=spectral,
spatial=spatial,
count=count,
raw_name=raw_name,
truevals=truevals,
fixedvals=fixedvals,
alias=alias,
)
submodel_specs.append(spec)
catch_duplicates(raw_names)
return submodel_specs
[docs]
def preprocess_truevals(tv_raw):
## some quantities we want to use as log values, so convert them
truevals = {}
log_list = ["Np", "Na", "omega0", "fbreak", "fcut", "fscale"]
for component_name in tv_raw.keys():
truevals[component_name] = {}
for name in tv_raw[component_name].keys():
if name in log_list:
new_name = "log_" + name
truevals[component_name][new_name] = np.log10(
tv_raw[component_name][name]
)
else:
truevals[component_name][name] = tv_raw[component_name][name]
return truevals