diff --git a/ra2ce/analysis/adaptation/adaptation.py b/ra2ce/analysis/adaptation/adaptation.py index 858192df3..7c86593dd 100644 --- a/ra2ce/analysis/adaptation/adaptation.py +++ b/ra2ce/analysis/adaptation/adaptation.py @@ -55,6 +55,8 @@ def __init__( ): self.analysis = analysis_input.analysis self.graph_file_hazard = analysis_input.graph_file_hazard + self.input_path = analysis_input.input_path + self.output_path = analysis_input.output_path self.adaptation_collection = AdaptationOptionCollection.from_config( analysis_config ) @@ -91,12 +93,12 @@ def run_cost(self) -> GeoDataFrame: _option, _cost, ) in self.adaptation_collection.calculate_options_unit_cost().items(): - _cost_gdf[f"{_option.id}_cost"] = _orig_gdf.apply( + _cost_gdf[_option.cost_col] = _orig_gdf.apply( lambda x, cost=_cost: x["length"] * cost, axis=1 ) # Only calculate the cost for the impacted fraction of the links. if self.analysis.hazard_fraction_cost: - _cost_gdf[f"{_option.id}_cost"] *= _orig_gdf[_fraction_col] + _cost_gdf[_option.cost_col] *= _orig_gdf[_fraction_col] return _cost_gdf @@ -120,13 +122,28 @@ def calculate_bc_ratio( cost_gdf (GeoDataFrame): Gdf containing the cost of the adaptation options. Returns: - GeoDataFrame: Gdf containing the benefit-cost ratio of the adaptation options. + GeoDataFrame: Gdf containing the benefit-cost ratio of the adaptation options, + including the relevant attributes from the original graph (geometry). """ + + def copy_column(from_gdf: GeoDataFrame, col_name: str) -> None: + if not col_name in from_gdf.columns: + return + benefit_gdf.insert(loc=0, column=col_name, value=from_gdf[col_name]) + + # Copy relevant columns from the original graph + _orig_gdf = self.graph_file_hazard.get_graph() + benefit_gdf.set_geometry(_orig_gdf.geometry, inplace=True) + for _col in ["length", "highway", "infra_type", "link_id"]: + copy_column(_orig_gdf, _col) + for _option in self.adaptation_collection.adaptation_options: - benefit_gdf[f"{_option.id}_cost"] = cost_gdf[f"{_option.id}_cost"] - benefit_gdf[f"{_option.id}_bc_ratio"] = benefit_gdf[ - f"{_option.id}_benefit" - ].replace(float("nan"), 0) / benefit_gdf[f"{_option.id}_cost"].replace( + # Copy cost columns from the cost gdf + copy_column(cost_gdf, _option.cost_col) + + benefit_gdf[_option.bc_ratio_col] = benefit_gdf[ + _option.benefit_col + ].replace(float("nan"), 0) / benefit_gdf[_option.cost_col].replace( 0, float("nan") ) diff --git a/ra2ce/analysis/adaptation/adaptation_option.py b/ra2ce/analysis/adaptation/adaptation_option.py index 9deeb93cc..547fd45c8 100644 --- a/ra2ce/analysis/adaptation/adaptation_option.py +++ b/ra2ce/analysis/adaptation/adaptation_option.py @@ -23,7 +23,6 @@ from dataclasses import asdict, dataclass from geopandas import GeoDataFrame -from pandas import Series from ra2ce.analysis.adaptation.adaptation_option_analysis import ( AdaptationOptionAnalysis, @@ -51,6 +50,25 @@ class AdaptationOption: def __hash__(self) -> int: return hash(self.id) + @property + def cost_col(self) -> str: + return self._get_column_name("cost") + + @property + def impact_col(self) -> str: + return self._get_column_name("impact") + + @property + def benefit_col(self) -> str: + return self._get_column_name("benefit") + + @property + def bc_ratio_col(self) -> str: + return self._get_column_name("bc_ratio") + + def _get_column_name(self, col_type: str) -> str: + return f"{self.id}_{col_type}" + @classmethod def from_config( cls, @@ -135,18 +153,22 @@ def calculate_cost(year) -> float: return sum(calculate_cost(_year) for _year in range(0, round(time_horizon), 1)) - def calculate_impact(self, net_present_value_factor: float) -> Series: + def calculate_impact(self, net_present_value_factor: float) -> GeoDataFrame: """ Calculate the impact of the adaptation option. Returns: - Series: The impact of the adaptation option. + GeoDataFrame: The impact of the adaptation option. """ _result_gdf = GeoDataFrame() for _analysis in self.analyses: - _result_gdf[_analysis.analysis_type] = _analysis.execute( - self.analysis_config - ) + _result_gdf[ + f"{self.impact_col}_{_analysis.analysis_type.config_value}" + ] = _analysis.execute(self.analysis_config) # Calculate the impact (summing the results of the analyses) - return _result_gdf.sum(axis=1) * net_present_value_factor + _result_gdf[self.impact_col] = ( + _result_gdf.sum(axis=1) * net_present_value_factor + ) + + return _result_gdf diff --git a/ra2ce/analysis/adaptation/adaptation_option_analysis.py b/ra2ce/analysis/adaptation/adaptation_option_analysis.py index da412f2ac..bb531c51e 100644 --- a/ra2ce/analysis/adaptation/adaptation_option_analysis.py +++ b/ra2ce/analysis/adaptation/adaptation_option_analysis.py @@ -171,4 +171,5 @@ def execute(self, analysis_config: AnalysisConfigWrapper) -> Series: self.analysis_input, analysis_config ).execute() _result = _result_wrapper.get_single_result() + return self.get_result_column(_result) diff --git a/ra2ce/analysis/adaptation/adaptation_option_collection.py b/ra2ce/analysis/adaptation/adaptation_option_collection.py index 50c605751..790478104 100644 --- a/ra2ce/analysis/adaptation/adaptation_option_collection.py +++ b/ra2ce/analysis/adaptation/adaptation_option_collection.py @@ -132,18 +132,18 @@ def calculate_options_benefit(self) -> GeoDataFrame: _benefit_gdf = GeoDataFrame() # Calculate impact of reference option - _benefit_gdf[ - f"{self.reference_option.id}_impact" - ] = self.reference_option.calculate_impact(net_present_value_factor) + _impact_gdf = self.reference_option.calculate_impact(net_present_value_factor) + for _col in _impact_gdf.columns: + _benefit_gdf[_col] = _impact_gdf[_col] # Calculate impact and benefit of adaptation options for _option in self.adaptation_options: - _benefit_gdf[f"{_option.id}_impact"] = _option.calculate_impact( - net_present_value_factor - ) - _benefit_gdf[f"{_option.id}_benefit"] = ( - _benefit_gdf[f"{_option.id}_impact"] - - _benefit_gdf[f"{self.reference_option.id}_impact"] + _impact_gdf = _option.calculate_impact(net_present_value_factor) + for _col in _impact_gdf.columns: + _benefit_gdf[_col] = _impact_gdf[_col] + _benefit_gdf[_option.benefit_col] = ( + _benefit_gdf[_option.impact_col] + - _benefit_gdf[self.reference_option.impact_col] ) return _benefit_gdf diff --git a/ra2ce/analysis/analysis_collection.py b/ra2ce/analysis/analysis_collection.py index e2664ca48..3af1f3c6c 100644 --- a/ra2ce/analysis/analysis_collection.py +++ b/ra2ce/analysis/analysis_collection.py @@ -22,12 +22,10 @@ from __future__ import annotations from dataclasses import dataclass, field -from typing import Type from ra2ce.analysis.adaptation.adaptation import Adaptation from ra2ce.analysis.analysis_config_wrapper import AnalysisConfigWrapper from ra2ce.analysis.analysis_factory import AnalysisFactory -from ra2ce.analysis.analysis_protocol import AnalysisProtocol from ra2ce.analysis.damages.analysis_damages_protocol import AnalysisDamagesProtocol from ra2ce.analysis.losses.analysis_losses_protocol import AnalysisLossesProtocol diff --git a/tests/analysis/adaptation/test_adaptation.py b/tests/analysis/adaptation/test_adaptation.py index eae887aef..407e67344 100644 --- a/tests/analysis/adaptation/test_adaptation.py +++ b/tests/analysis/adaptation/test_adaptation.py @@ -3,14 +3,17 @@ import pytest from geopandas import GeoDataFrame +from shapely import Point from ra2ce.analysis.adaptation.adaptation import Adaptation +from ra2ce.analysis.adaptation.adaptation_option import AdaptationOption from ra2ce.analysis.adaptation.adaptation_option_collection import ( AdaptationOptionCollection, ) from ra2ce.analysis.analysis_base import AnalysisBase from ra2ce.analysis.analysis_config_wrapper import AnalysisConfigWrapper from ra2ce.analysis.analysis_input_wrapper import AnalysisInputWrapper +from ra2ce.network.graph_files.network_file import NetworkFile from tests.analysis.adaptation.conftest import AdaptationOptionCases @@ -47,7 +50,7 @@ def test_run_cost_returns_gdf( # 3. Verify expectations. assert isinstance(_result, GeoDataFrame) assert all( - f"{_option.id}_cost" in _result.columns + _option.cost_col in _result.columns for _option in _adaptation.adaptation_collection.adaptation_options ) for _option, _expected in AdaptationOptionCases.cases[1:]: @@ -85,15 +88,32 @@ def test_run_benefit_returns_gdf( @pytest.fixture(name="mocked_adaptation") def _get_mocked_adaptation_fixture(self) -> Iterator[Adaptation]: # Mock to avoid complex setup. - @dataclass - class MockAdaptationOption: - id: str - class MockAdaptation(Adaptation): + graph_file_hazard = NetworkFile( + graph=GeoDataFrame.from_dict( + data={ + "geometry": [Point(x, 0) for x in range(10)], + "link_id": range(10), + "highway": "residential", + "length": 1.0, + }, + geometry="geometry", + ) + ) adaptation_collection: AdaptationOptionCollection = ( AdaptationOptionCollection( all_options=[ - MockAdaptationOption(id=f"Option{x}") for x in range(2) + AdaptationOption( + id=f"Option{x}", + name=None, + construction_cost=None, + construction_interval=None, + maintenance_cost=None, + maintenance_interval=None, + analyses=None, + analysis_config=None, + ) + for x in range(2) ] ) ) @@ -103,34 +123,33 @@ def __init__(self): yield MockAdaptation() - def test_calculate_bc_ratio_returns_gdf( - self, - mocked_adaptation: Adaptation, - ): + def test_calculate_bc_ratio_returns_gdf(self, mocked_adaptation: Adaptation): # 1. Define test data. _nof_rows = 10 - _benefit_gdf = GeoDataFrame(range(_nof_rows)) - _cost_gdf = GeoDataFrame(range(_nof_rows)) + _benefit_gdf = GeoDataFrame(index=range(_nof_rows)) + _cost_gdf = GeoDataFrame(index=range(_nof_rows)) + for i, _option in enumerate( mocked_adaptation.adaptation_collection.adaptation_options ): - _benefit_gdf[f"{_option.id}_benefit"] = 4.0 + i - _cost_gdf[f"{_option.id}_cost"] = 1.0 + i + _benefit_gdf[_option.benefit_col] = 4.0 + i + _cost_gdf[_option.cost_col] = 1.0 + i # 2. Run test. _result = mocked_adaptation.calculate_bc_ratio(_benefit_gdf, _cost_gdf) # 3. Verify expectations. assert isinstance(_result, GeoDataFrame) + assert not _result.geometry.empty assert all( [ - f"{_option.id}_bc_ratio" in _result.columns + _option.bc_ratio_col in _result.columns for _option in mocked_adaptation.adaptation_collection.adaptation_options ] ) for i, _option in enumerate( mocked_adaptation.adaptation_collection.adaptation_options ): - assert _result[f"{_option.id}_bc_ratio"].sum(axis=0) == pytest.approx( + assert _result[_option.bc_ratio_col].sum(axis=0) == pytest.approx( _nof_rows * (4.0 + i) / (1.0 + i) ) diff --git a/tests/analysis/adaptation/test_adaptation_option.py b/tests/analysis/adaptation/test_adaptation_option.py index b5239d723..df16a2e2b 100644 --- a/tests/analysis/adaptation/test_adaptation_option.py +++ b/tests/analysis/adaptation/test_adaptation_option.py @@ -5,9 +5,15 @@ from pandas import Series from ra2ce.analysis.adaptation.adaptation_option import AdaptationOption +from ra2ce.analysis.adaptation.adaptation_option_analysis import ( + AdaptationOptionAnalysis, +) from ra2ce.analysis.analysis_config_data.analysis_config_data import ( AnalysisSectionAdaptation, ) +from ra2ce.analysis.analysis_config_data.enums.analysis_damages_enum import ( + AnalysisDamagesEnum, +) from ra2ce.analysis.analysis_config_data.enums.analysis_losses_enum import ( AnalysisLossesEnum, ) @@ -129,13 +135,8 @@ def test_calculate_unit_cost_returns_float( maint_interval: float, net_unit_cost: float, ): - # Mock to avoid complex setup. - @dataclass - class MockAdaptationOption(AdaptationOption): - id: str - # 1. Define test data. - _option = MockAdaptationOption( + _option = AdaptationOption( id="AnOption", name=None, construction_cost=constr_cost, @@ -155,12 +156,10 @@ class MockAdaptationOption(AdaptationOption): assert isinstance(_result, float) assert _result == pytest.approx(net_unit_cost) - def test_calculate_impact_returns_series(self) -> GeoDataFrame: + def test_calculate_impact_returns_gdf(self) -> GeoDataFrame: @dataclass # Mock to avoid the need to run the impact analysis. - class MockAdaptationOptionAnalysis: - analysis_type: str - result_col: str + class MockAdaptationOptionAnalysis(AdaptationOptionAnalysis): result: float def execute(self, _: AnalysisConfigWrapper) -> Series: @@ -170,11 +169,16 @@ def execute(self, _: AnalysisConfigWrapper) -> Series: _nof_rows = 10 _analyses = [ MockAdaptationOptionAnalysis( - analysis_type=f"Analysis_{i}", + analysis_type=_analysis_type, + analysis_class=None, + analysis_input=None, result_col=f"Result_{i}", result=(i + 1) * 1.0e6, ) - for i in range(2) + for i, _analysis_type in zip( + range(2), + [AnalysisDamagesEnum.DAMAGES, AnalysisLossesEnum.MULTI_LINK_LOSSES], + ) ] _id = "Option1" _option = AdaptationOption( @@ -192,7 +196,7 @@ def execute(self, _: AnalysisConfigWrapper) -> Series: _result = _option.calculate_impact(1.0) # 3. Verify expectations. - assert isinstance(_result, Series) - assert _result.sum() == pytest.approx( + assert isinstance(_result, GeoDataFrame) + assert _result[_option.impact_col].sum() == pytest.approx( _nof_rows * sum(x.result for x in _analyses) ) diff --git a/tests/analysis/adaptation/test_adaptation_option_collection.py b/tests/analysis/adaptation/test_adaptation_option_collection.py index 6b7887a45..e85642a4c 100644 --- a/tests/analysis/adaptation/test_adaptation_option_collection.py +++ b/tests/analysis/adaptation/test_adaptation_option_collection.py @@ -67,15 +67,25 @@ def test_calculate_options_unit_cost_returns_dict( assert isinstance(_result, dict) assert all(_option in _result for _option in _collection.adaptation_options) - def test_calculate_options_benefit_returns_series(self): + def test_calculate_options_benefit_returns_gdf(self): @dataclass class MockOption: # Mock to avoid the need to run the impact analysis. id: str impact: float + @property + def benefit_col(self) -> str: + return f"{self.id}_benefit" + + @property + def impact_col(self) -> str: + return f"{self.id}_impact" + def calculate_impact(self, _) -> Series: - return Series(self.impact, index=range(_nof_rows)) + _impact_gdf = GeoDataFrame(index=range(10)) + _impact_gdf[self.impact_col] = self.impact + return _impact_gdf # 1. Define test data. _nof_rows = 10 @@ -91,7 +101,7 @@ def calculate_impact(self, _) -> Series: # 3. Verify expectations. assert isinstance(_result, GeoDataFrame) assert all( - f"{_option.id}_benefit" in _result.columns + _option.benefit_col in _result.columns for _option in _collection.adaptation_options ) assert all( diff --git a/tests/runners/conftest.py b/tests/runners/conftest.py index ebb0039e3..8aebf322a 100644 --- a/tests/runners/conftest.py +++ b/tests/runners/conftest.py @@ -7,6 +7,8 @@ from ra2ce.analysis.analysis_config_data.enums.analysis_damages_enum import ( AnalysisDamagesEnum, ) +from ra2ce.analysis.analysis_config_data.enums.damage_curve_enum import DamageCurveEnum +from ra2ce.analysis.analysis_config_data.enums.event_type_enum import EventTypeEnum from ra2ce.analysis.analysis_config_wrapper import AnalysisConfigWrapper from ra2ce.configuration.config_wrapper import ConfigWrapper from ra2ce.network.network_config_wrapper import NetworkConfigWrapper @@ -45,7 +47,14 @@ def _get_dummy_ra2ce_input_with_damages( dummy_ra2ce_input: ConfigWrapper, ) -> ConfigWrapper: dummy_ra2ce_input.analysis_config.config_data.analyses = [ - AnalysisSectionDamages(analysis=AnalysisDamagesEnum.DAMAGES) + AnalysisSectionDamages( + analysis=AnalysisDamagesEnum.DAMAGES, + name="Damages", + event_type=EventTypeEnum.EVENT, + damage_curve=DamageCurveEnum.MAN, + save_csv=True, + save_gpkg=True, + ) ] dummy_ra2ce_input.network_config.config_data.hazard.hazard_map = "A value" return dummy_ra2ce_input diff --git a/tests/runners/test_adaptation_analysis_runner.py b/tests/runners/test_adaptation_analysis_runner.py index c55d8b2dd..f8107f87c 100644 --- a/tests/runners/test_adaptation_analysis_runner.py +++ b/tests/runners/test_adaptation_analysis_runner.py @@ -1,9 +1,21 @@ +from pathlib import Path +from shutil import copytree, rmtree + +from geopandas import read_file +from geopandas.testing import assert_geodataframe_equal + +from ra2ce.analysis.analysis_collection import AnalysisCollection from ra2ce.analysis.analysis_config_data.analysis_config_data import ( AnalysisSectionAdaptation, AnalysisSectionAdaptationOption, ) +from ra2ce.analysis.analysis_config_data.enums.analysis_enum import AnalysisEnum +from ra2ce.analysis.analysis_factory import AnalysisFactory +from ra2ce.analysis.analysis_result.analysis_result_wrapper import AnalysisResultWrapper from ra2ce.configuration.config_wrapper import ConfigWrapper +from ra2ce.network.graph_files.graph_files_collection import GraphFilesCollection from ra2ce.runners.adaptation_analysis_runner import AdaptationAnalysisRunner +from tests import test_data class TestAdaptationAnalysisRunner: @@ -55,3 +67,91 @@ def test_given_valid_damages_and_adaptation_input_configuration_can_run( # 3. Verify expectation assert _result is True + + def test_adapatation_can_run_and_export_result( + self, + damages_ra2ce_input: ConfigWrapper, + test_result_param_case: Path, + ): + # 1. Define test data. + assert damages_ra2ce_input.analysis_config.config_data.adaptation is None + + _root_path = test_result_param_case + damages_ra2ce_input.analysis_config.config_data.root_path = _root_path + damages_ra2ce_input.analysis_config.config_data.input_path = ( + _root_path.joinpath("input") + ) + damages_ra2ce_input.analysis_config.config_data.static_path = ( + _root_path.joinpath("static") + ) + damages_ra2ce_input.analysis_config.config_data.output_path = ( + _root_path.joinpath("output") + ) + + # Add adaptation analysis to the configuration + _adaptation_config = AnalysisSectionAdaptation( + analysis=AnalysisEnum.ADAPTATION, + name="Adaptation", + adaptation_options=[ + AnalysisSectionAdaptationOption(id="AO0"), + ], + save_csv=True, + save_gpkg=True, + ) + damages_ra2ce_input.analysis_config.config_data.analyses.append( + _adaptation_config + ) + + # Copy input files for the adaptation analysis + if _root_path.exists(): + rmtree(_root_path) + damages_ra2ce_input.analysis_config.config_data.output_path.mkdir(parents=True) + for _option in _adaptation_config.adaptation_options: + _ao_path = ( + damages_ra2ce_input.analysis_config.config_data.input_path.joinpath( + _option.id + ) + ) + copytree(test_data.joinpath("adaptation", "input"), _ao_path) + copytree( + test_data.joinpath("adaptation", "static"), + damages_ra2ce_input.analysis_config.config_data.static_path, + ) + + # Read graph/network files + damages_ra2ce_input.analysis_config.graph_files = ( + GraphFilesCollection.set_files( + damages_ra2ce_input.analysis_config.config_data.static_path.joinpath( + "output_graph" + ), + ) + ) + + _analysis_collection = AnalysisCollection( + damages_analyses=None, + losses_analyses=None, + adaptation_analysis=AnalysisFactory.get_adaptation_analysis( + damages_ra2ce_input.analysis_config.config_data.adaptation, + damages_ra2ce_input.analysis_config, + ), + ) + + # 2. Run test. + _result = AdaptationAnalysisRunner().run(_analysis_collection) + + # 3. Verify expectation + assert isinstance(_result, list) + assert len(_result) == 1 + + _result_wrapper = _result[0] + assert isinstance(_result_wrapper, AnalysisResultWrapper) + assert _result_wrapper.is_valid_result() == True + + _analysis_result = _result_wrapper.results_collection[0] + _output_gdf = _analysis_result.base_export_path.with_suffix(".gpkg") + assert _output_gdf.exists() + assert _analysis_result.base_export_path.with_suffix(".csv").exists() + + # Check the output geodataframe content + _gdf = read_file(_output_gdf) + assert_geodataframe_equal(_gdf, _analysis_result.analysis_result)