Skip to content

Commit

Permalink
Merge commit '1d0e071'
Browse files Browse the repository at this point in the history
  • Loading branch information
Carsopre committed Dec 9, 2024
2 parents 10e94c2 + 1d0e071 commit 83d5967
Show file tree
Hide file tree
Showing 10 changed files with 389 additions and 114 deletions.
45 changes: 29 additions & 16 deletions ra2ce/analysis/adaptation/adaptation.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,22 +68,30 @@ def execute(self) -> GeoDataFrame:
Returns:
GeoDataFrame: The result of the adaptation analysis.
"""
return self.generate_result_wrapper(self.calculate_bc_ratio())
_cost_gdf = self.run_cost()
_benefit_gdf = self.run_benefit()

_benefit_gdf = self.calculate_bc_ratio(_benefit_gdf, _cost_gdf)

return _benefit_gdf

def run_cost(self) -> GeoDataFrame:
"""
Calculate the unit cost for all adaptation options.
Calculate the link cost for all adaptation options.
Returns:
GeoDataFrame: The result of the cost calculation.
Returns:
GeoDataFrame: The result of the cost calculation.
"""
_cost_gdf = deepcopy(self.graph_file.get_graph())
_orig_gdf = self.graph_file.get_graph()

_cost_gdf = GeoDataFrame()
for (
_option,
_cost,
) in self.adaptation_collection.calculate_options_unit_cost().items():
_cost_gdf[f"{_option.id}_cost"] = _cost_gdf.apply(
_cost_gdf[f"{_option.id}_cost"] = _orig_gdf.apply(
lambda x, cost=_cost: x["length"] * cost, axis=1
)

Expand All @@ -96,22 +104,27 @@ def run_benefit(self) -> GeoDataFrame:
Returns:
GeoDataFrame: The result of the benefit calculation.
"""
_benefit_gdf = deepcopy(self.graph_file.get_graph())

return self.adaptation_collection.calculation_options_impact(_benefit_gdf)
return self.adaptation_collection.calculate_options_benefit()

def calculate_bc_ratio(self) -> GeoDataFrame:
def calculate_bc_ratio(
self, benefit_gdf: GeoDataFrame, cost_gdf: GeoDataFrame
) -> GeoDataFrame:
"""
Calculate the benefit-cost ratio for all adaptation options.
Args:
benefit_gdf (GeoDataFrame): Gdf containing the benefit of the adaptation options.
cost_gdf (GeoDataFrame): Gdf containing the cost of the adaptation options.
Returns:
GeoDataFrame: The result of the benefit-cost ratio calculation.
GeoDataFrame: Gdf containing the benefit-cost ratio of the adaptation options.
"""
_cost_gdf = self.run_cost()
_benefit_gdf = self.run_benefit()

# TODO: apply economic discounting
# TODO: calculate B/C ratio
# TODO: apply overlay
for _option in self.adaptation_collection.adaptation_options:
benefit_gdf[f"{_option.id}_cost"] = cost_gdf[f"{_option.id}_cost"]
benefit_gdf[f"{_option.id}_bc_ratio"] = benefit_gdf[
f"{_option.id}_benefit"
].replace(float("nan"), 0) / benefit_gdf[f"{_option.id}_cost"].replace(
0, float("nan")
)

return _cost_gdf
return benefit_gdf
76 changes: 33 additions & 43 deletions ra2ce/analysis/adaptation/adaptation_option.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from dataclasses import asdict, dataclass

from geopandas import GeoDataFrame
from pandas import Series

