diff --git a/ra2ce/analysis/adaptation/adaptation.py b/ra2ce/analysis/adaptation/adaptation.py index 9a1f38b35..716ed72fe 100644 --- a/ra2ce/analysis/adaptation/adaptation.py +++ b/ra2ce/analysis/adaptation/adaptation.py @@ -18,6 +18,7 @@ You should have received a copy of the GNU General Public License along with this program. If not, see . """ +from copy import deepcopy from pathlib import Path from geopandas import GeoDataFrame @@ -40,7 +41,7 @@ class Adaptation(AnalysisDamagesProtocol): graph_file_hazard: NetworkFile input_path: Path output_path: Path - _adaptation_options: AdaptationOptionCollection + adaptation_collection: AdaptationOptionCollection # TODO: add the proper protocol for the adaptation analysis. def __init__( @@ -51,35 +52,40 @@ def __init__( self.graph_file_hazard = analysis_input.graph_file_hazard self.input_path = analysis_input.input_path self.output_path = analysis_input.output_path - self._adaptation_options = AdaptationOptionCollection.from_config( + self.adaptation_collection = AdaptationOptionCollection.from_config( analysis_config ) - def execute(self) -> GeoDataFrame | None: + def execute(self) -> GeoDataFrame: """ Run the adaptation analysis. """ return self.calculate_bc_ratio() - def run_cost(self) -> GeoDataFrame | None: + def run_cost(self) -> GeoDataFrame: """ Calculate the cost for all adaptation options. """ # Open the network without hazard data - road_gdf = self.graph_file.get_graph() + _cost_gdf = deepcopy(self.graph_file.get_graph()) + for ( + _option, + _cost, + ) in self.adaptation_collection.calculate_option_cost().items(): + _cost_gdf[f"costs_{_option.id}"] = _cost - return 0.0 + return _cost_gdf - def run_benefit(self) -> GeoDataFrame | None: + def run_benefit(self) -> GeoDataFrame: """ Calculate the benefit for all adaptation options """ - return 0.0 + return None - def calculate_bc_ratio(self) -> GeoDataFrame | None: + def calculate_bc_ratio(self) -> GeoDataFrame: """ Calculate the benefit-cost ratio for all adaptation options """ - self.run_cost() - self.run_benefit() - return 0.0 + _cost_gdf = self.run_cost() + _benefit_gdf = self.run_benefit() + return None diff --git a/ra2ce/analysis/adaptation/adaptation_option.py b/ra2ce/analysis/adaptation/adaptation_option.py index bcaea9d9f..2a8c34b96 100644 --- a/ra2ce/analysis/adaptation/adaptation_option.py +++ b/ra2ce/analysis/adaptation/adaptation_option.py @@ -39,20 +39,41 @@ class AdaptationOption: construction_interval: float maintenance_cost: float maintenance_interval: float + damages_root: Path damages_config: AnalysisSectionDamages + losses_root: Path losses_config: AnalysisSectionLosses + def __hash__(self) -> int: + return hash(self.id) + + @property + def input_path(self) -> Path: + return self.damages_root.joinpath("input") + + @property + def static_path(self) -> Path: + return self.damages_root.joinpath("static") + + @property + def output_path(self) -> Path: + return self.damages_root.joinpath("output") + @classmethod def from_config( cls, + root_path: Path, adaptation_option: AnalysisSectionAdaptationOption, damages_section: AnalysisSectionDamages, losses_section: AnalysisSectionLosses, ) -> AdaptationOption: # Adjust path to the input files - def extend_path(analysis: str, input_path: Path | None) -> Path | None: + def extend_path(analysis: str, input_path: Path) -> Path: if not input_path: return None + # Input is directory: add stuff at the end + if not (input_path.suffix): + return input_path.joinpath("input", adaptation_option.id, analysis) return input_path.parent.joinpath( "input", adaptation_option.id, analysis, input_path.name ) @@ -62,8 +83,10 @@ def extend_path(analysis: str, input_path: Path | None) -> Path | None: "Damages and losses sections are required to create an adaptation option." ) + _damages_root = extend_path("damages", root_path) _damages_section = deepcopy(damages_section) + _losses_root = extend_path("losses", root_path) _losses_section = deepcopy(losses_section) _losses_section.resilience_curves_file = extend_path( "losses", losses_section.resilience_curves_file @@ -77,7 +100,9 @@ def extend_path(analysis: str, input_path: Path | None) -> Path | None: return cls( **asdict(adaptation_option), + damages_root=_damages_root, damages_config=_damages_section, + losses_root=_losses_root, losses_config=_losses_section, ) diff --git a/ra2ce/analysis/adaptation/adaptation_option_collection.py b/ra2ce/analysis/adaptation/adaptation_option_collection.py index 9066e3ae8..d12b313f3 100644 --- a/ra2ce/analysis/adaptation/adaptation_option_collection.py +++ b/ra2ce/analysis/adaptation/adaptation_option_collection.py @@ -76,6 +76,7 @@ def from_config( for _config_option in analysis_config_data.adaptation.adaptation_options: _collection.all_options.append( AdaptationOption.from_config( + analysis_config_data.root_path, _config_option, _damages_analysis, _losses_analysis, @@ -83,3 +84,15 @@ def from_config( ) return _collection + + def calculate_option_cost(self) -> dict[AdaptationOption, float]: + """ + Calculate the cost for all adaptation options. + """ + return { + _option: _option.calculate_cost( + self.time_horizon, + self.discount_rate, + ) + for _option in self.adaptation_options + } diff --git a/ra2ce/analysis/analysis_factory.py b/ra2ce/analysis/analysis_factory.py index 8a7e320b3..ebb22f468 100644 --- a/ra2ce/analysis/analysis_factory.py +++ b/ra2ce/analysis/analysis_factory.py @@ -82,6 +82,7 @@ def get_damages_analysis( _analysis_input = AnalysisInputWrapper.from_input( analysis=analysis, analysis_config=analysis_config, + graph_file=analysis_config.graph_files.base_network, graph_file_hazard=analysis_config.graph_files.base_network_hazard, ) diff --git a/tests/analysis/adaptation/conftest.py b/tests/analysis/adaptation/conftest.py index 8d7961dd3..fd9ee0e4d 100644 --- a/tests/analysis/adaptation/conftest.py +++ b/tests/analysis/adaptation/conftest.py @@ -17,6 +17,13 @@ from ra2ce.analysis.analysis_config_data.enums.analysis_losses_enum import ( AnalysisLossesEnum, ) +from ra2ce.analysis.analysis_config_data.enums.damage_curve_enum import DamageCurveEnum +from ra2ce.analysis.analysis_config_data.enums.event_type_enum import EventTypeEnum +from ra2ce.analysis.analysis_config_data.enums.traffic_period_enum import ( + TrafficPeriodEnum, +) +from ra2ce.analysis.analysis_config_data.enums.trip_purpose_enum import TripPurposeEnum +from ra2ce.analysis.analysis_config_data.enums.weighing_enum import WeighingEnum from tests import test_data, test_results @@ -51,6 +58,33 @@ class AdaptationOptionCases: def _get_valid_adaptation_config_fixture( request: pytest.FixtureRequest, ) -> Iterator[AnalysisConfigData]: + def get_losses_section(analysis: AnalysisLossesEnum) -> AnalysisSectionLosses: + return AnalysisSectionLosses( + analysis=analysis, + event_type=EventTypeEnum.EVENT, + weighing=WeighingEnum.TIME, + threshold=0.5, + production_loss_per_capita_per_hour=42, + traffic_period=TrafficPeriodEnum.DAY, + trip_purposes=[ + TripPurposeEnum.BUSINESS, + TripPurposeEnum.COMMUTE, + TripPurposeEnum.FREIGHT, + TripPurposeEnum.OTHER, + ], + resilience_curves_file=_root_path.joinpath( + "damage_functions", "resilience_curves.csv" + ), + traffic_intensities_file=_root_path.joinpath( + "damage_functions", "traffic_intensities.csv" + ), + values_of_time_file=_root_path.joinpath( + "damage_functions", "values_of_time.csv" + ), + save_gpkg=True, + save_csv=True, + ) + _root_path = test_results.joinpath(request.node.name) _input_path = _root_path.joinpath("input") _static_path = _root_path.joinpath("static") @@ -70,24 +104,22 @@ def _get_valid_adaptation_config_fixture( # - damages _damages_section = AnalysisSectionDamages( analysis=AnalysisDamagesEnum.DAMAGES, + event_type=EventTypeEnum.EVENT, + damage_curve=DamageCurveEnum.MAN, + save_gpkg=True, + save_csv=True, ) # - losses - _losses_section = AnalysisSectionLosses( - analysis=AnalysisLossesEnum.SINGLE_LINK_LOSSES, - resilience_curves_file=_root_path.joinpath( - "damage_functions", "resilience_curves.csv" - ), - traffic_intensities_file=_root_path.joinpath( - "damage_functions", "traffic_intensities.csv" - ), - values_of_time_file=_root_path.joinpath( - "damage_functions", "values_of_time.csv" - ), + _single_link_losses_section = get_losses_section( + AnalysisLossesEnum.SINGLE_LINK_LOSSES + ) + _multi_link_losses_section = get_losses_section( + AnalysisLossesEnum.MULTI_LINK_LOSSES ) # - adaptation _adaptation_section = AnalysisSectionAdaptation( analysis=AnalysisEnum.ADAPTATION, - losses_analysis=AnalysisLossesEnum.SINGLE_LINK_LOSSES, + losses_analysis=AnalysisLossesEnum.MULTI_LINK_LOSSES, adaptation_options=AdaptationOptionCases.config_cases, discount_rate=0.025, time_horizon=20, @@ -98,5 +130,10 @@ def _get_valid_adaptation_config_fixture( input_path=_input_path, static_path=_static_path, output_path=_output_path, - analyses=[_damages_section, _losses_section, _adaptation_section], + analyses=[ + _damages_section, + _single_link_losses_section, + _multi_link_losses_section, + _adaptation_section, + ], ) diff --git a/tests/analysis/adaptation/test_adaptation.py b/tests/analysis/adaptation/test_adaptation.py index 238b9f6c9..a5c6df601 100644 --- a/tests/analysis/adaptation/test_adaptation.py +++ b/tests/analysis/adaptation/test_adaptation.py @@ -5,17 +5,12 @@ from geopandas import GeoDataFrame from ra2ce.analysis.adaptation.adaptation import Adaptation -from ra2ce.analysis.adaptation.adaptation_option import AdaptationOption -from ra2ce.analysis.analysis_config_data.analysis_config_data import ( - AnalysisConfigData, - AnalysisSectionAdaptation, -) +from ra2ce.analysis.analysis_config_data.analysis_config_data import AnalysisConfigData from ra2ce.analysis.analysis_config_wrapper import AnalysisConfigWrapper from ra2ce.analysis.analysis_input_wrapper import AnalysisInputWrapper from ra2ce.network.network_config_data.network_config_data import NetworkConfigData from ra2ce.network.network_config_wrapper import NetworkConfigWrapper -from tests import test_data, test_results -from tests.analysis.adaptation.conftest import AdaptationOptionCases +from tests import test_results class TestAdaptation: @@ -39,24 +34,39 @@ def _get_valid_adaptation_input_fixture( _analysis_input = AnalysisInputWrapper.from_input( analysis=valid_adaptation_config.adaptation, analysis_config=_config, - graph_file=_config.graph_files.base_graph_hazard, - graph_file_hazard=_config.graph_files.base_graph_hazard, + graph_file=_config.graph_files.base_network, + graph_file_hazard=_config.graph_files.base_network_hazard, ) yield _analysis_input - @pytest.fixture(name="acceptance_adaptation_option") - def test_initialize(self, valid_adaptation_input: AnalysisInputWrapper): + def test_initialize( + self, + valid_adaptation_input: AnalysisInputWrapper, + valid_adaptation_config: AnalysisConfigData, + ): # 1./2. Define test data./Run test. - _adaptation = Adaptation(valid_adaptation_input) + _adaptation = Adaptation(valid_adaptation_input, valid_adaptation_config) # 3. Verify expectations. assert isinstance(_adaptation, Adaptation) - def test_execute_cost( + def test_run_cost( self, - request: pytest.FixtureRequest, - valid_analysis_ini: Path, + valid_adaptation_input: AnalysisInputWrapper, valid_adaptation_config: AnalysisConfigData, ): - pass + # 1. Define test data. + _adaptation = Adaptation(valid_adaptation_input, valid_adaptation_config) + + # 2. Run test. + _cost_gdf = _adaptation.run_cost() + + # 3. Verify expectations. + assert isinstance(_cost_gdf, GeoDataFrame) + assert all( + [ + f"costs_{_option.id}" in _cost_gdf.columns + for _option in _adaptation.adaptation_collection.adaptation_options + ] + ) diff --git a/tests/analysis/adaptation/test_adaptation_option.py b/tests/analysis/adaptation/test_adaptation_option.py index 869e3c1ce..0ee56be56 100644 --- a/tests/analysis/adaptation/test_adaptation_option.py +++ b/tests/analysis/adaptation/test_adaptation_option.py @@ -11,12 +11,19 @@ from ra2ce.analysis.analysis_config_data.enums.analysis_losses_enum import ( AnalysisLossesEnum, ) -from ra2ce.analysis.analysis_input_wrapper import AnalysisInputWrapper from tests.analysis.adaptation.conftest import AdaptationOptionCases class TestAdaptationOption: - def test_from_config(self, valid_adaptation_config: AnalysisConfigData): + @pytest.mark.parametrize( + "losses_analysis", + [AnalysisLossesEnum.SINGLE_LINK_LOSSES, AnalysisLossesEnum.MULTI_LINK_LOSSES], + ) + def test_from_config( + self, + valid_adaptation_config: AnalysisConfigData, + losses_analysis: AnalysisLossesEnum, + ): # 1. Define test data. _orig_path = valid_adaptation_config.losses_list[0].resilience_curves_file _expected_path = _orig_path.parent.joinpath( @@ -28,20 +35,19 @@ def test_from_config(self, valid_adaptation_config: AnalysisConfigData): # 2. Run test. _option = AdaptationOption.from_config( + root_path=valid_adaptation_config.root_path, adaptation_option=valid_adaptation_config.adaptation.adaptation_options[0], damages_section=valid_adaptation_config.get_analysis( AnalysisDamagesEnum.DAMAGES ), - losses_section=valid_adaptation_config.get_analysis( - AnalysisLossesEnum.SINGLE_LINK_LOSSES - ), + losses_section=valid_adaptation_config.get_analysis(losses_analysis), ) # 3. Verify expectations. assert isinstance(_option, AdaptationOption) assert _option.id == "AO0" assert _option.damages_config.analysis == AnalysisDamagesEnum.DAMAGES - assert _option.losses_config.analysis == AnalysisLossesEnum.SINGLE_LINK_LOSSES + assert _option.losses_config.analysis == losses_analysis assert _option.losses_config.resilience_curves_file == _expected_path def test_from_config_no_damages_losses_raises(self): @@ -51,6 +57,7 @@ def test_from_config_no_damages_losses_raises(self): # 2. Run test. with pytest.raises(ValueError) as _exc: AdaptationOption.from_config( + root_path=_config.root_path, adaptation_option=AnalysisSectionAdaptation(), damages_section=None, losses_section=None, @@ -72,9 +79,10 @@ def test_calculate_option_cost( ): # 1. Define test data. _option = AdaptationOption.from_config( - adaptation_option[0], - valid_adaptation_config.damages_list[0], - valid_adaptation_config.losses_list[0], + root_path=valid_adaptation_config.root_path, + adaptation_option=adaptation_option[0], + damages_section=valid_adaptation_config.damages_list[0], + losses_section=valid_adaptation_config.losses_list[0], ) _time_horizon = valid_adaptation_config.adaptation.time_horizon _discount_rate = valid_adaptation_config.adaptation.discount_rate diff --git a/tests/test_data/adaptation/input/damages/damage_functions/hazard_severity_damage_fraction.csv b/tests/test_data/adaptation/input/damages/input/damage_functions/all_road_types/hazard_severity_damage_fraction.csv similarity index 100% rename from tests/test_data/adaptation/input/damages/damage_functions/hazard_severity_damage_fraction.csv rename to tests/test_data/adaptation/input/damages/input/damage_functions/all_road_types/hazard_severity_damage_fraction.csv diff --git a/tests/test_data/adaptation/input/damages/damage_functions/max_damage_road_types.csv b/tests/test_data/adaptation/input/damages/input/damage_functions/all_road_types/max_damage_road_types.csv similarity index 100% rename from tests/test_data/adaptation/input/damages/damage_functions/max_damage_road_types.csv rename to tests/test_data/adaptation/input/damages/input/damage_functions/all_road_types/max_damage_road_types.csv diff --git a/tests/test_data/adaptation/input/losses/resilience_curve.csv b/tests/test_data/adaptation/input/losses/input/resilience_curve.csv similarity index 100% rename from tests/test_data/adaptation/input/losses/resilience_curve.csv rename to tests/test_data/adaptation/input/losses/input/resilience_curve.csv diff --git a/tests/test_data/adaptation/input/losses/traffic_intensities.csv b/tests/test_data/adaptation/input/losses/input/traffic_intensities.csv similarity index 100% rename from tests/test_data/adaptation/input/losses/traffic_intensities.csv rename to tests/test_data/adaptation/input/losses/input/traffic_intensities.csv diff --git a/tests/test_data/adaptation/input/losses/values_of_time.csv b/tests/test_data/adaptation/input/losses/input/values_of_time.csv similarity index 100% rename from tests/test_data/adaptation/input/losses/values_of_time.csv rename to tests/test_data/adaptation/input/losses/input/values_of_time.csv