Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: fix adaptation bugs #668

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 53 additions & 26 deletions ra2ce/analysis/adaptation/adaptation.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
import math
from pathlib import Path

from geopandas import GeoDataFrame
from pandas import DataFrame

from ra2ce.analysis.adaptation.adaptation_option_collection import (
AdaptationOptionCollection,
Expand Down Expand Up @@ -69,82 +71,107 @@ def execute(self) -> AnalysisResultWrapper:
Returns:
AnalysisResultWrapper: The result of the adaptation analysis.
"""
_cost_gdf = self.run_cost()
_benefit_gdf = self.run_benefit()
_cost_df = self.run_cost()
_benefit_df = self.run_benefit()

return self.generate_result_wrapper(
self.calculate_bc_ratio(_benefit_gdf, _cost_gdf)
self.calculate_bc_ratio(_benefit_df, _cost_df)
)

def run_cost(self) -> GeoDataFrame:
def run_cost(self) -> DataFrame:
"""
Calculate the link cost for all adaptation options.
The unit cost is multiplied by the length of the link.
If the hazard fraction cost is enabled, the cost is multiplied by the fraction of the link that is impacted.

Returns:
GeoDataFrame: The result of the cost calculation.
DataFrame: The result of the cost calculation.
"""
_orig_gdf = self.graph_file_hazard.get_graph()
_fraction_col = _orig_gdf.filter(regex="EV.*_fr").columns[0]

_cost_gdf = GeoDataFrame()
_cost_df = _orig_gdf[["link_id"]].copy()
for (
_option,
_cost,
) in self.adaptation_collection.calculate_options_unit_cost().items():
_cost_gdf[_option.cost_col] = _orig_gdf.apply(
_cost_df[_option.cost_col] = _orig_gdf.apply(
lambda x, cost=_cost: x["length"] * cost, axis=1
)
# Only calculate the cost for the impacted fraction of the links.
if self.analysis.hazard_fraction_cost:
_cost_gdf[_option.cost_col] *= _orig_gdf[_fraction_col]
_cost_df[_option.cost_col] *= _orig_gdf[_fraction_col]

return _cost_gdf
return _cost_df

def run_benefit(self) -> GeoDataFrame:
def run_benefit(self) -> DataFrame:
"""
Calculate the benefit for all adaptation options.

Returns:
GeoDataFrame: The result of the benefit calculation.
DataFrame: The result of the benefit calculation.
"""
return self.adaptation_collection.calculate_options_benefit()

def calculate_bc_ratio(
self, benefit_gdf: GeoDataFrame, cost_gdf: GeoDataFrame
self, benefit_df: DataFrame, cost_df: DataFrame
) -> GeoDataFrame:
"""
Calculate the benefit-cost ratio for all adaptation options.

Args:
benefit_gdf (GeoDataFrame): Gdf containing the benefit of the adaptation options.
cost_gdf (GeoDataFrame): Gdf containing the cost of the adaptation options.
benefit_df (DataFrame): Df containing the benefit of the adaptation options.
cost_df (DataFrame): Df containing the cost of the adaptation options.

Returns:
GeoDataFrame: Gdf containing the benefit-cost ratio of the adaptation options,
including the relevant attributes from the original graph (geometry).
"""

def copy_column(from_gdf: GeoDataFrame, col_name: str) -> None:
if not col_name in from_gdf.columns:
return
benefit_gdf.insert(loc=0, column=col_name, value=from_gdf[col_name])
def merge_columns(
left_df: DataFrame, right_df: DataFrame, columns: list[str]
) -> DataFrame:
# Merge 2 dataframes base on link_id
_id_col = "link_id"

# Add temporary key as the link_id to merge on contains inconsistent types (list[int] and int)
_merge_col = "temp_key"

left_df[_merge_col] = left_df[_id_col].apply(lambda x: str(x))
# Not all columns are present in both dataframes, so only merge the relevant columns
_columns = [_col for _col in columns if _col in left_df.columns]
if not _columns:
return right_df

right_df[_merge_col] = right_df[_id_col].apply(lambda x: str(x))
# Not each dataframe has the same entries in the link_id column, so use an outer merge
_merged_df = right_df.merge(
left_df[[_merge_col] + _columns],
on=_merge_col,
how="outer",
).fillna(math.nan)

return _merged_df.drop(columns=[_merge_col])

# Copy relevant columns from the original graph
_orig_gdf = self.graph_file_hazard.get_graph()
benefit_gdf.set_geometry(_orig_gdf.geometry, inplace=True)
for _col in ["length", "highway", "infra_type", "link_id"]:
copy_column(_orig_gdf, _col)
_bc_ratio_gdf = _orig_gdf[["link_id"]]
_bc_ratio_gdf = merge_columns(
_orig_gdf, _bc_ratio_gdf, ["geometry", "infra_type", "highway", "length"]
)

for _option in self.adaptation_collection.adaptation_options:
# Copy cost columns from the cost gdf
copy_column(cost_gdf, _option.cost_col)
# Copy benefit and cost column from the benefit and cost gdf
_bc_ratio_gdf = merge_columns(
benefit_df, _bc_ratio_gdf, [_option.benefit_col]
)
_bc_ratio_gdf = merge_columns(cost_df, _bc_ratio_gdf, [_option.cost_col])

benefit_gdf[_option.bc_ratio_col] = benefit_gdf[
# Calculate BC-ratio
_bc_ratio_gdf[_option.bc_ratio_col] = _bc_ratio_gdf[
_option.benefit_col
].replace(float("nan"), 0) / benefit_gdf[_option.cost_col].replace(
].replace(float("nan"), 0) / _bc_ratio_gdf[_option.cost_col].replace(
0, float("nan")
)

return benefit_gdf
return GeoDataFrame(_bc_ratio_gdf).set_geometry("geometry")
72 changes: 59 additions & 13 deletions ra2ce/analysis/adaptation/adaptation_option.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@
"""
from __future__ import annotations

import math
from collections import defaultdict
from dataclasses import asdict, dataclass
from functools import reduce

from geopandas import GeoDataFrame
from pandas import DataFrame, merge

from ra2ce.analysis.adaptation.adaptation_option_analysis import (
AdaptationOptionAnalysis,
Expand All @@ -33,6 +36,9 @@
from ra2ce.analysis.analysis_config_data.enums.analysis_damages_enum import (
AnalysisDamagesEnum,
)
from ra2ce.analysis.analysis_config_data.enums.analysis_losses_enum import (
AnalysisLossesEnum,
)
from ra2ce.analysis.analysis_config_wrapper import AnalysisConfigWrapper


Expand All @@ -55,8 +61,12 @@ def cost_col(self) -> str:
return self._get_column_name("cost")

@property
def impact_col(self) -> str:
return self._get_column_name("impact")
def net_impact_col(self) -> str:
return self._get_column_name("net_impact")

@property
def event_impact_col(self) -> str:
return self._get_column_name("event_impact")

@property
def benefit_col(self) -> str:
Expand Down Expand Up @@ -153,22 +163,58 @@ def calculate_cost(year) -> float:

return sum(calculate_cost(_year) for _year in range(0, round(time_horizon), 1))

def calculate_impact(self, net_present_value_factor: float) -> GeoDataFrame:
def calculate_impact(self, net_present_value_factor: float) -> DataFrame:
"""
Calculate the impact of the adaptation option.

Args:
net_present_value_factor (float): The net present value factor to apply to the event impact.

Returns:
GeoDataFrame: The impact of the adaptation option.
DataFrame: The impact (event and net) of the adaptation option per link.
"""
_result_gdf = GeoDataFrame()

def merge_results(
results_dict: dict[AnalysisDamagesEnum | AnalysisLossesEnum, DataFrame]
) -> DataFrame:
# Merge all result dataframes base on link_id
_id_col = "link_id"

# Add temporary key as the id column to merge on contains inconsistent types (list[int] and int)
_merge_col = "temp_key"

for i, _result in enumerate(results_dict.values()):
_result[_merge_col] = _result[_id_col].apply(lambda x: str(x))
# Drop id column if not the first result to avoid duplicate columns
if i > 0:
_result.drop(columns=[_id_col], inplace=True)

# Not each dataframe has the same entries in the link_id column, so use an outer merge
_merged_df = reduce(
lambda left, right: merge(left, right, on=[_merge_col], how="outer"),
results_dict.values(),
).fillna(math.nan)

return _merged_df.drop(columns=[_merge_col])

# Get all results from the analyses
_results: dict[
AnalysisDamagesEnum | AnalysisLossesEnum, DataFrame
] = defaultdict(DataFrame)
for _analysis in self.analyses:
_result_gdf[
f"{self.impact_col}_{_analysis.analysis_type.config_value}"
] = _analysis.execute(self.analysis_config)
_results[_analysis.analysis_type] = _analysis.execute(self.analysis_config)
_result_df = merge_results(_results)

# Add option ID to result column names (skip ID column)
_result_df.rename(
columns={x: self._get_column_name(x) for x in _result_df.columns[1:]},
inplace=True,
)

# Calculate the impact (summing the results of the analyses)
_result_gdf[self.impact_col] = (
_result_gdf.sum(axis=1) * net_present_value_factor
# Calculate the impact (summing the results of the analysis results per link)
_result_df[self.event_impact_col] = _result_df.filter(regex=self.id).sum(axis=1)
_result_df[self.net_impact_col] = (
_result_df[self.event_impact_col] * net_present_value_factor
)

return _result_gdf
return _result_df
50 changes: 28 additions & 22 deletions ra2ce/analysis/adaptation/adaptation_option_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
from __future__ import annotations

import logging
import math
from copy import deepcopy
from dataclasses import dataclass

from geopandas import GeoDataFrame
from numpy import nan
from pandas import Series
from pandas import DataFrame

from ra2ce.analysis.analysis_config_data.enums.analysis_damages_enum import (
AnalysisDamagesEnum,
Expand All @@ -37,6 +37,7 @@
from ra2ce.analysis.analysis_config_wrapper import AnalysisConfigWrapper
from ra2ce.analysis.analysis_input_wrapper import AnalysisInputWrapper
from ra2ce.analysis.damages.damages import Damages
from ra2ce.analysis.damages.damages_result_wrapper import DamagesResultWrapper
from ra2ce.analysis.losses.losses_base import LossesBase
from ra2ce.analysis.losses.multi_link_losses import MultiLinkLosses
from ra2ce.analysis.losses.single_link_losses import SingleLinkLosses
Expand All @@ -47,12 +48,13 @@ class AdaptationOptionAnalysis:
analysis_type: AnalysisDamagesEnum | AnalysisLossesEnum
analysis_class: type[Damages | LossesBase]
analysis_input: AnalysisInputWrapper
id_col: str
result_col: str

@staticmethod
def get_analysis_info(
analysis_type: AnalysisDamagesEnum | AnalysisLossesEnum,
) -> tuple[type[Damages | LossesBase], str]:
) -> tuple[type[Damages | LossesBase], str, str]:
"""
Get the analysis class and the result column for the given analysis.

Expand All @@ -63,15 +65,15 @@ def get_analysis_info(
NotImplementedError: The analysis type is not implemented.

Returns:
tuple[type[Damages | LossesBase], str]: The analysis class and the regex to find the result column.
tuple[type[Damages | LossesBase], str]: The analysis class, the name of column containing the id and the regex to find the result column.
"""
if analysis_type == AnalysisDamagesEnum.DAMAGES:
# Columnname should start with "dam_" and should not end with "_segments"
return (Damages, "(?!.*_segments$)^dam_.*")
return (Damages, "link_id", "(?!.*_segments$)^dam_.*")
elif analysis_type == AnalysisLossesEnum.SINGLE_LINK_LOSSES:
return (SingleLinkLosses, "^vlh_.*_total$")
return (SingleLinkLosses, "link_id", "^vlh_.*_total$")
elif analysis_type == AnalysisLossesEnum.MULTI_LINK_LOSSES:
return (MultiLinkLosses, "^vlh_.*_total$")
return (MultiLinkLosses, "link_id", "^vlh_.*_total$")
raise NotImplementedError(f"Analysis {analysis_type} not supported")

@classmethod
Expand Down Expand Up @@ -119,57 +121,61 @@ def from_config(
)

# Create output object
_analysis_class, _result_col = cls.get_analysis_info(analysis_type)
_analysis_class, _id_col, _result_col = cls.get_analysis_info(analysis_type)

return cls(
analysis_type=analysis_type,
analysis_class=_analysis_class,
analysis_input=_analysis_input,
id_col=_id_col,
result_col=_result_col,
)

def get_result_column(self, gdf: GeoDataFrame) -> Series:
def get_result_columns(self, result_gdf: GeoDataFrame) -> DataFrame:
"""
Get a column from the dataframe based on the provided regex.

Args:
gdf (GeoDataFrame): The dataframe to search in.
regex (str): Regex to match the column.

Returns:
Series: The relevant column.
DataFrame: The relevant columns.
"""
_result_col = gdf.filter(regex=self.result_col)
if _result_col.empty:
_result_cols = result_gdf.filter(regex=self.result_col).columns
if _result_cols.empty:
logging.warning(
"No column found in dataframe matching the regex %s for analaysis %s. Returning NaN.",
self.result_col,
self.analysis_type,
self.analysis_type.config_value,
)
return Series(nan, index=gdf.index)
return _result_col.iloc[:, 0]
return result_gdf[[self.id_col]].assign(
**{self.analysis_type.config_value: math.nan}
)
return result_gdf[[self.id_col, _result_cols[0]]].rename(
columns={_result_cols[0]: self.analysis_type.config_value}
)

def execute(self, analysis_config: AnalysisConfigWrapper) -> Series:
def execute(self, analysis_config: AnalysisConfigWrapper) -> DataFrame:
"""
Execute the analysis.

Args:
analysis_config (AnalysisConfigWrapper): The config for the analysis.

Returns:
Series: The relevant result column of the analysis.
DataFrame: The relevant result columns of the analysis.
"""
if self.analysis_class == Damages:
_result_wrapper = self.analysis_class(
self.analysis_input,
analysis_config.graph_files.base_graph_hazard.get_graph(),
).execute()
# Take the link based result
_result = _result_wrapper.results_collection[1].analysis_result
assert isinstance(_result_wrapper, DamagesResultWrapper)
_result_gdf = _result_wrapper.link_based_result.analysis_result
else:
_result_wrapper = self.analysis_class(
self.analysis_input, analysis_config
).execute()
_result = _result_wrapper.get_single_result()
_result_gdf = _result_wrapper.get_single_result()

return self.get_result_column(_result)
return self.get_result_columns(_result_gdf)
Loading