diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml new file mode 100644 index 00000000..0cc08080 --- /dev/null +++ b/.github/workflows/lint.yml @@ -0,0 +1,27 @@ +name: "Lint" +on: + pull_request: + types: [opened, synchronize, reopened, ready_for_review, labeled, unlabeled] + +jobs: + lint: + runs-on: ubuntu-latest + steps: + - name: Checkout Pace repository + uses: actions/checkout@v3.5.2 + with: + submodules: 'recursive' + - name: Step Python 3.8.12 + uses: actions/setup-python@v4.6.0 + with: + python-version: '3.8.12' + - name: Install OpenMPI for gt4py + run: | + sudo apt-get install libopenmpi-dev + - name: Install Python packages + run: | + python -m pip install --upgrade pip + pip install -r requirements_dev.txt -r requirements_lint.txt + - name: Run lint via pre-commit + run: | + pre-commit run --all-files diff --git a/.github/workflows/main_unit_tests.yml b/.github/workflows/main_unit_tests.yml new file mode 100644 index 00000000..5dbf4a1f --- /dev/null +++ b/.github/workflows/main_unit_tests.yml @@ -0,0 +1,27 @@ +name: "Main unit tests" +on: + pull_request: + types: [opened, synchronize, reopened, ready_for_review, labeled, unlabeled] + +jobs: + main_unit_tests: + runs-on: ubuntu-latest + steps: + - name: Checkout Pace repository + uses: actions/checkout@v3.5.2 + with: + submodules: 'recursive' + - name: Step Python 3.8.12 + uses: actions/setup-python@v4.6.0 + with: + python-version: '3.8.12' + - name: Install OpenMPI for gt4py + run: | + sudo apt-get install libopenmpi-dev + - name: Install Python packages + run: | + python -m pip install --upgrade pip + pip install -r requirements_dev.txt + - name: Run all main tests + run: | + pytest -x tests/main diff --git a/README.md b/README.md index 31980394..5884cee8 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,7 @@ Pace is an implementation of the FV3GFS / SHiELD atmospheric model developed by Full Sphinx documentation can be found at [https://ai2cm.github.io/pace/](https://ai2cm.github.io/pace/). **WARNING** This repo is under active development - supported features and procedures can change rapidly and without notice. + ## Quickstart - bare metal ### Build @@ -27,10 +28,13 @@ export BOOST_ROOT=BOOST/ROOT/boost_1_79_0 ``` When cloning Pace you will need to update the repository's submodules as well: + ```shell git clone --recursive https://github.com/ai2cm/pace.git ``` + or if you have already cloned the repository: + ``` git submodule update --init --recursive ``` @@ -43,6 +47,7 @@ source venv_name/bin/activate ``` Inside of your pace `venv` or conda environment pip install the Python requirements, GT4Py, and Pace: + ```shell pip3 install -r requirements_dev.txt -c constraints.txt ``` @@ -52,6 +57,7 @@ Shell scripts to install Pace on specific machines such as Gaea can be found in ### Run With the environment activated, you can run an example baroclinic test case with the following command: + ```shell mpirun -n 6 python3 -m pace.driver.run driver/examples/configs/baroclinic_c12.yaml @@ -61,20 +67,33 @@ mpirun -n 6 --oversubscribe python3 -m pace.driver.run driver/examples/configs/b After the run completes, you will see an output direcotry `output.zarr`. An example to visualize the output is provided in `driver/examples/plot_output.py`. See the [driver example](driver/examples/README.md) section for more details. +### Environment variable configuration + +- `PACE_CONSTANTS`: Pace is bundled with various constants (see _util/pace/util/constants.py_). + - `FV3DYCORE` NOAA's FV3 dynamical core constants (original port) + - `GFS` Constant as defined in NOAA GFS + - `GEOS` Constant as defined in GEOS v13 +- `PACE_FLOAT_PRECISION`: default precision of the field & scalars in the numerics. Default to 64. +- `PACE_LOGLEVEL`: logging level to display (DEBUG, INFO, WARNING, ERROR, CRITICAL). Default to INFO. + ## Quickstart - Docker + ### Build While it is possible to install and build pace bare-metal, we can ensure all system libraries are installed with the correct versions by using a Docker container to test and develop pace. First, you will need to update the git submodules so that any dependencies are cloned and at the correct version: + ```shell git submodule update --init --recursive ``` Then build the `pace` docker image at the top level. + ```shell make build ``` + ### Run ```shell @@ -94,7 +113,6 @@ This git repository is laid out as a mono-repo, containing multiple independent ![Graph of interdependencies of Pace modules, generated from dependences.dot](./dependencies.svg) - ## ML emulation An example of integration of an ML model replacing the microphysics parametrization is available on the `feature/microphysics-emulator` branch. diff --git a/driver/pace/driver/run.py b/driver/pace/driver/run.py index 2d9160cd..e6c39982 100644 --- a/driver/pace/driver/run.py +++ b/driver/pace/driver/run.py @@ -1,58 +1,15 @@ import dataclasses import gc -import logging from typing import Optional import click import yaml -from pace.util.mpi import MPI +from pace.util import pace_log, AVAILABLE_LOG_LEVELS from .driver import Driver, DriverConfig -logger = logging.getLogger(__name__) - - -log_levels = { - "info": logging.INFO, - "debug": logging.DEBUG, - "warning": logging.WARNING, - "error": logging.ERROR, - "critical": logging.CRITICAL, -} - - -def configure_logging(log_rank: Optional[int], log_level: str): - """ - Configure logging for the driver. - - Args: - log_rank: rank to log from, or 'all' to log to all ranks, - forced to 'all' if running without MPI - log_level: log level to use - """ - level = log_levels[log_level.lower()] - if MPI is None: - logging.basicConfig( - level=level, - format="%(asctime)s [%(levelname)s] %(name)s:%(message)s", - handlers=[logging.StreamHandler()], - datefmt="%Y-%m-%d %H:%M:%S", - ) - else: - if log_rank is None or int(log_rank) == MPI.COMM_WORLD.Get_rank(): - logging.basicConfig( - level=level, - format=( - f"%(asctime)s [%(levelname)s] (rank {MPI.COMM_WORLD.Get_rank()}) " - "%(name)s:%(message)s" - ), - handlers=[logging.StreamHandler()], - datefmt="%Y-%m-%d %H:%M:%S", - ) - - @click.command() @click.argument( "CONFIG_PATH", @@ -75,12 +32,15 @@ def command_line(config_path: str, log_rank: Optional[int], log_level: str): CONFIG_PATH is the path to a DriverConfig yaml file. """ - configure_logging(log_rank=log_rank, log_level=log_level) - logger.info("loading DriverConfig from yaml") + level = AVAILABLE_LOG_LEVELS[log_level.lower()] + pace_log.setLevel(level) + pace_log.info("loading DriverConfig from yaml") with open(config_path, "r") as f: config = yaml.safe_load(f) driver_config = DriverConfig.from_dict(config) - logging.info(f"DriverConfig loaded: {yaml.dump(dataclasses.asdict(driver_config))}") + pace_log.info( + f"DriverConfig loaded: {yaml.dump(dataclasses.asdict(driver_config))}" + ) main(driver_config=driver_config) diff --git a/dsl/pace/dsl/dace/utils.py b/dsl/pace/dsl/dace/utils.py index 47eb61ad..40ac3c12 100644 --- a/dsl/pace/dsl/dace/utils.py +++ b/dsl/pace/dsl/dace/utils.py @@ -28,7 +28,7 @@ def __init__(self, config: DaceConfig, label: str): @classmethod def log(cls, prefix: str, message: str): - pace_log.info(f"{prefix} {message}") + pace_log.debug(f"{prefix} {message}") @classmethod def default_prefix(cls, config: DaceConfig) -> str: diff --git a/fv3core/pace/fv3core/initialization/geos_wrapper.py b/fv3core/pace/fv3core/initialization/geos_wrapper.py index 4fc34052..2835e77e 100644 --- a/fv3core/pace/fv3core/initialization/geos_wrapper.py +++ b/fv3core/pace/fv3core/initialization/geos_wrapper.py @@ -132,9 +132,12 @@ def __init__( self._allocate_output_dir() pace_log.info( - "GEOS-Wrapper with: \n" + "Pace GEOS wrapper initialized: \n" f" dt : {self.dycore_state.bdt}\n" f" bridge : {self._fortran_mem_space} > {self._pace_mem_space}\n" + f" backend: {backend}\n" + f" orchestration: {self._is_orchestrated}\n" + f" sizer : {sizer.nx}x{sizer.ny}x{sizer.nz} (halo: {sizer.n_halo})" ) def _critical_path(self): @@ -173,7 +176,6 @@ def __call__( cyd: np.ndarray, diss_estd: np.ndarray, ) -> Tuple[Dict[str, np.ndarray], Dict[str, List[float]]]: - with self.perf_collector.timestep_timer.clock("numpy-to-dycore"): self.dycore_state = self._put_fortran_data_in_dycore( u, @@ -246,7 +248,6 @@ def _put_fortran_data_in_dycore( cyd: np.ndarray, diss_estd: np.ndarray, ) -> fv3core.DycoreState: - isc = self._grid_indexing.isc jsc = self._grid_indexing.jsc iec = self._grid_indexing.iec + 1 @@ -315,7 +316,6 @@ def _put_fortran_data_in_dycore( return state def _prep_outputs_for_geos(self) -> Dict[str, np.ndarray]: - output_dict = self.output_dict isc = self._grid_indexing.isc jsc = self._grid_indexing.jsc diff --git a/fv3core/pace/fv3core/stencils/d_sw.py b/fv3core/pace/fv3core/stencils/d_sw.py index cc602f4e..51c9ee6e 100644 --- a/fv3core/pace/fv3core/stencils/d_sw.py +++ b/fv3core/pace/fv3core/stencils/d_sw.py @@ -932,7 +932,7 @@ def make_quantity(): ) ) - if (self._d_con > 1.e-5) or (self._do_stochastic_ke_backscatter): + if (self._d_con > 1.0e-5) or (self._do_stochastic_ke_backscatter): self._accumulate_heat_source_and_dissipation_estimate_stencil = ( stencil_factory.from_dims_halo( func=accumulate_heat_source_and_dissipation_estimate, @@ -1254,11 +1254,11 @@ def __call__( self._column_namelist["d_con"], ) - if (self._d_con > 1.e-5) or (self._do_stochastic_ke_backscatter): + if (self._d_con > 1.0e-5) or (self._do_stochastic_ke_backscatter): self._accumulate_heat_source_and_dissipation_estimate_stencil( self._tmp_heat_s, heat_source, self._tmp_diss_e, diss_est ) - + self._update_u_and_v_stencil( self._tmp_ut, self._tmp_vt, diff --git a/fv3core/pace/fv3core/stencils/fv_dynamics.py b/fv3core/pace/fv3core/stencils/fv_dynamics.py index 3299e501..5f3de73a 100644 --- a/fv3core/pace/fv3core/stencils/fv_dynamics.py +++ b/fv3core/pace/fv3core/stencils/fv_dynamics.py @@ -7,7 +7,6 @@ import pace.dsl.gt4py_utils as utils import pace.fv3core.stencils.moist_cv as moist_cv import pace.util -import pace.util.constants as constants from pace.dsl.dace.orchestration import dace_inhibitor, orchestrate from pace.dsl.dace.wrapped_halo_exchange import WrappedHaloUpdater from pace.dsl.stencil import StencilFactory @@ -21,7 +20,7 @@ from pace.fv3core.stencils.neg_adj3 import AdjustNegativeTracerMixingRatio from pace.fv3core.stencils.remapping import LagrangianToEulerian from pace.stencils.c2l_ord import CubedToLatLon -from pace.util import X_DIM, Y_DIM, Z_INTERFACE_DIM, Timer +from pace.util import X_DIM, Y_DIM, Z_INTERFACE_DIM, Timer, constants from pace.util.grid import DampingCoefficients, GridData from pace.util.logging import pace_log from pace.util.mpi import MPI diff --git a/tests/main/fv3core/test_init_from_geos.py b/tests/main/fv3core/test_init_from_geos.py index 252fe7d2..16efe9e9 100644 --- a/tests/main/fv3core/test_init_from_geos.py +++ b/tests/main/fv3core/test_init_from_geos.py @@ -7,7 +7,6 @@ def test_geos_wrapper(): - namelist_dict = { "stencil_config": { "compilation_config": { @@ -82,7 +81,12 @@ def test_geos_wrapper(): comm = NullComm(rank=0, total_ranks=6, fill_value=0.0) backend = "numpy" - wrapper = fv3core.GeosDycoreWrapper(namelist, comm, backend) + wrapper = fv3core.GeosDycoreWrapper( + namelist=namelist, + comm=comm, + backend=backend, + bdt=namelist_dict["dt_atmos"], + ) nhalo = 3 shape_centered = ( namelist["nx_tile"] + 2 * nhalo, @@ -191,31 +195,33 @@ def test_geos_wrapper(): ) diss_estd = np.ones(shape_centered) - output_dict = wrapper( - u, - v, - w, - delz, - pt, - delp, - q, - ps, - pe, - pk, - peln, - pkz, - phis, - q_con, - omga, - ua, - va, - uc, - vc, - mfxd, - mfyd, - cxd, - cyd, - diss_estd, + timings = {} + output_dict, timings = wrapper( + timings=timings, + u=u, + v=v, + w=w, + delz=delz, + pt=pt, + delp=delp, + q=q, + ps=ps, + pe=pe, + pk=pk, + peln=peln, + pkz=pkz, + phis=phis, + q_con=q_con, + omga=omga, + ua=ua, + va=va, + uc=uc, + vc=vc, + mfxd=mfxd, + mfyd=mfyd, + cxd=cxd, + cyd=cyd, + diss_estd=diss_estd, ) assert isinstance(output_dict["u"], np.ndarray) diff --git a/util/HISTORY.md b/util/HISTORY.md index 26aac41c..0b0a42b6 100644 --- a/util/HISTORY.md +++ b/util/HISTORY.md @@ -7,6 +7,7 @@ latest - Added `dx_const`, `dy_const`, `deglat`, and `u_max` namelist settings for doubly-periodic grids - Added `dx_const`, `dy_const`, and `deglat` to grid generation code for doubly-periodic grids - Added f32 support to halo exchange data transformation +- Use one single logger, from logging.py v0.10.0 ------- diff --git a/util/pace/util/__init__.py b/util/pace/util/__init__.py index 4911f2cf..54f684d6 100644 --- a/util/pace/util/__init__.py +++ b/util/pace/util/__init__.py @@ -54,7 +54,7 @@ from .initialization import GridSizer, QuantityFactory, SubtileGridSizer from .io import read_state, write_state from .local_comm import LocalComm -from .logging import pace_log +from .logging import pace_log, AVAILABLE_LOG_LEVELS from .monitor import Monitor, NetCDFMonitor, ZarrMonitor from .mpi import MPIComm from .namelist import Namelist, NamelistDefaults diff --git a/util/pace/util/communicator.py b/util/pace/util/communicator.py index 0611fa98..938469bd 100644 --- a/util/pace/util/communicator.py +++ b/util/pace/util/communicator.py @@ -1,5 +1,4 @@ import abc -import logging from typing import List, Mapping, Optional, Sequence, Tuple, Union, cast import numpy as np @@ -15,8 +14,6 @@ from .utils import device_synchronize -logger = logging.getLogger("pace.util") - try: import cupy except ImportError: diff --git a/util/pace/util/local_comm.py b/util/pace/util/local_comm.py index a289296a..32fd0fb4 100644 --- a/util/pace/util/local_comm.py +++ b/util/pace/util/local_comm.py @@ -1,14 +1,11 @@ import copy -import logging from typing import Any from .comm import Comm +from .logging import pace_log from .utils import ensure_contiguous, safe_assign_array -logger = logging.getLogger("pace.util") - - class ConcurrencyError(Exception): """Exception to denote that a rank cannot proceed because it is waiting on a call from another rank.""" @@ -104,7 +101,7 @@ def bcast(self, value, root=0): "the bcast source" ) value = self._get_buffer("bcast", value) - logger.debug(f"bcast {value} to rank {self.rank}") + pace_log.debug(f"bcast {value} to rank {self.rank}") return value def Barrier(self): diff --git a/util/pace/util/logging.py b/util/pace/util/logging.py index 81b5a7b1..1f9142fe 100644 --- a/util/pace/util/logging.py +++ b/util/pace/util/logging.py @@ -1,15 +1,29 @@ import logging +import os import sys from mpi4py import MPI +LOGLEVEL = os.environ.get("PACE_LOGLEVEL", "INFO").upper() + +# Python log levels are hierarchical, therefore setting INFO +# means DEBUG and everything lower will be logged. +AVAILABLE_LOG_LEVELS = { + "info": logging.INFO, + "debug": logging.DEBUG, + "warning": logging.WARNING, + "error": logging.ERROR, + "critical": logging.CRITICAL, +} + + def _pace_logger(): name_log = logging.getLogger(__name__) - name_log.setLevel(logging.DEBUG) + name_log.setLevel(LOGLEVEL) handler = logging.StreamHandler(sys.stdout) - handler.setLevel(logging.DEBUG) + handler.setLevel(LOGLEVEL) formatter = logging.Formatter( fmt=( f"%(asctime)s|%(levelname)s|rank {MPI.COMM_WORLD.Get_rank()}|" diff --git a/util/pace/util/monitor/netcdf_monitor.py b/util/pace/util/monitor/netcdf_monitor.py index 18687216..0b39da60 100644 --- a/util/pace/util/monitor/netcdf_monitor.py +++ b/util/pace/util/monitor/netcdf_monitor.py @@ -1,4 +1,3 @@ -import logging import os from pathlib import Path from typing import Any, Dict, List, Optional, Set @@ -10,13 +9,11 @@ from .. import _xarray as xr from ..filesystem import get_fs +from ..logging import pace_log from ..quantity import Quantity from .convert import to_numpy -logger = logging.getLogger(__name__) - - class _TimeChunkedVariable: def __init__(self, initial: Quantity, time_chunk_size: int): self._data = np.zeros( @@ -46,7 +43,6 @@ def data(self) -> Quantity: class _ChunkedNetCDFWriter: - FILENAME_FORMAT = "state_{chunk:04d}_tile{tile}.nc" def __init__( @@ -62,7 +58,7 @@ def __init__( self._time_units: Optional[str] = None def append(self, state): - logger.debug("appending at time %d", self._i_time) + pace_log.debug("appending at time %d", self._i_time) state = {**state} # copy so we don't mutate the input time = state.pop("time", None) if self._chunked is None: @@ -75,7 +71,7 @@ def append(self, state): self._chunked[name].append(quantity) self._times.append(time) if (self._i_time + 1) % self._time_chunk_size == 0: - logger.debug("flushing on append at time %d", self._i_time) + pace_log.debug("flushing on append at time %d", self._i_time) self.flush() self._i_time += 1 diff --git a/util/pace/util/monitor/zarr_monitor.py b/util/pace/util/monitor/zarr_monitor.py index 12186799..5d7729b9 100644 --- a/util/pace/util/monitor/zarr_monitor.py +++ b/util/pace/util/monitor/zarr_monitor.py @@ -1,4 +1,3 @@ -import logging from datetime import datetime, timedelta from typing import List, Tuple, Union @@ -7,12 +6,11 @@ from .. import _xarray as xr from .. import constants, utils from .._optional_imports import cupy, zarr +from ..logging import pace_log from ..partitioner import Partitioner, subtile_slice from .convert import to_numpy -logger = logging.getLogger("pace.util") - __all__ = ["ZarrMonitor"] @@ -238,7 +236,7 @@ def append(self, quantity): ) from_slice = _get_from_slice(target_slice) - logger.debug( + pace_log.debug( f"assigning data from subtile slice {from_slice} to " f"target slice {target_slice}" ) @@ -310,7 +308,7 @@ def append(self, quantity): ) from_slice = _get_from_slice(target_slice) - logger.debug( + pace_log.debug( f"assigning data from subtile slice {from_slice} to " f"target slice {target_slice}" ) @@ -332,7 +330,6 @@ def append(self, quantity): class _ZarrTimeWriter(_ZarrVariableWriter): - _TIME_CHUNK_SIZE = 1024 def __init__(self, *args, **kwargs): diff --git a/util/pace/util/mpi.py b/util/pace/util/mpi.py index d03b7937..5acc2b00 100644 --- a/util/pace/util/mpi.py +++ b/util/pace/util/mpi.py @@ -2,16 +2,14 @@ from mpi4py import MPI except ImportError: MPI = None -import logging from typing import List, Optional, TypeVar, cast from .comm import Comm, Request +from .logging import pace_log T = TypeVar("T") -logger = logging.getLogger(__name__) - class MPIComm(Comm): def __init__(self): @@ -26,54 +24,56 @@ def Get_size(self) -> int: return self._comm.Get_size() def bcast(self, value: Optional[T], root=0) -> T: - logger.debug("bcast from root %s on rank %s", root, self._comm.Get_rank()) + pace_log.debug("bcast from root %s on rank %s", root, self._comm.Get_rank()) return self._comm.bcast(value, root=root) def barrier(self): - logger.debug("barrier on rank %s", self._comm.Get_rank()) + pace_log.debug("barrier on rank %s", self._comm.Get_rank()) self._comm.barrier() def Barrier(self): pass def Scatter(self, sendbuf, recvbuf, root=0, **kwargs): - logger.debug("Scatter on rank %s with root %s", self._comm.Get_rank(), root) + pace_log.debug("Scatter on rank %s with root %s", self._comm.Get_rank(), root) self._comm.Scatter(sendbuf, recvbuf, root=root, **kwargs) def Gather(self, sendbuf, recvbuf, root=0, **kwargs): - logger.debug("Gather on rank %s with root %s", self._comm.Get_rank(), root) + pace_log.debug("Gather on rank %s with root %s", self._comm.Get_rank(), root) self._comm.Gather(sendbuf, recvbuf, root=root, **kwargs) def allgather(self, sendobj: T) -> List[T]: - logger.debug("allgather on rank %s", self._comm.Get_rank()) + pace_log.debug("allgather on rank %s", self._comm.Get_rank()) return self._comm.allgather(sendobj) def Send(self, sendbuf, dest, tag: int = 0, **kwargs): - logger.debug("Send on rank %s with dest %s", self._comm.Get_rank(), dest) + pace_log.debug("Send on rank %s with dest %s", self._comm.Get_rank(), dest) self._comm.Send(sendbuf, dest, tag=tag, **kwargs) def sendrecv(self, sendbuf, dest, **kwargs): - logger.debug("sendrecv on rank %s with dest %s", self._comm.Get_rank(), dest) + pace_log.debug("sendrecv on rank %s with dest %s", self._comm.Get_rank(), dest) return self._comm.sendrecv(sendbuf, dest, **kwargs) def Isend(self, sendbuf, dest, tag: int = 0, **kwargs) -> Request: - logger.debug("Isend on rank %s with dest %s", self._comm.Get_rank(), dest) + pace_log.debug("Isend on rank %s with dest %s", self._comm.Get_rank(), dest) return self._comm.Isend(sendbuf, dest, tag=tag, **kwargs) def Recv(self, recvbuf, source, tag: int = 0, **kwargs): - logger.debug("Recv on rank %s with source %s", self._comm.Get_rank(), source) + pace_log.debug("Recv on rank %s with source %s", self._comm.Get_rank(), source) self._comm.Recv(recvbuf, source, tag=tag, **kwargs) def Irecv(self, recvbuf, source, tag: int = 0, **kwargs) -> Request: - logger.debug("Irecv on rank %s with source %s", self._comm.Get_rank(), source) + pace_log.debug("Irecv on rank %s with source %s", self._comm.Get_rank(), source) return self._comm.Irecv(recvbuf, source, tag=tag, **kwargs) def Split(self, color, key) -> "Comm": - logger.debug( + pace_log.debug( "Split on rank %s with color %s, key %s", self._comm.Get_rank(), color, key ) return self._comm.Split(color, key) def allreduce(self, sendobj: T, op=None) -> T: - logger.debug("allreduce on rank %s with operator %s", self._comm.Get_rank(), op) + pace_log.debug( + "allreduce on rank %s with operator %s", self._comm.Get_rank(), op + ) return self._comm.allreduce(sendobj, op)