Skip to content

Commit

Permalink
feat: 646 write adaptation to file (#650)
Browse files Browse the repository at this point in the history
  • Loading branch information
ArdtK authored Dec 12, 2024
1 parent 9eb97c3 commit 5351a6a
Show file tree
Hide file tree
Showing 10 changed files with 239 additions and 59 deletions.
31 changes: 24 additions & 7 deletions ra2ce/analysis/adaptation/adaptation.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ def __init__(
):
self.analysis = analysis_input.analysis
self.graph_file_hazard = analysis_input.graph_file_hazard
self.input_path = analysis_input.input_path
self.output_path = analysis_input.output_path
self.adaptation_collection = AdaptationOptionCollection.from_config(
analysis_config
)
Expand Down Expand Up @@ -91,12 +93,12 @@ def run_cost(self) -> GeoDataFrame:
_option,
_cost,
) in self.adaptation_collection.calculate_options_unit_cost().items():
_cost_gdf[f"{_option.id}_cost"] = _orig_gdf.apply(
_cost_gdf[_option.cost_col] = _orig_gdf.apply(
lambda x, cost=_cost: x["length"] * cost, axis=1
)
# Only calculate the cost for the impacted fraction of the links.
if self.analysis.hazard_fraction_cost:
_cost_gdf[f"{_option.id}_cost"] *= _orig_gdf[_fraction_col]
_cost_gdf[_option.cost_col] *= _orig_gdf[_fraction_col]

return _cost_gdf

Expand All @@ -120,13 +122,28 @@ def calculate_bc_ratio(
cost_gdf (GeoDataFrame): Gdf containing the cost of the adaptation options.
Returns:
GeoDataFrame: Gdf containing the benefit-cost ratio of the adaptation options.
GeoDataFrame: Gdf containing the benefit-cost ratio of the adaptation options,
including the relevant attributes from the original graph (geometry).
"""

def copy_column(from_gdf: GeoDataFrame, col_name: str) -> None:
if not col_name in from_gdf.columns:
return
benefit_gdf.insert(loc=0, column=col_name, value=from_gdf[col_name])

# Copy relevant columns from the original graph
_orig_gdf = self.graph_file_hazard.get_graph()
benefit_gdf.set_geometry(_orig_gdf.geometry, inplace=True)
for _col in ["length", "highway", "infra_type", "link_id"]:
copy_column(_orig_gdf, _col)

for _option in self.adaptation_collection.adaptation_options:
benefit_gdf[f"{_option.id}_cost"] = cost_gdf[f"{_option.id}_cost"]
benefit_gdf[f"{_option.id}_bc_ratio"] = benefit_gdf[
f"{_option.id}_benefit"
].replace(float("nan"), 0) / benefit_gdf[f"{_option.id}_cost"].replace(
# Copy cost columns from the cost gdf
copy_column(cost_gdf, _option.cost_col)

benefit_gdf[_option.bc_ratio_col] = benefit_gdf[
_option.benefit_col
].replace(float("nan"), 0) / benefit_gdf[_option.cost_col].replace(
0, float("nan")
)

Expand Down
36 changes: 29 additions & 7 deletions ra2ce/analysis/adaptation/adaptation_option.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
from dataclasses import asdict, dataclass

from geopandas import GeoDataFrame
from pandas import Series

from ra2ce.analysis.adaptation.adaptation_option_analysis import (
AdaptationOptionAnalysis,
Expand Down Expand Up @@ -51,6 +50,25 @@ class AdaptationOption:
def __hash__(self) -> int:
return hash(self.id)

@property
def cost_col(self) -> str:
return self._get_column_name("cost")

@property
def impact_col(self) -> str:
return self._get_column_name("impact")

@property
def benefit_col(self) -> str:
return self._get_column_name("benefit")

@property
def bc_ratio_col(self) -> str:
return self._get_column_name("bc_ratio")

def _get_column_name(self, col_type: str) -> str:
return f"{self.id}_{col_type}"

@classmethod
def from_config(
cls,
Expand Down Expand Up @@ -135,18 +153,22 @@ def calculate_cost(year) -> float:

return sum(calculate_cost(_year) for _year in range(0, round(time_horizon), 1))

def calculate_impact(self, net_present_value_factor: float) -> Series:
def calculate_impact(self, net_present_value_factor: float) -> GeoDataFrame:
"""
Calculate the impact of the adaptation option.
Returns:
Series: The impact of the adaptation option.
GeoDataFrame: The impact of the adaptation option.
"""
_result_gdf = GeoDataFrame()
for _analysis in self.analyses:
_result_gdf[_analysis.analysis_type] = _analysis.execute(
self.analysis_config
)
_result_gdf[
f"{self.impact_col}_{_analysis.analysis_type.config_value}"
] = _analysis.execute(self.analysis_config)

# Calculate the impact (summing the results of the analyses)
return _result_gdf.sum(axis=1) * net_present_value_factor
_result_gdf[self.impact_col] = (
_result_gdf.sum(axis=1) * net_present_value_factor
)

return _result_gdf
1 change: 1 addition & 0 deletions ra2ce/analysis/adaptation/adaptation_option_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,4 +171,5 @@ def execute(self, analysis_config: AnalysisConfigWrapper) -> Series:
self.analysis_input, analysis_config
).execute()
_result = _result_wrapper.get_single_result()

return self.get_result_column(_result)
18 changes: 9 additions & 9 deletions ra2ce/analysis/adaptation/adaptation_option_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,18 +132,18 @@ def calculate_options_benefit(self) -> GeoDataFrame:
_benefit_gdf = GeoDataFrame()

# Calculate impact of reference option
_benefit_gdf[
f"{self.reference_option.id}_impact"
] = self.reference_option.calculate_impact(net_present_value_factor)
_impact_gdf = self.reference_option.calculate_impact(net_present_value_factor)
for _col in _impact_gdf.columns:
_benefit_gdf[_col] = _impact_gdf[_col]

# Calculate impact and benefit of adaptation options
for _option in self.adaptation_options:
_benefit_gdf[f"{_option.id}_impact"] = _option.calculate_impact(
net_present_value_factor
)
_benefit_gdf[f"{_option.id}_benefit"] = (
_benefit_gdf[f"{_option.id}_impact"]
- _benefit_gdf[f"{self.reference_option.id}_impact"]
_impact_gdf = _option.calculate_impact(net_present_value_factor)
for _col in _impact_gdf.columns:
_benefit_gdf[_col] = _impact_gdf[_col]
_benefit_gdf[_option.benefit_col] = (
_benefit_gdf[_option.impact_col]
- _benefit_gdf[self.reference_option.impact_col]
)

return _benefit_gdf
2 changes: 0 additions & 2 deletions ra2ce/analysis/analysis_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,10 @@
from __future__ import annotations

from dataclasses import dataclass, field
from typing import Type

from ra2ce.analysis.adaptation.adaptation import Adaptation
from ra2ce.analysis.analysis_config_wrapper import AnalysisConfigWrapper
from ra2ce.analysis.analysis_factory import AnalysisFactory
from ra2ce.analysis.analysis_protocol import AnalysisProtocol
from ra2ce.analysis.damages.analysis_damages_protocol import AnalysisDamagesProtocol
from ra2ce.analysis.losses.analysis_losses_protocol import AnalysisLossesProtocol

Expand Down
51 changes: 35 additions & 16 deletions tests/analysis/adaptation/test_adaptation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@

import pytest
from geopandas import GeoDataFrame
from shapely import Point

from ra2ce.analysis.adaptation.adaptation import Adaptation
from ra2ce.analysis.adaptation.adaptation_option import AdaptationOption
from ra2ce.analysis.adaptation.adaptation_option_collection import (
AdaptationOptionCollection,
)
from ra2ce.analysis.analysis_base import AnalysisBase
from ra2ce.analysis.analysis_config_wrapper import AnalysisConfigWrapper
from ra2ce.analysis.analysis_input_wrapper import AnalysisInputWrapper
from ra2ce.network.graph_files.network_file import NetworkFile
from tests.analysis.adaptation.conftest import AdaptationOptionCases


Expand Down Expand Up @@ -47,7 +50,7 @@ def test_run_cost_returns_gdf(
# 3. Verify expectations.
assert isinstance(_result, GeoDataFrame)
assert all(
f"{_option.id}_cost" in _result.columns
_option.cost_col in _result.columns
for _option in _adaptation.adaptation_collection.adaptation_options
)
for _option, _expected in AdaptationOptionCases.cases[1:]:
Expand Down Expand Up @@ -85,15 +88,32 @@ def test_run_benefit_returns_gdf(
@pytest.fixture(name="mocked_adaptation")
def _get_mocked_adaptation_fixture(self) -> Iterator[Adaptation]:
# Mock to avoid complex setup.
@dataclass
class MockAdaptationOption:
id: str

class MockAdaptation(Adaptation):
graph_file_hazard = NetworkFile(
graph=GeoDataFrame.from_dict(
data={
"geometry": [Point(x, 0) for x in range(10)],
"link_id": range(10),
"highway": "residential",
"length": 1.0,
},
geometry="geometry",
)
)
adaptation_collection: AdaptationOptionCollection = (
AdaptationOptionCollection(
all_options=[
MockAdaptationOption(id=f"Option{x}") for x in range(2)
AdaptationOption(
id=f"Option{x}",
name=None,
construction_cost=None,
construction_interval=None,
maintenance_cost=None,
maintenance_interval=None,
analyses=None,
analysis_config=None,
)
for x in range(2)
]
)
)
Expand All @@ -103,34 +123,33 @@ def __init__(self):

yield MockAdaptation()

def test_calculate_bc_ratio_returns_gdf(
self,
mocked_adaptation: Adaptation,
):
def test_calculate_bc_ratio_returns_gdf(self, mocked_adaptation: Adaptation):
# 1. Define test data.
_nof_rows = 10
_benefit_gdf = GeoDataFrame(range(_nof_rows))
_cost_gdf = GeoDataFrame(range(_nof_rows))
_benefit_gdf = GeoDataFrame(index=range(_nof_rows))
_cost_gdf = GeoDataFrame(index=range(_nof_rows))

for i, _option in enumerate(
mocked_adaptation.adaptation_collection.adaptation_options
):
_benefit_gdf[f"{_option.id}_benefit"] = 4.0 + i
_cost_gdf[f"{_option.id}_cost"] = 1.0 + i
_benefit_gdf[_option.benefit_col] = 4.0 + i
_cost_gdf[_option.cost_col] = 1.0 + i

# 2. Run test.
_result = mocked_adaptation.calculate_bc_ratio(_benefit_gdf, _cost_gdf)

# 3. Verify expectations.
assert isinstance(_result, GeoDataFrame)
assert not _result.geometry.empty
assert all(
[
f"{_option.id}_bc_ratio" in _result.columns
_option.bc_ratio_col in _result.columns
for _option in mocked_adaptation.adaptation_collection.adaptation_options
]
)
for i, _option in enumerate(
mocked_adaptation.adaptation_collection.adaptation_options
):
assert _result[f"{_option.id}_bc_ratio"].sum(axis=0) == pytest.approx(
assert _result[_option.bc_ratio_col].sum(axis=0) == pytest.approx(
_nof_rows * (4.0 + i) / (1.0 + i)
)
32 changes: 18 additions & 14 deletions tests/analysis/adaptation/test_adaptation_option.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,15 @@
from pandas import Series

from ra2ce.analysis.adaptation.adaptation_option import AdaptationOption
from ra2ce.analysis.adaptation.adaptation_option_analysis import (
AdaptationOptionAnalysis,
)
from ra2ce.analysis.analysis_config_data.analysis_config_data import (
AnalysisSectionAdaptation,
)
from ra2ce.analysis.analysis_config_data.enums.analysis_damages_enum import (
AnalysisDamagesEnum,
)
from ra2ce.analysis.analysis_config_data.enums.analysis_losses_enum import (
AnalysisLossesEnum,
)
Expand Down Expand Up @@ -129,13 +135,8 @@ def test_calculate_unit_cost_returns_float(
maint_interval: float,
net_unit_cost: float,
):
# Mock to avoid complex setup.
@dataclass
class MockAdaptationOption(AdaptationOption):
id: str

# 1. Define test data.
_option = MockAdaptationOption(
_option = AdaptationOption(
id="AnOption",
name=None,
construction_cost=constr_cost,
Expand All @@ -155,12 +156,10 @@ class MockAdaptationOption(AdaptationOption):
assert isinstance(_result, float)
assert _result == pytest.approx(net_unit_cost)

def test_calculate_impact_returns_series(self) -> GeoDataFrame:
def test_calculate_impact_returns_gdf(self) -> GeoDataFrame:
@dataclass
# Mock to avoid the need to run the impact analysis.
class MockAdaptationOptionAnalysis:
analysis_type: str
result_col: str
class MockAdaptationOptionAnalysis(AdaptationOptionAnalysis):
result: float

def execute(self, _: AnalysisConfigWrapper) -> Series:
Expand All @@ -170,11 +169,16 @@ def execute(self, _: AnalysisConfigWrapper) -> Series:
_nof_rows = 10
_analyses = [
MockAdaptationOptionAnalysis(
analysis_type=f"Analysis_{i}",
analysis_type=_analysis_type,
analysis_class=None,
analysis_input=None,
result_col=f"Result_{i}",
result=(i + 1) * 1.0e6,
)
for i in range(2)
for i, _analysis_type in zip(
range(2),
[AnalysisDamagesEnum.DAMAGES, AnalysisLossesEnum.MULTI_LINK_LOSSES],
)
]
_id = "Option1"
_option = AdaptationOption(
Expand All @@ -192,7 +196,7 @@ def execute(self, _: AnalysisConfigWrapper) -> Series:
_result = _option.calculate_impact(1.0)

# 3. Verify expectations.
assert isinstance(_result, Series)
assert _result.sum() == pytest.approx(
assert isinstance(_result, GeoDataFrame)
assert _result[_option.impact_col].sum() == pytest.approx(
_nof_rows * sum(x.result for x in _analyses)
)
16 changes: 13 additions & 3 deletions tests/analysis/adaptation/test_adaptation_option_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,25 @@ def test_calculate_options_unit_cost_returns_dict(
assert isinstance(_result, dict)
assert all(_option in _result for _option in _collection.adaptation_options)

def test_calculate_options_benefit_returns_series(self):
def test_calculate_options_benefit_returns_gdf(self):
@dataclass
class MockOption:
# Mock to avoid the need to run the impact analysis.
id: str
impact: float

@property
def benefit_col(self) -> str:
return f"{self.id}_benefit"

@property
def impact_col(self) -> str:
return f"{self.id}_impact"

def calculate_impact(self, _) -> Series:
return Series(self.impact, index=range(_nof_rows))
_impact_gdf = GeoDataFrame(index=range(10))
_impact_gdf[self.impact_col] = self.impact
return _impact_gdf

# 1. Define test data.
_nof_rows = 10
Expand All @@ -91,7 +101,7 @@ def calculate_impact(self, _) -> Series:
# 3. Verify expectations.
assert isinstance(_result, GeoDataFrame)
assert all(
f"{_option.id}_benefit" in _result.columns
_option.benefit_col in _result.columns
for _option in _collection.adaptation_options
)
assert all(
Expand Down
Loading

0 comments on commit 5351a6a

Please sign in to comment.