diff --git a/ra2ce/analysis/adaptation/adaptation.py b/ra2ce/analysis/adaptation/adaptation.py index 344546a4a..a69b5d393 100644 --- a/ra2ce/analysis/adaptation/adaptation.py +++ b/ra2ce/analysis/adaptation/adaptation.py @@ -68,22 +68,30 @@ def execute(self) -> GeoDataFrame: Returns: GeoDataFrame: The result of the adaptation analysis. """ - return self.generate_result_wrapper(self.calculate_bc_ratio()) + _cost_gdf = self.run_cost() + _benefit_gdf = self.run_benefit() + + _benefit_gdf = self.calculate_bc_ratio(_benefit_gdf, _cost_gdf) + + return _benefit_gdf def run_cost(self) -> GeoDataFrame: """ - Calculate the unit cost for all adaptation options. + Calculate the link cost for all adaptation options. + Returns: + GeoDataFrame: The result of the cost calculation. Returns: GeoDataFrame: The result of the cost calculation. """ - _cost_gdf = deepcopy(self.graph_file.get_graph()) + _orig_gdf = self.graph_file.get_graph() + _cost_gdf = GeoDataFrame() for ( _option, _cost, ) in self.adaptation_collection.calculate_options_unit_cost().items(): - _cost_gdf[f"{_option.id}_cost"] = _cost_gdf.apply( + _cost_gdf[f"{_option.id}_cost"] = _orig_gdf.apply( lambda x, cost=_cost: x["length"] * cost, axis=1 ) @@ -96,22 +104,27 @@ def run_benefit(self) -> GeoDataFrame: Returns: GeoDataFrame: The result of the benefit calculation. """ - _benefit_gdf = deepcopy(self.graph_file.get_graph()) - - return self.adaptation_collection.calculation_options_impact(_benefit_gdf) + return self.adaptation_collection.calculate_options_benefit() - def calculate_bc_ratio(self) -> GeoDataFrame: + def calculate_bc_ratio( + self, benefit_gdf: GeoDataFrame, cost_gdf: GeoDataFrame + ) -> GeoDataFrame: """ Calculate the benefit-cost ratio for all adaptation options. + Args: + benefit_gdf (GeoDataFrame): Gdf containing the benefit of the adaptation options. + cost_gdf (GeoDataFrame): Gdf containing the cost of the adaptation options. + Returns: - GeoDataFrame: The result of the benefit-cost ratio calculation. + GeoDataFrame: Gdf containing the benefit-cost ratio of the adaptation options. """ - _cost_gdf = self.run_cost() - _benefit_gdf = self.run_benefit() - - # TODO: apply economic discounting - # TODO: calculate B/C ratio - # TODO: apply overlay + for _option in self.adaptation_collection.adaptation_options: + benefit_gdf[f"{_option.id}_cost"] = cost_gdf[f"{_option.id}_cost"] + benefit_gdf[f"{_option.id}_bc_ratio"] = benefit_gdf[ + f"{_option.id}_benefit" + ].replace(float("nan"), 0) / benefit_gdf[f"{_option.id}_cost"].replace( + 0, float("nan") + ) - return _cost_gdf + return benefit_gdf diff --git a/ra2ce/analysis/adaptation/adaptation_option.py b/ra2ce/analysis/adaptation/adaptation_option.py index 607b8b40c..20c81420c 100644 --- a/ra2ce/analysis/adaptation/adaptation_option.py +++ b/ra2ce/analysis/adaptation/adaptation_option.py @@ -23,6 +23,7 @@ from dataclasses import asdict, dataclass from geopandas import GeoDataFrame +from pandas import Series from ra2ce.analysis.adaptation.adaptation_option_analysis import ( AdaptationOptionAnalysis, @@ -108,53 +109,42 @@ def calculate_unit_cost(self, time_horizon: float, discount_rate: float) -> floa float: The net present value unit cost of the adaptation option. """ - def calc_years(from_year: float, to_year: float, interval: float) -> range: - return range( - round(from_year), - round(min(to_year, time_horizon)), - round(interval), - ) - - def calc_cost(cost: float, year: float) -> float: - return cost * (1 - discount_rate) ** year - - _constr_years = calc_years( - 0, - time_horizon, - self.construction_interval, - ) - _lifetime_cost = 0.0 - for _constr_year in _constr_years: - # Calculate the present value of the construction cost - _lifetime_cost += calc_cost(self.construction_cost, _constr_year) - - # Calculate the present value of the maintenance cost - _maint_years = calc_years( - _constr_year + self.maintenance_interval, - _constr_year + self.construction_interval, - self.maintenance_interval, - ) - for _maint_year in _maint_years: - _lifetime_cost += calc_cost(self.maintenance_cost, _maint_year) - - return _lifetime_cost - - def calculate_impact(self, benefit_graph: GeoDataFrame) -> GeoDataFrame: + def is_constr_year(year: float) -> bool: + if self.construction_interval == 0: + return False + return (round(year) % round(self.construction_interval)) == 0 + + def is_maint_year(year: float) -> bool: + if self.maintenance_interval == 0: + return False + if self.construction_interval > 0: + # Take year relative to last construction year + year = round(year) % round(self.construction_interval) + return (year % round(self.maintenance_interval)) == 0 + + def calculate_cost(year) -> float: + if is_constr_year(year): + _cost = self.construction_cost + elif is_maint_year(year): + _cost = self.maintenance_cost + else: + return 0.0 + return _cost / (1 + discount_rate) ** year + + return sum(calculate_cost(_year) for _year in range(0, round(time_horizon), 1)) + + def calculate_impact(self, net_present_value_factor: float) -> Series: """ Calculate the impact of the adaptation option. Returns: - float: The impact of the adaptation option. + Series: The impact of the adaptation option. """ + _result_gdf = GeoDataFrame() for _analysis in self.analyses: - _result_wrapper = _analysis.execute(self.analysis_config) - # Assumes a single result. - _analysis_result = _result_wrapper.results_collection[0].analysis_result - _col = _analysis_result.filter(regex=_analysis.result_col).columns[0] - benefit_graph[f"{self.id}_{_col}"] = _analysis_result[_col] - - # Calculate the impact (summing the damages and losses values) - _option_cols = benefit_graph.filter(regex=f"{self.id}_").columns - benefit_graph[f"{self.id}_impact"] = benefit_graph[_option_cols].sum(axis=1) + _result_gdf[_analysis.analysis_type] = _analysis.execute( + self.analysis_config + ) - return benefit_graph + # Calculate the impact (summing the results of the analyses) + return _result_gdf.sum(axis=1) * net_present_value_factor diff --git a/ra2ce/analysis/adaptation/adaptation_option_analysis.py b/ra2ce/analysis/adaptation/adaptation_option_analysis.py index 5218b4d1e..417c44a7a 100644 --- a/ra2ce/analysis/adaptation/adaptation_option_analysis.py +++ b/ra2ce/analysis/adaptation/adaptation_option_analysis.py @@ -23,7 +23,7 @@ from copy import deepcopy from dataclasses import dataclass -from geopandas import GeoDataFrame +from pandas import Series from ra2ce.analysis.analysis_config_data.enums.analysis_damages_enum import ( AnalysisDamagesEnum, @@ -33,9 +33,6 @@ ) from ra2ce.analysis.analysis_config_wrapper import AnalysisConfigWrapper from ra2ce.analysis.analysis_input_wrapper import AnalysisInputWrapper -from ra2ce.analysis.analysis_result.analysis_result_wrapper_protocol import ( - AnalysisResultWrapperProtocol, -) from ra2ce.analysis.damages.damages import Damages from ra2ce.analysis.losses.losses_base import LossesBase from ra2ce.analysis.losses.multi_link_losses import MultiLinkLosses @@ -44,12 +41,13 @@ @dataclass class AdaptationOptionAnalysis: + analysis_type: AnalysisDamagesEnum | AnalysisLossesEnum analysis_class: type[Damages | LossesBase] analysis_input: AnalysisInputWrapper result_col: str @staticmethod - def get_analysis( + def get_analysis_info( analysis_type: AnalysisDamagesEnum | AnalysisLossesEnum, ) -> tuple[type[Damages | LossesBase], str]: """ @@ -62,10 +60,10 @@ def get_analysis( NotImplementedError: The analysis type is not implemented. Returns: - tuple[type[Damages | LossesBase], str]: The analysis class and the result column. + tuple[type[Damages | LossesBase], str]: The analysis class and the regex to find the result column. """ if analysis_type == AnalysisDamagesEnum.DAMAGES: - return (Damages, "dam_") + return (Damages, "dam_.*") elif analysis_type == AnalysisLossesEnum.SINGLE_LINK_LOSSES: return (SingleLinkLosses, "vlh_.*_total") elif analysis_type == AnalysisLossesEnum.MULTI_LINK_LOSSES: @@ -117,17 +115,16 @@ def from_config( ) # Create output object - _analysis_class, _result_col = cls.get_analysis(analysis_type) + _analysis_class, _result_col = cls.get_analysis_info(analysis_type) return cls( + analysis_type=analysis_type, analysis_class=_analysis_class, analysis_input=_analysis_input, result_col=_result_col, ) - def execute( - self, analysis_config: AnalysisConfigWrapper - ) -> AnalysisResultWrapperProtocol: + def execute(self, analysis_config: AnalysisConfigWrapper) -> Series: """ Execute the analysis. @@ -135,10 +132,18 @@ def execute( analysis_config (AnalysisConfigWrapper): The config for the analysis. Returns: - DataFrame: The results of the analysis. + Series: The relevant result column of the analysis. """ if self.analysis_class == Damages: - return self.analysis_class( - self.analysis_input, analysis_config.graph_files.base_graph_hazard.graph + _result_wrapper = self.analysis_class( + self.analysis_input, + analysis_config.graph_files.base_graph_hazard.get_graph(), + ).execute() + # Take the link based result + _result = _result_wrapper.results_collection[1].analysis_result + else: + _result_wrapper = self.analysis_class( + self.analysis_input, analysis_config ).execute() - return self.analysis_class(self.analysis_input, analysis_config).execute() + _result = _result_wrapper.get_single_result() + return _result.filter(regex=self.result_col).iloc[:, 0] diff --git a/ra2ce/analysis/adaptation/adaptation_option_collection.py b/ra2ce/analysis/adaptation/adaptation_option_collection.py index 27b7c09fd..50c605751 100644 --- a/ra2ce/analysis/adaptation/adaptation_option_collection.py +++ b/ra2ce/analysis/adaptation/adaptation_option_collection.py @@ -22,6 +22,7 @@ from dataclasses import dataclass, field +import numpy as np from geopandas import GeoDataFrame from ra2ce.analysis.adaptation.adaptation_option import AdaptationOption @@ -91,6 +92,19 @@ def from_config( return _collection + def get_net_present_value_factor(self) -> float: + """ + Calculate the net present value factor for the entire time horizon. To be multiplied to the event impact to + obtain the net present value. + """ + _years_array = np.arange(0, self.time_horizon) + _frequency_per_year = ( + self.initial_frequency + _years_array * self.climate_factor + ) + _discount = (1 + self.discount_rate) ** _years_array + _ratio = _frequency_per_year / _discount + return _ratio.sum() + def calculate_options_unit_cost(self) -> dict[AdaptationOption, float]: """ Calculate the unit cost for all adaptation options. @@ -106,17 +120,30 @@ def calculate_options_unit_cost(self) -> dict[AdaptationOption, float]: for _option in self.adaptation_options } - def calculation_options_impact(self, benefit_graph: GeoDataFrame) -> GeoDataFrame: + def calculate_options_benefit(self) -> GeoDataFrame: """ - Calculate the impact of all adaptation options (including the reference option). - - Args: - benefit_graph (GeoDataFrame): The graph to which the impact of the adaptation options will be added. + Calculate the benefit of all adaptation options. + The benefit is calculated by subtracting the impact of the reference option from the impact of the adaptation option. Returns: - NetworkFile: The calculated impact of all adaptation options. + GeoDataFrame: The calculated impact of all adaptation options. """ - for _option in self.all_options: - benefit_graph = _option.calculate_impact(benefit_graph) + net_present_value_factor = self.get_net_present_value_factor() + _benefit_gdf = GeoDataFrame() + + # Calculate impact of reference option + _benefit_gdf[ + f"{self.reference_option.id}_impact" + ] = self.reference_option.calculate_impact(net_present_value_factor) + + # Calculate impact and benefit of adaptation options + for _option in self.adaptation_options: + _benefit_gdf[f"{_option.id}_impact"] = _option.calculate_impact( + net_present_value_factor + ) + _benefit_gdf[f"{_option.id}_benefit"] = ( + _benefit_gdf[f"{_option.id}_impact"] + - _benefit_gdf[f"{self.reference_option.id}_impact"] + ) - return benefit_graph + return _benefit_gdf diff --git a/ra2ce/analysis/analysis_factory.py b/ra2ce/analysis/analysis_factory.py index c9ad15caf..946b20098 100644 --- a/ra2ce/analysis/analysis_factory.py +++ b/ra2ce/analysis/analysis_factory.py @@ -88,7 +88,8 @@ def get_damages_analysis( if analysis.analysis == AnalysisDamagesEnum.DAMAGES: return Damages( - _analysis_input, analysis_config.graph_files.base_graph_hazard.graph + _analysis_input, + analysis_config.graph_files.base_graph_hazard.get_graph(), ) raise NotImplementedError(f"Analysis {analysis.analysis} not implemented") diff --git a/tests/analysis/adaptation/conftest.py b/tests/analysis/adaptation/conftest.py index ff312a607..39e4eaffd 100644 --- a/tests/analysis/adaptation/conftest.py +++ b/tests/analysis/adaptation/conftest.py @@ -64,10 +64,10 @@ class AdaptationOptionCases: maintenance_interval=3.0, ), ] - unit_cost: list[float] = [0.0, 2693.684211, 5231.908660] - total_cost: list[float] = [0.0, 97411702.122141, 189201512.873560] - cases: list[tuple[AnalysisSectionAdaptationOption, float, float]] = list( - zip(config_cases, unit_cost, total_cost) + total_cost: list[float] = [0.0, 97800589.027952, 189253296.099491] + total_benefit: list[float] = [0.0, 0.0, 0.0] + cases: list[tuple[AnalysisSectionAdaptationOption, tuple[float, float]]] = list( + zip(config_cases, zip(total_cost, total_benefit)) ) @@ -91,6 +91,7 @@ def _get_valid_adaptation_config_fixture( def get_losses_section(analysis: AnalysisLossesEnum) -> AnalysisSectionLosses: return AnalysisSectionLosses( analysis=analysis, + name="Losses", event_type=EventTypeEnum.EVENT, weighing=WeighingEnum.TIME, threshold=0, @@ -147,6 +148,7 @@ def get_losses_section(analysis: AnalysisLossesEnum) -> AnalysisSectionLosses: # - damages _damages_section = AnalysisSectionDamages( analysis=AnalysisDamagesEnum.DAMAGES, + name="Damages", event_type=EventTypeEnum.EVENT, damage_curve=DamageCurveEnum.MAN, save_gpkg=True, @@ -162,10 +164,13 @@ def get_losses_section(analysis: AnalysisLossesEnum) -> AnalysisSectionLosses: # - adaptation _adaptation_section = AnalysisSectionAdaptation( analysis=AnalysisEnum.ADAPTATION, + name="Adaptation", losses_analysis=AnalysisLossesEnum.MULTI_LINK_LOSSES, adaptation_options=AdaptationOptionCases.config_cases, discount_rate=0.025, time_horizon=20, + climate_factor=0.00036842, + initial_frequency=0.01, ) _analysis_data = AnalysisConfigData( diff --git a/tests/analysis/adaptation/test_adaptation.py b/tests/analysis/adaptation/test_adaptation.py index 96c72c269..e0a49aca2 100644 --- a/tests/analysis/adaptation/test_adaptation.py +++ b/tests/analysis/adaptation/test_adaptation.py @@ -1,7 +1,13 @@ +from dataclasses import dataclass +from typing import Iterator + import pytest from geopandas import GeoDataFrame from ra2ce.analysis.adaptation.adaptation import Adaptation +from ra2ce.analysis.adaptation.adaptation_option_collection import ( + AdaptationOptionCollection, +) from ra2ce.analysis.analysis_base import AnalysisBase from ra2ce.analysis.analysis_config_wrapper import AnalysisConfigWrapper from ra2ce.analysis.analysis_input_wrapper import AnalysisInputWrapper @@ -28,17 +34,17 @@ def test_run_cost_returns_gdf( _adaptation = Adaptation(valid_adaptation_config[0], valid_adaptation_config[1]) # 2. Run test. - _cost_gdf = _adaptation.run_cost() + _result = _adaptation.run_cost() # 3. Verify expectations. - assert isinstance(_cost_gdf, GeoDataFrame) + assert isinstance(_result, GeoDataFrame) assert all( - f"{_option.id}_cost" in _cost_gdf.columns + f"{_option.id}_cost" in _result.columns for _option in _adaptation.adaptation_collection.adaptation_options ) - for _option, _, _total_cost in AdaptationOptionCases.cases[1:]: - assert _cost_gdf[f"{_option.id}_cost"].sum(axis=0) == pytest.approx( - _total_cost + for _option, _expected in AdaptationOptionCases.cases[1:]: + assert _result[f"{_option.id}_cost"].sum(axis=0) == pytest.approx( + _expected[0] ) def test_run_benefit_returns_gdf( @@ -55,7 +61,64 @@ def test_run_benefit_returns_gdf( assert isinstance(_result, GeoDataFrame) assert all( [ - f"{_option.id}_impact" in _result.columns - for _option in _adaptation.adaptation_collection.all_options + f"{_option.id}_benefit" in _result.columns + for _option in _adaptation.adaptation_collection.adaptation_options + ] + ) + for _option, _expected in AdaptationOptionCases.cases[1:]: + assert _result[f"{_option.id}_benefit"].sum(axis=0) == pytest.approx( + _expected[1] + ) + + @pytest.fixture(name="mocked_adaptation") + def _get_mocked_adaptation_fixture(self) -> Iterator[Adaptation]: + # Mock to avoid complex setup. + @dataclass + class MockAdaptationOption: + id: str + + class MockAdaptation(Adaptation): + adaptation_collection: AdaptationOptionCollection = ( + AdaptationOptionCollection( + all_options=[ + MockAdaptationOption(id=f"Option{x}") for x in range(2) + ] + ) + ) + + def __init__(self): + pass + + yield MockAdaptation() + + def test_calculate_bc_ratio_returns_gdf( + self, + mocked_adaptation: Adaptation, + ): + # 1. Define test data. + _nof_rows = 10 + _benefit_gdf = GeoDataFrame(range(_nof_rows)) + _cost_gdf = GeoDataFrame(range(_nof_rows)) + for i, _option in enumerate( + mocked_adaptation.adaptation_collection.adaptation_options + ): + _benefit_gdf[f"{_option.id}_benefit"] = 4.0 + i + _cost_gdf[f"{_option.id}_cost"] = 1.0 + i + + # 2. Run test. + _result = mocked_adaptation.calculate_bc_ratio(_benefit_gdf, _cost_gdf) + + # 3. Verify expectations. + assert isinstance(_result, GeoDataFrame) + assert all( + [ + f"{_option.id}_bc_ratio" in _result.columns + for _option in mocked_adaptation.adaptation_collection.adaptation_options ] ) + for i, _option in enumerate( + mocked_adaptation.adaptation_collection.adaptation_options + ): + assert _result[f"{_option.id}_bc_ratio"].sum(axis=0) == pytest.approx( + _nof_rows * (4.0 + i) / (1.0 + i) + ) diff --git a/tests/analysis/adaptation/test_adaptation_option.py b/tests/analysis/adaptation/test_adaptation_option.py index f79089930..49e78ab26 100644 --- a/tests/analysis/adaptation/test_adaptation_option.py +++ b/tests/analysis/adaptation/test_adaptation_option.py @@ -1,4 +1,8 @@ +from dataclasses import dataclass + import pytest +from geopandas import GeoDataFrame +from pandas import Series from ra2ce.analysis.adaptation.adaptation_option import AdaptationOption from ra2ce.analysis.analysis_config_data.analysis_config_data import ( @@ -12,7 +16,6 @@ from ra2ce.analysis.damages.damages import Damages from ra2ce.analysis.losses.multi_link_losses import MultiLinkLosses from ra2ce.analysis.losses.single_link_losses import SingleLinkLosses -from tests.analysis.adaptation.conftest import AdaptationOptionCases class TestAdaptationOption: @@ -65,25 +68,98 @@ def test_from_config_no_damages_losses_raises(self): ) @pytest.mark.parametrize( - "adaptation_option", - AdaptationOptionCases.cases, + "constr_cost, constr_interval, maint_cost, maint_interval, net_unit_cost", + [ + pytest.param(0.0, 0.0, 0.0, 0.0, 0.0, id="Zero costs and intervals"), + pytest.param( + 1000.0, 10.0, 200.0, 3.0, 2704.437935, id="Cheap constr, exp maint" + ), + pytest.param( + 5000.0, 100.0, 50.0, 3.0, 5233.340599, id="Exp constr, cheap maint" + ), + pytest.param( + 0.0, 0.0, 1100.0, 1.0, 17576.780477, id="Zero constr cost and interval" + ), + pytest.param( + 1000.0, 100.0, 0.0, 0.0, 1000.0, id="Zero maint cost and interval" + ), + pytest.param( + 1000.0, 8.0, 100.0, 2.0, 3053.742434, id="Coinciding intervals" + ), + ], ) def test_calculate_unit_cost_returns_float( self, - valid_adaptation_config: tuple[AnalysisInputWrapper, AnalysisConfigWrapper], - adaptation_option: tuple[AnalysisSectionAdaptation, float, float], + constr_cost: float, + constr_interval: float, + maint_cost: float, + maint_interval: float, + net_unit_cost: float, ): + # Mock to avoid complex setup. + @dataclass + class MockAdaptationOption(AdaptationOption): + id: str + # 1. Define test data. - _option = AdaptationOption.from_config( - analysis_config=valid_adaptation_config[1], - adaptation_option=adaptation_option[0], + _option = MockAdaptationOption( + id="AnOption", + name=None, + construction_cost=constr_cost, + construction_interval=constr_interval, + maintenance_cost=maint_cost, + maintenance_interval=maint_interval, + analyses=[], + analysis_config=None, + ) + _time_horizon = 20.0 + _discount_rate = 0.025 + + # 2. Run test. + _result = _option.calculate_unit_cost(_time_horizon, _discount_rate) + + # 3. Verify expectations. + assert isinstance(_result, float) + assert _result == pytest.approx(net_unit_cost) + + def test_calculate_impact_returns_series(self) -> GeoDataFrame: + @dataclass + # Mock to avoid the need to run the impact analysis. + class MockAdaptationOptionAnalysis: + analysis_type: str + result_col: str + result: float + + def execute(self, _: AnalysisConfigWrapper) -> Series: + return Series(self.result, index=range(_nof_rows)) + + # 1. Define test data. + _nof_rows = 10 + _analyses = [ + MockAdaptationOptionAnalysis( + analysis_type=f"Analysis_{i}", + result_col=f"Result_{i}", + result=(i + 1) * 1.0e6, + ) + for i in range(2) + ] + _id = "Option1" + _option = AdaptationOption( + id=_id, + name=None, + construction_cost=None, + construction_interval=None, + maintenance_cost=None, + maintenance_interval=None, + analyses=_analyses, + analysis_config=None, ) - _time_horizon = valid_adaptation_config[1].config_data.adaptation.time_horizon - _discount_rate = valid_adaptation_config[1].config_data.adaptation.discount_rate # 2. Run test. - _cost = _option.calculate_unit_cost(_time_horizon, _discount_rate) + _result = _option.calculate_impact(1.0) # 3. Verify expectations. - assert isinstance(_cost, float) - assert _cost == pytest.approx(adaptation_option[1]) + assert isinstance(_result, Series) + assert _result.sum() == pytest.approx( + _nof_rows * sum(x.result for x in _analyses) + ) diff --git a/tests/analysis/adaptation/test_adaptation_option_analysis.py b/tests/analysis/adaptation/test_adaptation_option_analysis.py index eb44677d9..e3149a464 100644 --- a/tests/analysis/adaptation/test_adaptation_option_analysis.py +++ b/tests/analysis/adaptation/test_adaptation_option_analysis.py @@ -1,8 +1,14 @@ +from pathlib import Path + import pytest +from geopandas import GeoDataFrame +from pandas import Series from ra2ce.analysis.adaptation.adaptation_option_analysis import ( AdaptationOptionAnalysis, ) +from ra2ce.analysis.analysis_base import AnalysisBase +from ra2ce.analysis.analysis_config_data.analysis_config_data import AnalysisConfigData from ra2ce.analysis.analysis_config_data.enums.analysis_damages_enum import ( AnalysisDamagesEnum, ) @@ -11,6 +17,7 @@ ) from ra2ce.analysis.analysis_config_wrapper import AnalysisConfigWrapper from ra2ce.analysis.analysis_input_wrapper import AnalysisInputWrapper +from ra2ce.analysis.analysis_protocol import AnalysisProtocol from ra2ce.analysis.damages.damages import Damages from ra2ce.analysis.losses.losses_base import LossesBase from ra2ce.analysis.losses.multi_link_losses import MultiLinkLosses @@ -40,7 +47,7 @@ def test_get_analysis_returns_tuple( expected_analysis: type[Damages | LossesBase], ): # 1./2. Define test data./Run test. - _result = AdaptationOptionAnalysis.get_analysis(analysis_type) + _result = AdaptationOptionAnalysis.get_analysis_info(analysis_type) # 3. Verify expectations. assert isinstance(_result, tuple) @@ -53,7 +60,7 @@ def test_get_analysis_raises_not_implemented_error(self): # 2. Run test. with pytest.raises(NotImplementedError) as exc: - AdaptationOptionAnalysis.get_analysis(_analysis_type) + AdaptationOptionAnalysis.get_analysis_info(_analysis_type) # 3. Verify expectations. assert exc.match(f"Analysis {_analysis_type} not implemented") @@ -95,3 +102,34 @@ def test_from_config_returns_object( # 3. Verify expectations. assert isinstance(_result, AdaptationOptionAnalysis) assert _result.analysis_class == expected_analysis + + def test_execute_returns_series(self): + class MockAnalysis(AnalysisBase, AnalysisProtocol): + analysis: AnalysisConfigData.ANALYSIS_SECTION = None + output_path: Path = None + + def __init__(self, *args) -> None: + pass + + def execute(self): + return self.generate_result_wrapper( + GeoDataFrame.from_dict( + {_col_name: range(10), "other_column": range(1, 11, 1)} + ) + ) + + # 1. Define test data. + _col_name = "result_column" + _analysis = AdaptationOptionAnalysis( + analysis_type=AnalysisDamagesEnum.DAMAGES, + analysis_class=MockAnalysis, + analysis_input=None, + result_col="result.*", + ) + + # 2. Run test. + _result = _analysis.execute(None) + + # 3. Verify expectations. + assert isinstance(_result, Series) + assert _result.sum() == 45 diff --git a/tests/analysis/adaptation/test_adaptation_option_collection.py b/tests/analysis/adaptation/test_adaptation_option_collection.py index b6aa381dc..905295b58 100644 --- a/tests/analysis/adaptation/test_adaptation_option_collection.py +++ b/tests/analysis/adaptation/test_adaptation_option_collection.py @@ -1,4 +1,8 @@ +from dataclasses import dataclass + import pytest +from geopandas import GeoDataFrame +from pandas import Series from ra2ce.analysis.adaptation.adaptation_option import AdaptationOption from ra2ce.analysis.adaptation.adaptation_option_collection import ( @@ -16,7 +20,7 @@ def test_initialize(self): # 3. Verify expectations. assert isinstance(_collection, AdaptationOptionCollection) - def test_from_config( + def test_from_config_returns_object( self, valid_adaptation_config: tuple[AnalysisInputWrapper, AnalysisConfigWrapper], ): @@ -29,7 +33,10 @@ def test_from_config( assert isinstance(_collection.reference_option, AdaptationOption) assert _collection.reference_option.id == "AO0" - assert len(_collection.adaptation_options) == 2 + assert len(_collection.all_options) == len( + valid_adaptation_config[1].config_data.adaptation.adaptation_options + ) + assert all( isinstance(x, AdaptationOption) for x in _collection.adaptation_options ) @@ -47,7 +54,7 @@ def test_from_config_no_adaptation_raises(self): # 3. Verify expectations. assert _exc.match("No adaptation section found in the analysis config data.") - def test_calculate_options_unit_cost( + def test_calculate_options_unit_cost_returns_dict( self, valid_adaptation_config: tuple[AnalysisInputWrapper, AnalysisConfigWrapper], ): @@ -60,3 +67,53 @@ def test_calculate_options_unit_cost( # 3. Verify expectations. assert isinstance(_result, dict) assert all(_option in _result for _option in _collection.adaptation_options) + + def test_calculate_options_benefit_returns_series(self): + @dataclass + class MockOption: + # Mock to avoid the need to run the impact analysis. + id: str + impact: float + + def calculate_impact(self, _) -> Series: + return Series(self.impact, index=range(_nof_rows)) + + # 1. Define test data. + _nof_rows = 10 + _reference_benefit = 3.0e6 + _options = {f"Option{i}": _reference_benefit + (i * 1.0e6) for i in range(3)} + _collection = AdaptationOptionCollection( + all_options=[MockOption(id=x, impact=y) for x, y in _options.items()] + ) + + # 2. Run test. + _result = _collection.calculate_options_benefit() + + # 3. Verify expectations. + assert isinstance(_result, GeoDataFrame) + assert all( + f"{_option.id}_benefit" in _result.columns + for _option in _collection.adaptation_options + ) + assert all( + _result[f"{_id}_benefit"].sum(axis=0) + == pytest.approx(_nof_rows * (_impact - _reference_benefit)) + for _id, _impact in _options.items() + if _id != "Option0" + ) + + def test_calculate_correct_get_net_present_value_factor( + self, + valid_adaptation_config: tuple[AnalysisInputWrapper, AnalysisConfigWrapper], + ): + # 1. Define test data. + _config_wrapper = valid_adaptation_config[1] + assert isinstance(_config_wrapper, AnalysisConfigWrapper) + _collection = AdaptationOptionCollection.from_config(_config_wrapper) + + # 2. Run test. + _result = _collection.get_net_present_value_factor() + + # 3. Verify expectations. + assert isinstance(_result, float) + assert _result == pytest.approx(0.2109011023, rel=1e-9)