Skip to content

Commit

Permalink
temporary update so to let @dncwa test current code
Browse files Browse the repository at this point in the history
  • Loading branch information
MetinSa committed May 22, 2024
1 parent 3bb58e5 commit d77432a
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 78 deletions.
5 changes: 3 additions & 2 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ theme:
- media: "(prefers-color-scheme: light)"
scheme: default
toggle:
icon: material/lightbulb
icon: material/weather-sunny
name: Switch to dark mode
- media: "(prefers-color-scheme: dark)"
scheme: slate
toggle:
icon: material/lightbulb-outline
icon: material/weather-night
name: Switch to light mode
icon:
logo: material/book-open-page-variant
Expand All @@ -30,6 +30,7 @@ theme:
- navigation.tabs
- navigation.instant
- navigation.top
- navigation.tabs.sticky
markdown_extensions:
- markdown_include.include:
base_path: docs
Expand Down
48 changes: 10 additions & 38 deletions tests/test_evaluate.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
from __future__ import annotations

import multiprocessing

import numpy as np
import pytest
from astropy import coordinates as coords
Expand Down Expand Up @@ -52,7 +50,10 @@ def test_evaluate(
obs: units.Quantity | str,
) -> None:
"""Test that the evaluate function works for valid user input."""
assert model.evaluate(sky_coord, obspos=obs).size == sky_coord.size
emission = model.evaluate(sky_coord, obspos=obs)
assert emission.size == sky_coord.size
assert isinstance(emission, units.Quantity)
assert emission.unit == units.MJy / units.sr


def test_invalid_sky_coord() -> None:
Expand Down Expand Up @@ -145,23 +146,7 @@ def test_contains_duplicates() -> None:

def test_multiprocessing_nproc() -> None:
"""Test that the multiprocessing works with n_proc > 1."""
model_multi = Model(x=20 * units.micron, n_proc=4)
model = Model(x=20 * units.micron, n_proc=1)

lon = np.random.randint(low=0, high=360, size=10000)
lat = np.random.randint(low=-90, high=90, size=10000)
skycoord = SkyCoord(
lon,
lat,
unit=units.deg,
obstime=TEST_TIME,
)
emission_multi = model_multi.evaluate(skycoord)
emission = model.evaluate(skycoord)
assert_array_equal(emission_multi, emission)

model_multi = Model(x=75 * units.micron, n_proc=4, name="rrm-experimental")
model = Model(x=75 * units.micron, n_proc=1, name="rrm-experimental")
model = Model(x=20 * units.micron)

lon = np.random.randint(low=0, high=360, size=10000)
lat = np.random.randint(low=-90, high=90, size=10000)
Expand All @@ -171,15 +156,12 @@ def test_multiprocessing_nproc() -> None:
unit=units.deg,
obstime=TEST_TIME,
)
emission_multi = model_multi.evaluate(skycoord)
emission = model.evaluate(skycoord)

emission_multi = model.evaluate(skycoord, nprocesses=4)
emission = model.evaluate(skycoord, nprocesses=1)
assert_array_equal(emission_multi, emission)

model = Model(x=75 * units.micron, name="rrm-experimental")

def test_multiprocessing_pool() -> None:
"""Test that the multiprocessing works with n_proc > 1."""
model = Model(x=20 * units.micron, n_proc=1)
lon = np.random.randint(low=0, high=360, size=10000)
lat = np.random.randint(low=-90, high=90, size=10000)
skycoord = SkyCoord(
Expand All @@ -188,17 +170,7 @@ def test_multiprocessing_pool() -> None:
unit=units.deg,
obstime=TEST_TIME,
)
emission = model.evaluate(skycoord)

try:
from pytest_cov.embed import cleanup_on_sigterm
except ImportError:
pass
else:
cleanup_on_sigterm()

