+### General packages
+import os
+import pickle
+import time
+
+import numpy as np
+import qubic
+import yaml
+from pysimulators.interfaces.healpy import HealpixConvolutionGaussianOperator
+
+### Local packages
+from lib.Qacquisition import *
+from lib.Qcg import *
+from lib.Qcomponent_model import *
+from lib.Qfoldertools import *
+from lib.Qmap_plotter import *
+from lib.Qnoise import *
+from lib.Qspectra import *
+
+from .model.externaldata import *
+from .model.models import *
+from .model.planck_timeline import *
+
+
+def save_pkl(name, d):
+ """
+ Function that create a file called "name" to save the dictionary "d" using Pickle package
+
+ Parameters
+ ----------
+ name : string
+ file name
+ d : dict
+ dictionary saved using Pickle
+ """
+ with open(name, "wb") as handle:
+ pickle.dump(d, handle, protocol=pickle.HIGHEST_PROTOCOL)
+
+
+__all__ = ["PipelineFrequencyMapMaking", "PipelineEnd2End"]
+
+
+
+
[docs]
+
class PipelineFrequencyMapMaking:
+
"""
+
Instance to reconstruct frequency maps using QUBIC abilities.
+
+
Parameters
+
----------
+
comm :
+
MPI communicator
+
file : str
+
used to create the forlder for data saving
+
params : dict
+
dictionary containing all the simulation parameters
+
+
"""
+
+
def __init__(self, comm, file, params):
+
"""
+
Initialise PipelineFrequencyMapMaking
+
+
"""
+
+
self.mapmaking_time_0 = time.time()
+
+
self.params = params
+
+
### Fsub
+
self.fsub = int(self.params["QUBIC"]["nsub_out"] / self.params["QUBIC"]["nrec"])
+
+
self.file = file
+
self.plot_folder = "FMM/" + self.params["path_out"] + "png/"
+
+
self.externaldata = PipelineExternalData(file, self.params)
+
self.externaldata.run(fwhm=self.params["QUBIC"]["convolution_in"], noise=True)
+
+
self.externaldata_noise = PipelineExternalData(
+
file, self.params, noise_only=True
+
)
+
self.externaldata_noise.run(
+
fwhm=self.params["QUBIC"]["convolution_in"], noise=True
+
)
+
+
if comm.Get_rank() == 0:
+
if not os.path.isdir("FMM/" + self.params["path_out"] + "maps/"):
+
os.makedirs("FMM/" + self.params["path_out"] + "maps/")
+
if not os.path.isdir("FMM/" + self.params["path_out"] + "png/"):
+
os.makedirs("FMM/" + self.params["path_out"] + "png/")
+
+
self.job_id = os.environ.get("SLURM_JOB_ID")
+
self.center = qubic.equ2gal(
+
self.params["SKY"]["RA_center"], self.params["SKY"]["DEC_center"]
+
)
+
+
### MPI common arguments
+
self.comm = comm
+
self.size = self.comm.Get_size()
+
self.rank = self.comm.Get_rank()
+
+
### Sky
+
self.dict_in = self.get_dict(key="in")
+
self.dict_out = self.get_dict(key="out")
+
self.skyconfig = self.get_sky_config()
+
+
### Joint acquisition for TOD making
+
self.joint_tod = JointAcquisitionFrequencyMapMaking(
+
self.dict_in,
+
self.params["QUBIC"]["instrument"],
+
self.params["QUBIC"]["nsub_in"],
+
self.params["QUBIC"]["nsub_in"],
+
H=None,
+
)
+
+
### Joint acquisition
+
if self.params["QUBIC"]["nsub_in"] == self.params["QUBIC"]["nsub_out"]:
+
H = self.joint_tod.qubic.H
+
else:
+
H = None
+
+
self.joint = JointAcquisitionFrequencyMapMaking(
+
self.dict_out,
+
self.params["QUBIC"]["instrument"],
+
self.params["QUBIC"]["nrec"],
+
self.params["QUBIC"]["nsub_out"],
+
H=H,
+
)
+
+
self.planck_acquisition143 = PlanckAcquisition(143, self.joint.qubic.scene)
+
self.planck_acquisition217 = PlanckAcquisition(217, self.joint.qubic.scene)
+
self.nus_Q = self.get_averaged_nus()
+
+
### Coverage map
+
self.coverage = self.joint.qubic.subacqs[0].get_coverage()
+
covnorm = self.coverage / self.coverage.max()
+
self.seenpix = covnorm > self.params["SKY"]["coverage_cut"]
+
self.seenpix_qubic = covnorm > 0
+
self.fsky = self.seenpix.astype(float).sum() / self.seenpix.size
+
self.coverage_cut = self.coverage.copy()
+
self.coverage_cut[~self.seenpix] = 1
+
+
self.seenpix_for_plot = covnorm > 0
+
self.mask = np.ones(12 * self.params["SKY"]["nside"] ** 2)
+
self.mask[self.seenpix] = self.params["PLANCK"]["weight_planck"]
+
+
### Angular resolutions
+
self.fwhm_in, self.fwhm_out, self.fwhm_rec = self.get_convolution()
+
+
self.external_timeline = ExternalData2Timeline(
+
self.skyconfig,
+
self.joint.qubic.allnus,
+
self.params["QUBIC"]["nrec"],
+
nside=self.params["SKY"]["nside"],
+
corrected_bandpass=self.params["QUBIC"]["bandpass_correction"],
+
)
+
+
### Initial maps
+
self.m_nu_in = self.get_input_map()
+
+
### Define reconstructed and TOD operator
+
self.get_H()
+
+
### Inverse noise covariance matrix
+
self.invN = self.joint.get_invntt_operator(mask=self.mask)
+
+
### Noises
+
seed_noise_planck = self.get_random_value()
+
+
self.noise143 = (
+
self.planck_acquisition143.get_noise(seed_noise_planck)
+
* self.params["PLANCK"]["level_noise_planck"]
+
)
+
self.noise217 = (
+
self.planck_acquisition217.get_noise(seed_noise_planck + 1)
+
* self.params["PLANCK"]["level_noise_planck"]
+
)
+
+
if self.params["QUBIC"]["instrument"] == "DB":
+
qubic_noise = QubicDualBandNoise(
+
self.dict_out,
+
self.params["QUBIC"]["npointings"],
+
self.params["QUBIC"]["NOISE"]["detector_nep"],
+
)
+
elif self.params["QUBIC"]["instrument"] == "UWB":
+
qubic_noise = QubicWideBandNoise(
+
self.dict_out,
+
self.params["QUBIC"]["npointings"],
+
self.params["QUBIC"]["NOISE"]["detector_nep"],
+
)
+
+
self.noiseq = qubic_noise.total_noise(
+
self.params["QUBIC"]["NOISE"]["ndet"],
+
self.params["QUBIC"]["NOISE"]["npho150"],
+
self.params["QUBIC"]["NOISE"]["npho220"],
+
seed_noise=seed_noise_planck,
+
).ravel()
+
+
### Initialize plot instance
+
self.plots = PlotsFMM(self.seenpix)
+
+
+
[docs]
+
def get_components_fgb(self):
+
"""Components FGbuster
+
+
Method to build a dictionary containing all the wanted components to generate sky maps.
+
Based on FGBuster.
+
+
Returns
+
-------
+
dict_comps: dict
+
Dictionary containing the component instances.
+
+
"""
+
+
dict_comps = []
+
+
if self.params["CMB"]["cmb"]:
+
dict_comps += [CMB()]
+
+
if self.params["Foregrounds"]["Dust"]:
+
dict_comps += [Dust(nu0=150, beta_d=1.54, temp=20)]
+
+
if self.params["Foregrounds"]["Synchrotron"]:
+
dict_comps += [Synchrotron(nu0=150, beta_pl=-3)]
+
+
return dict_comps
+
+
+
+
[docs]
+
def get_random_value(self):
+
"""Random value
+
+
Method to build a random seed.
+
+
Returns
+
-------
+
seed: int
+
Random seed.
+
+
"""
+
+
np.random.seed(None)
+
if self.rank == 0:
+
seed = np.random.randint(10000000)
+
else:
+
seed = None
+
+
seed = self.comm.bcast(seed, root=0)
+
return seed
+
+
+
+
[docs]
+
def get_H(self):
+
"""Acquisition operator
+
+
Method to compute QUBIC acquisition operators.
+
+
"""
+
+
### Pointing matrix for TOD generation
+
self.H_in = self.joint_tod.get_operator(
+
fwhm=self.fwhm_in
+
) # , external_data=self.params['PLANCK']['external_data'])
+
+
### QUBIC Pointing matrix for TOD generation
+
self.H_in_qubic = self.joint_tod.qubic.get_operator(fwhm=self.fwhm_in)
+
+
### Pointing matrix for reconstruction
+
self.H_out_all_pix = self.joint.get_operator(fwhm=self.fwhm_out)
+
self.H_out = self.joint.get_operator(
+
fwhm=self.fwhm_out, seenpix=self.seenpix
+
) # , external_data=self.params['PLANCK']['external_data'])
+
+
+
+
[docs]
+
def get_averaged_nus(self):
+
"""Average frequency
+
+
Method to average QUBIC frequencies according to the number of reconstructed frequency maps.
+
+
Returns
+
-------
+
nus_ave: array_like
+
array containing the averaged QUBIC frequencies.
+
+
"""
+
+
nus_ave = []
+
for i in range(self.params["QUBIC"]["nrec"]):
+
nus_ave += [
+
np.mean(self.joint.qubic.allnus[i * self.fsub : (i + 1) * self.fsub])
+
]
+
+
return np.array(nus_ave)
+
+
+
+
[docs]
+
def get_sky_config(self):
+
"""Sky configuration.
+
+
Method that read 'params.yml' file and create dictionary containing sky emission model.
+
+
Returns
+
-------
+
dict_sky: dict
+
Sky config dictionary.
+
+
Notes
+
-----
+
Note that the key denote the emission and the value denote the sky model using PySM convention.
+
For CMB, seed denote the realization.
+
+
Example
+
-------
+
d = {'cmb':seed, 'dust':'d0', 'synchrotron':'s0'}
+
+
"""
+
+
dict_sky = {}
+
+
if self.params["CMB"]["cmb"]:
+
if self.params["CMB"]["seed"] == 0:
+
if self.rank == 0:
+
seed = np.random.randint(10000000)
+
else:
+
seed = None
+
seed = self.comm.bcast(seed, root=0)
+
else:
+
seed = self.params["CMB"]["seed"]
+
print(f"Seed of the CMB is {seed} for rank {self.rank}")
+
dict_sky["cmb"] = seed
+
+
for j in self.params["Foregrounds"]:
+
# print(j, self.params['Foregrounds'][j])
+
if j == "Dust":
+
if self.params["Foregrounds"][j]:
+
dict_sky["dust"] = "d0"
+
elif j == "Synchrotron":
+
if self.params["Foregrounds"][j]:
+
dict_sky["synchrotron"] = "s0"
+
+
return dict_sky
+
+
+
+
[docs]
+
def get_dict(self, key="in"):
+
"""QUBIC dictionary.
+
+
Method to modify the qubic dictionary.
+
+
Parameters
+
----------
+
key : str, optional
+
Can be "in" or "out".
+
It is used to build respectively the instances to generate the TODs or to reconstruct the sky maps,
+
by default "in".
+
+
Returns
+
-------
+
dict_qubic: dict
+
Modified QUBIC dictionary.
+
+
"""
+
+
args = {
+
"npointings": self.params["QUBIC"]["npointings"],
+
"nf_recon": self.params["QUBIC"]["nrec"],
+
"nf_sub": self.params["QUBIC"][f"nsub_{key}"],
+
"nside": self.params["SKY"]["nside"],
+
"MultiBand": True,
+
"period": 1,
+
"RA_center": self.params["SKY"]["RA_center"],
+
"DEC_center": self.params["SKY"]["DEC_center"],
+
"filter_nu": 150 * 1e9,
+
"noiseless": False,
+
"comm": self.comm,
+
"dtheta": self.params["QUBIC"]["dtheta"],
+
"nprocs_sampling": 1,
+
"nprocs_instrument": self.size,
+
"photon_noise": True,
+
"nhwp_angles": 3,
+
#'effective_duration':3,
+
"effective_duration150": 3,
+
"effective_duration220": 3,
+
"filter_relative_bandwidth": 0.25,
+
"type_instrument": "wide",
+
"TemperatureAtmosphere150": None,
+
"TemperatureAtmosphere220": None,
+
"EmissivityAtmosphere150": None,
+
"EmissivityAtmosphere220": None,
+
"detector_nep": float(self.params["QUBIC"]["NOISE"]["detector_nep"]),
+
"synthbeam_kmax": self.params["QUBIC"]["SYNTHBEAM"]["synthbeam_kmax"],
+
}
+
+
### Get the default dictionary
+
dictfilename = "dicts/pipeline_demo.dict"
+
dict_qubic = qubic.qubicdict.qubicDict()
+
dict_qubic.read_from_file(dictfilename)
+
+
for i in args.keys():
+
+
dict_qubic[str(i)] = args[i]
+
+
return dict_qubic
+
+
+
+
[docs]
+
def get_convolution(self):
+
"""QUBIC resolutions.
+
+
Method to define expected QUBIC angular resolutions (radians) as function of frequencies.
+
+
Returns
+
-------
+
fwhm_in: array_like
+
Intrinsic resolutions, used to build the simulated TOD.
+
fwhm_out: array_like
+
Output resolutions. If we don't apply convolutions during reconstruction, array of zeros.
+
fwhm_rec: array_like
+
Reconstructed resolutions. Egal the output resolutions if we apply convolutions during reconstructions, evaluate through analytic formula otherwise.
+
+
"""
+
+
### Define FWHMs
+
+
fwhm_in = None
+
fwhm_out = None
+
+
### FWHMs during map-making
+
if self.params["QUBIC"]["convolution_in"]:
+
fwhm_in = self.joint.qubic.allfwhm.copy()
+
if self.params["QUBIC"]["convolution_out"]:
+
fwhm_out = np.array([])
+
for irec in range(self.params["QUBIC"]["nrec"]):
+
fwhm_out = np.append(
+
fwhm_out,
+
np.sqrt(
+
self.joint.qubic.allfwhm[
+
irec * self.fsub : (irec + 1) * self.fsub
+
]
+
** 2
+
- np.min(
+
self.joint.qubic.allfwhm[
+
irec * self.fsub : (irec + 1) * self.fsub
+
]
+
)
+
** 2
+
),
+
)
+
+
### Define reconstructed FWHM depending on the user's choice
+
if (
+
self.params["QUBIC"]["convolution_in"]
+
and self.params["QUBIC"]["convolution_out"]
+
):
+
fwhm_rec = np.array([])
+
for irec in range(self.params["QUBIC"]["nrec"]):
+
fwhm_rec = np.append(
+
fwhm_rec,
+
np.min(
+
self.joint.qubic.allfwhm[
+
irec * self.fsub : (irec + 1) * self.fsub
+
]
+
),
+
)
+
+
elif (
+
self.params["QUBIC"]["convolution_in"]
+
and self.params["QUBIC"]["convolution_out"] is False
+
):
+
fwhm_rec = np.array([])
+
for irec in range(self.params["QUBIC"]["nrec"]):
+
fwhm_rec = np.append(
+
fwhm_rec,
+
np.mean(
+
self.joint.qubic.allfwhm[
+
irec * self.fsub : (irec + 1) * self.fsub
+
]
+
),
+
)
+
+
else:
+
fwhm_rec = np.array([])
+
for irec in range(self.params["QUBIC"]["nrec"]):
+
fwhm_rec = np.append(fwhm_rec, 0)
+
+
if self.rank == 0:
+
print(f"FWHM for TOD generation : {fwhm_in}")
+
print(f"FWHM for reconstruction : {fwhm_out}")
+
print(f"Final FWHM : {fwhm_rec}")
+
+
return fwhm_in, fwhm_out, fwhm_rec
+
+
+
+
+
+
+
[docs]
+
def get_tod(self, noise=False):
+
r"""Simulated TOD.
+
+
Method that compute observed TODs with :math:`\vec{TOD} = H \cdot \vec{s} + \vec{n}`, with H the QUBIC operator, :math:`\vec{s}` the sky signal and :math:`\vec{n}` the instrumental noise`.
+
+
Parameters
+
----------
+
noise : bool, optional
+
True if you want to simulate noise in your data, False otherwise, by default False.
+
+
Returns
+
-------
+
TOD: array_like
+
Simulated TOD :math:`(N_{rec}, 12 \times N^{2}_{side}, N_{stk})`.
+
+
"""
+
+
if noise:
+
factor = 0
+
else:
+
factor = 1
+
if self.params["QUBIC"]["instrument"] == "UWB":
+
if self.params["QUBIC"]["nrec"] != 1:
+
TOD_PLANCK = np.zeros(
+
(
+
self.params["QUBIC"]["nrec"],
+
12 * self.params["SKY"]["nside"] ** 2,
+
3,
+
)
+
)
+
for irec in range(int(self.params["QUBIC"]["nrec"] / 2)):
+
if self.params["QUBIC"]["convolution_in"]:
+
C = HealpixConvolutionGaussianOperator(
+
fwhm=np.min(
+
self.fwhm_in[irec * self.fsub : (irec + 1) * self.fsub]
+
)
+
)
+
+
else:
+
C = HealpixConvolutionGaussianOperator(fwhm=0)
+
+
TOD_PLANCK[irec] = C(
+
factor * self.external_timeline.maps[irec] + self.noise143
+
)
+
+
for irec in range(
+
int(self.params["QUBIC"]["nrec"] / 2), self.params["QUBIC"]["nrec"]
+
):
+
if self.params["QUBIC"]["convolution_in"]:
+
C = HealpixConvolutionGaussianOperator(
+
fwhm=np.min(
+
self.fwhm_in[irec * self.fsub : (irec + 1) * self.fsub]
+
)
+
)
+
+
else:
+
C = HealpixConvolutionGaussianOperator(fwhm=0)
+
+
TOD_PLANCK[irec] = C(
+
factor * self.external_timeline.maps[irec] + self.noise217
+
)
+
else:
+
TOD_PLANCK = np.zeros(
+
(
+
2 * self.params["QUBIC"]["nrec"],
+
12 * self.params["SKY"]["nside"] ** 2,
+
3,
+
)
+
)
+
+
if self.params["QUBIC"]["convolution_in"]:
+
C = HealpixConvolutionGaussianOperator(fwhm=self.fwhm_in[-1])
+
else:
+
C = HealpixConvolutionGaussianOperator(fwhm=0)
+
+
TOD_PLANCK[0] = C(
+
factor * self.external_timeline.maps[0] + self.noise143
+
)
+
TOD_PLANCK[1] = C(
+
factor * self.external_timeline.maps[0] + self.noise217
+
)
+
+
TOD_PLANCK = TOD_PLANCK.ravel()
+
TOD_QUBIC = (
+
self.H_in_qubic(factor * self.external_timeline.m_nu).ravel()
+
+ self.noiseq
+
)
+
if self.params["PLANCK"]["external_data"]:
+
TOD = np.r_[TOD_QUBIC, TOD_PLANCK]
+
else:
+
TOD = TOD_QUBIC
+
+
else:
+
+
sh_q = self.joint.qubic.ndets * self.joint.qubic.nsamples
+
TOD_QUBIC = (
+
self.H_in_qubic(factor * self.external_timeline.m_nu).ravel()
+
+ self.noiseq
+
)
+
if self.params["PLANCK"]["external_data"] == False:
+
TOD = TOD_QUBIC
+
else:
+
TOD_QUBIC150 = TOD_QUBIC[:sh_q].copy()
+
TOD_QUBIC220 = TOD_QUBIC[sh_q:].copy()
+
+
TOD = TOD_QUBIC150.copy()
+
TOD_PLANCK = np.zeros(
+
(
+
self.params["QUBIC"]["nrec"],
+
12 * self.params["SKY"]["nside"] ** 2,
+
3,
+
)
+
)
+
for irec in range(int(self.params["QUBIC"]["nrec"] / 2)):
+
if self.params["QUBIC"]["convolution_in"]:
+
C = HealpixConvolutionGaussianOperator(
+
fwhm=np.min(
+
self.fwhm_in[irec * self.fsub : (irec + 1) * self.fsub]
+
)
+
)
+
+
else:
+
C = HealpixConvolutionGaussianOperator(fwhm=0)
+
+
TOD = np.r_[
+
TOD,
+
C(
+
factor * self.external_timeline.maps[irec] + self.noise143
+
).ravel(),
+
]
+
+
TOD = np.r_[TOD, TOD_QUBIC220.copy()]
+
for irec in range(
+
int(self.params["QUBIC"]["nrec"] / 2), self.params["QUBIC"]["nrec"]
+
):
+
if self.params["QUBIC"]["convolution_in"]:
+
C = HealpixConvolutionGaussianOperator(
+
fwhm=np.min(
+
self.fwhm_in[irec * self.fsub : (irec + 1) * self.fsub]
+
)
+
)
+
+
else:
+
C = HealpixConvolutionGaussianOperator(fwhm=0)
+
+
TOD = np.r_[
+
TOD,
+
C(
+
factor * self.external_timeline.maps[irec] + self.noise217
+
).ravel(),
+
]
+
+
return TOD
+
+
+
def _barrier(self):
+
"""
+
Method to introduce comm._Barrier() function if MPI communicator is detected.
+
+
"""
+
if self.comm is None:
+
pass
+
else:
+
self.comm.Barrier()
+
+
def _print_message(self, message):
+
"""
+
Method to print message only on rank 0 if MPI communicator is detected. It display simple message if not.
+
+
"""
+
+
if self.comm is None:
+
print(message)
+
else:
+
if self.rank == 0:
+
print(message)
+
+
+
[docs]
+
def get_preconditioner(self):
+
"""PCG Preconditioner.
+
+
Computed using the formula: To be added.
+
+
Returns
+
-------
+
M: DiagonalOperator
+
Preconditioner for PCG algorithm.
+
"""
+
+
if self.params["PCG"]["preconditioner"]:
+
+
approx_hth = np.zeros(
+
(
+
self.params["QUBIC"]["nsub_out"],
+
12 * self.params["SKY"]["nside"] ** 2,
+
3,
+
)
+
)
+
conditioner = np.zeros(
+
(self.params["QUBIC"]["nrec"], 12 * self.params["SKY"]["nside"] ** 2, 3)
+
)
+
vec = np.ones(self.joint.qubic.H[0].shapein)
+
+
for i in range(self.params["QUBIC"]["nsub_out"]):
+
for j in range(self.params["QUBIC"]["nsub_out"]):
+
approx_hth[i] = (
+
self.joint.qubic.H[i].T
+
* self.joint.qubic.invn220
+
* self.joint.qubic.H[j](vec)
+
)
+
+
for irec in range(self.params["QUBIC"]["nrec"]):
+
imin = irec * self.fsub
+
imax = (irec + 1) * self.fsub
+
for istk in range(3):
+
conditioner[irec, self.seenpix, istk] = 1 / (
+
np.sum(approx_hth[imin:imax, self.seenpix, 0], axis=0)
+
)
+
+
conditioner[conditioner == np.inf] = 1
+
+
M = DiagonalOperator(conditioner[:, self.seenpix, :])
+
else:
+
M = None
+
return M
+
+
+
+
[docs]
+
def pcg(self, d, x0, seenpix):
+
r"""Preconditioned Conjugate Gradiant algorithm.
+
+
Solve the map-making equation iteratively : :math:`(H^T . N^{-1} . H) . x = H^T . N^{-1} . d`.
+
+
The PCG used for the minimization is intrinsequely parallelized (e.g see PyOperators).
+
+
Parameters
+
----------
+
d : array_like
+
Array containing the TODs generated previously :math:`(N_{rec}, 12 \times N^{2}_{side}, N_{stk})`.
+
x0 : array_like
+
Starting point of the PCG algorithm :math:`(N_{rec}, 12 \times N^{2}_{side}, N_{stk})`.
+
seenpix : array_like
+
Boolean array to define the pixels seen by QUBIC :math:`(N_{rec}, 12 \times N^{2}_{side}, N_{stk})`.
+
+
Returns
+
-------
+
solution: array_like
+
Reconstructed maps :math:`(N_{rec}, 12 \times N^{2}_{side}, N_{stk})`.
+
+
"""
+
+
### Update components when pixels outside the patch are fixed (assumed to be 0)
+
A = self.H_out.T * self.invN * self.H_out
+
+
x_planck = self.m_nu_in * (1 - seenpix[None, :, None])
+
b = self.H_out.T * self.invN * (d - self.H_out_all_pix(x_planck))
+
### Preconditionning
+
M = self.get_preconditioner()
+
+
if self.params["PCG"]["gif"]:
+
gif_folder = self.plot_folder + f"{self.job_id}/iter/"
+
else:
+
gif_folder = None
+
+
### PCG
+
solution_qubic_planck = pcg(
+
A=A,
+
b=b,
+
comm=self.comm,
+
x0=x0,
+
M=M,
+
tol=self.params["PCG"]["tol_pcg"],
+
disp=True,
+
maxiter=self.params["PCG"]["n_iter_pcg"],
+
gif_folder=gif_folder,
+
job_id=self.job_id,
+
seenpix=self.seenpix,
+
seenpix_plot=self.seenpix,
+
center=self.center,
+
reso=self.params["PCG"]["resolution_plot"],
+
fwhm_plot=self.params["PCG"]["fwhm_plot"],
+
input=self.m_nu_in,
+
)
+
+
if self.params["PCG"]["gif"]:
+
do_gif(gif_folder, "iter_", output="animation.gif")
+
+
self._barrier()
+
+
if self.params["QUBIC"]["nrec"] == 1:
+
solution_qubic_planck["x"]["x"] = np.array(
+
[solution_qubic_planck["x"]["x"]]
+
)
+
+
solution = np.ones(self.m_nu_in.shape) * hp.UNSEEN
+
solution[:, seenpix, :] = solution_qubic_planck["x"]["x"].copy()
+
+
return solution
+
+
+
def _save_data(self, name, d):
+
"""
+
Method to save data using pickle convention.
+
+
"""
+
+
with open(name, "wb") as handle:
+
pickle.dump(d, handle, protocol=pickle.HIGHEST_PROTOCOL)
+
+
+
[docs]
+
def run(self):
+
"""Run the FMM Pipeline.
+
+
Method to run the whole pipeline from TOD generation from sky reconstruction by reading `params.yml` file.
+
+
"""
+
+
self._print_message("\n=========== Map-Making ===========\n")
+
+
### Get simulated data
+
self.TOD = self.get_tod(noise=False)
+
self.n = self.get_tod(noise=True)
+
+
### Wait for all processes
+
self._barrier()
+
+
if self.params["QUBIC"]["convolution_in"]:
+
for i in range(self.params["QUBIC"]["nrec"]):
+
C = HealpixConvolutionGaussianOperator(fwhm=self.fwhm_rec[i])
+
self.m_nu_in[i] = C(self.m_nu_in[i])
+
+
x0 = self.m_nu_in.copy()
+
x0[:, self.seenpix, :] = 0
+
+
### Solve map-making equation
+
self.s_hat = self.pcg(self.TOD, x0=x0[:, self.seenpix, :], seenpix=self.seenpix)
+
+
### Wait for all processes
+
self._barrier()
+
+
### n = m_signalnoisy - m_signal
+
self.s_hat_noise = self.s_hat - self.m_nu_in
+
+
### Ensure that non seen pixels is 0 for spectrum computation
+
self.s_hat[:, ~self.seenpix, :] = 0
+
self.s_hat_noise[:, ~self.seenpix, :] = 0
+
+
### Plots and saving
+
if self.rank == 0:
+
+
self.external_maps = self.externaldata.maps.copy()
+
self.external_maps[:, ~self.seenpix, :] = 0
+
+
self.external_maps_noise = self.externaldata_noise.maps.copy()
+
self.external_maps_noise[:, ~self.seenpix, :] = 0
+
+
if len(self.externaldata.external_nus) != 0:
+
fwhm_ext = self.externaldata.fwhm_ext.copy()
+
self.s_hat = np.concatenate((self.s_hat, self.external_maps), axis=0)
+
self.s_hat_noise = np.concatenate(
+
(self.s_hat_noise, self.external_maps_noise), axis=0
+
)
+
self.nus_rec = np.array(
+
list(self.nus_Q) + list(self.externaldata.external_nus)
+
)
+
self.fwhm_rec = np.array(list(self.fwhm_rec) + list(fwhm_ext))
+
+
self.plots.plot_frequency_maps(
+
self.m_nu_in[: len(self.nus_Q)],
+
self.s_hat[: len(self.nus_Q)],
+
self.center,
+
reso=15,
+
nsig=3,
+
filename=self.plot_folder + f"/all_maps.png",
+
figsize=(10, 5),
+
)
+
# self.plots.plot_FMM(self.m_nu_in, self.s_hat, self.center, self.seenpix, self.nus_Q, job_id=self.job_id, istk=0, nsig=3, name='signal')
+
# self.plots.plot_FMM(self.m_nu_in, self.s_hat, self.center, self.seenpix, self.nus_Q, job_id=self.job_id, istk=1, nsig=3, name='signal')
+
# self.plots.plot_FMM(self.m_nu_in, self.s_hat, self.center, self.seenpix, self.nus_Q, job_id=self.job_id, istk=2, nsig=3, name='signal')
+
+
# self.plots.plot_FMM(self.m_nu_in*0, self.s_hat_noise, self.center, self.seenpix, self.nus_Q, job_id=self.job_id, istk=0, nsig=3, name='noise')
+
# self.plots.plot_FMM(self.m_nu_in*0, self.s_hat_noise, self.center, self.seenpix, self.nus_Q, job_id=self.job_id, istk=1, nsig=3, name='noise')
+
# self.plots.plot_FMM(self.m_nu_in*0, self.s_hat_noise, self.center, self.seenpix, self.nus_Q, job_id=self.job_id, istk=2, nsig=3, name='noise')
+
+
mapmaking_time = time.time() - self.mapmaking_time_0
+
if self.comm is None:
+
print(f"Map-making done in {mapmaking_time:.3f} s")
+
else:
+
if self.rank == 0:
+
print(f"Map-making done in {mapmaking_time:.3f} s")
+
+
dict_solution = {
+
"maps": self.s_hat,
+
"maps_noise": self.s_hat_noise,
+
"nus": self.nus_rec,
+
"coverage": self.coverage,
+
"center": self.center,
+
"maps_in": self.m_nu_in,
+
"parameters": self.params,
+
"fwhm_in": self.fwhm_in,
+
"fwhm_out": self.fwhm_out,
+
"fwhm_rec": self.fwhm_rec,
+
"duration": mapmaking_time,
+
}
+
+
self._save_data(self.file, dict_solution)
+
self._barrier()
+
+
+
+
+
+
[docs]
+
class PipelineEnd2End:
+
"""FMM Pipeline.
+
+
Wrapper for End-2-End pipeline. It added class one after the others by running method.run().
+
+
"""
+
+
def __init__(self, comm):
+
+
with open("FMM/params.yml", "r") as stream:
+
self.params = yaml.safe_load(stream)
+
+
self.comm = comm
+
self.job_id = os.environ.get("SLURM_JOB_ID")
+
+
self.folder = "FMM/" + self.params["path_out"] + "maps/"
+
self.file = self.folder + self.params["datafilename"] + f"_{self.job_id}.pkl"
+
self.file_spectrum = (
+
"FMM/"
+
+ self.params["path_out"]
+
+ "spectrum/"
+
+ "spectrum_"
+
+ self.params["datafilename"]
+
+ f"_{self.job_id}.pkl"
+
)
+
self.mapmaking = None
+
+
+
[docs]
+
def main(self, specific_file=None):
+
+
### Execute Frequency Map-Making
+
if self.params["Pipeline"]["mapmaking"]:
+
+
### Initialization
+
self.mapmaking = PipelineFrequencyMapMaking(
+
self.comm, self.file, self.params
+
)
+
+
### Run
+
self.mapmaking.run()
+
+
### Execute spectrum
+
if self.params["Pipeline"]["spectrum"]:
+
if self.comm.Get_rank() == 0:
+
create_folder_if_not_exists(
+
self.comm, "FMM/" + self.params["path_out"] + "spectrum/"
+
)
+
+
if self.mapmaking is not None:
+
self.spectrum = Spectra(self.file)
+
else:
+
self.spectrum = Spectra(specific_file)
+
+
### Signal
+
DlBB_maps = self.spectrum.run(maps=self.spectrum.maps)
+
+
### noise
+
DlBB_noise = self.spectrum.run(
+
maps=self.spectrum.dictionary["maps_noise"]
+
)
+
+
dict_solution = {
+
"nus": self.spectrum.dictionary["nus"],
+
"ell": self.spectrum.ell,
+
"Dls": DlBB_maps,
+
"Nl": DlBB_noise,
+
"parameters": self.params,
+
}
+
+
self._save_data(self.file_spectrum, dict_solution)
+
+
+