Skip to content

Commit

Permalink
[NASA:Update] Distributed dace cache (rework) (#16)
Browse files Browse the repository at this point in the history
* Initialize GeosDycoreWrapper with bdt (timestep)

* Use GEOS version of constants

* 1. Add qcld to the list of tracers beings advected
  2. Made GEOS specific changes to thresholds in saturation adjustment

* Accumulate diss_est

* Allow GEOS_WRAPPER to process device data

* Add clear to collector for 3rd party use. GEOS pass down timings to caller

* Make kernel analysis run a copy stencil to compute local bandwidth
  Parametrize tool with backend, output format

* Move constant on a env var
  Add saturation adjustment threshold to const

* Remove unused if leading to empty code block

* Add guard for bdt==0

* Fix theroritical timings

* Fixed a bug where pkz was being calculated twice, and the second calc was wrong

* Downgrade DaCe to 0.14.0 pending array aliasing fix

* Read cache_root in default dace backend

* Document faulty behavior with GT_CACHE_DIR_NAME

* Fix bad requirements syntax

* Check for the string value of CONST_VERSION directly instead of enum

* Protect constant selection more rigorusly.
  Clean abort on unknown constant given

* Log constants selection

* Refactor NQ to constants.py

* Replace all logger with pace_log
Introduce PACE_LOGLEVEL to control log level from outside

* Code guidelines clean up

* Devops/GitHub actions on (#15)

* Add openmpi to the image

* Fix unit tests (remove dxa, dya rely on halo ex)

* Distributed compilation on orchestrated backend for NxN layouts (#14)

* Adapt orchestration distribute compile for NxN layout

* Add a more descriptive string base postfix for cache naming
Identify the code path for all cases
Consistent reload post-compile
Create a central space for all caches generation logic
No more original layout check required

* Add a test on caches relocatability

* Deactivate relocatability test due to Python crash
Logged as issue 16


* Raise for 1,X and X,1 layouts which requires a new descriptor

* Added ak, bk for 137 levels in eta.py

* Add floating point precision to GEOS bridge init

* Log info GEOS bridge (#18)

* Add floating point precision to GEOS bridge init

* Update geos/develop to grab NOAA PR9 results (#21)

* Verbose choice of block/grid size


* GEOS integration (#9)

* Initialize GeosDycoreWrapper with bdt (timestep)

* Use GEOS version of constants

* 1. Add qcld to the list of tracers beings advected
2. Made GEOS specific changes to thresholds in saturation adjustment

* Accumulate diss_est

* Allow GEOS_WRAPPER to process device data

* Add clear to collector for 3rd party use. GEOS pass down timings to caller

* Make kernel analysis run a copy stencil to compute local bandwith
Parametrize tool with backend, output format

* Move constant on a env var
Add saturation adjustement threshold to const

* Remove unused if leading to empty code block

* Restrict dace to 0.14.1 due to a parsing bug

* Add guard for bdt==0
Fix bad merge for bdt with GEOS_Wrapper

* Remove unused code

* Fix theroritical timings

* Fixed a bug where pkz was being calculated twice, and the second calc was wrong

* Downgrade DaCe to 0.14.0 pending array aliasing fix

* Set default cache path for orchestrated DaCe to respect GT_CACHE_* env

* Remove previous per stencil override of default_build_folder

* Revert "Set default cache path for orchestrated DaCe to respect GT_CACHE_* env"

* Revert "Remove previous per stencil override of default_build_folder"

* Read cache_root in default dace backend

* Document faulty behavior with GT_CACHE_DIR_NAME

* Fix bad requirements syntax

* Check for the string value of CONST_VERSION directly instead of enum

* Protect constant selection more rigorusly.
Clean abort on unknown constant given

* Log constants selection

* Refactor NQ to constants.py

* Fix or explain inlined import

* Verbose runtime error when bad dt_atmos

* Verbose warm up

* re-initialize heat_source and diss_est each call, add do_skeb check to accumulation

---------

Co-authored-by: Purnendu Chakraborty <[email protected]>
Co-authored-by: Oliver Elbert <[email protected]>

---------

Co-authored-by: Rusty Benson <[email protected]>
Co-authored-by: Oliver Elbert <[email protected]>
Co-authored-by: Purnendu Chakraborty <[email protected]>
Co-authored-by: Oliver Elbert <[email protected]>

* [NOAA:Update] Bring back #15 & doubly periodic domain (#25)

* Feature/dp driver (#13)

* initial commit

* adding test config

* adding the rest of driver and util code

* updating history.md

* move u_max to dycore config

* uncomment assert

* added comment explaining the copy of grid type to dycore config

* Turn main unit test  & lint on PR, logger clean up [NASA:Update]  (#15)

* Initialize GeosDycoreWrapper with bdt (timestep)

* Use GEOS version of constants

* 1. Add qcld to the list of tracers beings advected
2. Made GEOS specific changes to thresholds in saturation adjustment

* Accumulate diss_est

* Allow GEOS_WRAPPER to process device data

* Add clear to collector for 3rd party use. GEOS pass down timings to caller

* Make kernel analysis run a copy stencil to compute local bandwith
Parametrize tool with backend, output format

* Move constant on a env var
Add saturation adjustement threshold to const

* Restrict dace to 0.14.1 due to a parsing bug

* Add guard for bdt==0

* Fix theroritical timings

* Fixed a bug where pkz was being calculated twice, and the second calc was wrong

* Downgrade DaCe to 0.14.0 pending array aliasing fix

* Set default cache path for orchestrated DaCe to respect GT_CACHE_* env

* Remove previous per stencil override of default_build_folder

* Revert "Set default cache path for orchestrated DaCe to respect GT_CACHE_* env"

* Read cache_root in default dace backend

* Document faulty behavior with GT_CACHE_DIR_NAME

* Check for the string value of CONST_VERSION directly instead of enum

* Protect constant selection more rigorusly.
Clean abort on unknown constant given

* Log constants selection

* Refactor NQ to constants.py

* Introduce PACE_LOGLEVEL to control log level from outside

* Devops/GitHub actions on (#15)

* Update python to available 3.8.12

* Fix unit tests (remove dxa, dya rely on halo ex)

* Update HISTORY.md

* Adapt log_level in driver.run

* Verbose the PACE_CONSTANTS

* Doc log level hierarchical nature

---------

Co-authored-by: Purnendu Chakraborty <[email protected]>
Co-authored-by: Purnendu Chakraborty <[email protected]>

* Fix non-deterministic temporaries by using `zeros` everywhere instead of `empty`

* Update dsl/pace/dsl/caches/codepath.py

Co-authored-by: Oliver Elbert <[email protected]>

* Refactor the test to go around so reload bug
---------

Co-authored-by: Purnendu Chakraborty <[email protected]>
Co-authored-by: Purnendu Chakraborty <[email protected]>
Co-authored-by: Rusty Benson <[email protected]>
Co-authored-by: Oliver Elbert <[email protected]>
Co-authored-by: Oliver Elbert <[email protected]>
  • Loading branch information
6 people committed Sep 11, 2023
1 parent d8ebc39 commit b1ef6b5
Show file tree
Hide file tree
Showing 29 changed files with 817 additions and 213 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/main_unit_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ jobs:
uses: actions/[email protected]
with:
python-version: '3.8.12'
- name: Install OpenMPI for gt4py
- name: Install OpenMPI & Boost for gt4py
run: |
sudo apt-get install libopenmpi-dev
sudo apt-get install libopenmpi-dev libboost1.74-dev
- name: Install Python packages
run: |
python -m pip install --upgrade pip
Expand Down
2 changes: 1 addition & 1 deletion driver/pace/driver/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ def from_dict(cls, kwargs: Dict[str, Any]) -> "DriverConfig":
)
grid_type = kwargs["grid_config"].config.grid_type
# Copy grid_type to the DycoreConfig if it's not the default value
if grid_type != 0:
if grid_type != 0:
kwargs["dycore_config"].grid_type = grid_type

if (
Expand Down
4 changes: 1 addition & 3 deletions driver/pace/driver/grid.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ def get_grid(
quantity_factory: QuantityFactory,
communicator: CubedSphereCommunicator,
) -> Tuple[DampingCoefficients, DriverGridData, GridData]:

metric_terms = MetricTerms(
quantity_factory=quantity_factory,
communicator=communicator,
Expand Down Expand Up @@ -184,8 +183,7 @@ def get_grid(
quantity_factory: QuantityFactory,
communicator: CubedSphereCommunicator,
) -> Tuple[DampingCoefficients, DriverGridData, GridData]:

backend = quantity_factory.empty(
backend = quantity_factory.zeros(
dims=[pace.util.X_DIM, pace.util.Y_DIM], units="unknown"
).gt4py_backend

Expand Down
5 changes: 1 addition & 4 deletions driver/pace/driver/initialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ def get_driver_state(
driver_grid_data: pace.util.grid.DriverGridData,
grid_data: pace.util.grid.GridData,
) -> DriverState:

dycore_state = tc_init.init_tc_state(
grid_data=grid_data,
quantity_factory=quantity_factory,
Expand Down Expand Up @@ -323,7 +322,7 @@ def get_driver_state(
driver_grid_data: pace.util.grid.DriverGridData,
grid_data: pace.util.grid.GridData,
) -> DriverState:
backend = quantity_factory.empty(
backend = quantity_factory.zeros(
dims=[pace.util.X_DIM, pace.util.Y_DIM], units="unknown"
).gt4py_backend

Expand All @@ -348,7 +347,6 @@ def _initialize_dycore_state(
communicator: pace.util.CubedSphereCommunicator,
backend: str,
) -> fv3core.DycoreState:

grid = self._get_serialized_grid(communicator=communicator, backend=backend)

ser = self._serializer(communicator)
Expand Down Expand Up @@ -401,7 +399,6 @@ def get_driver_state(
driver_grid_data: pace.util.grid.DriverGridData,
grid_data: pace.util.grid.GridData,
) -> DriverState:

return DriverState(
dycore_state=self.dycore_state,
physics_state=self.physics_state,
Expand Down
2 changes: 1 addition & 1 deletion driver/pace/driver/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import click
import yaml

from pace.util import pace_log, AVAILABLE_LOG_LEVELS
from pace.util import AVAILABLE_LOG_LEVELS, pace_log

from .driver import Driver, DriverConfig

Expand Down
46 changes: 46 additions & 0 deletions dsl/pace/dsl/caches/cache_location.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from pace.dsl.caches.codepath import FV3CodePath
from pace.util import CubedSpherePartitioner


def identify_code_path(
rank: int,
partitioner: CubedSpherePartitioner,
) -> FV3CodePath:
if partitioner.layout == (1, 1) or partitioner.layout == [1, 1]:
return FV3CodePath.All
elif partitioner.layout[0] == 1 or partitioner.layout[1] == 1:
raise NotImplementedError(
f"Build for layout {partitioner.layout} is not handled"
)
else:
if partitioner.tile.on_tile_bottom(rank):
if partitioner.tile.on_tile_left(rank):
return FV3CodePath.BottomLeft
if partitioner.tile.on_tile_right(rank):
return FV3CodePath.BottomRight
else:
return FV3CodePath.Bottom
if partitioner.tile.on_tile_top(rank):
if partitioner.tile.on_tile_left(rank):
return FV3CodePath.TopLeft
if partitioner.tile.on_tile_right(rank):
return FV3CodePath.TopRight
else:
return FV3CodePath.Top
else:
if partitioner.tile.on_tile_left(rank):
return FV3CodePath.Left
if partitioner.tile.on_tile_right(rank):
return FV3CodePath.Right
else:
return FV3CodePath.Center


def get_cache_fullpath(code_path: FV3CodePath) -> str:
from gt4py.cartesian import config as gt_config

return f"{gt_config.cache_settings['root_path']}/.gt_cache_{code_path}"


def get_cache_directory(code_path: FV3CodePath) -> str:
return f".gt_cache_{code_path}"
33 changes: 33 additions & 0 deletions dsl/pace/dsl/caches/codepath.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import enum


class FV3CodePath(enum.Enum):
"""Enum listing all possible code paths on a cube sphere.
For any layout the cube sphere has up to 9 different code paths depending on
the positioning of the rank on the tile and which of the edge/corner cases
it has to handle, as well as the possibility for all boundary computations in
the 1x1 layout case.
Since the framework inlines code to optimize, we _cannot_ pre-suppose which code
being kept and/or ejected. This enum serves as the ground truth to map rank to
the proper generated code.
"""

All = "FV3_A"
BottomLeft = "FV3_BL"
Left = "FV3_L"
TopLeft = "FV3_TL"
Top = "FV3_T"
TopRight = "FV3_TR"
Right = "FV3_R"
BottomRight = "FV3_BR"
Bottom = "FV3_B"
Center = "FV3_C"

def __str__(self):
return self.value

def __repr__(self):
return self.value

def __format__(self, format_spec: str) -> str:
return self.value
136 changes: 28 additions & 108 deletions dsl/pace/dsl/dace/build.py
Original file line number Diff line number Diff line change
@@ -1,78 +1,23 @@
from typing import List, Optional, Tuple
from warnings import warn

from dace.sdfg import SDFG

import pace.util
from pace.dsl.caches.cache_location import get_cache_directory, get_cache_fullpath
from pace.dsl.dace.dace_config import DaceConfig, DaCeOrchestration


################################################
# Distributed compilation


def determine_compiling_ranks(config: DaceConfig) -> bool:
is_compiling = False
rank = config.my_rank
size = config.rank_size

if int(size / 6) == 0:
is_compiling = True
elif rank % int(size / 6) == rank:
is_compiling = True

return is_compiling


def unblock_waiting_tiles(comm, sdfg_path: str) -> None:
if comm and comm.Get_size() > 1:
for tile in range(1, 6):
tilesize = comm.Get_size() / 6
comm.send(sdfg_path, dest=tile * tilesize + comm.Get_rank())


def get_target_rank(rank: int, partitioner: pace.util.CubedSpherePartitioner):
"""From my rank & the current partitioner we determine which
rank we should read from.
For all layout >= 3,3 this presumes build has been done on a
3,3 layout."""
if partitioner.layout == (1, 1):
return 0
if partitioner.layout == (2, 2):
if partitioner.tile.on_tile_bottom(rank):
if partitioner.tile.on_tile_left(rank):
return 0 # "00"
if partitioner.tile.on_tile_right(rank):
return 1 # "10"
if partitioner.tile.on_tile_top(rank):
if partitioner.tile.on_tile_left(rank):
return 2 # "01"
if partitioner.tile.on_tile_right(rank):
return 3 # "11"
else:
if partitioner.tile.on_tile_bottom(rank):
if partitioner.tile.on_tile_left(rank):
return 0 # "00"
if partitioner.tile.on_tile_right(rank):
return 2 # "20"
else:
return 1 # "10"
if partitioner.tile.on_tile_top(rank):
if partitioner.tile.on_tile_left(rank):
return 6 # "02"
if partitioner.tile.on_tile_right(rank):
return 8 # "22"
else:
return 7 # "12"
else:
if partitioner.tile.on_tile_left(rank):
return 3 # "01"
if partitioner.tile.on_tile_right(rank):
return 5 # "21"
else:
return 4 # "11"


def build_info_filepath() -> str:
return "build_info.txt"

Expand Down Expand Up @@ -101,7 +46,10 @@ def write_build_info(


def get_sdfg_path(
daceprog_name: str, config: DaceConfig, sdfg_file_path: Optional[str] = None
daceprog_name: str,
config: DaceConfig,
sdfg_file_path: Optional[str] = None,
override_run_only=False,
) -> Optional[str]:
"""Build an SDFG path from the qualified program name or it's direct path to .sdfg
Expand All @@ -113,7 +61,7 @@ def get_sdfg_path(

# TODO: check DaceConfig for cache.strategy == name
# Guarding against bad usage of this function
if config.get_orchestrate() != DaCeOrchestration.Run:
if not override_run_only and config.get_orchestrate() != DaCeOrchestration.Run:
return None

# Case of a .sdfg file given by the user to be compiled
Expand All @@ -125,19 +73,8 @@ def get_sdfg_path(
return sdfg_file_path

# Case of loading a precompiled .so - lookup using GT_CACHE
from gt4py.cartesian import config as gt_config

if config.rank_size > 1:
rank = config.my_rank
rank_str = f"_{config.target_rank:06d}"
else:
rank = 0
rank_str = f"_{rank:06d}"

sdfg_dir_path = (
f"{gt_config.cache_settings['root_path']}"
f"/.gt_cache{rank_str}/dacecache/{daceprog_name}"
)
cache_fullpath = get_cache_fullpath(config.code_path)
sdfg_dir_path = f"{cache_fullpath}/dacecache/{daceprog_name}"
if not os.path.isdir(sdfg_dir_path):
raise RuntimeError(f"Precompiled SDFG is missing at {sdfg_dir_path}")

Expand All @@ -153,23 +90,8 @@ def get_sdfg_path(
raise RuntimeError(
f"SDFG build for {build_backend}, {config._backend} has been asked"
)
# Check layout
build_layout = ast.literal_eval(build_info_file.readline())
can_read = True
if config.layout == (1, 1) and config.layout != build_layout:
can_read = False
elif config.layout == (2, 2) and config.layout != build_layout:
can_read = False
elif (
build_layout != (1, 1) and build_layout != (2, 2) and build_layout != (3, 3)
):
can_read = False
if not can_read:
warn(
f"SDFG build for layout {build_layout}, "
f"cannot be run with current layout {config.layout}, bad layout?"
)
# Check resolution per tile
build_layout = ast.literal_eval(build_info_file.readline())
build_resolution = ast.literal_eval(build_info_file.readline())
if (config.tile_resolution[0] / config.layout[0]) != (
build_resolution[0] / build_layout[0]
Expand All @@ -179,7 +101,7 @@ def get_sdfg_path(
f"cannot be run with current resolution {config.tile_resolution}"
)

print(f"[DaCe Config] Rank {rank} loading SDFG {sdfg_dir_path}")
print(f"[DaCe Config] Rank {config.my_rank} loading SDFG {sdfg_dir_path}")

return sdfg_dir_path

Expand All @@ -189,33 +111,31 @@ def set_distributed_caches(config: "DaceConfig"):

# Execute specific initialization per orchestration state
orchestration_mode = config.get_orchestrate()
if orchestration_mode == DaCeOrchestration.Python:
return

# Check that we have all the file we need to early out in case
# of issues.
if orchestration_mode == DaCeOrchestration.Run:
import os

from gt4py.cartesian import config as gt_config

# Check our cache exist
if config.rank_size > 1:
rank = config.my_rank
target_rank_str = f"_{config.target_rank:06d}"
else:
rank = 0
target_rank_str = f"_{rank:06d}"
cache_filepath = (
f"{gt_config.cache_settings['root_path']}/.gt_cache{target_rank_str}"
)
if not os.path.exists(cache_filepath):
cache_directory = get_cache_fullpath(config.code_path)
if not os.path.exists(cache_directory):
raise RuntimeError(
f"{orchestration_mode} error: Could not find caches for rank "
f"{rank} at {cache_filepath}"
f"{config.my_rank} at {cache_directory}"
)

# All, good set this rank cache to the source cache
gt_config.cache_settings["dir_name"] = f".gt_cache{target_rank_str}"
print(
f"[{orchestration_mode}] Rank {rank} "
f"reading cache {gt_config.cache_settings['dir_name']}"
)
# Set read/write caches to the target rank
from gt4py.cartesian import config as gt_config

if config.do_compile:
verb = "reading/writing"
else:
verb = "reading"

gt_config.cache_settings["dir_name"] = get_cache_directory(config.code_path)
pace.util.pace_log.critical(
f"[{orchestration_mode}] Rank {config.my_rank} "
f"{verb} cache {gt_config.cache_settings['dir_name']}"
)
Loading

0 comments on commit b1ef6b5

Please sign in to comment.