with multiprocessing.Pool(processes=4) as pool:
model_multi = Model(x=20 * units.micron, pool=pool)
emission_multi = model_multi.evaluate(skycoord)
emission_multi = model.evaluate(skycoord, nprocesses=4)
emission = model.evaluate(skycoord, nprocesses=1)

assert_array_equal(emission_multi, emission)
3 changes: 0 additions & 3 deletions zodipy/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from zodipy import comps, source_params
from zodipy.model_registry import model_registry
from zodipy.number_density import grid_number_density
from zodipy.zodipy import Model
Expand All @@ -7,6 +6,4 @@
"Model",
"grid_number_density",
"model_registry",
"comps",
"source_params",
)
58 changes: 23 additions & 35 deletions zodipy/zodipy.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from zodipy.unpack_model import get_model_to_dicts_callable
from zodipy.zodiacal_component import ComponentLabel

_platform_start_method = "fork" if "windows" not in platform.system().lower() else None
_PLATFORM_METHOD = "fork" if "windows" not in platform.system().lower() else None


class Model:
Expand All @@ -35,8 +35,6 @@ def __init__(
gauss_quad_degree: int = 50,
extrapolate: bool = False,
ephemeris: str = "builtin",
n_proc: int = 1,
pool: multiprocessing.pool.Pool | None = None,
) -> None:
"""Initialize the Zodipy interface.
Expand All @@ -57,10 +55,6 @@ def __init__(
positions of the observer and the Earth. Defaults to 'builtin'. See the
[Astropy documentation](https://docs.astropy.org/en/stable/coordinates/solarsystem.html)
for available ephemerides.
n_proc: Number of cores to use. If `n_proc >= 1`, the line-of-sight integrals are
parallelized using the `multiprocessing` module. Defaults to 1.
pool: Custom multiprocessing pool to use. If `None`, a new pool is created if
`n_proc >= 1`. Defaults to `None`.
"""
try:
Expand Down Expand Up @@ -104,20 +98,14 @@ def __init__(
self._ephemeris = ephemeris
self._gauss_points_and_weights = np.polynomial.legendre.leggauss(gauss_quad_degree)

if pool is None:
self._process_pool = multiprocessing.get_context(_platform_start_method).Pool(n_proc)
self._n_proc = n_proc
else:
self._process_pool = pool
self._n_proc = pool._processes # type: ignore

def evaluate(
self,
skycoord: coords.SkyCoord,
*,
obspos: units.Quantity | str = "earth",
return_comps: bool = False,
contains_duplicates: bool = False,
nprocesses: int = 1,
) -> units.Quantity[units.MJy / units.sr]:
"""Return the simulated zodiacal light.
Expand All @@ -139,6 +127,8 @@ def evaluate(
contains_duplicates: If True, the input coordinates are filtered and only unique
pointing is used to calculate the emission. The output is then mapped back to the
original coordinates resulting in the same output shape. Defaults to False.
nprocesses: Number of cores to use. If `nprocesses >= 1`, the line-of-sight integrals
are parallelized using the `multiprocessing` module. Defaults to 1.
Returns:
emission: Simulated zodiacal light in units of 'MJy/sr'.
Expand All @@ -153,7 +143,7 @@ def evaluate(
msg = "The `obstime` attribute of the `SkyCoord` object must be set."
raise ValueError(msg)

n_coords_in = skycoord.size
ncoords_in = skycoord.size
if contains_duplicates:
_, index, inverse = np.unique(
cast(
Expand All @@ -165,7 +155,7 @@ def evaluate(
axis=1,
)
skycoord = cast(coords.SkyCoord, skycoord[index]) # filter out identical coordinates
n_coords = skycoord.size
ncoords = skycoord.size
earth_xyz = get_earthpos_xyz(obstime, self._ephemeris)
obs_xyz = get_obspos_xyz(obstime, obspos, earth_xyz, self._ephemeris)

Expand All @@ -185,7 +175,7 @@ def evaluate(

# Return a dict of partial functions corresponding to the number density each zodiacal
# component in the interplanetary dust model.
density_partials = populate_number_density_with_model(
density_callables = populate_number_density_with_model(
comps=self._interplanetary_dust_model.comps,
dynamic_params={"X_earth": earth_xyz[:, np.newaxis, np.newaxis]},
)
Expand All @@ -199,29 +189,27 @@ def evaluate(
**self._common_parameters,
)

distribute_to_cores = n_coords > self._n_proc and self._n_proc > 1
distribute_to_cores = ncoords > nprocesses and nprocesses > 1
if distribute_to_cores:
unit_vector_chunks = np.array_split(skycoord_xyz, self._n_proc, axis=-1)
integrated_comp_emission = np.zeros((self._interplanetary_dust_model.ncomps, n_coords))
with self._process_pool as pool:
unit_vector_chunks = np.array_split(skycoord_xyz, nprocesses, axis=-1)
integrated_comp_emission = np.zeros((self._interplanetary_dust_model.ncomps, ncoords))
with multiprocessing.get_context(_PLATFORM_METHOD).Pool(nprocesses) as pool:
for idx, comp_label in enumerate(self._interplanetary_dust_model.comps.keys()):
stop_chunks = np.array_split(stop[comp_label], self._n_proc, axis=-1)
stop_chunks = np.array_split(stop[comp_label], nprocesses, axis=-1)
if start[comp_label].size == 1:
start_chunks = [start[comp_label]] * self._n_proc
start_chunks = [start[comp_label]] * nprocesses
else:
start_chunks = np.array_split(start[comp_label], self._n_proc, axis=-1)
start_chunks = np.array_split(start[comp_label], nprocesses, axis=-1)
comp_integrands = [
functools.partial(
common_integrand,
u_los=np.expand_dims(unit_vector, axis=-1),
u_los=np.expand_dims(vec, axis=-1),
start=np.expand_dims(start, axis=-1),
stop=np.expand_dims(stop, axis=-1),
get_density_function=density_partials[comp_label],
get_density_function=density_callables[comp_label],
**self._comp_parameters[comp_label],
)
for unit_vector, start, stop in zip(
unit_vector_chunks, start_chunks, stop_chunks
)
for vec, start, stop in zip(unit_vector_chunks, start_chunks, stop_chunks)
]

proc_chunks = [
Expand All @@ -231,22 +219,21 @@ def evaluate(
)
for comp_integrand in comp_integrands
]

integrated_comp_emission[idx] += (
np.concatenate([result.get() for result in proc_chunks])
* 0.5
* (stop[comp_label] - start[comp_label])
)

else:
integrated_comp_emission = np.zeros((self._interplanetary_dust_model.ncomps, n_coords))
integrated_comp_emission = np.zeros((self._interplanetary_dust_model.ncomps, ncoords))
for idx, comp_label in enumerate(self._interplanetary_dust_model.comps.keys()):
comp_integrand = functools.partial(
common_integrand,
u_los=np.expand_dims(skycoord_xyz, axis=-1),
start=np.expand_dims(start[comp_label], axis=-1),
stop=np.expand_dims(stop[comp_label], axis=-1),
get_density_function=density_partials[comp_label],
get_density_function=density_callables[comp_label],
**self._comp_parameters[comp_label],
)

Expand All @@ -257,11 +244,12 @@ def evaluate(
)

if contains_duplicates:
emission = np.zeros((self._interplanetary_dust_model.ncomps, n_coords_in))
emission = integrated_comp_emission[:, inverse] << (units.MJy / units.sr)
emission = np.zeros((self._interplanetary_dust_model.ncomps, ncoords_in))
emission = integrated_comp_emission[:, inverse]
else:
emission = integrated_comp_emission << (units.MJy / units.sr)
emission = integrated_comp_emission

emission <<= units.MJy / units.sr
return emission if return_comps else emission.sum(axis=0)

def get_parameters(self) -> dict:
Expand Down

0 comments on commit d77432a

Please sign in to comment.