diff --git a/scripts/gcorr/README b/scripts/gcorr/README index b36b1e14..90077d28 100644 --- a/scripts/gcorr/README +++ b/scripts/gcorr/README @@ -2,15 +2,16 @@ Code package for calculating correction factors to g_ell using pre-generated simulation maps for each single mask. This procedure assumes you are running this code on a cluster-- it is extremely slow to do otherwise. -There are three scripts used: +There is one main script that calls two functions from the xfaster.gcorr_tools +library: -1. run_xfaster.py ---- This calls XFaster either to run or submit jobs. -The only part that you'll need to touch is at the beginning-- opts and -submit_opts, to match the options you use for data/for your cluster. -2. compute_gcorr.py ---- A script that computes gcorr.npz file from all the -bandpower.npz files. You'll never need to touch or run this script. -3. iterate.py ---- A iteration script, that calls script 1 and 2 to get a -new gcorr.npz file each time. This is the main script you'll run. +1. xfaster_gcorr ---- This function calls XFaster to run or submit jobs for + gcorr runs. +2. process_gcorr ---- A function that computes the gcorr correction from the + ensemble of bandpowers, updates the running total, and backs up the necessary + files from each iteration. +3. iterate_gcorr.py ---- A iteration script, that calls function 1 and 2 to get + a new gcorr.npz file each time. This is the main script you'll run. There is also a config file with options specific to computing gcorr. @@ -18,63 +19,54 @@ There is also a config file with options specific to computing gcorr. Gcorr run procedure: 1. Edit the gcorr config file to suit your purposes. An example is given. -Required fields are: - * null -- must be true for null tests and false for signal runs - * map_tags -- comma-separated list of map tags - * data_subset -- the globabble data_subset argument to xfaster_run, - but without map tags. So, "full", "chunk*", etc. - * output_root -- the parent directory where your gcorr XFaster runs will - be written - * nsim -- the number of simulations to use to compute gcorr - * [xfaster_opts] -- this is where you'll put any string-type options - that will be directly input to xfaster_run + Required fields are: + * null -- must be true for null tests and false for signal runs + * map_tags -- comma-separated list of map tags + * data_subset -- the globabble data_subset argument to xfaster_run, + but without map tags. So, "full", "chunk*", etc. + * output_root -- the parent directory where your gcorr XFaster runs will + be written + * nsim -- the number of simulations to use to compute gcorr + * [xfaster_opts] -- this is where you'll put any options + that will be directly input to xfaster_run + * [submit_opts] -- this is where you'll put any options + that will be directly input to xfaster_submit, + in addition to those in [xfaster_opts] -2. Edit the beginning of run_xfaster.py for non-string input options - to xfaster_run (opts dictionary) or xfaster_submit (submit_opts). - Here you might change things like lmin, lmax, bin size, etc. and - omp_threads. +2. Run iterate_gcorr.py once to get the full set of XFaster output files in the + output directory. Since we haven't computed gcorr yet, this will set + apply_gcorr=False. Make sure to use as many OMP threads as possible since + this is the step where the sims_xcorr file, which benefits the most from + extra threads, is computed. Your command should look like this: -3. Run run_xfaster.py once to get the full set of XFaster output files. - Since we haven't computed gcorr yet, you must use --no-gcorr. Make sure - to use as many OMP threads as possible since this is the step where the - sims_xcorr file, which benefits the most from extra threads, is computed. - Your command should look like this: - python run_xfaster.py --gcorr-config path-to-my-gcorr-config.ini --no-gcorr + python iterate_gcorr.py path-to-my-gcorr-config.ini -4. Run iterate.py until convergence is reached. In practice, you will do: - iterate.py --gcorr-config path-to-my-gcorr-config.ini - then wait for it to finish. Then look at the correction-to-the-correction - that it both prints and plots (it should converge to 1s for TT, EE, BB), - and if it hasn't converged, up+enter (redo) the same command you just did. - In much more detail, here's what the code does: - 1. If this is the first iteration, copy the whole output directory into - one next to it with tag _iter. This is the directory that will now be - updated with new transfer functions and bandpowers on each iteration. - In the code, it's called rundir. - 2. Make a plot directory in that _iter directory-- look here for new plots - of the total gcorr and the correction-to-gcorr each iteration. - 3. For the first iteration, initialize a starting guess for gcorr as all - ones. This total gcorr is saved as gcorr__total.npz in the original - (reference) output directory. - 4. If not the first iteration, load up the correction-to-gcorr computed - in the previous iteration. Multiply it by the total gcorr, and save that - to the reference directory as gcorr_total. Also save the previous - iteration's total gcorr as gcorr__prev.npz. - 5. Plot gcorr total and the correction to gcorr total. Save in rundir/plots. - 6. Clear out rundir bandpowers/transfer functions/logs. - 7. Call run_xfaster.py for the 0th sim seed while also reloading gcorr. - This does a couple things-- saves the new gcorr in the masks_xcorr file, - so later seeds will use the right thing. And recompute the transfer - function, which doesn't depend on the sim_index, so is only necessary to - do once. - 8. After the transfer functions are all on disk, submit individual jobs for - all the other seeds, just doing the bandpowers step for those. - 9. Monitor the queue, checking every 10 seconds for jobs still running. - 10. Once they're all done, run compute_gcal.py, which saves a - correction-to-gcorr as gcorr_corr_.npz in the rundir. - 11. Print out the values of the correction-to-gcorr. - 12. Exit. +3. Run iterate_gcorr.py until convergence is reached. In practice, you will run + the command above and wait for it to finish. If you include the `--max-iters` + option with a non-zero value, the code will try to determine whether + convergence or max_iters has been reached and stop on its own. Otherwise, + you can look at the correction-to-the-correction that it both prints and + plots (it should converge to 1s for TT, EE, BB), and if it hasn't converged, + up+enter (redo) the same command you just did. In much more detail, here's + what the code does: -5. After convergence is reached, copy the gcorr_total file from the refdir - to the mask directory, labeling it mask_map__gcorr.npz for signal or - mask_map__gcorr_null.npz for null. + 1. Call xfaster_gcorr for the 0th sim seed while also reloading gcorr (if + this is not the first iteration). This does a couple things-- saves the + new gcorr in the masks_xcorr file, so later seeds will use the right + thing. And recompute the transfer function, which doesn't depend on the + sim_index, so is only necessary to do once. + 2. After the transfer functions are all on disk, submit individual jobs for + all the other seeds, just doing the bandpowers step for those. + 3. Once they're all done, run compute_gcal, and save a correction-to-gcorr + as gcorr_corr__iter.npz in the rundir. + 4. If not the first iteration, load up the correction-to-gcorr computed for + this iteration. Multiply it by the total gcorr, and save that to the + output directory as gcorr_total__iter.npz. + 5. Plot gcorr total and the correction to gcorr total. Save in rundir/plots. + 6. Clear out rundir bandpowers/transfer functions/logs. + 7. Exit. + +4. After convergence is reached, copy the gcorr_total file for the last + iteration from the rundir to the mask directory, labeling it + mask_map__gcorr.npz for signal or mask_map__gcorr_null.npz for + null. diff --git a/scripts/gcorr/compute_gcal.py b/scripts/gcorr/compute_gcal.py deleted file mode 100644 index 4171e5be..00000000 --- a/scripts/gcorr/compute_gcal.py +++ /dev/null @@ -1,145 +0,0 @@ -""" -A script for computing g_corr factor from simulation bandpowers. -""" -import os -import glob -import numpy as np -import scipy.optimize as opt -import argparse as ap -from configparser import ConfigParser -import xfaster as xf -from xfaster import parse_tools as pt - -P = ap.ArgumentParser() -P.add_argument("--gcorr-config", help="The config file for gcorr computation") -P.add_argument("--output-tag", help="Which map tag") -P.add_argument("-r", "--root", default="xfaster_gcal", help="XFaster outputs directory") -P.add_argument( - "--fit-hist", - action="store_true", - help="Fit histogram for gcorr, rather than using the distribution variance", -) -args = P.parse_args() - - -assert os.path.exists(args.gcorr_config), "Missing config file {}".format( - args.gcorr_config -) -g_cfg = ConfigParser() -g_cfg.read(args.gcorr_config) -null = g_cfg.getboolean("gcorr_opts", "null") -if null: - # no sample variance used for null tests - fish_name = "invfish_nosampvar" -else: - fish_name = "inv_fish" - -output_tag = args.output_tag -output_root = os.path.join(g_cfg["gcorr_opts"]["output_root"], args.root) - -specs = ["tt", "ee", "bb", "te", "eb", "tb"] - -nsim = g_cfg.getint("gcorr_opts", "nsim") - - -# use gauss model for null bandpowers -def gauss(qb, amp, width, offset): - # width = 0.5*1/sig**2 - # offset = mean - return amp * np.exp(-width * (qb - offset) ** 2) - - -# use lognormal model for signal bandpowers -def lognorm(qb, amp, width, offset): - return gauss(np.log(qb), amp, width, offset) - - -# Compute the correction factor -if output_root is None: - output_root = os.getcwd() -if output_tag is not None: - output_root = os.path.join(output_root, output_tag) - output_tag = "_{}".format(output_tag) -else: - output_tag = "" - -file_glob = os.path.join(output_root, "bandpowers_sim*{}.npz".format(output_tag)) -files = sorted(glob.glob(file_glob)) -if not len(files): - raise OSError("No bandpowers files found in {}".format(output_root)) - -out = {"data_version": 1} -inv_fishes = None -qbs = {} - -for spec in specs: - qbs[spec] = None - -for filename in files: - bp = xf.load_and_parse(filename) - inv_fish = bp[fish_name] - bad = np.where(np.diag(inv_fish) < 0)[0] - if len(bad): - # this happens rarely and we won't use those sims - print("Found negative fisher values in {}: {}".format(filename, bad)) - continue - - if inv_fishes is None: - inv_fishes = np.diag(inv_fish) - else: - inv_fishes = np.vstack([inv_fishes, np.diag(inv_fish)]) - - for spec in specs: - if qbs[spec] is None: - qbs[spec] = bp["qb"]["cmb_{}".format(spec)] - else: - qbs[spec] = np.vstack([qbs[spec], bp["qb"]["cmb_{}".format(spec)]]) - -# Get average XF-estimated variance -xf_var_mean = np.mean(inv_fishes, axis=0) -xf_var = pt.arr_to_dict(xf_var_mean, bp["qb"]) - -out["bin_def"] = bp["bin_def"] -nbins = len(out["bin_def"]["cmb_tt"]) -out["gcorr"] = {} - -for spec in specs: - stag = "cmb_{}".format(spec) - out["gcorr"][spec] = np.ones(nbins) - - if not args.fit_hist: - fit_variance = np.var(qbs[spec], axis=0) - out["gcorr"][spec] = xf_var[stag] / fit_variance - continue - - for b0 in np.arange(nbins): - hist, bins = np.histogram( - np.asarray(qbs[spec])[:, b0], density=True, bins=int(nsim / 10.0) - ) - bc = (bins[:-1] + bins[1:]) / 2.0 - - # Gauss Fisher-based params - A0 = np.max(hist) - sig0 = np.sqrt(xf_var[stag][b0]) - mu0 = np.mean(qbs[spec][b0]) - - if spec in ["eb", "tb"] or null: - func = gauss - else: - func = lognorm - sig0 /= mu0 - mu0 = np.log(mu0) - - # Initial parameter guesses - p0 = [A0, 1.0 / sig0**2 / 2.0, mu0] - - try: - popth, pcovh = opt.curve_fit(func, bc, hist, p0=p0, maxfev=int(1e9)) - # gcorr is XF Fisher variance over fit variance - out["gcorr"][spec][b0] = popth[1] / p0[1] - except RuntimeError: - print("No hist fits found") - -outfile = os.path.join(output_root, "gcorr_corr{}.npz".format(output_tag)) -np.savez_compressed(outfile, **out) -print("New gcorr correction computed (should converge to 1): ", out["gcorr"]) diff --git a/scripts/gcorr/gcorr_config_null.ini b/scripts/gcorr/gcorr_config_null.ini index 99b18b06..a1394296 100644 --- a/scripts/gcorr/gcorr_config_null.ini +++ b/scripts/gcorr/gcorr_config_null.ini @@ -17,4 +17,20 @@ signal_type = synfast # Noise type needed for nulls noise_type = gaussian mask_type = rectangle -verbose = debug +likelihood = false +residual_fit = false +foreground_fit = false +tbeb = true +bin_width = 25 +lmin = 2 +lmax = 500 +verbose = info + +# Options for submitting to a cluster +[submit_opts] +nodes = 1 +ppn = 1 +mem = 6 +omp_threads = 10 +wallt = 4 +num_jobs = 10 diff --git a/scripts/gcorr/gcorr_config_signal.ini b/scripts/gcorr/gcorr_config_signal.ini index 6b7674a5..73025f64 100644 --- a/scripts/gcorr/gcorr_config_signal.ini +++ b/scripts/gcorr/gcorr_config_signal.ini @@ -6,7 +6,6 @@ data_subset = full output_root = ../../example/gcorr_run nsim = 100 - # Options we can directly pass to XFaster [xfaster_opts] config = ../../example/config_example.ini @@ -16,4 +15,20 @@ signal_type = synfast # noise type ignored for signal gcorr noise_type = gaussian mask_type = rectangle -verbose = debug \ No newline at end of file +likelihood = false +residual_fit = false +foreground_fit = false +tbeb = true +bin_width = 25 +lmin = 2 +lmax = 500 +verbose = info + +# Options for submitting to a cluster +[submit_opts] +nodes = 1 +ppn = 1 +mem = 6 +omp_threads = 10 +wallt = 4 +num_jobs = 10 diff --git a/scripts/gcorr/iterate.py b/scripts/gcorr/iterate.py deleted file mode 100644 index f43b165f..00000000 --- a/scripts/gcorr/iterate.py +++ /dev/null @@ -1,223 +0,0 @@ -""" -A iteration script used to update g_corr factors. -""" -import os -import numpy as np -import argparse as ap -from configparser import ConfigParser -import time -import copy -import glob -import shutil -from matplotlib import use - -use("agg") -import matplotlib.pyplot as plt -import xfaster as xf -from xfaster import parse_tools as pt - -# Set this option to False to keep corrections to gcorr from being -# too large on each iteration. Try if things aren't converging. -allow_extreme = False - -P = ap.ArgumentParser() -P.add_argument("--gcorr-config", help="The config file for gcorr computation") -P.add_argument( - "--force-restart", - action="store_true", - default=False, - help="Force restarting from iteration 0-- remakes iteration dir", -) - -args = P.parse_args() - -assert os.path.exists(args.gcorr_config), "Missing config file {}".format( - args.gcorr_config -) -g_cfg = ConfigParser() -g_cfg.read(args.gcorr_config) -tags = g_cfg["gcorr_opts"]["map_tags"].split(",") - -specs = ["tt", "ee", "bb", "te", "eb", "tb"] - -run_name = "xfaster_gcal" -run_name_iter = run_name + "_iter" -# ref dir will contain the total gcorr file -ref_dir = os.path.join(g_cfg["gcorr_opts"]["output_root"], run_name) -# run dir will be where all the iteration happens to update the reference -rundir = ref_dir + "_iter" - -# if rundir doesn't exist or force_restart, we start from scratch -if not os.path.exists(rundir) or args.force_restart: - iternum = 0 - - if os.path.exists(rundir): - shutil.rmtree(rundir) - for gfile in glob.glob(os.path.join(ref_dir, "*", "gcorr*.npz")): - os.remove(gfile) - shutil.copytree(ref_dir, rundir) - - # make plots directory - os.mkdir(os.path.join(rundir, "plots")) - - # Since this is the first iteration, we also make a gcorr file of - # all ones. - for tag in tags: - bp_file = glob.glob(os.path.join(ref_dir, tag, "bandpowers*.npz"))[0] - bp = xf.load_and_parse(bp_file) - gcorr_data = {"bin_def": bp["bin_def"], "gcorr": {}, "data_version": 1} - for spec in specs: - stag = "cmb_{}".format(spec) - gcorr_data["gcorr"][spec] = np.ones_like(bp["qb"][stag]) - np.savez_compressed( - os.path.join(ref_dir, tag, "gcorr_{}_total.npz".format(tag)), **gcorr_data - ) - -else: - # check plots directory to find what iteration we're at - plots = glob.glob(os.path.join(rundir, "plots", "*.png")) - plot_inds = sorted([int(x.split("iter")[-1].split(".")[0]) for x in plots]) - last_iter = plot_inds[-1] - iternum = int(last_iter) + 1 -print("Starting iteration {}".format(iternum)) - - -for tag in tags: - ref_file = os.path.join(ref_dir, "{0}/gcorr_{0}_total.npz".format(tag)) - rundirf = os.path.join(rundir, tag) - - # Remove transfer functions and bandpowers - os.system("rm -rf {}/bandpowers*".format(rundirf)) - os.system("rm -rf {}/transfer*".format(rundirf)) - os.system("rm -rf {}/ERROR*".format(rundirf)) - os.system("rm -rf {}/logs".format(rundirf)) - - first = False - - # Get gcorr from reference folder, if it's there - assert os.path.exists(ref_file), "Missing total gcorr file {}".format(ref_file) - gcorr = xf.load_and_parse(ref_file) - print("loaded {}".format(ref_file)) - - # Get gcorr_correction from iter folder -- this is the multiplicative - # change to gcorr-- should converge to 1s - try: - fp = os.path.join(rundirf, "gcorr_corr_{}.npz".format(tag)) - gcorr_corr = xf.load_and_parse(fp) - print("got correction to gcorr {}".format(fp)) - except IOError: - gcorr_corr = copy.deepcopy(gcorr) - gcorr_corr["gcorr"] = pt.arr_to_dict( - np.ones_like(pt.dict_to_arr(gcorr["gcorr"], flatten=True)), - gcorr["gcorr"], - ) - first = True - print("Didn't get gcorr correction file in iter folder. Starting from ones.") - - np.savez_compressed(ref_file.replace("_total.npz", "_prev.npz"), **gcorr) - - gcorr["gcorr"] = pt.arr_to_dict( - pt.dict_to_arr(gcorr["gcorr"], flatten=True) - * pt.dict_to_arr(gcorr_corr["gcorr"], flatten=True), - gcorr["gcorr"], - ) - - fig_tot, ax_tot = plt.subplots(2, 3) - fig_corr, ax_corr = plt.subplots(2, 3) - ax_tot = ax_tot.flatten() - ax_corr = ax_corr.flatten() - for i, (k, v) in enumerate(gcorr["gcorr"].items()): - v[0] = 0.5 - if k in ["te", "eb", "tb"]: - # We don't compute gcorr for off-diagonals - v[:] = 1 - if not allow_extreme: - # Don't update gcorr if correction is extreme - v[v < 0.05] /= gcorr_corr["gcorr"][k][v < 0.05] - v[v > 5] /= gcorr_corr["gcorr"][k][v > 5] - for v0, val in enumerate(v): - if val > 1.2: - if v0 != 0: - v[v0] = v[v0 - 1] - else: - v[v0] = 1.2 - if val < 0.2: - if v0 != 0: - v[v0] = v[v0 - 1] - else: - v[v0] = 0.2 - - ax_tot[i].plot(v) - ax_tot[i].set_title("{} total gcorr".format(k)) - ax_corr[i].plot(gcorr_corr["gcorr"][k]) - ax_corr[i].set_title("{} gcorr corr".format(k)) - - print(gcorr["gcorr"]) - fig_tot.savefig( - os.path.join( - rundir, - "plots", - "gcorr_tot_{}_iter{}.png".format(tag, iternum), - ) - ) - fig_corr.savefig( - os.path.join( - rundir, - "plots", - "gcorr_corr_{}_iter{}.png".format(tag, iternum), - ) - ) - - np.savez_compressed(ref_file, **gcorr) - -# Submit a first job that reloads gcorr and computes the transfer function -# that will be used by all the other seeds -print("Sumitting first job") -cmd = "python run_xfaster.py --gcorr-config {g} --omp 1 --check-point transfer --reload-gcorr -o {o} > /dev/null".format( - g=args.gcorr_config, o=run_name_iter -) -print(cmd) -os.system(cmd) - -transfer_exists = {} -for tag in tags: - transfer_exists[tag] = False -while not np.all(list(transfer_exists.values())): - # wait until transfer functions are done to submit rest of jobs - for tag in tags: - rundirf = os.path.join(rundir, tag) - transfer_files = glob.glob( - os.path.join(rundirf, "transfer_all*{}.npz".format(tag)) - ) - transfer_exists[tag] = bool(len(transfer_files)) - - print("transfer exists: ", transfer_exists) - time.sleep(15) - -# Once transfer function is done, all other seeds can run -print("Submitting jobs for all seeds") -cmd = "python run_xfaster.py --gcorr-config {g} --omp 1 --check-point bandpowers -o {o} -f 1 -n {n} > /dev/null".format( - g=args.gcorr_config, o=run_name_iter, n=int(g_cfg["gcorr_opts"]["nsim"]) - 1 -) -print(cmd) -os.system(cmd) - -print("Waiting for jobs to complete...") -while os.system("squeue -u {} | grep xfast > /dev/null".format(os.getenv("USER"))) == 0: - print("Number of jobs left +1:") - os.system("squeue -u {} | wc -l".format(os.getenv("USER"))) - time.sleep(10) - -print("Computing new gcorr factors") -for tag in tags: - cmd = "python compute_gcal.py --gcorr-config {g} -r {r} --output-tag {t}".format( - g=args.gcorr_config, r=run_name_iter, t=tag - ) - print(cmd) - os.system(cmd) - -for tag in tags: - bp_files = glob.glob("{}/{}/bandpowers*".format(rundir, tag)) - error_files = glob.glob("{}/{}/ERROR*".format(rundir, tag)) - print("{} completed: {}".format(tag, len(bp_files))) - print("{} error: {}".format(tag, len(error_files))) diff --git a/scripts/gcorr/iterate_gcorr.py b/scripts/gcorr/iterate_gcorr.py new file mode 100644 index 00000000..6034cf3a --- /dev/null +++ b/scripts/gcorr/iterate_gcorr.py @@ -0,0 +1,176 @@ +""" +A iteration script used to update g_corr factors. Can either be run in series +or submit jobs to run in parallel. +""" +import os +import argparse as ap +import subprocess as sp +from xfaster import gcorr_tools as gt +from xfaster.batch_tools import batch_sub +from matplotlib import use + +use("agg") + +P = ap.ArgumentParser() +P.add_argument("config", help="The config file for gcorr computation") +P.add_argument( + "-s", + "--submit", + action="store_true", + help="Submit jobs, instead of running serially on current session", +) +P.add_argument( + "-t", + "--map-tags", + nargs="+", + help="Subset of map tags to iterate over", +) +P.add_argument( + "-i", + "--iternums", + type=int, + nargs="+", + help="Iteration number to start with for each tag in --map-tags. If one " + "number is given, use the same index for all tags. All files for iterations " + "above this number will be removed. If not supplied, iterations will increment " + "to the next index.", +) +P.add_argument( + "--keep-iters", + action="store_true", + help="Store outputs from each iteration in a separate directory", +) +P.add_argument( + "--max-iters", + default=0, + type=int, + help="Maximum number of iterations to run. If 0, run once and exit.", +) +P.add_argument( + "-c", + "--converge-criteria", + type=float, + default=0.01, + help="Maximum fractional change in gcorr that indicates " + "convergence and stops iteration", +) +P.add_argument( + "--allow-extreme", + action="store_true", + help="Do not clip gcorr corrections that are too large at each " + "iteration. Try this if iterations are not converging.", +) +P.add_argument( + "--gcorr-fit-hist", + action="store_true", + help="Fit bandpower histogram to a lognorm distribution to compute gcorr", +) +P.add_argument( + "-a", + "--analyze-only", + action="store_true", + help="Compute and store gcorr files for the current iteration. Typically used " + "internally by the script, and should not be called by the user.", +) + +args = P.parse_args() + +# load configuration +g_cfg = gt.get_gcorr_config(args.config) + +tags = g_cfg["gcorr_opts"]["map_tags"] +if args.map_tags: + tags = [t for t in args.map_tags if t in tags] + +iternums = {t: None for t in tags} +if args.iternums: + if len(args.iternums) == 1: + args.iternums = args.iternums * len(tags) + for t, i in zip(tags, args.iternums): + iternums[t] = i + +null = g_cfg["gcorr_opts"].get("null", False) +nsim = g_cfg["gcorr_opts"]["nsim"] +rundir = g_cfg["gcorr_opts"]["output_root"] +xf_submit_opts = g_cfg.get("submit_opts", {}) +submit_opts = xf_submit_opts.copy() +submit_opts.pop("num_jobs") + +# sim ensemble options +run_opts = dict( + data_subset=g_cfg["gcorr_opts"]["data_subset"], + data_subset2=g_cfg["gcorr_opts"].get("data_subset2", None), + output_root=rundir, + null=null, + num_sims=nsim, + **g_cfg["xfaster_opts"], +) +if args.submit: + run_opts.update(submit=True, **xf_submit_opts) + +# gcorr analysis options +gcorr_opts = dict( + output_root=rundir, + null=null, + num_sims=nsim, + gcorr_fit_hist=args.gcorr_fit_hist, + allow_extreme=args.allow_extreme, + keep_iters=args.keep_iters, + converge_criteria=args.converge_criteria, + max_iters=args.max_iters, +) + +if args.analyze_only: + assert len(tags) == 1, "Analyze one tag at a time" + if gt.process_gcorr(output_tag=tags[0], iternum=iternums[tags[0]], **gcorr_opts): + raise RuntimeError("Stopping iterations") + raise SystemExit + +# build command for this script +cmd = ["python", os.path.abspath(__file__), os.path.abspath(args.config)] +for k in [ + "allow_extreme", + "gcorr_fit_hist", + "keep_iters", +]: + if getattr(args, k): + cmd += ["--{}".format(k.replace("_", "-"))] +for k in ["converge_criteria", "max_iters"]: + cmd += ["--{}".format(k.replace("_", "-")), str(getattr(args, k))] + +if args.max_iters == 0: + args.max_iters = 1 + +# run +for tag in tags: + # setup for next iteration + iternum0 = gt.get_next_iter( + output_root=rundir, output_tag=tag, iternum=iternums[tag] + ) + if iternum0 > args.max_iters: + raise ValueError( + "Tag {} iteration {} > max {}".format(tag, iternum0, args.max_iters) + ) + gcorr_job = None + + for iternum in range(iternum0, args.max_iters): + print("Starting {} iteration {}".format(tag, iternum)) + + # compute ensemble bandpowers + if args.submit: + run_opts["dep_afterok"] = gcorr_job + bp_jobs = gt.xfaster_gcorr(output_tag=tag, iternum=iternum, **run_opts) + + # compute gcorr + if args.submit: + # submit analysis job + gcorr_job = batch_sub( + cmd + ["-a", "-t", tag, "-i", str(iternum)], + name="gcorr_{}".format(tag), + workdir=os.path.abspath(os.path.join(rundir, tag, "logs")), + dep_afterok=bp_jobs, + **submit_opts, + ) + else: + if gt.process_gcorr(output_tag=tag, iternum=iternum, **gcorr_opts): + break diff --git a/scripts/gcorr/run_xfaster.py b/scripts/gcorr/run_xfaster.py deleted file mode 100644 index f8b2fa45..00000000 --- a/scripts/gcorr/run_xfaster.py +++ /dev/null @@ -1,109 +0,0 @@ -""" -A script to run XFaster for gcorr calculation. Called by iterate.py. -""" -import os -import xfaster as xf -import argparse as ap -from configparser import ConfigParser - -# Change XFaster options here to suit your purposes -opts = dict( - likelihood=False, - residual_fit=False, - foreground_fit=False, - # change options below for your purposes - tbeb=True, - bin_width=25, - lmin=2, - lmax=500, -) - -# Change submit options here to fit your system -submit_opts = dict(nodes=1, ppn=1, mem=6, omp_threads=10, wallt=4) - -P = ap.ArgumentParser() -P.add_argument("--gcorr-config", help="The config file for gcorr computation") -P.add_argument("-f", "--first", default=0, type=int, help="First sim index to run") -P.add_argument("-n", "--num", default=1, type=int, help="Number of sims to run") -P.add_argument( - "-o", "--output", default="xfaster_gcal", help="Name of output subdirectory" -) -P.add_argument( - "--no-gcorr", - dest="gcorr", - default=True, - action="store_false", - help="Don't apply a g-gcorrection", -) -P.add_argument( - "--reload-gcorr", default=False, action="store_true", help="Reload the gcorr factor" -) -P.add_argument("--check-point", default="bandpowers", help="XFaster checkpoint") -P.add_argument( - "--no-submit", dest="submit", action="store_false", help="Don't submit, run locally" -) -P.add_argument( - "--omp", - default=None, - type=int, - help="Number of omp threads, if submit. Overwrites value in config file", -) - -args = P.parse_args() - -# start by loading up gcorr config file and parsing it -assert os.path.exists(args.gcorr_config), "Missing config file {}".format( - args.gcorr_config -) -g_cfg = ConfigParser() -g_cfg.read(args.gcorr_config) - -# set all user-specific xfaster opts -for k, v in g_cfg["xfaster_opts"].items(): - opts[k] = v -null = g_cfg.getboolean("gcorr_opts", "null") -tags = g_cfg["gcorr_opts"]["map_tags"].split(",") - -# null tests should use noise sims. signal shouldn't. -if null: - opts["noise_type"] = g_cfg["xfaster_opts"]["noise_type"] - opts["sim_data_components"] = ["signal", "noise"] -else: - opts["noise_type"] = None - opts["sim_data_components"] = ["signal"] - -opts["output_root"] = os.path.join(g_cfg["gcorr_opts"]["output_root"], args.output) - -# update opts with command line args -opts["apply_gcorr"] = args.gcorr -opts["reload_gcorr"] = args.reload_gcorr -opts["checkpoint"] = args.check_point - -seeds = list(range(args.first, args.first + args.num)) - -for tag in tags: - opts["sim_data"] = True - opts["output_tag"] = tag - opts["gcorr_file"] = os.path.abspath( - os.path.join( - g_cfg["gcorr_opts"]["output_root"], - "xfaster_gcal", - tag, - "gcorr_{}_total.npz".format(tag), - ) - ) - opts["data_subset"] = os.path.join( - g_cfg["gcorr_opts"]["data_subset"], "*{}".format(tag) - ) - if args.omp is not None: - submit_opts["omp_threads"] = args.omp - - if args.submit: - opts.update(**submit_opts) - - for s in seeds: - opts["sim_index_default"] = s - if args.submit: - xf.xfaster_submit(**opts) - else: - xf.xfaster_run(**opts) diff --git a/xfaster/gcorr_tools.py b/xfaster/gcorr_tools.py new file mode 100644 index 00000000..6427d9b4 --- /dev/null +++ b/xfaster/gcorr_tools.py @@ -0,0 +1,631 @@ +import os +import glob +import shutil +import copy +import time +import numpy as np +from . import parse_tools as pt +import subprocess as sp + + +def get_gcorr_config(filename): + """ + Return a dictionary of options for running the gcorr iteration scripts. + + Arguments + --------- + filename : str or dict + Filename to load. + + Returns + ------- + cfg : dict + """ + if isinstance(filename, dict): + return filename + + assert os.path.exists(filename), "Missing config file {}".format(filename) + + from configparser import ConfigParser + from ast import literal_eval + + cfg = ConfigParser() + cfg.read(filename) + + out = {} + for sec, sec_items in cfg.items(): + if not len(sec_items): + continue + out[sec] = {} + for k, v in sec_items.items(): + if k in ["map_tags"]: + v = [x.strip() for x in v.split(",")] + else: + try: + v = literal_eval(v) + except: + pass + if isinstance(v, str) and v.lower() in cfg.BOOLEAN_STATES: + v = cfg.getboolean(sec, k) + + out[sec][k] = v + + return out + + +def get_next_iter(output_root="xfaster_gcal", output_tag=None, iternum=None): + """ + Find the index for the next iteration. + + Arguments + --------- + output_root : str + Output root where the data product will be stored. + output_tag : str + Map tag to analyze + iternum : int + Override iteration number. If supplied, all gcorr files for this + iteration and higher are removed from disk. + + Returns + ------- + iternum : int + Next iteration number + """ + + if not os.path.exists(output_root): + return 0 + + if output_tag is None: + tag = "" + else: + output_root = os.path.join(output_root, output_tag) + tag = "_{}".format(output_tag) + + pattern = os.path.join(output_root, "gcorr_total{}_iter*.npz".format(tag)) + if iternum is None: + return len(glob.glob(pattern)) + + # clear out iteration files from previous runs that will be ovewritten + gfiles = glob.glob(os.path.join(output_root, "gcorr*{}_iter*.npz".format(tag))) + for f in gfiles: + if int(f.split("iter")[1].split(".")[0]) >= iternum: + os.remove(f) + iterdirs = glob.glob(os.path.join(output_root, "iter*")) + for d in iterdirs: + if int(os.path.basename(d).replace("iter", "")) >= iternum: + shutil.rmtree(d) + for f in ["*bandpowers_sim", "transfer"]: + sp.check_call("rm -rf {}/{}*{}.npz".format(output_root, f, tag), shell=True) + + # ensure that iterations are contiguous + if len(glob.glob(pattern)) != iternum: + raise ValueError("Invalid iteration number") + + return iternum + + +def xfaster_gcorr( + output_root="xfaster_gcal", + output_tag=None, + data_subset="full", + data_subset2=None, + null=False, + iternum=None, + sim_index=0, + num_sims=1, + num_jobs=1, + submit=False, + **opts, +): + """ + Run XFaster for the gcorr calculation. + + Arguments + --------- + output_root : str + Output root where the data product will be stored. + output_tag : str + Map tag to analyze + data_subset : str + Data subset directory from which to load data maps. The glob pattern + for the `data_subset` XFaster option is built from this string for each + map tag as `/*_`. + null : bool + If True, this is a null test run. + iternum : int + Iteration number for which to compute bandpowers. + sim_index : int + First sim index to run + num_sims : int + Number of sims to run + num_jobs : int + Number of jobs to split each sim ensemble into + submit : bool + If True, submit jobs to a cluster. + Requires submit_opts section to be present in the config. + opts : + Remaining options are passed directly to `xfaster_run` or + `xfaster_submit`. + """ + if iternum is None: + iternum = get_next_iter(output_root, output_tag) + + print( + "{} XFaster jobs for {} iteration {}".format( + "Submitting" if submit else "Running", output_tag, iternum + ) + ) + + if null: + assert opts.get("noise_type") is not None, "Missing noise_type" + assert ( + opts.get("data_root2") is not None or data_subset2 is not None + ), "Missing data_root2 or data_subset2 for null gcorr" + opts["sim_data_components"] = ["signal", "noise"] + else: + opts["noise_type"] = None + opts["data_root2"] = None + opts["sim_data_components"] = ["signal"] + + opts["output_root"] = output_root + opts["output_tag"] = output_tag + opts["apply_gcorr"] = iternum > 0 + opts["reload_gcorr"] = iternum > 0 + opts["checkpoint"] = "transfer" + opts["sim_data"] = True + opts["save_sim_data"] = True + opts["qb_only"] = True + opts["sim_index_default"] = sim_index + opts["num_sims"] = num_sims + + if output_tag is None: + tag = "" + groot = output_root + else: + tag = "_{}".format(output_tag) + groot = os.path.join(output_root, output_tag) + + opts["data_subset"] = os.path.join(data_subset, "*{}".format(tag)) + if null and data_subset2 is not None: + opts["data_subset2"] = os.path.join(data_subset2, "*{}".format(tag)) + + if iternum > 0: + gfile = os.path.join( + groot, "gcorr_total{}_iter{:03d}.npz".format(tag, iternum - 1) + ) + opts["gcorr_file"] = gfile + if not submit: + assert os.path.exists(gfile), "Missing gcorr file {}".format(gfile) + + from .xfaster_exec import xfaster_submit, xfaster_run + + if submit: + jobs = [] + + # submit first job to compute transfer function + opts["sim_index_default"] = sim_index + opts["num_sims"] = 1 if num_jobs > 1 else num_sims + jobs.extend(xfaster_submit(**opts)) + + # submit remaining jobs to compute bandpowers for the rest of the + # ensemble + if num_sims > 1 and num_jobs > 1: + opts["checkpoint"] = "bandpowers" + opts["dep_afterok"] = [jobs[0]] + opts["reload_gcorr"] = False + + idxs = np.array_split(np.arange(sim_index, sim_index + num_sims), num_jobs) + for idx in idxs: + opts["sim_index_default"] = idx[0] + opts["num_sims"] = len(idx) + jobs.extend(xfaster_submit(**opts)) + + return jobs + else: + try: + xfaster_run(**opts) + except RuntimeError: + pass + + +def compute_gcal( + output_root="xfaster_gcal", output_tag=None, null=False, num_sims=1, fit_hist=False +): + """ + Compute gcorr calibration + + Arguments + --------- + output_root : str + Output root where the data product will be stored. + output_tag : str + Map tag to analyze + null : bool + If True, this is a null test dataset. + num_sims : int + Number of sim bandpowers to expect in the ensemble. + fit_hist : bool + If True, fit the bandpower histogram to a lognorm distribution to + compute the calibration factor. Otherwise, uses the simple variance of + the distribution. + """ + if null: + # no sample variance used for null tests + fish_name = "invfish_nosampvar" + else: + fish_name = "inv_fish" + + # use gauss model for null bandpowers + def gauss(qb, amp, width, offset): + # width = 0.5*1/sig**2 + # offset = mean + return amp * np.exp(-width * (qb - offset) ** 2) + + # use lognormal model for signal bandpowers + def lognorm(qb, amp, width, offset): + return gauss(np.log(qb), amp, width, offset) + + # Compute the correction factor + if output_tag is not None: + output_root = os.path.join(output_root, output_tag) + output_tag = "_{}".format(output_tag) + else: + output_tag = "" + + file_glob = os.path.join(output_root, "bandpowers_sim*{}.npz".format(output_tag)) + files = sorted(glob.glob(file_glob)) + efiles = glob.glob( + os.path.join(output_root, "ERROR_" + os.path.basename(file_glob)) + ) + nf = len(files) + len(efiles) + if nf != num_sims: + raise OSError( + "Found {} bandpowers files in {}, expected {}".format( + nf, output_root, num_sims + ) + ) + + out = {"data_version": 1} + inv_fishes = None + qbs = {} + + for filename in files: + bp = pt.load_and_parse(filename) + inv_fish = np.diag(bp[fish_name]) + check = pt.arr_to_dict(inv_fish < 0, bp["qb"]) + bad = False + for k, v in check.items(): + if not k.startswith("cmb_"): + continue + spec = k.split("_")[1] + # ignore negative fisher values in off-diagonals + if spec in ["te", "eb", "tb"]: + continue + # ignore negative fisher values in first bin + if np.any(v[1:]): + bad = True + break + if bad: + # this happens rarely and we won't use those sims + check = np.where(pt.dict_to_arr(check, flatten=True))[0] + print("Found negative fisher values in {}: {}".format(filename, check)) + continue + + if inv_fishes is None: + inv_fishes = inv_fish + else: + inv_fishes = np.vstack([inv_fishes, inv_fish]) + + for stag, qb1 in bp["qb"].items(): + if not stag.startswith("cmb_"): + continue + spec = stag.split("_")[1] + if spec not in qbs: + qbs[spec] = qb1 + else: + qbs[spec] = np.vstack([qbs[spec], qb1]) + + # Get average XF-estimated variance + xf_var_mean = np.mean(inv_fishes, axis=0) + xf_var = pt.arr_to_dict(xf_var_mean, bp["qb"]) + + out["bin_def"] = bp["bin_def"] + out["gcorr"] = {} + + import scipy.optimize as opt + + for spec in qbs: + stag = "cmb_{}".format(spec) + nbins = len(out["bin_def"][stag]) + out["gcorr"][spec] = np.ones(nbins) + + if not fit_hist: + fit_variance = np.var(qbs[spec], axis=0) + out["gcorr"][spec] = xf_var[stag] / fit_variance + continue + + for b0 in np.arange(nbins): + hist, bins = np.histogram( + np.asarray(qbs[spec])[:, b0], density=True, bins=int(nsim / 10.0) + ) + bc = (bins[:-1] + bins[1:]) / 2.0 + + # Gauss Fisher-based params + A0 = np.max(hist) + sig0 = np.sqrt(xf_var[stag][b0]) + mu0 = np.mean(qbs[spec][b0]) + + if spec in ["eb", "tb"] or null: + func = gauss + else: + func = lognorm + sig0 /= mu0 + mu0 = np.log(mu0) + + # Initial parameter guesses + p0 = [A0, 1.0 / sig0**2 / 2.0, mu0] + + try: + popth, pcovh = opt.curve_fit(func, bc, hist, p0=p0, maxfev=int(1e9)) + # gcorr is XF Fisher variance over fit variance + out["gcorr"][spec][b0] = popth[1] / p0[1] + except RuntimeError as e: + print( + "Error computing gcorr for {} bin {}: {}".format(spec, b0, str(e)) + ) + out["gcorr"][spec][b0] = np.nan + + return out + + +def apply_gcal(gcorr, gtot=None, allow_extreme=False): + """ + Apply gcorr correction to the previous iteration's gcorr file. + + Arguments + --------- + gcorr : dict + Correction-to-gcorr to apply. + gtot : dict + Cumulative gcorr from previous iteration. + allow_extreme : bool + Do not clip gcorr corrections that are too large at each iteration. Try + this if iterations are not converging. + """ + + if gtot is not None: + gtot["gcorr"] = pt.arr_to_dict( + pt.dict_to_arr(gtot["gcorr"], flatten=True) + * pt.dict_to_arr(gcorr["gcorr"], flatten=True), + gtot["gcorr"], + ) + else: + gtot = copy.deepcopy(gcorr) + + for k, v in gtot["gcorr"].items(): + if v[0] < 0.1: + v[0] = 0.1 + if k in ["te", "eb", "tb"]: + # We don't compute gcorr for off-diagonals + v[:] = 1 + if not allow_extreme: + # Don't update gcorr if correction is extreme + v[v < 0.05] /= gcorr["gcorr"][k][v < 0.05] + v[v > 5] /= gcorr["gcorr"][k][v > 5] + for v0, val in enumerate(v): + if val > 2: + if v0 != 0: + v[v0] = v[v0 - 1] + else: + v[v0] = 2 + if val < 0.1: + if v0 != 0: + v[v0] = v[v0 - 1] + else: + v[v0] = 0.1 + + return gtot + + +def plot_gcorr(output_root="xfaster_gcal", output_tag=None): + """ + Plot the gcorr data through the current iteration. + + Arguments + --------- + output_root : str + Output root where the data product will be stored. + output_tag : str + Map tag to analyze + """ + plotdir = os.path.join(output_root, "plots") + if not os.path.exists(plotdir): + os.mkdir(plotdir) + + if output_tag is not None: + output_root = os.path.join(output_root, output_tag) + tag = "_{}".format(output_tag) + else: + tag = "" + + pattern = os.path.join(output_root, "gcorr_total{}_iter*.npz".format(tag)) + files = sorted(glob.glob(pattern)) + assert len(files) > 0, "Missing gcorr files" + + # clear plots from later iterations + pfiles = glob.glob(os.path.join(plotdir, "gcorr{}_iter*.png".format(tag))) + for f in pfiles: + if int(f.split("iter")[-1].split(".")[0]) >= len(files): + os.remove(f) + + import matplotlib.pyplot as plt + from matplotlib import colormaps, cm + + fig = axs = None + try: + colors = colormaps["viridis"].resampled(len(files)).colors + except AttributeError: + colors = cm.get_cmap("viridis", len(files)).colors + + for f, c in zip(files, colors): + gdata = pt.load_and_parse(f)["gcorr"] + cdata = pt.load_and_parse(f.replace("total", "corr"))["gcorr"] + + if fig is None: + pol = "ee" in gdata + fig, axs = plt.subplots(2, 3 if pol else 1, sharex=True, sharey="row") + axs[1, 1].set_xlabel("Bin Number") + axs[0, 0].set_ylabel("{} G Total".format(output_tag)) + axs[1, 0].set_ylabel("{} G Correction".format(output_tag)) + + for spec, ax1 in zip(["tt", "ee", "bb"], axs.T): + if spec not in gdata: + continue + ax1[0].plot(gdata[spec], color=c) + ax1[0].set_title(spec.upper()) + ax1[1].plot(cdata[spec], color=c) + + filename = os.path.join( + plotdir, "gcorr{}_iter{:03d}.png".format(tag, len(files) - 1) + ) + fig.tight_layout() + fig.savefig(filename, bbox_inches="tight") + + +def process_gcorr( + output_root="xfaster_gcal", + output_tag=None, + null=False, + iternum=None, + num_sims=1, + gcorr_fit_hist=False, + allow_extreme=False, + keep_iters=False, + converge_criteria=0.01, + max_iters=0, +): + """ + Run gcorr analysis on a complete ensemble of sim bandpowers. This function + runs ``compute_gcal`` to calculate a correction-to-gcorr, stored in the + running directory as ``gcorr_corr__iter.npz``. It then runs + ``apply_gcal`` to increment the running total gcorr, stored in + ``gcorr_total__iter.npz``. Plots of the correction and total + files for the current iteration are stored in the ``/plots`` + directory. The current iteration is determined by counting the number of + ``gcorr_total`` files. + + Arguments + --------- + Arguments + --------- + output_root : str + Output root where the data product will be stored. + output_tag : str + Map tag to analyze + null : bool + If True, this is a null test dataset. + iternum : int + Iteration number to process. + num_sims : int + Number of sim bandpowers to expect in the ensemble. + gcorr_fit_hist : bool + If True, fit the bandpower histogram to a lognorm distribution to + compute the calibration factor. Otherwise, uses the simple variance of + the distribution. + allow_extreme : bool + Do not clip gcorr corrections that are too large at each iteration. Try + this if iterations are not converging. + keep_iters : bool + If True, store the transfer function and bandpowers outputs from each + iteration in a separate sub-directory, rather than deleting these files. + converge_criteria : float + Maximum fractional change in gcorr that indicates convergence and stops + iteration. + max_iters : int + Maximum number of iterations to run. If 0, run once and return. + + Returns + ------- + retval : bool + True if convergence or iteration index is reached, False otherwise. + """ + if iternum is None: + iternum = get_next_iter(output_root, output_tag) + + print("Processing gcorr for {} iteration {}".format(output_tag, iternum)) + + # Cleanup output directories + if output_tag is not None: + froot = os.path.join(output_root, output_tag) + ftag = "_{}".format(output_tag) + else: + froot = output_root + ftag = "" + + # Compute gcorr correction from bandpower distribution + gcorr = compute_gcal( + output_root=output_root, + output_tag=output_tag, + num_sims=num_sims, + null=null, + fit_hist=gcorr_fit_hist, + ) + cfile = os.path.join(froot, "gcorr_corr{}_iter{:03d}.npz".format(ftag, iternum)) + pt.save(cfile, **gcorr) + + # Apply correction to cumulative gcorr + if iternum > 0: + gfile = os.path.join( + froot, "gcorr_total{}_iter{:03d}.npz".format(ftag, iternum - 1) + ) + gtot = pt.load_and_parse(gfile) + else: + gtot = None + gtot = apply_gcal(gcorr, gtot, allow_extreme=allow_extreme) + gfile = os.path.join(froot, "gcorr_total{}_iter{:03d}.npz".format(ftag, iternum)) + pt.save(gfile, **gtot) + + # Plot stuff + plot_gcorr(output_root, output_tag) + + bp_files = glob.glob(os.path.join(froot, "bandpowers_sim*{}.npz".format(ftag))) + error_files = glob.glob( + os.path.join(froot, "ERROR_bandpowers_sim*{}.npz".format(ftag)) + ) + print("{} iter {} completed: {}".format(output_tag, iternum, len(bp_files))) + print("{} iter {} error: {}".format(output_tag, iternum, len(error_files))) + + flist = ["*bandpowers_sim", "transfer"] + flist = ["{}/{}*{}.npz".format(froot, f, ftag) for f in flist] + + if keep_iters: + # Keep iteration output files + iterdir = os.path.join(froot, "iter{:03d}".format(iternum)) + os.mkdir(iterdir) + for f in flist: + sp.check_call("rsync -a {} {}/".format(f, iterdir), shell=True) + + # Remove transfer functions and bandpowers from run directory + for f in flist: + sp.check_call("rm -rf {}".format(f), shell=True) + + print("{} iter {} gcorr correction: {}".format(output_tag, iternum, gcorr["gcorr"])) + print("{} iter {} total gcorr: {}".format(output_tag, iternum, gtot["gcorr"])) + + gcc = gcorr["gcorr"] + v = np.array([gcc[spec][1:] for spec in ["tt", "ee", "bb"] if spec in gcc]) + gmax = np.max(np.abs(v - 1)) + print("Maximum fractional gcorr correction (among TT/EE/BB bins 1+):", gmax) + if gmax <= converge_criteria: + # Stop iterating + print("{} gcorr converged after {} iterations".format(output_tag, iternum)) + return True + + if iternum >= max_iters: + # Stop iterating + print("{} gcorr not converged after {} iterations".format(output_tag, iternum)) + return True + + # Trigger another iteration + return False