Skip to content

Commit

Permalink
add type annotations and args to docstrings (#115)
Browse files Browse the repository at this point in the history
* add type annotations and args to docstrings

* update CHANGELOG.md
  • Loading branch information
danielfromearth authored May 24, 2024
1 parent 619b1b6 commit 9e4a089
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 43 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
### Changed
- Updated python libraries
- [issue #114](https://github.com/podaac/concise/issues/114): add type annotations
### Deprecated
### Removed
- Removed CMR testing. Now done in [concise-autotest](https://github.com/podaac/concise-autotest) repo
Expand Down
30 changes: 20 additions & 10 deletions podaac/merger/merge.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Main module containing merge implementation"""

from pathlib import Path
from time import perf_counter
from logging import getLogger
from os import cpu_count
Expand All @@ -11,9 +11,9 @@
from podaac.merger.preprocess_worker import run_preprocess


def is_file_empty(parent_group):
def is_file_empty(parent_group: nc.Dataset | nc.Group) -> bool:
"""
Function to test if a all variable size in a dataset is 0
Function to test if any variable size in a dataset is zero
"""

for var in parent_group.variables.values():
Expand All @@ -24,17 +24,23 @@ def is_file_empty(parent_group):
return True


def merge_netcdf_files(original_input_files, output_file, granule_urls, logger=getLogger(__name__), perf_stats=None, process_count=None): # pylint: disable=too-many-locals
def merge_netcdf_files(original_input_files: list[Path], # pylint: disable=too-many-locals
output_file: str,
granule_urls,
logger=getLogger(__name__),
perf_stats: dict = None,
process_count: int = None):
"""
Main entrypoint to merge implementation. Merges n >= 2 granules together as a single
granule. Named in reference to original Java implementation.
granule. Named in reference to the original Java implementation.
Parameters
----------
input_files: list
list of string paths to NetCDF4 files to merge
original_input_files: list
list of Paths to NetCDF4 files to merge
output_file: str
output path for merged product
granule_urls
logger: logger
logger object
perf_stats: dict
Expand Down Expand Up @@ -113,7 +119,7 @@ def merge_netcdf_files(original_input_files, output_file, granule_urls, logger=g
logger.info('Done!')


def clean_metadata(metadata):
def clean_metadata(metadata: dict) -> None:
"""
Prepares metadata dictionary for insertion by removing inconsistent entries
and performing escaping of attribute names
Expand Down Expand Up @@ -141,9 +147,13 @@ def clean_metadata(metadata):
del metadata[key]


def init_dataset(merged_dataset, groups, var_info, max_dims, input_files):
def init_dataset(merged_dataset: nc.Dataset,
groups: list[str],
var_info: dict,
max_dims: dict,
input_files: list[Path]) -> None:
"""
Initialize the dataset utilizing data gathered from preprocessing
Initialize the dataset using data gathered from preprocessing
Parameters
----------
Expand Down
48 changes: 33 additions & 15 deletions podaac/merger/merge_worker.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
"""Preprocessing methods and the utilities to automagically run them in single-thread/multiprocess modes"""

import logging
import math
import multiprocessing
from multiprocessing.shared_memory import SharedMemory
import queue
import time
import os
import shutil
import multiprocessing
from multiprocessing.shared_memory import SharedMemory
from pathlib import Path
import netCDF4 as nc
import numpy as np

from podaac.merger.path_utils import resolve_dim, resolve_group


def shared_memory_size():
def shared_memory_size() -> int:
"""
try to get the shared memory space size by reading the /dev/shm on linux machines
"""
Expand All @@ -26,16 +27,16 @@ def shared_memory_size():
return int(default_memory_size)


def max_var_memory(file_list, var_info, max_dims):
"""
function to get the maximum shared memory that will be used for variables
def max_var_memory(file_list: list[Path], var_info: dict, max_dims) -> int:
"""Function to get the maximum shared memory that will be used for variables
Parameters
----------
file_list : list
List of file paths to be processed
var_info : dict
Dictionary of variable paths and associated VariableInfo
max_dims
"""

max_var_mem = 0
Expand All @@ -57,7 +58,12 @@ def max_var_memory(file_list, var_info, max_dims):
return max_var_mem


def run_merge(merged_dataset, file_list, var_info, max_dims, process_count, logger):
def run_merge(merged_dataset: nc.Dataset,
file_list: list[Path],
var_info: dict,
max_dims: dict,
process_count: int,
logger: logging.Logger):
"""
Automagically run merging in an optimized mode determined by the environment
Expand All @@ -73,6 +79,7 @@ def run_merge(merged_dataset, file_list, var_info, max_dims, process_count, logg
Dictionary of dimension paths and maximum dimensions found during preprocessing
process_count : int
Number of worker processes to run (expected >= 1)
logger
"""

if process_count == 1:
Expand All @@ -91,7 +98,11 @@ def run_merge(merged_dataset, file_list, var_info, max_dims, process_count, logg
_run_single_core(merged_dataset, file_list, var_info, max_dims, logger)


def _run_single_core(merged_dataset, file_list, var_info, max_dims, logger):
def _run_single_core(merged_dataset: nc.Dataset,
file_list: list[Path],
var_info: dict,
max_dims: dict,
logger: logging.Logger):
"""
Run the variable merge in the current thread/single-core mode
Expand All @@ -105,6 +116,7 @@ def _run_single_core(merged_dataset, file_list, var_info, max_dims, logger):
Dictionary of variable paths and associated VariableInfo
max_dims : dict
Dictionary of dimension paths and maximum dimensions found during preprocessing
logger
"""

logger.info("Running single core ......")
Expand All @@ -129,7 +141,12 @@ def _run_single_core(merged_dataset, file_list, var_info, max_dims, logger):
merged_var[i] = resized


def _run_multi_core(merged_dataset, file_list, var_info, max_dims, process_count, logger): # pylint: disable=too-many-locals
def _run_multi_core(merged_dataset: nc.Dataset, # pylint: disable=too-many-locals
file_list: list[Path],
var_info: dict,
max_dims: dict,
process_count: int,
logger: logging.Logger):
"""
Run the variable merge in multi-core mode. This method creates (process_count - 1)
read processes which read data from an origin granule, resize it, then queue it
Expand All @@ -150,6 +167,7 @@ def _run_multi_core(merged_dataset, file_list, var_info, max_dims, process_count
Dictionary of dimension paths and maximum dimensions found during preprocessing
process_count : int
Number of worker processes to run (expected >= 2)
logger
"""

logger.info("Running multicore ......")
Expand Down Expand Up @@ -266,7 +284,7 @@ def _run_worker(in_queue, out_queue, max_dims, var_info, memory_limit, lock):
shared_mem.close()


def _check_exit(processes):
def _check_exit(processes: list):
"""
Ensure that all processes have exited without error by checking their exitcode
if they're no longer running. Processes that have exited properly are removed
Expand All @@ -286,7 +304,7 @@ def _check_exit(processes):
raise RuntimeError(f'Merging failed - exit code: {process.exitcode}')


def resize_var(var, var_info, max_dims):
def resize_var(var: nc.Variable, var_info, max_dims: dict) -> np.ndarray:
"""
Resizes a variable's data to the maximum dimensions found in preprocessing.
This method will never downscale a variable and only performs bottom and
Expand All @@ -296,8 +314,8 @@ def resize_var(var, var_info, max_dims):
----------
var : nc.Variable
variable to be resized
group_path : str
group path to this variable
var_info
contains a group path to this variable
max_dims : dict
dictionary of maximum dimensions found during preprocessing
Expand All @@ -310,7 +328,7 @@ def resize_var(var, var_info, max_dims):
if var.ndim == 0:
return var[:]

# generate ordered array of new widths
# generate an ordered array of new widths
dims = [resolve_dim(max_dims, var_info.group_path, dim.name) - dim.size for dim in var.get_dims()]
widths = [[0, dim] for dim in dims]

Expand Down
9 changes: 5 additions & 4 deletions podaac/merger/path_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
Utilities used throughout the merging implementation to simplify group path resolution
and generation
"""
import netCDF4 as nc


def get_group_path(group, resource):
def get_group_path(group: nc.Group, resource: str) -> str:
"""
Generates a Unix-like path from a group and resource to be accessed
Expand All @@ -27,7 +28,7 @@ def get_group_path(group, resource):
return group.path + '/' + resource


def resolve_group(dataset, path):
def resolve_group(dataset: nc.Dataset, path: str):
"""
Resolves a group path into two components: the group and the resource's name
Expand All @@ -50,10 +51,10 @@ def resolve_group(dataset, path):
if len(components[0]) > 0:
group = dataset[components[0]]

return (group, components[1])
return group, components[1]


def resolve_dim(dims, group_path, dim_name):
def resolve_dim(dims: dict, group_path: str, dim_name: str):
"""
Attempt to resolve dim name starting from top-most group going down to the root group
Expand Down
Loading

0 comments on commit 9e4a089

Please sign in to comment.