from ra2ce.analysis.adaptation.adaptation_option_analysis import (
AdaptationOptionAnalysis,
Expand Down Expand Up @@ -108,53 +109,42 @@ def calculate_unit_cost(self, time_horizon: float, discount_rate: float) -> floa
float: The net present value unit cost of the adaptation option.
"""

def calc_years(from_year: float, to_year: float, interval: float) -> range:
return range(
round(from_year),
round(min(to_year, time_horizon)),
round(interval),
)

def calc_cost(cost: float, year: float) -> float:
return cost * (1 - discount_rate) ** year

_constr_years = calc_years(
0,
time_horizon,
self.construction_interval,
)
_lifetime_cost = 0.0
for _constr_year in _constr_years:
# Calculate the present value of the construction cost
_lifetime_cost += calc_cost(self.construction_cost, _constr_year)

# Calculate the present value of the maintenance cost
_maint_years = calc_years(
_constr_year + self.maintenance_interval,
_constr_year + self.construction_interval,
self.maintenance_interval,
)
for _maint_year in _maint_years:
_lifetime_cost += calc_cost(self.maintenance_cost, _maint_year)

return _lifetime_cost

def calculate_impact(self, benefit_graph: GeoDataFrame) -> GeoDataFrame:
def is_constr_year(year: float) -> bool:
if self.construction_interval == 0:
return False
return (round(year) % round(self.construction_interval)) == 0

def is_maint_year(year: float) -> bool:
if self.maintenance_interval == 0:
return False
if self.construction_interval > 0:
# Take year relative to last construction year
year = round(year) % round(self.construction_interval)
return (year % round(self.maintenance_interval)) == 0

def calculate_cost(year) -> float:
if is_constr_year(year):
_cost = self.construction_cost
elif is_maint_year(year):
_cost = self.maintenance_cost
else:
return 0.0
return _cost / (1 + discount_rate) ** year

return sum(calculate_cost(_year) for _year in range(0, round(time_horizon), 1))

def calculate_impact(self, net_present_value_factor: float) -> Series:
"""
Calculate the impact of the adaptation option.
Returns:
float: The impact of the adaptation option.
Series: The impact of the adaptation option.
"""
_result_gdf = GeoDataFrame()
for _analysis in self.analyses:
_result_wrapper = _analysis.execute(self.analysis_config)
# Assumes a single result.
_analysis_result = _result_wrapper.results_collection[0].analysis_result
_col = _analysis_result.filter(regex=_analysis.result_col).columns[0]
benefit_graph[f"{self.id}_{_col}"] = _analysis_result[_col]

# Calculate the impact (summing the damages and losses values)
_option_cols = benefit_graph.filter(regex=f"{self.id}_").columns
benefit_graph[f"{self.id}_impact"] = benefit_graph[_option_cols].sum(axis=1)
_result_gdf[_analysis.analysis_type] = _analysis.execute(
self.analysis_config
)

return benefit_graph
# Calculate the impact (summing the results of the analyses)
return _result_gdf.sum(axis=1) * net_present_value_factor
35 changes: 20 additions & 15 deletions ra2ce/analysis/adaptation/adaptation_option_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from copy import deepcopy
from dataclasses import dataclass

from geopandas import GeoDataFrame
from pandas import Series

from ra2ce.analysis.analysis_config_data.enums.analysis_damages_enum import (
AnalysisDamagesEnum,
Expand All @@ -33,9 +33,6 @@
)
from ra2ce.analysis.analysis_config_wrapper import AnalysisConfigWrapper
from ra2ce.analysis.analysis_input_wrapper import AnalysisInputWrapper
from ra2ce.analysis.analysis_result.analysis_result_wrapper_protocol import (
AnalysisResultWrapperProtocol,
)
from ra2ce.analysis.damages.damages import Damages
from ra2ce.analysis.losses.losses_base import LossesBase
from ra2ce.analysis.losses.multi_link_losses import MultiLinkLosses
Expand All @@ -44,12 +41,13 @@

@dataclass
class AdaptationOptionAnalysis:
analysis_type: AnalysisDamagesEnum | AnalysisLossesEnum
analysis_class: type[Damages | LossesBase]
analysis_input: AnalysisInputWrapper
result_col: str

@staticmethod
def get_analysis(
def get_analysis_info(
analysis_type: AnalysisDamagesEnum | AnalysisLossesEnum,
) -> tuple[type[Damages | LossesBase], str]:
"""
Expand All @@ -62,10 +60,10 @@ def get_analysis(
NotImplementedError: The analysis type is not implemented.
Returns:
tuple[type[Damages | LossesBase], str]: The analysis class and the result column.
tuple[type[Damages | LossesBase], str]: The analysis class and the regex to find the result column.
"""
if analysis_type == AnalysisDamagesEnum.DAMAGES:
return (Damages, "dam_")
return (Damages, "dam_.*")
elif analysis_type == AnalysisLossesEnum.SINGLE_LINK_LOSSES:
return (SingleLinkLosses, "vlh_.*_total")
elif analysis_type == AnalysisLossesEnum.MULTI_LINK_LOSSES:
Expand Down Expand Up @@ -117,28 +115,35 @@ def from_config(
)

# Create output object
_analysis_class, _result_col = cls.get_analysis(analysis_type)
_analysis_class, _result_col = cls.get_analysis_info(analysis_type)

return cls(
analysis_type=analysis_type,
analysis_class=_analysis_class,
analysis_input=_analysis_input,
result_col=_result_col,
)

def execute(
self, analysis_config: AnalysisConfigWrapper
) -> AnalysisResultWrapperProtocol:
def execute(self, analysis_config: AnalysisConfigWrapper) -> Series:
"""
Execute the analysis.
Args:
analysis_config (AnalysisConfigWrapper): The config for the analysis.
Returns:
DataFrame: The results of the analysis.
Series: The relevant result column of the analysis.
"""
if self.analysis_class == Damages:
return self.analysis_class(
self.analysis_input, analysis_config.graph_files.base_graph_hazard.graph
_result_wrapper = self.analysis_class(
self.analysis_input,
analysis_config.graph_files.base_graph_hazard.get_graph(),
).execute()
# Take the link based result
_result = _result_wrapper.results_collection[1].analysis_result
else:
_result_wrapper = self.analysis_class(
self.analysis_input, analysis_config
).execute()
return self.analysis_class(self.analysis_input, analysis_config).execute()
_result = _result_wrapper.get_single_result()
return _result.filter(regex=self.result_col).iloc[:, 0]
45 changes: 36 additions & 9 deletions ra2ce/analysis/adaptation/adaptation_option_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

from dataclasses import dataclass, field

import numpy as np
from geopandas import GeoDataFrame

from ra2ce.analysis.adaptation.adaptation_option import AdaptationOption
Expand Down Expand Up @@ -91,6 +92,19 @@ def from_config(

return _collection

def get_net_present_value_factor(self) -> float:
"""
Calculate the net present value factor for the entire time horizon. To be multiplied to the event impact to
obtain the net present value.
"""
_years_array = np.arange(0, self.time_horizon)
_frequency_per_year = (
self.initial_frequency + _years_array * self.climate_factor
)
_discount = (1 + self.discount_rate) ** _years_array
_ratio = _frequency_per_year / _discount
return _ratio.sum()

def calculate_options_unit_cost(self) -> dict[AdaptationOption, float]:
"""
Calculate the unit cost for all adaptation options.
Expand All @@ -106,17 +120,30 @@ def calculate_options_unit_cost(self) -> dict[AdaptationOption, float]:
for _option in self.adaptation_options
}

def calculation_options_impact(self, benefit_graph: GeoDataFrame) -> GeoDataFrame:
def calculate_options_benefit(self) -> GeoDataFrame:
"""
Calculate the impact of all adaptation options (including the reference option).
Args:
benefit_graph (GeoDataFrame): The graph to which the impact of the adaptation options will be added.
Calculate the benefit of all adaptation options.
The benefit is calculated by subtracting the impact of the reference option from the impact of the adaptation option.
Returns:
NetworkFile: The calculated impact of all adaptation options.
GeoDataFrame: The calculated impact of all adaptation options.
"""
for _option in self.all_options:
benefit_graph = _option.calculate_impact(benefit_graph)
net_present_value_factor = self.get_net_present_value_factor()
_benefit_gdf = GeoDataFrame()

# Calculate impact of reference option
_benefit_gdf[
f"{self.reference_option.id}_impact"
] = self.reference_option.calculate_impact(net_present_value_factor)

# Calculate impact and benefit of adaptation options
for _option in self.adaptation_options:
_benefit_gdf[f"{_option.id}_impact"] = _option.calculate_impact(
net_present_value_factor
)
_benefit_gdf[f"{_option.id}_benefit"] = (
_benefit_gdf[f"{_option.id}_impact"]
- _benefit_gdf[f"{self.reference_option.id}_impact"]
)

return benefit_graph
return _benefit_gdf
3 changes: 2 additions & 1 deletion ra2ce/analysis/analysis_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ def get_damages_analysis(

if analysis.analysis == AnalysisDamagesEnum.DAMAGES:
return Damages(
_analysis_input, analysis_config.graph_files.base_graph_hazard.graph
_analysis_input,
analysis_config.graph_files.base_graph_hazard.get_graph(),
)

raise NotImplementedError(f"Analysis {analysis.analysis} not implemented")
Expand Down
13 changes: 9 additions & 4 deletions tests/analysis/adaptation/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ class AdaptationOptionCases:
maintenance_interval=3.0,
),
]
unit_cost: list[float] = [0.0, 2693.684211, 5231.908660]
total_cost: list[float] = [0.0, 97411702.122141, 189201512.873560]
cases: list[tuple[AnalysisSectionAdaptationOption, float, float]] = list(
zip(config_cases, unit_cost, total_cost)
total_cost: list[float] = [0.0, 97800589.027952, 189253296.099491]
total_benefit: list[float] = [0.0, 0.0, 0.0]
cases: list[tuple[AnalysisSectionAdaptationOption, tuple[float, float]]] = list(
zip(config_cases, zip(total_cost, total_benefit))
)


Expand All @@ -91,6 +91,7 @@ def _get_valid_adaptation_config_fixture(
def get_losses_section(analysis: AnalysisLossesEnum) -> AnalysisSectionLosses:
return AnalysisSectionLosses(
analysis=analysis,
name="Losses",
event_type=EventTypeEnum.EVENT,
weighing=WeighingEnum.TIME,
threshold=0,
Expand Down Expand Up @@ -147,6 +148,7 @@ def get_losses_section(analysis: AnalysisLossesEnum) -> AnalysisSectionLosses:
# - damages
_damages_section = AnalysisSectionDamages(
analysis=AnalysisDamagesEnum.DAMAGES,
name="Damages",
event_type=EventTypeEnum.EVENT,
damage_curve=DamageCurveEnum.MAN,
save_gpkg=True,
Expand All @@ -162,10 +164,13 @@ def get_losses_section(analysis: AnalysisLossesEnum) -> AnalysisSectionLosses:
# - adaptation
_adaptation_section = AnalysisSectionAdaptation(
analysis=AnalysisEnum.ADAPTATION,
name="Adaptation",
losses_analysis=AnalysisLossesEnum.MULTI_LINK_LOSSES,
adaptation_options=AdaptationOptionCases.config_cases,
discount_rate=0.025,
time_horizon=20,
climate_factor=0.00036842,
initial_frequency=0.01,
)

_analysis_data = AnalysisConfigData(
Expand Down
Loading

0 comments on commit 83d5967

Please sign in to comment.