Skip to content

Commit

Permalink
feat: 638 adaptation adaptation analysis should run with only damages…
Browse files Browse the repository at this point in the history
… or only losses (#642)

* chore: allow adaptation to run with only a damages or losses analysis

* tests: split conftest

* chore: adjust readme

* tests: fix/improve fixture for not reading graphfiles

* chore: process review comments
  • Loading branch information
ArdtK authored Dec 10, 2024
1 parent 180038f commit 37ab5eb
Show file tree
Hide file tree
Showing 9 changed files with 221 additions and 75 deletions.
10 changes: 7 additions & 3 deletions ra2ce/analysis/adaptation/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@ These properties are configured in `AnalysisConfigData.AnalysisSectionAdaptation
The benefit of a certain adaptation option (`AdapatationOption`) is calculated by comparing the impact of an option with the impact of the reference option.

### Impact
The impact of an option is calculated by determining the damages and losses that are caused by a certain hazard. To calculate the damages, the `Damages` analysis is run on the network.
The losses are calculated by running either the `SingleLinkLosses` or `MultiLinkLosses` analysis on the network.
The impact of an option is calculated by determining the damages and/or losses that are caused by a certain hazard.
Which losses analysis is run is determined by `AnalysisConfigData.AnalysisSectionAdaptation.losses_analysis`.

The configuration of the damages and the losses analyses are derived from their standard configuration in the section `AnalysisSectionDamages` and `AnalysisSectionLosses`, which are stored in `AdaptationOptionAnalysis` for a specific option.
One of these needs to be configured.

The net present impact is calculated by summing the damages and the losses per link, taking into account the `initial_frequency` corrected by a `climate_factor`, and the `time_horizon` and `discount_rate`.
In `AdaptationOptionAnalysis.get_analysis_info` it can be found which analysis to run.
Note this logic resembles the logic in `AnalysisFactory`, which can't be used due to circular dependencies.
Here also a regex expression is given to find the right column in the analysis result.

The net present impact is calculated by summing the damages and/or the losses per link, taking into account the `initial_frequency` corrected by a `climate_factor`, and the `time_horizon` and `discount_rate`.

## Cost calculation
The cost of an adaptation is calculated per link in the network by multiplying the unit cost of an adaptation with the length of the link.
Expand Down
3 changes: 1 addition & 2 deletions ra2ce/analysis/adaptation/adaptation.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
class Adaptation(AnalysisBase, AnalysisDamagesProtocol):
"""
Execute the adaptation analysis.
For each adaptation option a damages and losses analysis is executed.
For each adaptation option a damages and/or losses analysis is executed.
"""

analysis: AnalysisSectionAdaptation
Expand All @@ -48,7 +48,6 @@ class Adaptation(AnalysisBase, AnalysisDamagesProtocol):
output_path: Path
adaptation_collection: AdaptationOptionCollection

# TODO: add the proper protocol for the adaptation analysis.
def __init__(
self,
analysis_input: AnalysisInputWrapper,
Expand Down
12 changes: 7 additions & 5 deletions ra2ce/analysis/adaptation/adaptation_option.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,23 +72,25 @@ def from_config(
"""
if (
not analysis_config.config_data.damages_list
or not analysis_config.config_data.losses_list
and not analysis_config.config_data.losses_list
):
raise ValueError(
"Damages and losses sections are required to create an adaptation option."
"Damages and/or losses sections are required to create an adaptation option."
)

# Create input for the analyses
# Create input for the damages and losses analyses (if present in config)
_config_analyses = [x.analysis for x in analysis_config.config_data.analyses]
_analyses = [
AdaptationOptionAnalysis.from_config(
analysis_config=analysis_config,
analysis_type=_analysis,
analysis_type=_analysis_type,
option_id=adaptation_option.id,
)
for _analysis in [
for _analysis_type in [
AnalysisDamagesEnum.DAMAGES,
analysis_config.config_data.adaptation.losses_analysis,
]
if _analysis_type in _config_analyses
]

return cls(
Expand Down
35 changes: 30 additions & 5 deletions ra2ce/analysis/adaptation/adaptation_option_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@
"""
from __future__ import annotations

import logging
from copy import deepcopy
from dataclasses import dataclass

from geopandas import GeoDataFrame
from numpy import nan
from pandas import Series

from ra2ce.analysis.analysis_config_data.enums.analysis_damages_enum import (
Expand Down Expand Up @@ -63,12 +66,13 @@ def get_analysis_info(
tuple[type[Damages | LossesBase], str]: The analysis class and the regex to find the result column.
"""
if analysis_type == AnalysisDamagesEnum.DAMAGES:
return (Damages, "dam_.*")
# Columnname should start with "dam_" and should not end with "_segments"
return (Damages, "(?!.*_segments$)^dam_.*")
elif analysis_type == AnalysisLossesEnum.SINGLE_LINK_LOSSES:
return (SingleLinkLosses, "vlh_.*_total")
return (SingleLinkLosses, "^vlh_.*_total$")
elif analysis_type == AnalysisLossesEnum.MULTI_LINK_LOSSES:
return (MultiLinkLosses, "vlh_.*_total")
raise NotImplementedError(f"Analysis {analysis_type} not implemented")
return (MultiLinkLosses, "^vlh_.*_total$")
raise NotImplementedError(f"Analysis {analysis_type} not supported")

@classmethod
def from_config(
Expand Down Expand Up @@ -124,6 +128,27 @@ def from_config(
result_col=_result_col,
)

def get_result_column(self, gdf: GeoDataFrame) -> Series:
"""
Get a column from the dataframe based on the provided regex.
Args:
gdf (GeoDataFrame): The dataframe to search in.
regex (str): Regex to match the column.
Returns:
Series: The relevant column.
"""
_result_col = gdf.filter(regex=self.result_col)
if _result_col.empty:
logging.warning(
"No column found in dataframe matching the regex %s for analaysis %s. Returning NaN.",
self.result_col,
self.analysis_type,
)
return Series(nan, index=gdf.index)
return _result_col.iloc[:, 0]

def execute(self, analysis_config: AnalysisConfigWrapper) -> Series:
"""
Execute the analysis.
Expand All @@ -146,4 +171,4 @@ def execute(self, analysis_config: AnalysisConfigWrapper) -> Series:
self.analysis_input, analysis_config
).execute()
_result = _result_wrapper.get_single_result()
return _result.filter(regex=self.result_col).iloc[:, 0]
return self.get_result_column(_result)
81 changes: 54 additions & 27 deletions tests/analysis/adaptation/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,19 +73,18 @@ class AdaptationOptionCases:

@pytest.fixture(name="valid_adaptation_config")
def _get_valid_adaptation_config_fixture(
request: pytest.FixtureRequest,
valid_analysis_ini: Path,
) -> Iterator[tuple[AnalysisInputWrapper, AnalysisConfigWrapper]]:
test_result_param_case: Path,
) -> Iterator[AnalysisConfigWrapper]:
"""
Create valid input and config for the adaptation analysis.
Create valid config for the adaptation analysis.
Args:
request (pytest.FixtureRequest): Pytest fixture request.
valid_analysis_ini (Path): Path to a valid analysis ini file.
test_result_param_case (Path): Path to a valid folder for the test results.
Yields:
Iterator[tuple[AnalysisInputWrapper, AnalysisConfigWrapper]]:
Tuple with the input and config for the adaptation analysis.
Iterator[AnalysisConfigWrapper]: The config for the adaptation analysis.
"""

def get_losses_section(analysis: AnalysisLossesEnum) -> AnalysisSectionLosses:
Expand All @@ -111,25 +110,12 @@ def get_losses_section(analysis: AnalysisLossesEnum) -> AnalysisSectionLosses:
save_csv=True,
)

_root_path = test_results.joinpath(request.node.name)
# Define the paths
_root_path = test_result_param_case
_input_path = _root_path.joinpath("input")
_static_path = _root_path.joinpath("static")
_output_path = _root_path.joinpath("output")

# Create the input files
if _root_path.exists():
rmtree(_root_path)

_input_path.mkdir(parents=True)
for _option in AdaptationOptionCases.config_cases:
_ao_path = _input_path.joinpath(_option.id)
copytree(test_data.joinpath("adaptation", "input"), _ao_path)
copytree(
test_data.joinpath("adaptation", "static"),
_ao_path.joinpath("multi_link_losses", "static"),
)
copytree(test_data.joinpath("adaptation", "static"), _static_path)

# Create the config

# - network
Expand All @@ -139,7 +125,7 @@ def get_losses_section(analysis: AnalysisLossesEnum) -> AnalysisSectionLosses:
link_type_column="highway",
)
_network_config_data = NetworkConfigData(
static_path=test_results.joinpath(request.node.name, "static"),
static_path=test_results.joinpath(_static_path),
hazard=_hazard_section,
network=_network_section,
)
Expand Down Expand Up @@ -192,11 +178,52 @@ def get_losses_section(analysis: AnalysisLossesEnum) -> AnalysisSectionLosses:
valid_analysis_ini, _analysis_data, _network_config
)

yield _analysis_config


@pytest.fixture(name="valid_adaptation_config_with_input")
def _get_valid_adaptation_config_with_input_fixture(
valid_adaptation_config: AnalysisConfigWrapper,
) -> Iterator[tuple[AnalysisInputWrapper, AnalysisConfigWrapper]]:
"""
Create valid adaptation config with analysis input and files.
Args:
valid_adaptation_config (AnalysisConfigWrapper): Valid adaptation config.
Yields:
Iterator[tuple[AnalysisInputWrapper, AnalysisConfigWrapper]]:
The adaptation input and config.
"""
# Create the input files
_root_path = valid_adaptation_config.config_data.root_path
if _root_path.exists():
rmtree(_root_path)

_input_path = valid_adaptation_config.config_data.input_path
_input_path.mkdir(parents=True)
for _option in AdaptationOptionCases.config_cases:
_ao_path = _input_path.joinpath(_option.id)
copytree(test_data.joinpath("adaptation", "input"), _ao_path)
copytree(
test_data.joinpath("adaptation", "static"),
_ao_path.joinpath("multi_link_losses", "static"),
)
copytree(
test_data.joinpath("adaptation", "static"),
valid_adaptation_config.config_data.static_path,
)

# Read graph/network files
valid_adaptation_config.graph_files = NetworkConfigWrapper.read_graphs_from_config(
valid_adaptation_config.config_data.static_path.joinpath("output_graph")
)

_analysis_input = AnalysisInputWrapper.from_input(
analysis=_analysis_config.config_data.adaptation,
analysis_config=_analysis_config,
graph_file=_analysis_config.graph_files.base_network,
graph_file_hazard=_analysis_config.graph_files.base_network_hazard,
analysis=valid_adaptation_config.config_data.adaptation,
analysis_config=valid_adaptation_config,
graph_file=valid_adaptation_config.graph_files.base_network,
graph_file_hazard=valid_adaptation_config.graph_files.base_network_hazard,
)

yield (_analysis_input, _analysis_config)
yield (_analysis_input, valid_adaptation_config)
24 changes: 18 additions & 6 deletions tests/analysis/adaptation/test_adaptation.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,29 @@
class TestAdaptation:
def test_initialize(
self,
valid_adaptation_config: tuple[AnalysisInputWrapper, AnalysisConfigWrapper],
valid_adaptation_config_with_input: tuple[
AnalysisInputWrapper, AnalysisConfigWrapper
],
):
# 1./2. Define test data./Run test.
_adaptation = Adaptation(valid_adaptation_config[0], valid_adaptation_config[1])
_adaptation = Adaptation(
valid_adaptation_config_with_input[0], valid_adaptation_config_with_input[1]
)

# 3. Verify expectations.
assert isinstance(_adaptation, Adaptation)
assert isinstance(_adaptation, AnalysisBase)

def test_run_cost_returns_gdf(
self,
valid_adaptation_config: tuple[AnalysisInputWrapper, AnalysisConfigWrapper],
valid_adaptation_config_with_input: tuple[
AnalysisInputWrapper, AnalysisConfigWrapper
],
):
# 1. Define test data.
_adaptation = Adaptation(valid_adaptation_config[0], valid_adaptation_config[1])
_adaptation = Adaptation(
valid_adaptation_config_with_input[0], valid_adaptation_config_with_input[1]
)

# 2. Run test.
_result = _adaptation.run_cost()
Expand All @@ -49,10 +57,14 @@ def test_run_cost_returns_gdf(

def test_run_benefit_returns_gdf(
self,
valid_adaptation_config: tuple[AnalysisInputWrapper, AnalysisConfigWrapper],
valid_adaptation_config_with_input: tuple[
AnalysisInputWrapper, AnalysisConfigWrapper
],
):
# 1. Define test data.
_adaptation = Adaptation(valid_adaptation_config[0], valid_adaptation_config[1])
_adaptation = Adaptation(
valid_adaptation_config_with_input[0], valid_adaptation_config_with_input[1]
)

# 2. Run test.
_result = _adaptation.run_benefit()
Expand Down
55 changes: 44 additions & 11 deletions tests/analysis/adaptation/test_adaptation_option.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,30 +28,63 @@ class TestAdaptationOption:
)
def test_from_config_returns_object_with_2_analyses(
self,
valid_adaptation_config: tuple[AnalysisInputWrapper, AnalysisConfigWrapper],
valid_adaptation_config: AnalysisConfigWrapper,
losses_analysis_type: AnalysisLossesEnum,
losses_analysis: type[SingleLinkLosses | MultiLinkLosses],
):
# 1. Define test data.
_config_data = valid_adaptation_config[1].config_data
_config_data = valid_adaptation_config.config_data
assert _config_data.adaptation
_config_data.adaptation.losses_analysis = losses_analysis_type

_config_option = _config_data.adaptation.adaptation_options[0]

# 2. Run test.
_result = AdaptationOption.from_config(
analysis_config=valid_adaptation_config,
adaptation_option=_config_option,
)

# 3. Verify expectations.
assert isinstance(_result, AdaptationOption)
assert _result.id == _config_option.id
assert len(_result.analyses) == 2
assert Damages in [x.analysis_class for x in _result.analyses]
assert losses_analysis in [x.analysis_class for x in _result.analyses]

@pytest.mark.parametrize(
"keep_analyses",
[
pytest.param("damages_list", id="Only damages"),
pytest.param("losses_list", id="Only losses"),
],
)
def test_from_config_only_damages_or_losses_returns_object_with_1_analysis(
self,
valid_adaptation_config: AnalysisConfigWrapper,
keep_analyses: str,
):
# 1. Define test data.
_config_data = valid_adaptation_config.config_data
assert _config_data.adaptation
# Keep the given analyses and the adaptation.
_keep_list = getattr(_config_data, keep_analyses)
_config_data.analyses = _keep_list + [_config_data.adaptation]

_config_option = _config_data.adaptation.adaptation_options[0]

# 2. Run test.
_option = AdaptationOption.from_config(
analysis_config=valid_adaptation_config[1],
_result = AdaptationOption.from_config(
analysis_config=valid_adaptation_config,
adaptation_option=_config_option,
)

# 3. Verify expectations.
assert isinstance(_option, AdaptationOption)
assert _option.id == _config_option.id
assert len(_option.analyses) == 2
assert Damages in [x.analysis_class for x in _option.analyses]
assert losses_analysis in [x.analysis_class for x in _option.analyses]
assert isinstance(_result, AdaptationOption)
assert _result.id == _config_option.id
assert len(_result.analyses) == 1

def test_from_config_no_damages_losses_raises(self):
def test_from_config_no_damages_and_no_losses_raises(self):
# 1. Define test data.
_config = AnalysisConfigWrapper()

Expand All @@ -64,7 +97,7 @@ def test_from_config_no_damages_losses_raises(self):

# 3. Verify expectations.
assert _exc.match(
"Damages and losses sections are required to create an adaptation option."
"Damages and/or losses sections are required to create an adaptation option."
)

@pytest.mark.parametrize(
Expand Down
Loading

0 comments on commit 37ab5eb

Please sign in to comment.