diff --git a/ra2ce/analysis/adaptation/adaptation.py b/ra2ce/analysis/adaptation/adaptation.py index 7c86593d..7f52fcd2 100644 --- a/ra2ce/analysis/adaptation/adaptation.py +++ b/ra2ce/analysis/adaptation/adaptation.py @@ -18,9 +18,11 @@ You should have received a copy of the GNU General Public License along with this program. If not, see . """ +import math from pathlib import Path from geopandas import GeoDataFrame +from pandas import DataFrame from ra2ce.analysis.adaptation.adaptation_option_collection import ( AdaptationOptionCollection, @@ -69,82 +71,107 @@ def execute(self) -> AnalysisResultWrapper: Returns: AnalysisResultWrapper: The result of the adaptation analysis. """ - _cost_gdf = self.run_cost() - _benefit_gdf = self.run_benefit() + _cost_df = self.run_cost() + _benefit_df = self.run_benefit() return self.generate_result_wrapper( - self.calculate_bc_ratio(_benefit_gdf, _cost_gdf) + self.calculate_bc_ratio(_benefit_df, _cost_df) ) - def run_cost(self) -> GeoDataFrame: + def run_cost(self) -> DataFrame: """ Calculate the link cost for all adaptation options. The unit cost is multiplied by the length of the link. If the hazard fraction cost is enabled, the cost is multiplied by the fraction of the link that is impacted. Returns: - GeoDataFrame: The result of the cost calculation. + DataFrame: The result of the cost calculation. """ _orig_gdf = self.graph_file_hazard.get_graph() _fraction_col = _orig_gdf.filter(regex="EV.*_fr").columns[0] - _cost_gdf = GeoDataFrame() + _cost_df = _orig_gdf[["link_id"]].copy() for ( _option, _cost, ) in self.adaptation_collection.calculate_options_unit_cost().items(): - _cost_gdf[_option.cost_col] = _orig_gdf.apply( + _cost_df[_option.cost_col] = _orig_gdf.apply( lambda x, cost=_cost: x["length"] * cost, axis=1 ) # Only calculate the cost for the impacted fraction of the links. if self.analysis.hazard_fraction_cost: - _cost_gdf[_option.cost_col] *= _orig_gdf[_fraction_col] + _cost_df[_option.cost_col] *= _orig_gdf[_fraction_col] - return _cost_gdf + return _cost_df - def run_benefit(self) -> GeoDataFrame: + def run_benefit(self) -> DataFrame: """ Calculate the benefit for all adaptation options. Returns: - GeoDataFrame: The result of the benefit calculation. + DataFrame: The result of the benefit calculation. """ return self.adaptation_collection.calculate_options_benefit() def calculate_bc_ratio( - self, benefit_gdf: GeoDataFrame, cost_gdf: GeoDataFrame + self, benefit_df: DataFrame, cost_df: DataFrame ) -> GeoDataFrame: """ Calculate the benefit-cost ratio for all adaptation options. Args: - benefit_gdf (GeoDataFrame): Gdf containing the benefit of the adaptation options. - cost_gdf (GeoDataFrame): Gdf containing the cost of the adaptation options. + benefit_df (DataFrame): Df containing the benefit of the adaptation options. + cost_df (DataFrame): Df containing the cost of the adaptation options. Returns: GeoDataFrame: Gdf containing the benefit-cost ratio of the adaptation options, including the relevant attributes from the original graph (geometry). """ - def copy_column(from_gdf: GeoDataFrame, col_name: str) -> None: - if not col_name in from_gdf.columns: - return - benefit_gdf.insert(loc=0, column=col_name, value=from_gdf[col_name]) + def merge_columns( + left_df: DataFrame, right_df: DataFrame, columns: list[str] + ) -> DataFrame: + # Merge 2 dataframes base on link_id + _id_col = "link_id" + + # Add temporary key as the link_id to merge on contains inconsistent types (list[int] and int) + _merge_col = "temp_key" + + left_df[_merge_col] = left_df[_id_col].apply(lambda x: str(x)) + # Not all columns are present in both dataframes, so only merge the relevant columns + _columns = [_col for _col in columns if _col in left_df.columns] + if not _columns: + return right_df + + right_df[_merge_col] = right_df[_id_col].apply(lambda x: str(x)) + # Not each dataframe has the same entries in the link_id column, so use an outer merge + _merged_df = right_df.merge( + left_df[[_merge_col] + _columns], + on=_merge_col, + how="outer", + ).fillna(math.nan) + + return _merged_df.drop(columns=[_merge_col]) # Copy relevant columns from the original graph _orig_gdf = self.graph_file_hazard.get_graph() - benefit_gdf.set_geometry(_orig_gdf.geometry, inplace=True) - for _col in ["length", "highway", "infra_type", "link_id"]: - copy_column(_orig_gdf, _col) + _bc_ratio_gdf = _orig_gdf[["link_id"]] + _bc_ratio_gdf = merge_columns( + _orig_gdf, _bc_ratio_gdf, ["geometry", "infra_type", "highway", "length"] + ) for _option in self.adaptation_collection.adaptation_options: - # Copy cost columns from the cost gdf - copy_column(cost_gdf, _option.cost_col) + # Copy benefit and cost column from the benefit and cost gdf + _bc_ratio_gdf = merge_columns( + benefit_df, _bc_ratio_gdf, [_option.benefit_col] + ) + _bc_ratio_gdf = merge_columns(cost_df, _bc_ratio_gdf, [_option.cost_col]) - benefit_gdf[_option.bc_ratio_col] = benefit_gdf[ + # Calculate BC-ratio + _bc_ratio_gdf[_option.bc_ratio_col] = _bc_ratio_gdf[ _option.benefit_col - ].replace(float("nan"), 0) / benefit_gdf[_option.cost_col].replace( + ].replace(float("nan"), 0) / _bc_ratio_gdf[_option.cost_col].replace( 0, float("nan") ) - return benefit_gdf + return GeoDataFrame(_bc_ratio_gdf).set_geometry("geometry") diff --git a/ra2ce/analysis/adaptation/adaptation_option.py b/ra2ce/analysis/adaptation/adaptation_option.py index 547fd45c..0ab7698a 100644 --- a/ra2ce/analysis/adaptation/adaptation_option.py +++ b/ra2ce/analysis/adaptation/adaptation_option.py @@ -20,9 +20,12 @@ """ from __future__ import annotations +import math +from collections import defaultdict from dataclasses import asdict, dataclass +from functools import reduce -from geopandas import GeoDataFrame +from pandas import DataFrame, merge from ra2ce.analysis.adaptation.adaptation_option_analysis import ( AdaptationOptionAnalysis, @@ -33,6 +36,9 @@ from ra2ce.analysis.analysis_config_data.enums.analysis_damages_enum import ( AnalysisDamagesEnum, ) +from ra2ce.analysis.analysis_config_data.enums.analysis_losses_enum import ( + AnalysisLossesEnum, +) from ra2ce.analysis.analysis_config_wrapper import AnalysisConfigWrapper @@ -55,8 +61,12 @@ def cost_col(self) -> str: return self._get_column_name("cost") @property - def impact_col(self) -> str: - return self._get_column_name("impact") + def net_impact_col(self) -> str: + return self._get_column_name("net_impact") + + @property + def event_impact_col(self) -> str: + return self._get_column_name("event_impact") @property def benefit_col(self) -> str: @@ -153,22 +163,58 @@ def calculate_cost(year) -> float: return sum(calculate_cost(_year) for _year in range(0, round(time_horizon), 1)) - def calculate_impact(self, net_present_value_factor: float) -> GeoDataFrame: + def calculate_impact(self, net_present_value_factor: float) -> DataFrame: """ Calculate the impact of the adaptation option. + Args: + net_present_value_factor (float): The net present value factor to apply to the event impact. + Returns: - GeoDataFrame: The impact of the adaptation option. + DataFrame: The impact (event and net) of the adaptation option per link. """ - _result_gdf = GeoDataFrame() + + def merge_results( + results_dict: dict[AnalysisDamagesEnum | AnalysisLossesEnum, DataFrame] + ) -> DataFrame: + # Merge all result dataframes base on link_id + _id_col = "link_id" + + # Add temporary key as the id column to merge on contains inconsistent types (list[int] and int) + _merge_col = "temp_key" + + for i, _result in enumerate(results_dict.values()): + _result[_merge_col] = _result[_id_col].apply(lambda x: str(x)) + # Drop id column if not the first result to avoid duplicate columns + if i > 0: + _result.drop(columns=[_id_col], inplace=True) + + # Not each dataframe has the same entries in the link_id column, so use an outer merge + _merged_df = reduce( + lambda left, right: merge(left, right, on=[_merge_col], how="outer"), + results_dict.values(), + ).fillna(math.nan) + + return _merged_df.drop(columns=[_merge_col]) + + # Get all results from the analyses + _results: dict[ + AnalysisDamagesEnum | AnalysisLossesEnum, DataFrame + ] = defaultdict(DataFrame) for _analysis in self.analyses: - _result_gdf[ - f"{self.impact_col}_{_analysis.analysis_type.config_value}" - ] = _analysis.execute(self.analysis_config) + _results[_analysis.analysis_type] = _analysis.execute(self.analysis_config) + _result_df = merge_results(_results) + + # Add option ID to result column names (skip ID column) + _result_df.rename( + columns={x: self._get_column_name(x) for x in _result_df.columns[1:]}, + inplace=True, + ) - # Calculate the impact (summing the results of the analyses) - _result_gdf[self.impact_col] = ( - _result_gdf.sum(axis=1) * net_present_value_factor + # Calculate the impact (summing the results of the analysis results per link) + _result_df[self.event_impact_col] = _result_df.filter(regex=self.id).sum(axis=1) + _result_df[self.net_impact_col] = ( + _result_df[self.event_impact_col] * net_present_value_factor ) - return _result_gdf + return _result_df diff --git a/ra2ce/analysis/adaptation/adaptation_option_analysis.py b/ra2ce/analysis/adaptation/adaptation_option_analysis.py index bb531c51..1725ca23 100644 --- a/ra2ce/analysis/adaptation/adaptation_option_analysis.py +++ b/ra2ce/analysis/adaptation/adaptation_option_analysis.py @@ -21,12 +21,12 @@ from __future__ import annotations import logging +import math from copy import deepcopy from dataclasses import dataclass from geopandas import GeoDataFrame -from numpy import nan -from pandas import Series +from pandas import DataFrame from ra2ce.analysis.analysis_config_data.enums.analysis_damages_enum import ( AnalysisDamagesEnum, @@ -37,6 +37,7 @@ from ra2ce.analysis.analysis_config_wrapper import AnalysisConfigWrapper from ra2ce.analysis.analysis_input_wrapper import AnalysisInputWrapper from ra2ce.analysis.damages.damages import Damages +from ra2ce.analysis.damages.damages_result_wrapper import DamagesResultWrapper from ra2ce.analysis.losses.losses_base import LossesBase from ra2ce.analysis.losses.multi_link_losses import MultiLinkLosses from ra2ce.analysis.losses.single_link_losses import SingleLinkLosses @@ -47,12 +48,13 @@ class AdaptationOptionAnalysis: analysis_type: AnalysisDamagesEnum | AnalysisLossesEnum analysis_class: type[Damages | LossesBase] analysis_input: AnalysisInputWrapper + id_col: str result_col: str @staticmethod def get_analysis_info( analysis_type: AnalysisDamagesEnum | AnalysisLossesEnum, - ) -> tuple[type[Damages | LossesBase], str]: + ) -> tuple[type[Damages | LossesBase], str, str]: """ Get the analysis class and the result column for the given analysis. @@ -63,15 +65,15 @@ def get_analysis_info( NotImplementedError: The analysis type is not implemented. Returns: - tuple[type[Damages | LossesBase], str]: The analysis class and the regex to find the result column. + tuple[type[Damages | LossesBase], str]: The analysis class, the name of column containing the id and the regex to find the result column. """ if analysis_type == AnalysisDamagesEnum.DAMAGES: # Columnname should start with "dam_" and should not end with "_segments" - return (Damages, "(?!.*_segments$)^dam_.*") + return (Damages, "link_id", "(?!.*_segments$)^dam_.*") elif analysis_type == AnalysisLossesEnum.SINGLE_LINK_LOSSES: - return (SingleLinkLosses, "^vlh_.*_total$") + return (SingleLinkLosses, "link_id", "^vlh_.*_total$") elif analysis_type == AnalysisLossesEnum.MULTI_LINK_LOSSES: - return (MultiLinkLosses, "^vlh_.*_total$") + return (MultiLinkLosses, "link_id", "^vlh_.*_total$") raise NotImplementedError(f"Analysis {analysis_type} not supported") @classmethod @@ -119,37 +121,41 @@ def from_config( ) # Create output object - _analysis_class, _result_col = cls.get_analysis_info(analysis_type) + _analysis_class, _id_col, _result_col = cls.get_analysis_info(analysis_type) return cls( analysis_type=analysis_type, analysis_class=_analysis_class, analysis_input=_analysis_input, + id_col=_id_col, result_col=_result_col, ) - def get_result_column(self, gdf: GeoDataFrame) -> Series: + def get_result_columns(self, result_gdf: GeoDataFrame) -> DataFrame: """ Get a column from the dataframe based on the provided regex. Args: gdf (GeoDataFrame): The dataframe to search in. - regex (str): Regex to match the column. Returns: - Series: The relevant column. + DataFrame: The relevant columns. """ - _result_col = gdf.filter(regex=self.result_col) - if _result_col.empty: + _result_cols = result_gdf.filter(regex=self.result_col).columns + if _result_cols.empty: logging.warning( "No column found in dataframe matching the regex %s for analaysis %s. Returning NaN.", self.result_col, - self.analysis_type, + self.analysis_type.config_value, ) - return Series(nan, index=gdf.index) - return _result_col.iloc[:, 0] + return result_gdf[[self.id_col]].assign( + **{self.analysis_type.config_value: math.nan} + ) + return result_gdf[[self.id_col, _result_cols[0]]].rename( + columns={_result_cols[0]: self.analysis_type.config_value} + ) - def execute(self, analysis_config: AnalysisConfigWrapper) -> Series: + def execute(self, analysis_config: AnalysisConfigWrapper) -> DataFrame: """ Execute the analysis. @@ -157,19 +163,19 @@ def execute(self, analysis_config: AnalysisConfigWrapper) -> Series: analysis_config (AnalysisConfigWrapper): The config for the analysis. Returns: - Series: The relevant result column of the analysis. + DataFrame: The relevant result columns of the analysis. """ if self.analysis_class == Damages: _result_wrapper = self.analysis_class( self.analysis_input, analysis_config.graph_files.base_graph_hazard.get_graph(), ).execute() - # Take the link based result - _result = _result_wrapper.results_collection[1].analysis_result + assert isinstance(_result_wrapper, DamagesResultWrapper) + _result_gdf = _result_wrapper.link_based_result.analysis_result else: _result_wrapper = self.analysis_class( self.analysis_input, analysis_config ).execute() - _result = _result_wrapper.get_single_result() + _result_gdf = _result_wrapper.get_single_result() - return self.get_result_column(_result) + return self.get_result_columns(_result_gdf) diff --git a/ra2ce/analysis/adaptation/adaptation_option_collection.py b/ra2ce/analysis/adaptation/adaptation_option_collection.py index 79047810..9f822436 100644 --- a/ra2ce/analysis/adaptation/adaptation_option_collection.py +++ b/ra2ce/analysis/adaptation/adaptation_option_collection.py @@ -23,7 +23,7 @@ from dataclasses import dataclass, field import numpy as np -from geopandas import GeoDataFrame +from pandas import DataFrame from ra2ce.analysis.adaptation.adaptation_option import AdaptationOption from ra2ce.analysis.analysis_config_wrapper import AnalysisConfigWrapper @@ -120,7 +120,7 @@ def calculate_options_unit_cost(self) -> dict[AdaptationOption, float]: for _option in self.adaptation_options } - def calculate_options_benefit(self) -> GeoDataFrame: + def calculate_options_benefit(self) -> DataFrame: """ Calculate the benefit of all adaptation options. The benefit is calculated by subtracting the impact of the reference option from the impact of the adaptation option. @@ -129,21 +129,20 @@ def calculate_options_benefit(self) -> GeoDataFrame: GeoDataFrame: The calculated impact of all adaptation options. """ net_present_value_factor = self.get_net_present_value_factor() - _benefit_gdf = GeoDataFrame() # Calculate impact of reference option - _impact_gdf = self.reference_option.calculate_impact(net_present_value_factor) - for _col in _impact_gdf.columns: - _benefit_gdf[_col] = _impact_gdf[_col] + _benefit_df = self.reference_option.calculate_impact(net_present_value_factor) # Calculate impact and benefit of adaptation options for _option in self.adaptation_options: _impact_gdf = _option.calculate_impact(net_present_value_factor) - for _col in _impact_gdf.columns: - _benefit_gdf[_col] = _impact_gdf[_col] - _benefit_gdf[_option.benefit_col] = ( - _benefit_gdf[_option.impact_col] - - _benefit_gdf[self.reference_option.impact_col] + # Copy columns except the id column + _result_cols = _impact_gdf.filter(regex="^(?!link_id)").columns + _benefit_df[_result_cols] = _impact_gdf[_result_cols] + # Benefit = reference impact - adaptation impact + _benefit_df[_option.benefit_col] = ( + _benefit_df[self.reference_option.net_impact_col] + - _benefit_df[_option.net_impact_col] ) - return _benefit_gdf + return _benefit_df diff --git a/tests/analysis/adaptation/test_adaptation.py b/tests/analysis/adaptation/test_adaptation.py index 407e6734..637884a0 100644 --- a/tests/analysis/adaptation/test_adaptation.py +++ b/tests/analysis/adaptation/test_adaptation.py @@ -1,8 +1,8 @@ -from dataclasses import dataclass from typing import Iterator import pytest from geopandas import GeoDataFrame +from pandas import DataFrame from shapely import Point from ra2ce.analysis.adaptation.adaptation import Adaptation @@ -33,7 +33,7 @@ def test_initialize( assert isinstance(_adaptation, Adaptation) assert isinstance(_adaptation, AnalysisBase) - def test_run_cost_returns_gdf( + def test_run_cost_returns_df( self, valid_adaptation_config_with_input: tuple[ AnalysisInputWrapper, AnalysisConfigWrapper @@ -48,7 +48,7 @@ def test_run_cost_returns_gdf( _result = _adaptation.run_cost() # 3. Verify expectations. - assert isinstance(_result, GeoDataFrame) + assert isinstance(_result, DataFrame) assert all( _option.cost_col in _result.columns for _option in _adaptation.adaptation_collection.adaptation_options @@ -58,7 +58,7 @@ def test_run_cost_returns_gdf( _expected[0] ) - def test_run_benefit_returns_gdf( + def test_run_benefit_returns_df( self, valid_adaptation_config_with_input: tuple[ AnalysisInputWrapper, AnalysisConfigWrapper @@ -73,7 +73,7 @@ def test_run_benefit_returns_gdf( _result = _adaptation.run_benefit() # 3. Verify expectations. - assert isinstance(_result, GeoDataFrame) + assert isinstance(_result, DataFrame) assert all( [ f"{_option.id}_benefit" in _result.columns @@ -92,8 +92,8 @@ class MockAdaptation(Adaptation): graph_file_hazard = NetworkFile( graph=GeoDataFrame.from_dict( data={ - "geometry": [Point(x, 0) for x in range(10)], "link_id": range(10), + "geometry": [Point(x, 0) for x in range(10)], "highway": "residential", "length": 1.0, }, @@ -125,18 +125,19 @@ def __init__(self): def test_calculate_bc_ratio_returns_gdf(self, mocked_adaptation: Adaptation): # 1. Define test data. + _id_col = "link_id" _nof_rows = 10 - _benefit_gdf = GeoDataFrame(index=range(_nof_rows)) - _cost_gdf = GeoDataFrame(index=range(_nof_rows)) + _benefit_df = DataFrame.from_dict({_id_col: range(_nof_rows)}) + _cost_df = DataFrame.from_dict({_id_col: range(_nof_rows)}) for i, _option in enumerate( mocked_adaptation.adaptation_collection.adaptation_options ): - _benefit_gdf[_option.benefit_col] = 4.0 + i - _cost_gdf[_option.cost_col] = 1.0 + i + _benefit_df[_option.benefit_col] = 4.0 + i + _cost_df[_option.cost_col] = 1.0 + i # 2. Run test. - _result = mocked_adaptation.calculate_bc_ratio(_benefit_gdf, _cost_gdf) + _result = mocked_adaptation.calculate_bc_ratio(_benefit_df, _cost_df) # 3. Verify expectations. assert isinstance(_result, GeoDataFrame) @@ -153,3 +154,28 @@ def test_calculate_bc_ratio_returns_gdf(self, mocked_adaptation: Adaptation): assert _result[_option.bc_ratio_col].sum(axis=0) == pytest.approx( _nof_rows * (4.0 + i) / (1.0 + i) ) + + def test_calculate_bc_ratio_matches_on_link_id(self, mocked_adaptation: Adaptation): + # 1. Define test data. + _id_col = "link_id" + _custom_id = [5, 6, 7, 8, 9, 0, 1, 2, 3, 4] + _benefit_df = DataFrame.from_dict( + { + _id_col: _custom_id, + "Option1_benefit": [i + 1 for i in _custom_id], + } + ) + _cost_df = DataFrame.from_dict( + { + _id_col: list(reversed(_custom_id)), + "Option1_cost": [i + 1 for i in reversed(_custom_id)], + } + ) + + # 2. Run test. + _benefit_df = mocked_adaptation.calculate_bc_ratio(_benefit_df, _cost_df) + + # 3. Verify expectations. + assert _benefit_df[ + mocked_adaptation.adaptation_collection.adaptation_options[0].bc_ratio_col + ].sum(axis=0) == pytest.approx(10.0) diff --git a/tests/analysis/adaptation/test_adaptation_option.py b/tests/analysis/adaptation/test_adaptation_option.py index df16a2e2..7c5eb29d 100644 --- a/tests/analysis/adaptation/test_adaptation_option.py +++ b/tests/analysis/adaptation/test_adaptation_option.py @@ -2,7 +2,7 @@ import pytest from geopandas import GeoDataFrame -from pandas import Series +from pandas import DataFrame, Series from ra2ce.analysis.adaptation.adaptation_option import AdaptationOption from ra2ce.analysis.adaptation.adaptation_option_analysis import ( @@ -156,22 +156,29 @@ def test_calculate_unit_cost_returns_float( assert isinstance(_result, float) assert _result == pytest.approx(net_unit_cost) - def test_calculate_impact_returns_gdf(self) -> GeoDataFrame: + def test_calculate_impact_returns_df(self): @dataclass # Mock to avoid the need to run the impact analysis. class MockAdaptationOptionAnalysis(AdaptationOptionAnalysis): result: float - def execute(self, _: AnalysisConfigWrapper) -> Series: - return Series(self.result, index=range(_nof_rows)) + def execute(self, _: AnalysisConfigWrapper) -> DataFrame: + return GeoDataFrame.from_dict( + { + _id_col: range(10), + self.analysis_type: self.result, + } + ) # 1. Define test data. + _id_col = "link_id" _nof_rows = 10 _analyses = [ MockAdaptationOptionAnalysis( analysis_type=_analysis_type, analysis_class=None, analysis_input=None, + id_col=_id_col, result_col=f"Result_{i}", result=(i + 1) * 1.0e6, ) @@ -193,10 +200,10 @@ def execute(self, _: AnalysisConfigWrapper) -> Series: ) # 2. Run test. - _result = _option.calculate_impact(1.0) + _result = _option.calculate_impact(1.1) # 3. Verify expectations. - assert isinstance(_result, GeoDataFrame) - assert _result[_option.impact_col].sum() == pytest.approx( + assert isinstance(_result, DataFrame) + assert _result[_option.event_impact_col].sum() == pytest.approx( _nof_rows * sum(x.result for x in _analyses) ) diff --git a/tests/analysis/adaptation/test_adaptation_option_analysis.py b/tests/analysis/adaptation/test_adaptation_option_analysis.py index ddc67867..0f224d94 100644 --- a/tests/analysis/adaptation/test_adaptation_option_analysis.py +++ b/tests/analysis/adaptation/test_adaptation_option_analysis.py @@ -2,7 +2,7 @@ import pytest from geopandas import GeoDataFrame -from pandas import Series +from pandas import DataFrame from ra2ce.analysis.adaptation.adaptation_option_analysis import ( AdaptationOptionAnalysis, @@ -40,7 +40,7 @@ class TestAnalysisOptionAnalysis: ), ], ) - def test_get_analysis_returns_tuple( + def test_get_analysis_info_returns_tuple( self, analysis_type: AnalysisDamagesEnum | AnalysisLossesEnum, expected_analysis: type[Damages | LossesBase], @@ -51,9 +51,9 @@ def test_get_analysis_returns_tuple( # 3. Verify expectations. assert isinstance(_result, tuple) assert _result[0] == expected_analysis - assert isinstance(_result[1], str) + assert all(isinstance(x, str) for x in _result[1:]) - def test_get_analysis_raises_not_supported_error(self): + def test_get_analysis_info_raises_not_supported_error(self): # 1. Define test data. _analysis_type = "not supported" @@ -90,27 +90,30 @@ def test_get_analysis_raises_not_supported_error(self): ), ], ) - def test_get_result_column_based_on_regex( + def test_get_result_columns_based_on_regex( self, analysis_type: AnalysisDamagesEnum | AnalysisLossesEnum, col_name: str, match: bool, ): # 1. Define test data. - _gdf = GeoDataFrame.from_dict({col_name: range(10)}) - _result_col = AdaptationOptionAnalysis.get_analysis_info(analysis_type)[1] + _id_col = "link_id" + _gdf = GeoDataFrame.from_dict({_id_col: range(10), col_name: range(10)}) + _result_col = AdaptationOptionAnalysis.get_analysis_info(analysis_type)[2] _adaption_option_analysis = AdaptationOptionAnalysis( analysis_type=analysis_type, analysis_class=None, analysis_input=None, + id_col=_id_col, result_col=_result_col, ) # 2. Run test. - _result = _adaption_option_analysis.get_result_column(_gdf) + _result = _adaption_option_analysis.get_result_columns(_gdf) # 3. Verify expectations. - assert (_result.sum() > 0) == match + assert analysis_type.config_value in _result.columns + assert (_result[analysis_type.config_value].sum() == pytest.approx(45)) == match @pytest.mark.parametrize( "analysis_type, expected_analysis", @@ -150,7 +153,7 @@ def test_from_config_returns_object( assert isinstance(_result, AdaptationOptionAnalysis) assert _result.analysis_class == expected_analysis - def test_execute_returns_series(self): + def test_execute_returns_result_df(self): class MockAnalysis(AnalysisBase, AnalysisProtocol): analysis: AnalysisConfigData.ANALYSIS_SECTION = None output_path: Path = None @@ -161,16 +164,22 @@ def __init__(self, *args) -> None: def execute(self): return self.generate_result_wrapper( GeoDataFrame.from_dict( - {_col_name: range(10), "other_column": range(1, 11, 1)} + { + _id_col: range(10), + "result_column": range(1, 11, 1), + "other_column": range(2, 12, 1), + } ) ) # 1. Define test data. - _col_name = "result_column" + _id_col = "link_id" + _analysis_type = AnalysisDamagesEnum.DAMAGES _analysis = AdaptationOptionAnalysis( - analysis_type=AnalysisDamagesEnum.DAMAGES, + analysis_type=_analysis_type, analysis_class=MockAnalysis, analysis_input=None, + id_col=_id_col, result_col="result.*", ) @@ -178,5 +187,5 @@ def execute(self): _result = _analysis.execute(None) # 3. Verify expectations. - assert isinstance(_result, Series) - assert _result.sum() == 45 + assert isinstance(_result, DataFrame) + assert _result[_analysis_type.config_value].sum() == pytest.approx(55) diff --git a/tests/analysis/adaptation/test_adaptation_option_collection.py b/tests/analysis/adaptation/test_adaptation_option_collection.py index e85642a4..9056f1a9 100644 --- a/tests/analysis/adaptation/test_adaptation_option_collection.py +++ b/tests/analysis/adaptation/test_adaptation_option_collection.py @@ -1,8 +1,7 @@ from dataclasses import dataclass import pytest -from geopandas import GeoDataFrame -from pandas import Series +from pandas import DataFrame from ra2ce.analysis.adaptation.adaptation_option import AdaptationOption from ra2ce.analysis.adaptation.adaptation_option_collection import ( @@ -67,46 +66,51 @@ def test_calculate_options_unit_cost_returns_dict( assert isinstance(_result, dict) assert all(_option in _result for _option in _collection.adaptation_options) - def test_calculate_options_benefit_returns_gdf(self): + def test_calculate_options_benefit_returns_df(self): @dataclass - class MockOption: + class MockOption(AdaptationOption): # Mock to avoid the need to run the impact analysis. - id: str - impact: float + impact: float = 0.0 - @property - def benefit_col(self) -> str: - return f"{self.id}_benefit" - - @property - def impact_col(self) -> str: - return f"{self.id}_impact" - - def calculate_impact(self, _) -> Series: - _impact_gdf = GeoDataFrame(index=range(10)) - _impact_gdf[self.impact_col] = self.impact + def calculate_impact(self, _) -> DataFrame: + _impact_gdf = DataFrame.from_dict({_id_col: range(10)}) + _impact_gdf[self.net_impact_col] = self.impact return _impact_gdf # 1. Define test data. + _id_col = "link_id" _nof_rows = 10 - _reference_benefit = 3.0e6 - _options = {f"Option{i}": _reference_benefit + (i * 1.0e6) for i in range(3)} + _reference_benefit = 5.0e6 + _options = {f"Option{i}": _reference_benefit - (i * 1.0e6) for i in range(3)} _collection = AdaptationOptionCollection( - all_options=[MockOption(id=x, impact=y) for x, y in _options.items()] + all_options=[ + MockOption( + id=x, + name=None, + construction_cost=None, + construction_interval=None, + maintenance_cost=None, + maintenance_interval=None, + analyses=None, + analysis_config=None, + impact=y, + ) + for x, y in _options.items() + ] ) # 2. Run test. _result = _collection.calculate_options_benefit() # 3. Verify expectations. - assert isinstance(_result, GeoDataFrame) + assert isinstance(_result, DataFrame) assert all( _option.benefit_col in _result.columns for _option in _collection.adaptation_options ) assert all( _result[f"{_id}_benefit"].sum(axis=0) - == pytest.approx(_nof_rows * (_impact - _reference_benefit)) + == pytest.approx(_nof_rows * (_reference_benefit - _impact)) for _id, _impact in _options.items() if _id != "Option0" ) diff --git a/tests/runners/test_adaptation_analysis_runner.py b/tests/runners/test_adaptation_analysis_runner.py index f8107f87..8bc435c9 100644 --- a/tests/runners/test_adaptation_analysis_runner.py +++ b/tests/runners/test_adaptation_analysis_runner.py @@ -152,6 +152,9 @@ def test_adapatation_can_run_and_export_result( assert _output_gdf.exists() assert _analysis_result.base_export_path.with_suffix(".csv").exists() - # Check the output geodataframe content + # Check the output geodataframe content (columns might have different order) _gdf = read_file(_output_gdf) - assert_geodataframe_equal(_gdf, _analysis_result.analysis_result) + assert _gdf.shape == _analysis_result.analysis_result.shape + assert all( + _col in _gdf.columns for _col in _analysis_result.analysis_result.columns + )