diff --git a/ra2ce/analysis/analysis_factory.py b/ra2ce/analysis/analysis_factory.py index 2e259a9d1..8f462e3cd 100644 --- a/ra2ce/analysis/analysis_factory.py +++ b/ra2ce/analysis/analysis_factory.py @@ -35,10 +35,10 @@ from ra2ce.analysis.damages.damages import Damages from ra2ce.analysis.damages.effectiveness_measures import EffectivenessMeasures from ra2ce.analysis.losses.analysis_losses_protocol import AnalysisLossesProtocol -from ra2ce.analysis.losses.losses import Losses from ra2ce.analysis.losses.multi_link_isolated_locations import ( MultiLinkIsolatedLocations, ) +from ra2ce.analysis.losses.multi_link_losses import MultiLinkLosses from ra2ce.analysis.losses.multi_link_origin_closest_destination import ( MultiLinkOriginClosestDestination, ) @@ -52,6 +52,7 @@ from ra2ce.analysis.losses.optimal_route_origin_destination import ( OptimalRouteOriginDestination, ) +from ra2ce.analysis.losses.single_link_losses import SingleLinkLosses from ra2ce.analysis.losses.single_link_redundancy import SingleLinkRedundancy @@ -79,10 +80,13 @@ def get_damages_analysis( analysis_config=analysis_config, graph_file_hazard=analysis_config.graph_files.base_network_hazard, ) + if analysis.analysis == AnalysisDamagesEnum.DAMAGES: return Damages(_analysis_input) + if analysis.analysis == AnalysisDamagesEnum.EFFECTIVENESS_MEASURES: return EffectivenessMeasures(_analysis_input) + raise NotImplementedError(f"Analysis {analysis.analysis} not implemented") @staticmethod @@ -110,6 +114,7 @@ def get_losses_analysis( graph_file=analysis_config.graph_files.base_graph, ) return SingleLinkRedundancy(_analysis_input) + if analysis.analysis == AnalysisLossesEnum.MULTI_LINK_REDUNDANCY: _analysis_input = AnalysisInputWrapper.from_input( analysis=analysis, @@ -117,6 +122,7 @@ def get_losses_analysis( graph_file_hazard=analysis_config.graph_files.base_graph_hazard, ) return MultiLinkRedundancy(_analysis_input) + if analysis.analysis == AnalysisLossesEnum.OPTIMAL_ROUTE_ORIGIN_DESTINATION: _analysis_input = AnalysisInputWrapper.from_input( analysis=analysis, @@ -124,6 +130,7 @@ def get_losses_analysis( graph_file=analysis_config.graph_files.origins_destinations_graph, ) return OptimalRouteOriginDestination(_analysis_input) + if analysis.analysis == AnalysisLossesEnum.MULTI_LINK_ORIGIN_DESTINATION: _analysis_input = AnalysisInputWrapper.from_input( analysis=analysis, @@ -131,6 +138,7 @@ def get_losses_analysis( graph_file_hazard=analysis_config.graph_files.origins_destinations_graph_hazard, ) return MultiLinkOriginDestination(_analysis_input) + if ( analysis.analysis == AnalysisLossesEnum.OPTIMAL_ROUTE_ORIGIN_CLOSEST_DESTINATION @@ -141,6 +149,7 @@ def get_losses_analysis( graph_file_hazard=analysis_config.graph_files.origins_destinations_graph_hazard, ) return OptimalRouteOriginClosestDestination(analysis_input=_analysis_input) + if ( analysis.analysis == AnalysisLossesEnum.MULTI_LINK_ORIGIN_CLOSEST_DESTINATION @@ -152,6 +161,7 @@ def get_losses_analysis( graph_file_hazard=analysis_config.graph_files.origins_destinations_graph_hazard, ) return MultiLinkOriginClosestDestination(_analysis_input) + if analysis.analysis == AnalysisLossesEnum.SINGLE_LINK_LOSSES: _analysis_input = AnalysisInputWrapper.from_input( analysis=analysis, @@ -159,15 +169,16 @@ def get_losses_analysis( graph_file=analysis_config.graph_files.base_graph_hazard, graph_file_hazard=analysis_config.graph_files.base_graph_hazard, ) - return Losses(_analysis_input, analysis_config) + return SingleLinkLosses(_analysis_input, analysis_config) + if analysis.analysis == AnalysisLossesEnum.MULTI_LINK_LOSSES: _analysis_input = AnalysisInputWrapper.from_input( analysis=analysis, analysis_config=analysis_config, graph_file_hazard=analysis_config.graph_files.base_graph_hazard, ) + return MultiLinkLosses(_analysis_input, analysis_config) - return Losses(_analysis_input, analysis_config) if analysis.analysis == AnalysisLossesEnum.MULTI_LINK_ISOLATED_LOCATIONS: _analysis_input = AnalysisInputWrapper.from_input( analysis=analysis, @@ -175,4 +186,5 @@ def get_losses_analysis( graph_file_hazard=analysis_config.graph_files.base_graph_hazard, ) return MultiLinkIsolatedLocations(_analysis_input) + raise NotImplementedError(f"Analysis {analysis.analysis} not implemented") diff --git a/ra2ce/analysis/losses/losses.py b/ra2ce/analysis/losses/losses_base.py similarity index 96% rename from ra2ce/analysis/losses/losses.py rename to ra2ce/analysis/losses/losses_base.py index c8ffed11a..8107de74f 100644 --- a/ra2ce/analysis/losses/losses.py +++ b/ra2ce/analysis/losses/losses_base.py @@ -19,9 +19,9 @@ along with this program. If not, see . """ -import ast import logging import math +from abc import ABC, abstractmethod from ast import literal_eval from collections import defaultdict from pathlib import Path @@ -33,15 +33,9 @@ from ra2ce.analysis.analysis_config_data.analysis_config_data import ( AnalysisSectionLosses, ) -from ra2ce.analysis.analysis_config_data.enums.analysis_losses_enum import ( - AnalysisLossesEnum, -) -from ra2ce.analysis.analysis_config_data.enums.trip_purposes import TripPurposeEnum from ra2ce.analysis.analysis_config_wrapper import AnalysisConfigWrapper from ra2ce.analysis.analysis_input_wrapper import AnalysisInputWrapper from ra2ce.analysis.losses.analysis_losses_protocol import AnalysisLossesProtocol -from ra2ce.analysis.losses.multi_link_redundancy import MultiLinkRedundancy -from ra2ce.analysis.losses.single_link_redundancy import SingleLinkRedundancy from ra2ce.network.graph_files.graph_file import GraphFile from ra2ce.network.hazard.hazard_names import HazardNames from ra2ce.network.network_config_data.enums.aggregate_wl_enum import AggregateWlEnum @@ -71,7 +65,12 @@ def _load_df_from_csv( return _csv_dataframe -class Losses(AnalysisLossesProtocol): +class LossesBase(AnalysisLossesProtocol, ABC): + """ + This class is the base class for the Losses analyses, containing the common methods and attributes. + Based on the analysis type a different criticality analysis is executed. + """ + analysis: AnalysisSectionLosses graph_file_hazard: GraphFile input_path: Path @@ -117,10 +116,10 @@ def __init__( ) self._check_validity_df() - self.input_path = self.analysis_input.input_path - self.static_path = self.analysis_input.static_path - self.output_path = self.analysis_input.output_path - self.hazard_names = self.analysis_input.hazard_names + self.input_path = analysis_input.input_path + self.static_path = analysis_input.static_path + self.output_path = analysis_input.output_path + self.hazard_names = analysis_input.hazard_names self.result = gpd.GeoDataFrame() @@ -620,11 +619,12 @@ def _get_link_types_heights_ranges(self) -> tuple[list[str], list[tuple]]: return list(_link_types), list(_hazard_intensity_ranges) + @abstractmethod + def _get_criticality_analysis(self) -> AnalysisLossesProtocol: + pass + def execute(self) -> gpd.GeoDataFrame: - if self.analysis.analysis == AnalysisLossesEnum.SINGLE_LINK_LOSSES: - criticality_analysis = SingleLinkRedundancy(self.analysis_input).execute() - elif self.analysis.analysis == AnalysisLossesEnum.MULTI_LINK_LOSSES: - criticality_analysis = MultiLinkRedundancy(self.analysis_input).execute() + criticality_analysis = self._get_criticality_analysis().execute() self._get_disrupted_criticality_analysis_results( criticality_analysis=criticality_analysis diff --git a/ra2ce/analysis/losses/multi_link_losses.py b/ra2ce/analysis/losses/multi_link_losses.py index 1d5005334..1a27c2fec 100644 --- a/ra2ce/analysis/losses/multi_link_losses.py +++ b/ra2ce/analysis/losses/multi_link_losses.py @@ -1,183 +1,18 @@ -from pathlib import Path - -import numpy as np -import pandas as pd -from geopandas import GeoDataFrame - -from ra2ce.analysis.analysis_config_data.analysis_config_data import ( - AnalysisSectionLosses, -) -from ra2ce.analysis.analysis_config_data.enums.loss_type_enum import LossTypeEnum -from ra2ce.analysis.analysis_input_wrapper import AnalysisInputWrapper -from ra2ce.analysis.losses.analysis_losses_protocol import AnalysisLossesProtocol +from ra2ce.analysis.losses.losses_base import LossesBase from ra2ce.analysis.losses.multi_link_redundancy import MultiLinkRedundancy -from ra2ce.network.graph_files.graph_file import GraphFile -from ra2ce.network.hazard.hazard_names import HazardNames - - -class MultiLinkLosses(AnalysisLossesProtocol): - analysis: AnalysisSectionLosses - graph_file_hazard: GraphFile - input_path: Path - static_path: Path - output_path: Path - hazard_names: HazardNames - _analysis_input: AnalysisInputWrapper - - def __init__( - self, - analysis_input: AnalysisInputWrapper, - ) -> None: - self.analysis = analysis_input.analysis - self.graph_file_hazard = analysis_input.graph_file_hazard - self.input_path = analysis_input.input_path - self.static_path = analysis_input.static_path - self.output_path = analysis_input.output_path - self.hazard_names = analysis_input.hazard_names - self._analysis_input = analysis_input - - def execute(self) -> GeoDataFrame: - """Calculates the multi-link redundancy losses of a NetworkX graph. - - The function removes all links of a variable that have a minimum value - of min_threshold. For each link it calculates the alternative path, if - any available. This function only removes one group at the time and saves the data from removing that group. - - Returns: - GeoDataFrame: The results of the analysis aggregated into a table. - """ - gdf = MultiLinkRedundancy(self._analysis_input).execute() - - losses_fn = self.static_path.joinpath("hazard", self.analysis.loss_per_distance) - losses_df = pd.read_excel(losses_fn, sheet_name="Sheet1") - - if self.analysis.loss_type == LossTypeEnum.CATEGORIZED: - disruption_fn = self.static_path.joinpath( - "hazard", self.analysis.disruption_per_category - ) - disruption_df = pd.read_excel(disruption_fn, sheet_name="Sheet1") - road_classes = [x for x in disruption_df.columns if "class" in x] - - results = [] - for hazard in self.hazard_names.names: - hazard_name = self.hazard_names.get_name(hazard) - - _gdf = gdf.loc[gdf["hazard"] == hazard_name].copy() - if ( - self.analysis.loss_type == LossTypeEnum.UNIFORM - ): # assume uniform threshold for disruption - for col in self.analysis.traffic_cols: - # detour_losses = traffic_per_day[veh/day] * detour_distance[meter] * cost_per_meter[USD/meter/vehicle] * duration_disruption[hour] / 24[hour/day] - _gdf.loc[_gdf["connected"] == 1, col + "_losses_detour"] = ( - _gdf[col] - * _gdf["diff_dist"] - * losses_df.loc[ - losses_df["traffic_class"] == col, "cost" - ].values[0] - * self.analysis.uniform_duration - / 24 - ) - # no_detour_losses = traffic_per_day[veh/day] * occupancy_per_vehicle[person/veh] * duration_disruption[hour] / 24[hour/day] * gdp_percapita_per_day [USD/person] - _gdf.loc[_gdf["connected"] == 0, col + "_losses_nodetour"] = ( - _gdf[col] - * losses_df.loc[ - losses_df["traffic_class"] == col, "occupancy" - ].values[0] - * self.analysis.gdp_percapita - * self.analysis.uniform_duration - / 24 - ) - _gdf["total_losses_" + hazard_name] = np.nansum( - _gdf[ - [ - x - for x in _gdf.columns - if ("losses" in x) and ("total" not in x) - ] - ], - axis=1, - ) - if ( - self.analysis.loss_type == LossTypeEnum.CATEGORIZED - ): # assume different disruption type depending on flood depth and road types - disruption_df["class_identifier"] = "" - _gdf["class_identifier"] = "" - for i, road_class in enumerate(road_classes): - disruption_df["class_identifier"] += disruption_df[road_class] - _gdf["class_identifier"] += _gdf[road_class[6:]] - if i < len(road_classes) - 1: - disruption_df["class_identifier"] += "_nextclass_" - _gdf["class_identifier"] += "_nextclass_" - all_road_categories = np.unique(_gdf["class_identifier"]) - _gdf["duration_disruption"] = 0 +class MultiLinkLosses(LossesBase): + """ + Calculates the multi-link redundancy losses of a NetworkX graph. - for lb in np.unique(disruption_df["lower_bound"]): - disruption_df_ = disruption_df.loc[ - disruption_df["lower_bound"] == lb - ] - ub = disruption_df_["upper_bound"].values[0] - if ub <= 0: - ub = 1e10 - for road_cat in all_road_categories: - _gdf.loc[ - ( - _gdf[ - hazard_name - + "_" - + self.analysis.aggregate_wl.config_value - ] - > lb - ) - & ( - _gdf[ - hazard_name - + "_" - + self.analysis.aggregate_wl.config_value - ] - <= ub - ) - & (_gdf["class_identifier"] == road_cat), - "duration_disruption", - ] = disruption_df_.loc[ - disruption_df_["class_identifier"] == road_cat, - "duration_disruption", - ].values[ - 0 - ] + The function removes all links of a variable that have a minimum value + of min_threshold. For each link it calculates the alternative path, if any available. + This function only removes one group at the time and saves the data from removing that group. - for col in self.analysis.traffic_cols: - # detour_losses = traffic_per_day[veh/day] * detour_distance[meter] * cost_per_meter[USD/meter/vehicle] * duration_disruption[hour] / 24[hour/day] - _gdf.loc[_gdf["connected"] == 1, col + "_losses_detour"] = ( - _gdf[col] - * _gdf["diff_dist"] - * losses_df.loc[ - losses_df["traffic_class"] == col, "cost" - ].values[0] - * _gdf["duration_disruption"] - / 24 - ) - # no_detour_losses = traffic_per_day[veh/day] * occupancy[person/veh] * gdp_percapita[USD/person] * duration_disruption[hour] / 24[hour/day] - _gdf.loc[_gdf["connected"] == 0, col + "_losses_nodetour"] = ( - _gdf[col] - * losses_df.loc[ - losses_df["traffic_class"] == col, "occupancy" - ].values[0] - * self.analysis.gdp_percapita - * _gdf["duration_disruption"] - / 24 - ) - _gdf["total_losses_" + hazard_name] = np.nansum( - _gdf[ - [ - x - for x in _gdf.columns - if ("losses" in x) and ("total" not in x) - ] - ], - axis=1, - ) - results.append(_gdf) + This class is based on the LossesBase abstract base class. + Don't override other methods than _get_criticality_analysis. + """ - return pd.concat(results, ignore_index=True) + def _get_criticality_analysis(self) -> MultiLinkRedundancy: + return MultiLinkRedundancy(self.analysis_input) diff --git a/ra2ce/analysis/losses/single_link_losses.py b/ra2ce/analysis/losses/single_link_losses.py index e69de29bb..3f6f4040c 100644 --- a/ra2ce/analysis/losses/single_link_losses.py +++ b/ra2ce/analysis/losses/single_link_losses.py @@ -0,0 +1,15 @@ +from ra2ce.analysis.losses.losses_base import LossesBase +from ra2ce.analysis.losses.single_link_redundancy import SingleLinkRedundancy + + +class SingleLinkLosses(LossesBase): + """ + Calculates the single-link redundancy losses of a NetworkX graph. + This is the function to analyse roads with a single link disruption and an alternative route. + + This class is based on the LossesBase abstract base class. + Don't override other methods than _get_criticality_analysis. + """ + + def _get_criticality_analysis(self) -> SingleLinkRedundancy: + return SingleLinkRedundancy(self.analysis_input) diff --git a/tests/analysis/losses/test_losses.py b/tests/analysis/losses/test_losses.py index 353f63ec1..3ed3cc2c9 100644 --- a/tests/analysis/losses/test_losses.py +++ b/tests/analysis/losses/test_losses.py @@ -1,4 +1,5 @@ from pathlib import Path +from typing import Iterator import geopandas as gpd import pandas as pd @@ -9,26 +10,41 @@ AnalysisConfigData, AnalysisSectionLosses, ) -from ra2ce.analysis.analysis_config_data.enums.analysis_losses_enum import ( - AnalysisLossesEnum, -) from ra2ce.analysis.analysis_config_data.enums.trip_purposes import TripPurposeEnum from ra2ce.analysis.analysis_config_data.enums.weighing_enum import WeighingEnum from ra2ce.analysis.analysis_config_wrapper import AnalysisConfigWrapper from ra2ce.analysis.analysis_input_wrapper import AnalysisInputWrapper -from ra2ce.analysis.losses.losses import Losses +from ra2ce.analysis.losses.analysis_losses_protocol import AnalysisLossesProtocol +from ra2ce.analysis.losses.losses_base import LossesBase +from ra2ce.analysis.losses.multi_link_losses import MultiLinkLosses +from ra2ce.analysis.losses.single_link_losses import SingleLinkLosses from ra2ce.network.network_config_data.enums.part_of_day_enum import PartOfDayEnum from ra2ce.network.network_config_wrapper import NetworkConfigWrapper from tests import test_data class TestLosses: - def test_initialize_no_data(self): + @pytest.fixture( + params=[ + pytest.param(SingleLinkLosses, id="Single link losses analysis"), + pytest.param(MultiLinkLosses, id="Multi link losses analysis"), + ], + name="losses_analysis", + ) + def _get_losses_analysis( + self, request: pytest.FixtureRequest + ) -> Iterator[AnalysisLossesProtocol]: + _analysis_losses_type = request.param + assert issubclass(_analysis_losses_type, LossesBase) + assert issubclass(_analysis_losses_type, AnalysisLossesProtocol) + yield _analysis_losses_type + + def test_initialize_no_data(self, losses_analysis: type[AnalysisLossesProtocol]): # 1. Define test data _config_data = AnalysisConfigData() _network_config = NetworkConfigWrapper() - _valid_analysis_ini = test_data / "losses" / "analyses.ini" + _valid_analysis_ini = test_data.joinpath("losses", "analyses.ini") _config = AnalysisConfigWrapper.from_data_with_network( _valid_analysis_ini, _config_data, _network_config ) @@ -43,37 +59,43 @@ def test_initialize_no_data(self): graph_file_hazard=_config.graph_files.base_graph_hazard, ) - with pytest.raises(ValueError): - _losses = Losses(_analysis_input, _config) + # 2. Run test. + with pytest.raises(ValueError) as exc: + _losses = losses_analysis(_analysis_input, _config) + + # 3. Verify final expectations. + assert ( + str(exc.value) + == "traffic_intensities_file, resilience_curve_file, and values_of_time_file should be given" + ) - def test_initialize_with_data(self): + def test_initialize_with_data(self, losses_analysis: type[AnalysisLossesProtocol]): # 1. Define test data _config_data = AnalysisConfigData() _network_config = NetworkConfigWrapper() - _valid_analysis_ini = test_data / "losses" / "analyses.ini" + _valid_analysis_ini = test_data.joinpath("losses", "analyses.ini") _config = AnalysisConfigWrapper.from_data_with_network( _valid_analysis_ini, _config_data, _network_config ) # Add extra arguments to config_data - _config.config_data.input_path = test_data / "losses" / "csv_data_for_losses" + _config.config_data.input_path = test_data.joinpath( + "losses", "csv_data_for_losses" + ) _config_data.network.file_id = "link_id" _config_data.network.link_type_column = "link_type" _analysis = AnalysisSectionLosses( part_of_day=PartOfDayEnum.DAY, - resilience_curve_file=test_data - / "losses" - / "csv_data_for_losses" - / "resilience_curve.csv", - traffic_intensities_file=test_data - / "losses" - / "csv_data_for_losses" - / "traffic_intensities.csv", - values_of_time_file=test_data - / "losses" - / "csv_data_for_losses" - / "values_of_time.csv", + resilience_curve_file=test_data.joinpath( + "losses", "csv_data_for_losses", "resilience_curve.csv" + ), + traffic_intensities_file=test_data.joinpath( + "losses", "csv_data_for_losses", "traffic_intensities.csv" + ), + values_of_time_file=test_data.joinpath( + "losses", "csv_data_for_losses", "values_of_time.csv" + ), name="single_link_redundancy_losses_test", trip_purposes=[TripPurposeEnum.BUSINESS, TripPurposeEnum.COMMUTE], ) @@ -86,10 +108,11 @@ def test_initialize_with_data(self): ) # 2. Run test. - _losses = Losses(_analysis_input, _config) + _losses = losses_analysis(_analysis_input, _config) # 3. Verify final expectations. - assert isinstance(_losses, Losses) + assert isinstance(_losses, LossesBase) + assert isinstance(_losses, losses_analysis) @pytest.mark.parametrize( "part_of_day", @@ -131,14 +154,16 @@ def create_linestring(row): _config_data = AnalysisConfigData() _network_config = NetworkConfigWrapper() - _valid_analysis_ini = test_data / "losses" / "analyses.ini" + _valid_analysis_ini = test_data.joinpath("losses", "analyses.ini") _config = AnalysisConfigWrapper.from_data_with_network( _valid_analysis_ini, _config_data, _network_config ) _config_data.network.file_id = "link_id" _config_data.network.link_type_column = "link_type" - _config.config_data.input_path = test_data / "losses" / "csv_data_for_losses" + _config.config_data.input_path = test_data.joinpath( + "losses" "csv_data_for_losses" + ) _analysis = AnalysisSectionLosses( part_of_day=part_of_day, @@ -161,13 +186,14 @@ def create_linestring(row): graph_file_hazard=_config.graph_files.base_graph_hazard, ) - _losses = Losses(_analysis_input, _config) + _losses = SingleLinkLosses(_analysis_input, _config) _losses.criticality_analysis = pd.read_csv( - test_data - / "losses" - / "csv_data_for_losses" - / "single_link_redundancy_losses_test.csv", + test_data.joinpath( + "losses", + "csv_data_for_losses", + "single_link_redundancy_losses_test.csv", + ), sep=",", on_bad_lines="skip", ) @@ -197,7 +223,9 @@ def create_linestring(row): _result = _losses.calculate_vehicle_loss_hours() _expected_result = pd.read_csv( - test_data / "losses" / "csv_data_for_losses" / "results_test_calc_vlh.csv" + test_data.joinpath( + "losses", "csv_data_for_losses", "results_test_calc_vlh.csv" + ) ) # 3. Verify final expectations. diff --git a/tests/analysis/losses/test_losses_base.py b/tests/analysis/losses/test_losses_base.py new file mode 100644 index 000000000..0af69e417 --- /dev/null +++ b/tests/analysis/losses/test_losses_base.py @@ -0,0 +1,15 @@ +import pytest + +from ra2ce.analysis.losses.losses_base import LossesBase + + +class TestLossesBase: + def test_initialize_base_class_raises(self): + # 1. Run test. + with pytest.raises(TypeError) as exc: + _losses = LossesBase(None, None) + + # 2. Verify final expectations + assert str(exc.value).startswith( + f"Can't instantiate abstract class {LossesBase.__name__}" + )