Source code for blip.config

import os
from dataclasses import dataclass
from enum import Enum
import configparser

import numpy as np
import pickle

from blip.src.makeLISAdata import (
    get_simulation_tf_grid,
    get_data_tf_grid,
    get_simulation_length,
    get_data_length,
)
from blip.src.submodel import SubmodelKind, SubmodelSpec
from blip.src.utils import catch_duplicates


[docs] @dataclass class Option: "Configuration option." name: str desc: str default: str | None = None required: bool = False
# The configuration file is an .ini file with three sections: `params`, `run_params` and `inj`. # Here is what they look like. SECTION_PARAMS = [ Option("fmin", desc="Minimum frequency.", required=True), Option("fmax", desc="Maximum frequency.", required=True), Option( "duration", desc="Duration in seconds, should be compatiable with fmin.", required=True, ), Option( "seglen", desc=""" Segment length for the analysis STFT fft in seconds, should be compatiable with fmin. Note by SB: Looks like this needs to be at least a factor of 10 larger than 1/fmin to give consistant recoveries.""", default="1e5", ), Option( "tsplice", desc=""" Splice length for the simulation in seconds, should be compatiable with fmin. """, default="2.5e4", ), Option( "fs", desc=""" Sample rate in Hz, should be compatible with fmax. Note by SB: Looks like this needs to be atleast a factor of 4 higher than fmax (as opposed to the usual factor of 2) to give consistent recoveries.""", default="0.25", ), Option("Shfile", desc="[TODO -- check depreciation]", default="LISA_2017_PSD_M.npy"), Option("load_data", desc="Whether to load data from `datafile`.", default="false"), Option( "datatype", desc="Doppler or strain data. MLDC data is doppler, autogenerated data is strain. Can be doppler or strain.", default="strain", ), Option("mldc", desc="FIXME: doesn't do anything!", default="0"), Option( "datafile", desc=""" Input data file. Only used if `load_data` is true. The data format is specified by `datafileformat`.""", default="None", ), Option( "datafileformat", desc="Format for external input data. Can be 'mldc' or 'ldc'.", default="ldc", ), Option( "datadomain", desc="How to interpret LDC data. Can be 'time' or 'freq'.", default="time", ), Option( "ldc_dataset", desc="HDF5 path to array with TDI series in LDC input.", default="obs/tdi", ), Option("fref", desc="TODO", default="0.001"), Option( "model", desc=""" Recovery Model description write out as spectralmodel1_spatialmodel1+spectralmodel2_spatialmodel2+... exception, noise is just 'noise' with no spatial component handle duplicates as (e.g.) noise+powerlaw_isgwb-1+powerlaw_isgwb-2 spectral models available: noise, powerlaw, brokenpowerlaw, truncatedpowerlaw spatial models available: isgwb, sph, fixedgalaxy, hotpixel""", required=True, ), Option( "fixedvals", desc=""" Recovery model fixed values fixedvals allows you to fix the value of specific parameters in the recovery model These parameters will be set to the value provided and not sampled over Currently only implemented for the fixedgalaxy and hotpixel spatial models""", default="None", ), Option( "alias", desc=""" Model/Injection aliasing optionally specify which models should be associated with which injections any models/injections left unspecified will be matched by name as default and if no match is found will not be treated as associated (i.e., truevals won't be provided) notation is alias={'model_name':'injection_name'}""", default="{}", ), Option( "tdi_lev", desc="Level of the tdi. Can be michelson, xyz or aet.", default="xyz" ), Option( "lisa_config", desc="LISA configuration. Can be stationary or orbiting.", default="orbiting", ), Option( "nside", desc="Healpix nside (skymap resolution).", required=True ), # why is this always required? It's always used and the choice matters for memory and accuracy Option("model_basis", desc=""" Skymap representation. Can be 'pixel' (Criswell+25 pixelated skymaps) or 'sph' (Banagiri+21 spherical harmonics). """, default="pixel"), Option( "tstart", desc="Start time in seconds. Determines starting orbital position.", default="0.0", ), Option("lmax", desc="spherical harmonic lmax for the b_lms (a_lmax/2)"), Option("faster_geometry", desc="Enable new response module", default="true"), ] SECTION_INJ = [ Option("doInj", desc="Whether to generate data (injections).", required=True), Option( "loadInj", desc="""Whether to load a previously-generated injection from `injdir`.""", default="false", ), Option( "inj_only", desc="If true, BLIP will perform only the injection and data generation process, then exit.", default="false", ), Option("injdir", desc="If loadInj, path to injection directory."), Option( "injection", desc=""" Injection model description Same syntax as for the Recovery Model Additional option: "population" generates spectral/spatial distributions from a DWD population""", ), Option("inj_basis", desc=""" Skymap representation for the injection. Can be 'pixel' (Criswell+25 pixelated skymaps) or 'sph' (Banagiri+21 spherical harmonics). """, default="pixel"), Option( "truevals", desc=""" Dictionary of injection 'ground truth'. If doing an sph injection, blms should be given as complex quantities in a comma seperated list in healpix order b00 can only be 1. b_{l, -m} = (-1)**m b_lm blms = b00, b10, b11 for blmax = 1, alamx = 2 blms = b00, b10, b20, b11, b21, b22 for blmax = 2, almax = 4 blms = b00, b10, b20, b30, b11, b21, b31, b22, b32, b33 for blmax = 3, almax = 6 e.g. 'blms':[1.0, 0.75, 0.5, 0.7j, 0.7-0.3j, 1.1j]""", ), Option( "parallel_inj", desc="If true, BLIP will multithread data simulation on a component-by-component basis.", default="0", ), Option( "inj_nthread", desc=""" How many processes to use for injection parallelization. No benefit from N_threads > N_components. Defaults to N_components.""", ), Option("response_nthread", desc="[TODO -- update or depreciate]", default="1"), Option( "inj_lmax", desc="Injection blmax. If not specified, will match to the analysis lmax.", ), Option( "population_params", desc=""" Population injection Population Params Dict Dictionary with the settings for each population. Top-level keys should correspond to inj (+eventually, model)""", default="None", ), ] SECTION_RUN_PARAMS = [ Option( "sampler", desc="""Sampler to use. Supported: dynesty, numpyro.""", required=True, ), Option("out_dir", desc="Output directory.", required=True), Option( "doPreProc", desc=""" If true, force generation of fake data again rather than using precalculated data in the data_spectrum file""", default="0", ), Option("input_spectrum", desc="Must be .npz file", default="data_spectrum.npz"), Option( "projection", desc="Projection used for making skymaps. Can be 'E' (ecliptic) or 'G' (galactic).", default="E", ), Option( "FixSeed", desc="Whether to seed the sampler with the `seed` parameter.", default="false", ), Option( "seed", desc="Seed for the sampler. Only used if `FixSeed` is true. Must be a 32-bit integer.", ), Option( "Nthreads", desc=""" For multiprocessing, number of threads Warning: most often, multithreading the dynesty sampler does not result in significant gains as its proposal process must be serial.""", default="1", ), Option( "N_GPU", desc=""" GPUs (numpyro only) Number of available GPU devices If one GPU is available, Nthreads > 1 will *vectorize* chains on a single GPU. If more than one GPU is available, BLIP will parallelize across GPUs, setting Nthreads=N_GPU""", default="0", ), Option( "colormap", desc="matplotlib colormap used for the skymaps.", default="magma" ), Option("nlive", desc="Number of live points for dynesty or emcee.", default="800"), Option( "sample_method", desc=""" Sampling method (dynesty only) see https://dynesty.readthedocs.io/en/latest/quickstart.html?highlight=method#sampling-options rslice recommended for MW foreground analyses""", default="rwalk", ), Option( "Nburn", desc="Length of burn-in phase. Affects numpyro.", default="1000", ), Option( "Nsamples", desc="Number of samples. Affects emcee and numpyro.", default="1000" ), Option( "show_progress", desc="If false, disable numpyro progress bar. Disabling improves performance, especially when multithreading.", default="1", ), Option( "checkpoint", desc=""" Checkpointing and restarting (dynesty/numpyro only). If `checkpoint` is true, the sampler will save its current state every so often. For dynesty this is every `checkpoint_interval` seconds. For numpyro the behavior changes depending on `checkpoint_at` and `checkpoint_interval`. To resume an interrupted checkpointed run, run blip on the copy of the params file in the output directory with 'resume' as an additional argument: run_blip [path/to/out_dir/params_file] resume""", default="false", ), Option( "checkpoint_at", desc=""" When to checkpoint (numpyro only). Can be 'end' (only saves sampler state at the very end of the run), 'warmup' (saves after warmup phase and at end), or 'interval' (saves after warmup, at end, and after every checkpoint_interval number of samples; need to also specify checkpoint_interval). Generally not worth checkpointing while sampling for large models/datasets, as the recompliation and GPU off/onloading time will exceed the sampling time.""", default="end", ), Option( "checkpoint_interval", desc="Checkpointing interval in seconds (dynesty) or samples (numpyro).", ), Option( "additional_samples", desc=""" With (checkpointed) numpyro runs, you can get additional samples after the run has concluded by setting additional_samples in the run directory's param file and doing: run_blip [paramsfile] resume""", default="0", ) ] # A few options have fallback values that are set dynamically depending on # the values of other options; these don't have defaults. DEFAULTS_SECTION_PARAMS = { opt.name: opt.default for opt in SECTION_PARAMS if opt.default } DEFAULTS_SECTION_INJ = {opt.name: opt.default for opt in SECTION_INJ if opt.default} DEFAULTS_SECTION_RUN_PARAMS = { opt.name: opt.default for opt in SECTION_RUN_PARAMS if opt.default } REQUIRED_SECTION_PARAMS = [opt.name for opt in SECTION_PARAMS if opt.required] REQUIRED_SECTION_INJ = [opt.name for opt in SECTION_INJ if opt.required] REQUIRED_SECTION_RUN_PARAMS = [opt.name for opt in SECTION_RUN_PARAMS if opt.required] ALL_LOWER_PARAMS = [opt.name.lower() for opt in SECTION_PARAMS] ALL_LOWER_INJ = [opt.name.lower() for opt in SECTION_INJ] ALL_LOWER_RUN_PARAMS = [opt.name.lower() for opt in SECTION_RUN_PARAMS]
[docs] def parse_config(paramsfile: str, resume: bool): """Parse a configuration file `paramsfile`. Args: `paramsfile`: the config filename. `resume`: a command-line flag to run_blip. It affects whether the configuration file is valid because it only makes sense to resume checkpointed runs if they have fixed seeds. Returns: a tuple of dictionaries `(params, inj, misc)` containing a *valid* configuration where all parameters have been converted from `str` to the appropriate types. For invalid input configurations: raises `TypeError`, `ValueError`, `KeyError`, `AssertionError`, or `configparse.NoOptionError`, `FileNotFoundError`. """ if not os.path.isfile(paramsfile): raise FileNotFoundError(paramsfile) # These are the returned dictionaries, containing a *valid* configuration # with options that already have the right types. params = {} # basically combines `params` and `run_params` sections inj = {} # basically receives options from the `inj` section misc = {} # receives a few run parameters and the generated seed # Preserve original paramsfile after parsing, for the sake of having a copy. with open(paramsfile) as f: misc["paramsfile_name"] = os.path.basename(paramsfile) misc["paramsfile_text"] = f.read() config = configparser.ConfigParser() config.read_dict( { "params": DEFAULTS_SECTION_PARAMS, "inj": DEFAULTS_SECTION_INJ, "run_params": DEFAULTS_SECTION_RUN_PARAMS, } ) config.read(paramsfile) config_params = config["params"] config_inj = config["inj"] config_run_params = config["run_params"] # is everything here? req_params_missing = [ optname for optname in REQUIRED_SECTION_PARAMS if (optname not in config_params) ] req_inj_missing = [ optname for optname in REQUIRED_SECTION_INJ if (optname not in config_inj) ] req_run_params_missing = [ optname for optname in REQUIRED_SECTION_RUN_PARAMS if (optname not in config_run_params) ] if req_params_missing or req_inj_missing or req_run_params_missing: raise TypeError( f""" Some required configuration options are missing: In section params: {req_params_missing} In section inj: {req_inj_missing} In section run_params: {req_run_params_missing} """ ) # do we recognize all the options in the input? unknown_params = [ optname for optname in config_params if (optname not in ALL_LOWER_PARAMS) ] unknown_inj = [optname for optname in config_inj if (optname not in ALL_LOWER_INJ)] unknown_run_params = [ optname for optname in config_run_params if (optname not in ALL_LOWER_RUN_PARAMS) ] if unknown_params or unknown_inj or unknown_run_params: raise TypeError( f""" Unrecognized options: In section params: {unknown_params} In section inj: {unknown_inj} In section run_params: {unknown_run_params} """ ) # Params Dict params["fmin"] = float(config_params["fmin"]) params["fmax"] = float(config_params["fmax"]) params["dur"] = float(config_params["duration"]) params["seglen"] = float(config_params["seglen"]) assert ( params["seglen"] >= 10 / params["fmin"] ), "Segment length incompatible with fmin" params["tsplice"] = float(config_params["tsplice"]) assert ( params["tsplice"] > 1 / params["fmin"] ), "Splice length incompatible with fmin" params["fs"] = float(config_params["fs"]) assert params["fs"] >= 4 * params["fmax"], "Sampling rate fs incompatible with fmax" params["Shfile"] = config_params["Shfile"] params["load_data"] = config_params.getboolean("load_data") params["datatype"] = config_params["datatype"] params["datafile"] = config_params["datafile"] if params["load_data"]: if not os.path.isfile(params["datafile"]): raise FileNotFoundError(params["datafile"]) params["datafileformat"] = config_params["datafileformat"] assert params["datafileformat"] in ["mldc", "ldc"], "Invalid input file format." params["datadomain"] = config_params["datadomain"] assert params["datadomain"] in ["time", "freq"], "Invalid input data domain." params["ldc_dataset"] = config_params["ldc_dataset"] ## default fref is 1mHz ## until we update the prior structure, using other reference frequencies may lead to unintended behavior in the prior bounds ## in the new prior structure, the default astrophysical prior bounds should scale with fref params["fref"] = float(config_params["fref"]) if params["fref"] != 0.001: print( "Warning: reference frequency set to a value other than 1 mHz (fref={} Hz).".format( params["fref"] ) ) print( "This may lead to unexpected prior behavior, as the prior bounds are based on a reference frequency of 1 mHz!" ) params["faster_geometry"] = config_params.getboolean("faster_geometry") # params["model"] will be a list[SubmodelAnalysisSpec] and depends on the injections # because of aliasing. We postpone that parsing until the injections have been parsed. params["model_raw"] = str(config_params["model"]) ## precompute time-frequency alignment params["nsegs"], params["Nperseg"] = get_data_tf_grid( params["dur"], params["seglen"], params["fs"] ) ## make sure alignment is possible for `tser2fser()` # FIXME BLIP should just trim the data for alignment Ndata_from_duration = int((params["dur"]) * params["fs"]) Ndata_from_alignment = get_data_length( params["dur"], params["seglen"], params["fs"] ) assert ( Ndata_from_duration % params["Nperseg"] == 0 ), f"""Data duration misaligned with segment length {Ndata_from_duration = }, {Ndata_from_alignment = } duration = {params['dur']}, nsegs = {params['nsegs']} seglen = {params['seglen']}, Nperseg = {params['Nperseg']} {Ndata_from_duration % params['Nperseg'] = }""" ## get the model fixed values, passed as a dict fixedvals = eval(str(config_params["fixedvals"])) # TODO check that the fixedvals make sense params["fixedvals"] = {} if fixedvals is not None: ## enforce that the keys of the fixedvals dict correspond to the desired models ## at this point we will only make sure that all the keys are in the model string submodel_names = params["model_raw"].split("+") if not np.all([key in submodel_names for key in fixedvals.keys()]): raise ValueError( "Fixedvals dictionary has an invalid key. Fixedvals must be provided as a nested dictionary with top-level keys corresponding to the models specified in the 'model' parameter." ) ## build the fixedvals dict ## some quantities we want to use as log values, so convert them log_list = ["Np", "Na", "omega0", "fbreak", "fcut", "fscale"] for submodel_name in fixedvals.keys(): params["fixedvals"][submodel_name] = {} for name in fixedvals[submodel_name].keys(): if name in log_list: new_name = "log_" + name params["fixedvals"][submodel_name][new_name] = np.log10( fixedvals[submodel_name][name] ) else: params["fixedvals"][submodel_name][name] = fixedvals[submodel_name][ name ] params["alias"] = eval(config_params["alias"]) if not isinstance(params["alias"], dict): raise ValueError("alias must be dict") for k, v in params["alias"].items(): if not isinstance(k, str): raise ValueError("alias keys must be strings") if not isinstance(v, str): raise ValueError("alias values must be strings") params["tdi_lev"] = config_params["tdi_lev"] params["lisa_config"] = config_params["lisa_config"] if params["faster_geometry"] and params["lisa_config"] != "orbiting": raise ValueError("faster_geometry requires lisa_config = 'orbiting'") params["nside"] = int(config_params["nside"]) params["model_basis"] = config_params["model_basis"] params["tstart"] = float(config_params["tstart"]) ## precompute injection TF grid params ( params["tsplice"], params["nsplice"], params["tsegmid"], params["Npersplice"], ) = get_simulation_tf_grid( params["dur"], params["tstart"], params["fs"], params["tsplice"] ) ## see if we need to initialize the spherical harmonic subroutines sph_check = [ sublist.split("-")[0].split("_")[-1] for sublist in params["model_raw"].split("+") ] # Injection Dict inj["doInj"] = config_inj.getboolean("doInj") inj["loadInj"] = config_inj.getboolean("loadInj") inj["inj_only"] = config_inj.getboolean("inj_only") if inj["doInj"]: ## first see if we are loading the injection if inj["loadInj"]: if inj["inj_only"]: raise ValueError( "Both loadInj and inj_only flags are set to True. This won't accomplish anything..." ) inj["injdir"] = config_inj["injdir"] # FIXME why are we pickling configuration dictionaries? # they can be saved as plain .ini files by configparser! ## get the already-generated injection dict with open(inj["injdir"] + "/config.pickle", "rb") as paramfile: ## things are loaded from the pickle file in the same order they are put in loaded_params = pickle.load(paramfile) loaded_inj = pickle.load(paramfile) ## for this to work, all params that impact the data/response times/frequencies/types can't change ## but you can change e.g., recovery model, sampler, etc. required_immutable = [ "fmin", "fmax", "dur", "seglen", "fs", "nside", "tstart", "lisa_config", "tdi_lev", "datatype", ] requirements_violated = [ requirement for requirement in required_immutable if params[requirement] != loaded_params[requirement] ] if len(requirements_violated) > 0: raise ValueError( "Loaded injection is incompatible with specified configuration due to mismatches in the following config settings: {}".format( requirements_violated ) ) ## update the injection dictionary with the loaded one inj |= loaded_inj ## reset the top-level injection flags to their original state inj["loadInj"] = True inj["inj_only"] = False ## otherwise make a new one else: ## make sure the simulation will generate enough data # FIXME this should just be impossible i.e. BLIP should always generate # enough data Nsim_from_alignement = get_simulation_length( params["dur"], params["tstart"], params["fs"], params["tsplice"] ) assert ( Ndata_from_duration <= Nsim_from_alignement ), f"""Simulation will be too short compared to data tsplice = {params['tsplice']}, nsplice = {params['nsplice']}, tsegmid = {params['tsegmid']}, Npersplice = {params['Npersplice']}""" truevals_raw = eval(config_inj["truevals"]) if not isinstance(truevals_raw, dict): raise ValueError("truevals must be dictionary") for k, v in truevals_raw.items(): if not isinstance(k, str): raise ValueError("truevals keys must be strings") if not isinstance(v, dict): raise ValueError("truevals values must be dictionaries") # inj["truevals"] will be built later with a little pre-processing and renaming. inj["truevals_raw"] = truevals_raw inj["truevals"] = preprocess_truevals(truevals_raw) inj["injection_raw"] = config_inj["injection"] inj["injection"] = parse_model_spec( inj["injection_raw"], is_injection=True, truevals_all=inj["truevals"], ) inj["inj_basis"] = config_inj["inj_basis"] ## enforce that the keys of the truevals dict correspond to the injected models ## at this point we will only make sure that all the keys are in the injection string ## (not all injections will have truevals; e.g., population injections) inj_component_names = inj["injection_raw"].split("+") if not np.all( [key in inj_component_names for key in inj["truevals"].keys()] ): raise ValueError( "Truevals dictionary has an invalid key. Truevals must be provided as a nested dictionary with top-level keys corresponding to the injections specified in the 'injection' parameter." ) ## injection per-component multithreading inj["parallel_inj"] = config_inj.getboolean("parallel_inj") ## if parallel_inj is True but there is only one component, set parallel_inj to False # if len(inj_component_names) == 1 and inj['parallel_inj']: # inj['parallel_inj'] = 0 if inj["parallel_inj"]: inj["inj_nthread"] = int( config_inj.get("inj_nthread", fallback=len(inj_component_names)) ) inj["response_nthread"] = int(config_inj["response_nthread"]) ## give preference to response multi-threading if inj["inj_nthread"] > 1 and inj["response_nthread"] > 1: print( "Warning: you have set both inj_nthread and response_nthread > 1." ) print( "The Multiprocessing package does not allow workers to spawn additional workers." ) print( "Giving precedence to response multiprocessing, setting inj_nthread=1." ) inj["inj_nthread"] = 1 elif inj["inj_nthread"] == 1 and inj["response_nthread"] == 1: inj["parallel_inj"] = 0 else: inj["inj_nthread"] = 1 ## add injections to the spherical harmonic check if needed sph_check = sph_check + [ sublist.split("-")[0].split("_")[-1] for sublist in inj["injection_raw"].split("+") ] elif not params["load_data"]: raise ValueError("Either inj.doInj or params.load_data has to be true") # now that we know the injections and aliases, we can parse the analysis models _truevals_all = inj.get("truevals", {}) _fixedvals_all = params.get("fixedvals", {}) _injection_specs = inj.get("injection", []) params["model"] = parse_model_spec( params["model_raw"], is_injection=False, truevals_all=_truevals_all, fixedvals_all=_fixedvals_all, alias_all=params["alias"], injection_specs=_injection_specs, ) ## pop out to set sph flags params["sph_flag"] = "sph" in sph_check # or ('hierarchical' in sph_check) ## set sph flag to false if both inj and model basis are pixel if params["sph_flag"]: params["lmax"] = int(config_params["lmax"]) elif params["model_basis"] == "sph" or inj.get("inj_basis", "sph") == "sph": params["sph_flag"] = True params["lmax"] = int(config_params["lmax"]) ## some final flag, injection parameter setting if we aren't loading the Injection directly if inj["doInj"] and not inj["loadInj"]: ## similarly, set inj sph flag to False if we're doing pixel basis injections if inj["inj_basis"] == "pixel" and not ("sph" in sph_check): inj["sph_flag"] = False ## but if we're also explicitly doing a sph injection, set it to true elif inj["inj_basis"] == "pixel" and ("sph" in sph_check): inj["sph_flag"] = True else: inj["sph_flag"] = np.any( [(item not in ["noise", "isgwb"]) for item in sph_check] ) ## set pop flag if spatial and/or spectral injection is a population inj["pop_flag"] = ("population" in sph_check) or ( "population" in [ sublist.split("-")[0].split("_")[0] for sublist in inj["injection_raw"].split("+") ] ) if inj["sph_flag"]: try: inj["inj_lmax"] = int(config_inj["inj_lmax"]) except (configparser.NoOptionError, KeyError) as err: if params["sph_flag"]: print( "Performing a spherical harmonic basis injection and inj_lmax has not been specified. Injection and recovery will use same lmax (lmax={}).".format( params["lmax"] ) ) inj["inj_lmax"] = params["lmax"] else: print( "You are trying to do a spherical harmonic injection, but have not specified lmax." ) if "lmax" in params.keys(): print( "Warning: using analysis lmax parameter for inj_lmax, but you are not performing a spherical harmonic analysis." ) inj["inj_lmax"] = params["lmax"] else: raise err ## NB -- will have to change this structure to allow pop-based recovery models. But it's a good start. if inj["doInj"] and inj["pop_flag"]: inj["popdict"] = eval(str(config_inj["population_params"])) ## make sure every injection population component has a corresponding entry inj_pop_component_names = [ cmn for cmn in inj_component_names if "population" in cmn ] for cmn in inj_pop_component_names: if cmn not in inj["popdict"].keys(): raise KeyError( "Population injection '{}' does not have a corresponding entry in the population_params dict.".format( cmn ) ) ## step through the (possibly multiple) populations and tweak formatting for the delimiters ## also enforce the required keys and substitute defaults if optional setting isn't given required_keys = ["popfile", "columns", "delimiter"] pop_defaults = {"snr_cut": 7, "name": None, "coldict": None} for key in inj["popdict"].keys(): ## make sure it corresponds to an injection if key not in inj_component_names: raise KeyError( "Population '{}' not in injection. Top-level keys for the population_params dict must correspond to an injection.".format( key ) ) ## enforce required keys for rk in required_keys: if rk not in inj["popdict"][key].keys(): raise KeyError( "population_params dict missing required key: '{}'".format( rk ) ) ## set defaults for dk in pop_defaults.keys(): if dk not in inj["popdict"][key].keys(): print( "No value found for populations parameter '{}' in population_params dict for injection component '{}'. Setting to default ({}).".format( dk, key, pop_defaults[dk] ) ) inj["popdict"][key][dk] = pop_defaults[dk] ## formatting if inj["popdict"][key]["delimiter"] == "space": inj["popdict"][key]["delimiter"] = " " elif inj["popdict"][key]["delimiter"] == "tab": inj["popdict"][key]["delimiter"] == "\t" # some run parameters params["out_dir"] = config_run_params["out_dir"] params["doPreProc"] = config_run_params.getboolean("doPreProc") params["input_spectrum"] = config_run_params["input_spectrum"] params["projection"] = str(config_run_params["projection"]) params["FixSeed"] = config_run_params.getboolean("FixSeed") if params["FixSeed"]: params["seed"] = int(config_run_params["seed"]) misc["nthread"] = int(config_run_params["Nthreads"]) misc["N_GPU"] = int(config_run_params["N_GPU"]) params["colormap"] = config_run_params["colormap"] ## sampler selection params["sampler"] = config_run_params["sampler"] ## only numpyro has GPU support if misc["N_GPU"] > 0 and params["sampler"] not in ["numpyro", "numpyro_nested"]: raise ValueError( "Only numpyro supports GPU acceleration but N_GPU ({}) > 0 and sampler is {}.".format( misc["N_GPU"], params["sampler"] ) ) misc["nlive"] = int(config_run_params["nlive"]) ## sampler setup and late-time imports to reduce dependencies ## dynesty if params["sampler"] == "dynesty": params["sample_method"] = config_run_params["sample_method"] ## numpyro elif params["sampler"] == "numpyro": params["show_progress"] = config_run_params.getboolean("show_progress") params["Nburn"] = int(config_run_params["Nburn"]) params["Nsamples"] = int(config_run_params["Nsamples"]) else: raise ValueError( "Unknown sampler. Supported samplers: 'dynesty' and 'numpyro'." ) # checkpointing (dynesty+numpyro only for now) if params["sampler"] == "dynesty" or params["sampler"] == "numpyro": params["checkpoint"] = config_run_params.getboolean("checkpoint") ## numpyro's checkpoint_interval is in number of samples, vs. seconds for dynesty if params["sampler"] == "numpyro": params["checkpoint_at"] = config_run_params["checkpoint_at"] if params["checkpoint_at"] == "interval": params["checkpoint_interval"] = int( config_run_params.get("checkpoint_interval", fallback=100) ) params["additional_samples"] = int(config_run_params["additional_samples"]) if params["additional_samples"] == 0: params["additional_samples"] = None else: params["checkpoint_interval"] = int( config_run_params.get("checkpoint_interval", fallback=3600) ) # Fix random seed if params["FixSeed"]: from blip.tools.SetRandomState import SetRandomState as setrs seed = params["seed"] misc["randst"] = setrs(seed) else: if params["checkpoint"]: raise TypeError( "Checkpointing without a fixed seed is not supported. Set 'FixSeed' to true and specify 'seed'." ) if resume: raise TypeError( "Resuming from a checkpoint requires re-generation of data, so the random seed MUST be fixed." ) misc["randst"] = None return params, inj, misc
[docs] def parse_model_spec( specs: str, *, is_injection: bool, truevals_all: dict, fixedvals_all: dict | None = None, alias_all: dict | None = None, injection_specs: list[SubmodelSpec] | None = None, ) -> list[SubmodelSpec]: """Parse a model specifier string into its component submodels. Parameters ---------- specs : str model specifier string, with submodels separated by "+" and no spaces. is_injection : bool if True, parse this as an injection model. This will ignore the parameters alias_all and injection_specs, since the injections cannot be aliased to anything. truevals_all : dict the truevals dictionary in the config params, specifying injected values. Not checked for correctness (this is for now the job of submodel.__init__). fixedvals_all : dict | None, optional the fixedvals dictionary in the config params, specifying parameters to hold fixed in likelihood evaluation. Ignored if is_injection=True. alias_all : dict | None, optional the alias dictionary in the config params. Ignored if is_injection=True. injection_specs : list[SubmodelSpec] | None, optional the already-parsed specifiers for the injection submodels. Required to correctly parse the analysis model. Ignored if is_injection=True. Returns ------- list[SubmodelSpec] The specifications to build the submodels. Raises ------ ValueError If the model specifier string contains invalid syntax. """ raw_names = specs.split("+") raw_names = catch_duplicates(raw_names) submodel_specs = [] for raw_name in raw_names: if raw_names.count(raw_name) > 1: raise ValueError(f"Duplicate model '{raw_name}'") invalid_err = ValueError(f"Invalid injection submodel specifier '{raw_name}'") ## remove the duplicate identifier if needed (powerlaw_isgwb-3 -> powerlaw_isgwb) name_split = raw_name.split("-") if len(name_split) > 2: raise invalid_err name = name_split[0] if len(name_split) == 2: count = f"({name_split[1]})" else: count = "" is_noise = name in ["noise", "fixednoise"] if is_noise and count != "": raise invalid_err is_population = name == "population" if is_noise: kind = SubmodelKind.NOISE spectral, spatial = None, None elif is_population: kind = SubmodelKind.POPULATION spectral, spatial = "population", "population" else: kind = SubmodelKind.SPECTRAL_SPATIAL name_split = name.split("_") if len(name_split) != 2: raise invalid_err spectral, spatial = name_split truevals = {} if raw_name in truevals_all: truevals: dict = truevals_all[raw_name] if is_injection: fixedvals = {} alias = None else: fixedvals = {} if raw_name in fixedvals_all: fixedvals: dict = fixedvals_all[raw_name] alias = None truevals = {} if raw_name in alias_all: alias: str = alias_all[raw_name] for injspec in injection_specs: if injspec.raw_name == alias: truevals = injspec.truevals break spec = SubmodelSpec( name=name, kind=kind, is_injection=is_injection, spectral=spectral, spatial=spatial, count=count, raw_name=raw_name, truevals=truevals, fixedvals=fixedvals, alias=alias, ) submodel_specs.append(spec) catch_duplicates(raw_names) return submodel_specs
[docs] def preprocess_truevals(tv_raw): ## some quantities we want to use as log values, so convert them truevals = {} log_list = ["Np", "Na", "omega0", "fbreak", "fcut", "fscale"] for component_name in tv_raw.keys(): truevals[component_name] = {} for name in tv_raw[component_name].keys(): if name in log_list: new_name = "log_" + name truevals[component_name][new_name] = np.log10( tv_raw[component_name][name] ) else: truevals[component_name][name] = tv_raw[component_name][name] return truevals