diff --git a/om3utils/MOM6InputParser.py b/om3utils/MOM6InputParser.py new file mode 100644 index 0000000..57c95f9 --- /dev/null +++ b/om3utils/MOM6InputParser.py @@ -0,0 +1,102 @@ +import re + + +class MOM6InputParser(object): + header_pattern = re.compile(r"^! === (.+) ===") + block_list = [ + "KPP%", + "%KPP", + "CVMix_CONVECTION%", + "%CVMix_CONVECTION", + "CVMIX_DDIFF%", + "%CVMIX_DDIFF", + ] + + def __init__(self): + self.param_dict = {} + self.commt_dict = {} + self.current_var = None + self.current_value = [] + self.current_comment = [] + self.block_pattern = re.compile(r"|".join(re.escape(i) for i in self.block_list)) + + def read_input(self, MOM_input_read_path): + with open(MOM_input_read_path, "r") as f: + self.lines = f.readlines() + + def parse_lines(self): + for line in self.lines: + if self.header_pattern.match(line): + self._save_current_param() + self._start_new_header() + continue + + if line.strip().startswith("!"): + self._append_comments(line) + continue + + if "=" in line: + self._save_current_param() + self._parse_params(line) + + elif self.block_pattern.search(line): # block_pattern + self._save_current_param() + self._parse_params_block(line) + + # last parameter + self._save_current_param() + + def _save_current_param(self): + # save parameters, and associated values and comments + if self.current_var: + var_name = self.current_var + value = "".join(self.current_value).strip() + comment = "\n".join(self.current_comment).strip() + self.param_dict[var_name] = value + self.commt_dict[var_name] = comment + + def _start_new_header(self): + self.current_var = None + self.current_value = [] + self.current_comment = [] + + def _parse_params(self, line): + param, value = line.split("=", 1) + param = param.strip() + # separate value and inline comment + tmp_value = value.split("!")[0].strip() # value + tmp_commt = value.split("!")[1].strip() if "!" in value else "" # inline comment + self.current_var = param + self.current_value = [tmp_value] + self.current_comment = [tmp_commt] + + def _parse_params_block(self, line): + self.current_var = line.strip() + self.current_value = [""] + self.current_comment = [""] + + def _append_comments(self, line): + if self.current_var: + self.current_comment.append(line.strip()) + + def writefile_MOM_input(self, MOM_input_write_path, total_width=32): + """ + Write the updated MOM_input to file + """ + with open(MOM_input_write_path, "w") as f: + f.write("! This file was written by the script xxx \n") + f.write("! and records the non-default parameters used at run-time.\n") + f.write("\n") + for var, value in self.param_dict.items(): + comment = self.commt_dict.get(var, "") + if comment: + comment_lines = comment.split("\n") + param_str = f"{var} = {value}" + f.write(f"{param_str:<{total_width}} ! {comment_lines[0].strip()}\n") + for comment_line in comment_lines[1:]: + f.write(f"{'':<{total_width}} {comment_line.strip()}\n") + elif var in self.block_list: + f.write(f"{var}\n") + else: + f.write(f"{var} = {value}\n") + f.write("\n") diff --git a/pyproject.toml b/pyproject.toml index b6e3998..280fb9c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,6 +37,7 @@ devel = [ test = [ "pytest", "pytest-cov", + "pytest-mock", ] [tool.pytest.ini_options] diff --git a/tests/test_MOM6InputParser.py b/tests/test_MOM6InputParser.py new file mode 100644 index 0000000..58badf1 --- /dev/null +++ b/tests/test_MOM6InputParser.py @@ -0,0 +1,89 @@ +import pytest + +# from test_utils import MockFile +from om3utils.MOM6InputParser import MOM6InputParser +from unittest.mock import mock_open, call + + +@pytest.fixture() +def mom6_input(): + return [ + "! === HEADER 1 ===\n", + "REGRIDDING_COORDINATE_MODE = ZSTAR ! default = 'LAYER''\n", + " ! Coordinate mode for vertical regridding. Choose among the following\n", + "! === HEADER 2 ===\n", + "KPP%\n", + "N_SMOOTH = 4 ! default = 0\n", + " ! The number of times the 1-1-4-1-1 Laplacian filter is applied on OBL depth.\n", + "%KPP\n", + "DT = 1800.0\n", + "BOOL = True\n", + ] + + +@pytest.fixture() +def param_output(): + return { + "REGRIDDING_COORDINATE_MODE": "ZSTAR", + "KPP%": "", + "N_SMOOTH": "4", + "%KPP": "", + "DT": "1800.0", + "BOOL": "True", + } + + +@pytest.fixture() +def commt_output(): + return { + "REGRIDDING_COORDINATE_MODE": "default = 'LAYER''\n! Coordinate mode for vertical regridding. Choose among the following", + "KPP%": "", + "N_SMOOTH": "default = 0\n! The number of times the 1-1-4-1-1 Laplacian filter is applied on OBL depth.", + "%KPP": "", + "DT": "", + "BOOL": "", + } + + +def test_read_mom6_input(mocker, mom6_input): + mocker.patch("builtins.open", mocker.mock_open(read_data="".join(mom6_input))) + parser = MOM6InputParser() + parser.read_input("tmp_path") + assert parser.lines == mom6_input + + +def test_param_commt_output(mom6_input, param_output, commt_output): + parser = MOM6InputParser() + parser.lines = mom6_input + parser.parse_lines() + assert parser.param_dict == param_output + assert parser.commt_dict == commt_output + + +def test_write_mom6_input(mocker, mom6_input): + mock_file = mock_open() + mocker.patch("builtins.open", mock_file) + parser = MOM6InputParser() + parser.lines = mom6_input + parser.parse_lines() + parser.writefile_MOM_input("tmp_path") # write to the mock_file + + expected_calls = [ + call("! This file was written by the script xxx \n"), + call("! and records the non-default parameters used at run-time.\n"), + call("\n"), + call("REGRIDDING_COORDINATE_MODE = ZSTAR ! default = 'LAYER''\n"), + call( + " ! Coordinate mode for vertical regridding. Choose among the following\n" + ), + call("KPP%\n"), + call("N_SMOOTH = 4 ! default = 0\n"), + call( + " ! The number of times the 1-1-4-1-1 Laplacian filter is applied on OBL depth.\n" + ), + call("%KPP\n"), + call("DT = 1800.0\n"), + call("BOOL = True\n"), + ] + + mock_file().write.assert_has_calls(expected_calls, any_order=False)