diff --git a/ra2ce/analysis/damages/damage/manual_damage_functions.py b/ra2ce/analysis/damages/damage/manual_damage_functions.py
deleted file mode 100644
index 7716af657..000000000
--- a/ra2ce/analysis/damages/damage/manual_damage_functions.py
+++ /dev/null
@@ -1,70 +0,0 @@
-"""
- GNU GENERAL PUBLIC LICENSE
- Version 3, 29 June 2007
-
- Risk Assessment and Adaptation for Critical Infrastructure (RA2CE).
- Copyright (C) 2023 Stichting Deltares
-
- This program is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation, either version 3 of the License, or
- (at your option) any later version.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with this program. If not, see .
-"""
-
-import logging
-from pathlib import Path
-
-from ra2ce.analysis.damages.damage.damage_function_road_type_lane import (
- DamageFunctionByRoadTypeByLane,
-)
-
-
-class ManualDamageFunctions:
- """ "
- This class keeps an overview of the manual damage functions
-
- Default behaviour is to find, load and apply all available functions
- At 22 sept 2022: only implemented workflow for DamageFunction_by_RoadType_by_Lane
- """
-
- def __init__(self):
- self.available = (
- {}
- ) # keys = name of the available functions; values = paths to the folder
- self.loaded = [] # List of DamageFunction objects (or child classes
-
- def find_damage_functions(self, folder: Path) -> None:
- """Find all available damage functions in the specified folder"""
- assert folder.exists(), "Folder {} does not contain damage functions".format(
- folder
- )
- for subfolder in folder.iterdir(): # Subfolders contain the damage curves
- if subfolder.is_dir():
- self.available[subfolder.stem] = subfolder
- logging.info(
- "Found {} manual damage curves: \n {}".format(
- len(self.available.keys()), list(self.available.keys())
- )
- )
- return None
-
- def load_damage_functions(self):
- """ "Load damage functions in Ra2Ce"""
- for name, damage_dir in self.available.items():
- damage_function = DamageFunctionByRoadTypeByLane(name=name)
- damage_function.from_input_folder(damage_dir)
- damage_function.set_prefix()
- self.loaded.append(damage_function)
- logging.info(
- "Damage function '{}' loaded from folder {}".format(
- damage_function.name, damage_dir
- )
- )
diff --git a/ra2ce/analysis/damages/damage_calculation/damage_network_base.py b/ra2ce/analysis/damages/damage_calculation/damage_network_base.py
index a326c956e..fef64f684 100644
--- a/ra2ce/analysis/damages/damage_calculation/damage_network_base.py
+++ b/ra2ce/analysis/damages/damage_calculation/damage_network_base.py
@@ -28,6 +28,9 @@
from scipy.interpolate import interp1d
from ra2ce.analysis.analysis_config_data.enums.damage_curve_enum import DamageCurveEnum
+from ra2ce.analysis.damages.damage_functions.manual_damage_functions import (
+ ManualDamageFunctions,
+)
from ra2ce.analysis.damages.damages_lookup import LookUp as lookup
from ra2ce.analysis.damages.damages_lookup import dataframe_lookup
from ra2ce.analysis.damages.damages_utils import (
@@ -56,7 +59,11 @@ def __init__(
# TODO: also create constructors of the children of this class
@abstractmethod
- def main(self, damage_function: DamageCurveEnum, manual_damage_functions):
+ def main(
+ self,
+ damage_function: DamageCurveEnum,
+ manual_damage_functions: ManualDamageFunctions,
+ ):
"""
Controller for doing the EAD calculation
@@ -169,29 +176,34 @@ def remove_unclassified_road_types_from_mask(self):
self._gdf_mask = df.loc[~(df["road_type"] == "none")]
### Damage handlers
- def calculate_damage_manual_functions(self, events, manual_damage_functions):
- """
- Arguments:
- *events* (list) : list of events (or return periods) to iterate over, these should match the hazard column names
- *manual_damage_functions* (RA2CE ManualDamageFunctions object) :
+ def calculate_damage_manual_functions(
+ self, events: list[str], manual_damage_functions: ManualDamageFunctions
+ ) -> None:
"""
+ Calculate the damage using the manual damage functions
+ Args:
+ events (list[str]): list of events (or return periods) to iterate over, these should match the hazard column names
+ manual_damage_functions (ManualDamageFunctions): The manual damage functions object
+ """
# Todo: Dirty fixes, these should be read from the init
hazard_prefix = "F"
# dataframe to carry out the damage calculation #todo: this is a bit dirty
df = self._gdf_mask
- assert manual_damage_functions is not None, "No damage functions were loaded"
+ assert (
+ len(manual_damage_functions.damage_functions) > 0
+ ), "No damage functions were loaded"
- for _loaded_func in manual_damage_functions.loaded:
+ for _damage_func in manual_damage_functions.damage_functions.values():
# Add max damage values to df
- df = _loaded_func.add_max_damage(df, _loaded_func.prefix)
+ df = _damage_func.add_max_damage(df, _damage_func.prefix)
for event in events:
# Add apply interpolator objects
event_prefix = event
- df = _loaded_func.calculate_damage(
- df, _loaded_func.prefix, hazard_prefix, event_prefix
+ df = _damage_func.calculate_damage(
+ df, _damage_func.prefix, hazard_prefix, event_prefix
)
# Only transfer the final results to the damage column
@@ -201,7 +213,7 @@ def calculate_damage_manual_functions(self, events, manual_damage_functions):
"Damage calculation with the manual damage functions was succesfull."
)
- def calculate_damage_HZ(self, events):
+ def calculate_damage_HZ(self, events: list[str]) -> None:
"""
Arguments:
*events* (list) = list of events (or return periods) to iterate over, these should match the hazard column names
@@ -265,7 +277,7 @@ def calculate_damage_HZ(self, events):
"calculate_damage_HZ(): Damage calculation with the Huizinga damage functions was successful"
)
- def calculate_damage_OSdaMage(self, events):
+ def calculate_damage_OSdaMage(self, events: list[str]) -> None:
"""Damage calculation with the OSdaMage functions"""
def interpolate_damage(row, representative_damage_percentage):
@@ -416,7 +428,7 @@ def interpolate_damage(row, representative_damage_percentage):
)
### Utils handlers
- def create_mask(self):
+ def create_mask(self) -> None:
"""
#Create a mask of only the dataframes with hazard data (to speed-up damage calculations)
effect: *self._gdf_mask* = mask of only the rows with hazard data
@@ -434,8 +446,6 @@ def create_mask(self):
column_names.remove("geometry")
self._gdf_mask = self._gdf_mask[column_names]
- def replace_none_with_nan(self):
- import numpy as np
-
+ def replace_none_with_nan(self) -> None:
dam_cols = [c for c in self.gdf.columns if c.startswith("dam_")]
self.gdf[dam_cols] = self.gdf[dam_cols].fillna(value=np.nan)
diff --git a/ra2ce/analysis/damages/damage_calculation/damage_network_return_periods.py b/ra2ce/analysis/damages/damage_calculation/damage_network_return_periods.py
index 183ce76f4..b027a62f9 100644
--- a/ra2ce/analysis/damages/damage_calculation/damage_network_return_periods.py
+++ b/ra2ce/analysis/damages/damage_calculation/damage_network_return_periods.py
@@ -67,9 +67,9 @@ def __init__(
@classmethod
def construct_from_csv(
- cls, path: Path, representative_damage_percentage: float, sep: str = ";"
+ cls, csv_path: Path, representative_damage_percentage: float, sep: str
):
- road_gdf = pd.read_csv(path, sep=sep)
+ road_gdf = pd.read_csv(csv_path, sep=sep)
val_cols = [
c for c in road_gdf.columns if c.startswith("F_")
] # Find everything starting with 'F'
diff --git a/ra2ce/analysis/damages/damage/__init__.py b/ra2ce/analysis/damages/damage_functions/__init__.py
similarity index 100%
rename from ra2ce/analysis/damages/damage/__init__.py
rename to ra2ce/analysis/damages/damage_functions/__init__.py
diff --git a/ra2ce/analysis/damages/damage/damage_fraction_uniform.py b/ra2ce/analysis/damages/damage_functions/damage_fraction_uniform.py
similarity index 74%
rename from ra2ce/analysis/damages/damage/damage_fraction_uniform.py
rename to ra2ce/analysis/damages/damage_functions/damage_fraction_uniform.py
index 18723d9b3..901df86c8 100644
--- a/ra2ce/analysis/damages/damage/damage_fraction_uniform.py
+++ b/ra2ce/analysis/damages/damage_functions/damage_fraction_uniform.py
@@ -19,36 +19,43 @@
along with this program. If not, see .
"""
+from __future__ import annotations
import logging
+from dataclasses import dataclass
from pathlib import Path
import pandas as pd
+from scipy.interpolate import interp1d
+@dataclass(kw_only=True)
class DamageFractionUniform:
"""
Uniform: assuming the same curve for
each road type and lane numbers and any other metadata
-
self.raw_data (pd.DataFrame) : Raw data from the csv file
self.data (pd.DataFrame) : index = hazard severity (e.g. flood depth); column 0 = damage fraction
-
"""
- def __init__(self, name=None, hazard_unit=None):
- self.name = name
- self.hazard_unit = hazard_unit
- self.interpolator = None
+ name: str
+ hazard_unit: str
+ data: pd.DataFrame
+ origin_path: Path
+ interpolator: interp1d = None
- def from_csv(self, path: Path, sep=",") -> None:
+ def __post_init__(self):
+ self._convert_hazard_severity_unit("m")
+
+ @classmethod
+ def from_csv(cls, csv_path: Path, sep: str) -> DamageFractionUniform:
"""Construct object from csv file. Damage curve name is inferred from filename
Arguments:
- *path* (Path) : Path to the csv file
+ *csv_path* (Path) : Path to the csv file
*sep* (str) : csv seperator
- *output_unit* (str) : desired output unit (default = 'm')
+ *output_unit* (str) : desired output unit
The CSV file should have the following structure:
- column 1: hazard severity
@@ -71,23 +78,25 @@ def from_csv(self, path: Path, sep=",") -> None:
"""
- self.name = path.stem
- self.raw_data = pd.read_csv(path, index_col=0, sep=sep)
- self.origin_path = path # to track the original path from which the object was constructed; maybe also date?
+ _name = csv_path.stem
+ _raw_data = pd.read_csv(csv_path, index_col=0, sep=sep)
+ _origin_path = csv_path # to track the original path from which the object was constructed; maybe also date?
# identify unit and drop from data
- self.hazard_unit = self.raw_data.index[0]
- self.data = self.raw_data.drop(
- self.hazard_unit
+ _hazard_unit = _raw_data.index[0]
+ _data = _raw_data.drop(
+ _hazard_unit
) # Todo: This could also be a series instead of DataFrame
# convert data to floats
- self.data = self.data.astype("float")
- self.data.index = self.data.index.astype("float")
+ _data = _data.astype("float")
+ _data.index = _data.index.astype("float")
- self.convert_hazard_severity_unit()
+ return cls(
+ name=_name, hazard_unit=_hazard_unit, data=_data, origin_path=_origin_path
+ )
- def convert_hazard_severity_unit(self, desired_unit="m") -> None:
+ def _convert_hazard_severity_unit(self, desired_unit: str) -> None:
"""Converts hazard severity values to a different unit
Arguments:
self.hazard_unit - implicit (string)
@@ -98,7 +107,7 @@ def convert_hazard_severity_unit(self, desired_unit="m") -> None:
"""
if desired_unit == self.hazard_unit:
logging.info(
- "Damage units are already in the desired format {}".format(desired_unit)
+ "Damage units are already in the desired format %s", desired_unit
)
return None
@@ -106,17 +115,19 @@ def convert_hazard_severity_unit(self, desired_unit="m") -> None:
scaling_factor = 1 / 100
self.data.index = self.data.index * scaling_factor
logging.info(
- "Hazard severity from {} data was scaled by a factor {}, to convert from {} to {}".format(
- self.origin_path, scaling_factor, self.hazard_unit, desired_unit
- )
+ "Hazard severity from %s data was scaled by a factor %s, to convert from %s to %s",
+ self.origin_path,
+ scaling_factor,
+ self.hazard_unit,
+ desired_unit,
)
self.damage_unit = desired_unit
return None
else:
logging.warning(
- "Hazard severity scaling from {} to {} is not supported".format(
- self.hazard_unit, desired_unit
- )
+ "Hazard severity scaling from %s to %s is not supported",
+ self.hazard_unit,
+ desired_unit,
)
return None
@@ -124,8 +135,6 @@ def create_interpolator(self):
"""Create interpolator object from loaded data
sets result to self.interpolator (Scipy interp1d)
"""
- from scipy.interpolate import interp1d
-
x_values = self.data.index.values
y_values = self.data.values[:, 0]
diff --git a/ra2ce/analysis/damages/damage/damage_function_road_type_lane.py b/ra2ce/analysis/damages/damage_functions/damage_function_road_type_lane.py
similarity index 72%
rename from ra2ce/analysis/damages/damage/damage_function_road_type_lane.py
rename to ra2ce/analysis/damages/damage_functions/damage_function_road_type_lane.py
index a6747730f..bd90fe428 100644
--- a/ra2ce/analysis/damages/damage/damage_function_road_type_lane.py
+++ b/ra2ce/analysis/damages/damage_functions/damage_function_road_type_lane.py
@@ -18,60 +18,47 @@
You should have received a copy of the GNU General Public License
along with this program. If not, see .
"""
+from __future__ import annotations
-import logging
+from dataclasses import dataclass
from pathlib import Path
import pandas as pd
-from ra2ce.analysis.damages.damage.damage_fraction_uniform import DamageFractionUniform
-from ra2ce.analysis.damages.damage.max_damage import MaxDamageByRoadTypeByLane
+from ra2ce.analysis.damages.damage_functions.damage_fraction_uniform import (
+ DamageFractionUniform,
+)
+from ra2ce.analysis.damages.damage_functions.max_damage import MaxDamage
+@dataclass(kw_only=True)
class DamageFunctionByRoadTypeByLane:
"""
A damage function that has different max damages per road type, but a uniform damage_fraction curve
-
The attributes need to be of the type:
- self.max_damage (MaxDamage_byRoadType_byLane)
- self.damage_fraction (DamageFractionHazardSeverityUniform)
-
+ self.max_damage (MaxDamage)
+ self.damage_fraction (DamageFractionUniform)
+ name (str)
"""
- def __init__(
- self,
- max_damage: MaxDamageByRoadTypeByLane = None,
- damage_fraction: DamageFractionUniform = None,
- name: str = "",
- hazard: str = "flood",
- type: str = "depth_damage",
- infra_type: str = "road",
- ):
- # Construct using the parent class __init__
- self.name = name
- self.hazard = hazard
- self.type = type
- self.infra_type = infra_type
- self.max_damage = max_damage # Should be a MaxDamage object
- self.damage_fraction = (
- damage_fraction # Should be a DamageFractionHazardSeverity object
- )
- self.prefix = None # Should be two characters long at maximum
-
- def set_prefix(self):
- self.prefix = self.name[0:2]
- logging.info(
- "The prefix: '{}' refers to curve name '{}' in the results".format(
- self.prefix, self.name
- )
- )
+ max_damage: MaxDamage
+ damage_fraction: DamageFractionUniform
+ name: str
+
+ @property
+ def prefix(self) -> str:
+ return self.name[0:2] if len(self.name) > 2 else self.name
- def from_input_folder(self, folder_path: Path):
+ @classmethod
+ def from_input_folder(
+ cls, name: str, folder_path: Path
+ ) -> DamageFunctionByRoadTypeByLane:
"""Construct a set of damage functions from csv files located in the folder_path
Arguments:
- *folder_path* (Pathlib Path) : path to folder where csv files can be found
+ name (str) : name of the damage function
+ folder_path (Pathlib Path) : path to folder where csv files can be found
"""
def find_unique_csv_file(folder_path: Path, part_of_filename: str) -> Path:
@@ -95,23 +82,20 @@ def find_unique_csv_file(folder_path: Path, part_of_filename: str) -> Path:
return result[0]
# Load the max_damage object
- max_damage = MaxDamageByRoadTypeByLane()
max_dam_path = find_unique_csv_file(folder_path, "max_damage")
- max_damage.from_csv(max_dam_path, sep=";")
-
- self.max_damage = max_damage
+ max_damage = MaxDamage.from_csv(max_dam_path, ";")
# Load the damage fraction function
# search in the folder for something *damage_fraction
- damage_fraction = DamageFractionUniform()
dam_fraction_path = find_unique_csv_file(folder_path, "hazard_severity")
- damage_fraction.from_csv(dam_fraction_path, sep=";")
- self.damage_fraction = damage_fraction
+ damage_fraction = DamageFractionUniform.from_csv(dam_fraction_path, sep=";")
damage_fraction.create_interpolator()
+ return cls(max_damage=max_damage, damage_fraction=damage_fraction, name=name)
+
# Todo: these two below functions are maybe better implemented at a lower level?
- def add_max_damage(self, df: pd.DataFrame, prefix: str = None):
+ def add_max_damage(self, df: pd.DataFrame, prefix: str) -> pd.DataFrame:
""" "Ads the max damage value to the dataframe"""
cols = df.columns
assert "road_type" in cols, "no column 'road type' in df"
diff --git a/ra2ce/analysis/damages/damage_functions/manual_damage_functions.py b/ra2ce/analysis/damages/damage_functions/manual_damage_functions.py
new file mode 100644
index 000000000..202a5a34d
--- /dev/null
+++ b/ra2ce/analysis/damages/damage_functions/manual_damage_functions.py
@@ -0,0 +1,41 @@
+"""
+ GNU GENERAL PUBLIC LICENSE
+ Version 3, 29 June 2007
+
+ Risk Assessment and Adaptation for Critical Infrastructure (RA2CE).
+ Copyright (C) 2023 Stichting Deltares
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see .
+"""
+
+from collections import defaultdict
+from dataclasses import dataclass, field
+
+from ra2ce.analysis.damages.damage_functions.damage_function_road_type_lane import (
+ DamageFunctionByRoadTypeByLane,
+)
+
+
+@dataclass(kw_only=True)
+class ManualDamageFunctions:
+ """ "
+ This class keeps an overview of the manual damage functions
+
+ Default behaviour is to find, load and apply all available functions
+ At 22 sept 2022: only implemented workflow for DamageFunction_by_RoadType_by_Lane
+ """
+
+ damage_functions: dict[str, DamageFunctionByRoadTypeByLane] = field(
+ default_factory=lambda: defaultdict(DamageFunctionByRoadTypeByLane)
+ )
diff --git a/ra2ce/analysis/damages/damage_functions/manual_damage_functions_reader.py b/ra2ce/analysis/damages/damage_functions/manual_damage_functions_reader.py
new file mode 100644
index 000000000..7db1e7cd0
--- /dev/null
+++ b/ra2ce/analysis/damages/damage_functions/manual_damage_functions_reader.py
@@ -0,0 +1,62 @@
+"""
+ GNU GENERAL PUBLIC LICENSE
+ Version 3, 29 June 2007
+
+ Risk Assessment and Adaptation for Critical Infrastructure (RA2CE).
+ Copyright (C) 2023 Stichting Deltares
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see .
+"""
+from pathlib import Path
+
+from ra2ce.analysis.damages.damage_functions.damage_function_road_type_lane import (
+ DamageFunctionByRoadTypeByLane,
+)
+from ra2ce.analysis.damages.damage_functions.manual_damage_functions import (
+ ManualDamageFunctions,
+)
+from ra2ce.common.io.readers.file_reader_protocol import FileReaderProtocol
+
+
+class ManualDamageFunctionsReader(FileReaderProtocol):
+ """
+ Reader class for the manual damage functions.
+ """
+
+ def read(self, file_path: Path) -> ManualDamageFunctions:
+ """
+ Read the manual damage functions from the given folder.
+ The folder should contain subfolders with the damage functions.
+ Each damage functions is constructed by reading the csv files for the max damage and damage fraction.
+
+ Args:
+ file_path (Path): Pathm to the folder containing the manual damage functions folders
+
+ Returns:
+ ManualDamageFunctions: The manual damage functions
+ """
+ # Find subfolders with the damage functions
+ _damage_function_folders = {
+ subfolder.stem: subfolder
+ for subfolder in file_path.iterdir()
+ if subfolder.is_dir()
+ }
+
+ # Read the damage functions from the subfolders
+ return ManualDamageFunctions(
+ damage_functions={
+ _name: DamageFunctionByRoadTypeByLane.from_input_folder(_name, _path)
+ for _name, _path in _damage_function_folders.items()
+ }
+ )
diff --git a/ra2ce/analysis/damages/damage/max_damage.py b/ra2ce/analysis/damages/damage_functions/max_damage.py
similarity index 60%
rename from ra2ce/analysis/damages/damage/max_damage.py
rename to ra2ce/analysis/damages/damage_functions/max_damage.py
index a39ef090c..41d3d717d 100644
--- a/ra2ce/analysis/damages/damage/max_damage.py
+++ b/ra2ce/analysis/damages/damage_functions/max_damage.py
@@ -18,34 +18,31 @@
You should have received a copy of the GNU General Public License
along with this program. If not, see .
"""
-
+from __future__ import annotations
import logging
+from dataclasses import dataclass
from pathlib import Path
import pandas as pd
-class MaxDamageByRoadTypeByLane:
+@dataclass(kw_only=True)
+class MaxDamage:
"""
Max damage per RoadType and per Lane
-
- Attributes:
- self.name (str) : Name of the damage curve
- self.data (pd.DataFrame) : columns contain number of lanes; rows contain the road types
-
- Optional attributes:
- self.origin_path (Path) : Path to the file from which the function was constructed
- self.raw_data : The raw data read from the input file
-
-
"""
- def __init__(self, name=None, damage_unit=None):
- self.name = name
- self.damage_unit = damage_unit
+ name: str
+ damage_unit: str
+ data: pd.DataFrame
+ origin_path: Path = None
+
+ def __post_init__(self):
+ self._convert_length_unit("euro/m")
- def from_csv(self, path: Path, sep=",") -> None:
+ @classmethod
+ def from_csv(cls, csv_path: Path, sep: str) -> MaxDamage:
"""Construct object from csv file. Damage curve name is inferred from filename
The first row describes the lane numbers per column and should have 'Road_type \ lanes' as index/first value.
@@ -54,36 +51,37 @@ def from_csv(self, path: Path, sep=",") -> None:
the rest of the rows contains the different road types as index/first value; and the costs as values
Arguments:
- *path* (Path) : Path to the csv file
+ *csv_* (Path) : Path to the csv file
*sep* (str) : csv seperator
"""
- default_output_unit = "euro/m"
- self.name = path.stem
- self.raw_data = pd.read_csv(path, index_col=r"Road_type \ lanes", sep=sep)
- self.origin_path = path # to track the original path from which the object was constructed; maybe also date?
+ _name = csv_path.stem
+ _raw_data = pd.read_csv(csv_path, index_col=r"Road_type \ lanes", sep=sep)
+ _origin_path = csv_path # to track the original path from which the object was constructed; maybe also date?
###Determine units
- units = self.raw_data.loc["unit", :].unique() # identify the unique units
+ units = _raw_data.loc["unit", :].unique() # identify the unique units
assert (
len(units) == 1
), "Columns in the max damage csv seem to have different units, ra2ce cannot handle this"
# case only one unique unit is identified
- self.damage_unit = units[
+ _damage_unit = units[
0
] # should have the structure 'x/y' , e.g. euro/m, dollar/yard
- self.data = self.raw_data.drop("unit")
- self.data = self.data.astype("float")
+ _data = _raw_data.drop("unit")
+ _data = _data.astype("float")
- # assume road types are in the rows; lane numbers in the columns
- self.road_types = list(self.data.index) # to method
# assumes that the columns containst the lanes
- self.data.columns = self.data.columns.astype("int")
+ _data.columns = _data.columns.astype("int")
- if self.damage_unit != default_output_unit:
- self.convert_length_unit() # convert the unit
+ return cls(
+ name=_name,
+ damage_unit=_damage_unit,
+ data=_data,
+ origin_path=_origin_path,
+ )
- def convert_length_unit(self, desired_unit="euro/m"):
+ def _convert_length_unit(self, desired_unit: str):
"""Converts max damage values to a different unit
Arguments:
self.damage_unit (implicit)
@@ -93,7 +91,6 @@ def convert_length_unit(self, desired_unit="euro/m"):
"""
if desired_unit == self.damage_unit:
- logging.info("Input damage units are already in the desired format")
return
original_length_unit = self.damage_unit.split("/")[1]
@@ -102,17 +99,19 @@ def convert_length_unit(self, desired_unit="euro/m"):
if original_length_unit != "km" or target_length_unit != "m":
# We currently only support from 'km' to 'm'
logging.warning(
- "Damage scaling from {} to {} is not supported".format(
- self.damage_unit, desired_unit
- )
+ "Damage scaling from %s to %s is not supported",
+ self.damage_unit,
+ desired_unit,
)
return
scaling_factor = 1 / 1000
self.data = self.data * scaling_factor
logging.info(
- "Damage data from {} was scaled by a factor {}, to convert from {} to {}".format(
- self.origin_path, scaling_factor, self.damage_unit, desired_unit
- )
+ "Damage data from %s was scaled by a factor %s, to convert from %s to %s",
+ self.origin_path,
+ scaling_factor,
+ self.damage_unit,
+ desired_unit,
)
self.damage_unit = desired_unit
diff --git a/ra2ce/analysis/damages/damages.py b/ra2ce/analysis/damages/damages.py
index c4a95bb87..7e321a0c2 100644
--- a/ra2ce/analysis/damages/damages.py
+++ b/ra2ce/analysis/damages/damages.py
@@ -20,11 +20,16 @@
from ra2ce.analysis.analysis_result.analysis_result import AnalysisResult
from ra2ce.analysis.analysis_result.analysis_result_wrapper import AnalysisResultWrapper
from ra2ce.analysis.damages.analysis_damages_protocol import AnalysisDamagesProtocol
-from ra2ce.analysis.damages.damage.manual_damage_functions import ManualDamageFunctions
from ra2ce.analysis.damages.damage_calculation import (
DamageNetworkEvents,
DamageNetworkReturnPeriods,
)
+from ra2ce.analysis.damages.damage_functions.manual_damage_functions import (
+ ManualDamageFunctions,
+)
+from ra2ce.analysis.damages.damage_functions.manual_damage_functions_reader import (
+ ManualDamageFunctionsReader,
+)
from ra2ce.analysis.damages.damages_result_wrapper import DamagesResultWrapper
from ra2ce.network.graph_files.network_file import NetworkFile
@@ -36,6 +41,7 @@ class Damages(AnalysisBase, AnalysisDamagesProtocol):
input_path: Path
output_path: Path
reference_base_graph_hazard: MultiGraph
+ manual_damage_functions: ManualDamageFunctions = None
def __init__(
self, analysis_input: AnalysisInputWrapper, base_graph_hazard: MultiGraph
@@ -46,6 +52,10 @@ def __init__(
self.input_path = analysis_input.input_path
self.output_path = analysis_input.output_path
self.reference_base_graph_hazard = base_graph_hazard
+ if self.analysis.damage_curve == DamageCurveEnum.MAN:
+ self.manual_damage_functions = ManualDamageFunctionsReader().read(
+ self.input_path.joinpath("damage_functions")
+ )
def execute(self) -> AnalysisResultWrapper:
def _rename_road_gdf_to_conventions(road_gdf_columns: list[str]) -> list[str]:
@@ -81,15 +91,6 @@ def _rename_road_gdf_to_conventions(road_gdf_columns: list[str]) -> list[str]:
# Read the desired damage function
damage_function = self.analysis.damage_curve
- # If you want to use manual damage functions, these need to be loaded first
- manual_damage_functions = None
- if self.analysis.damage_curve == DamageCurveEnum.MAN:
- manual_damage_functions = ManualDamageFunctions()
- manual_damage_functions.find_damage_functions(
- folder=(self.input_path.joinpath("damage_functions"))
- )
- manual_damage_functions.load_damage_functions()
-
# Choose between event or return period based analysis
if self.analysis.event_type == EventTypeEnum.EVENT:
event_gdf = DamageNetworkEvents(
@@ -97,7 +98,7 @@ def _rename_road_gdf_to_conventions(road_gdf_columns: list[str]) -> list[str]:
)
event_gdf.main(
damage_function=damage_function,
- manual_damage_functions=manual_damage_functions,
+ manual_damage_functions=self.manual_damage_functions,
)
return self.generate_result_wrapper(event_gdf.gdf)
@@ -108,7 +109,7 @@ def _rename_road_gdf_to_conventions(road_gdf_columns: list[str]) -> list[str]:
)
return_period_gdf.main(
damage_function=damage_function,
- manual_damage_functions=manual_damage_functions,
+ manual_damage_functions=self.manual_damage_functions,
)
if (
diff --git a/tests/analysis/damages/damage/test_max_damage.py b/tests/analysis/damages/damage/test_max_damage.py
deleted file mode 100644
index d70201c68..000000000
--- a/tests/analysis/damages/damage/test_max_damage.py
+++ /dev/null
@@ -1,43 +0,0 @@
-from ra2ce.analysis.damages.damage.max_damage import MaxDamageByRoadTypeByLane
-
-
-class TestMaxDamageByRoadTypeByLane:
- def test_init(self):
- # 1. Define test data.
- _name = "sth"
- _dmg_unit = "else"
-
- # 2. Run test.
- _damage = MaxDamageByRoadTypeByLane(_name, _dmg_unit)
-
- # 3. Verify final expectations.
- assert isinstance(_damage, MaxDamageByRoadTypeByLane)
- assert _damage.name == _name
- assert _damage.damage_unit == _dmg_unit
-
- def test_convert_length_unit_same_unit_does_nothing(self):
- # 1. Define test data.
- _name = "sth"
- _dmg_unit = "else"
- _damage = MaxDamageByRoadTypeByLane(_name, _dmg_unit)
-
- # 2. Run test.
- _damage.convert_length_unit(_dmg_unit)
-
- # 3. Verify expectations
- assert _damage.damage_unit == _dmg_unit
-
- def test_convert_length_unit_unsupported_does_nothing(self):
- # 1. Define test data.
- _name = "my_damage"
- _dmg_unit = "sth/km"
- _desired_unit = "else/miles"
- _damage = MaxDamageByRoadTypeByLane(_name, _dmg_unit)
- _damage.data = 42.0
-
- # 2. Run test.
- _damage.convert_length_unit(_desired_unit)
-
- # 3. Verify expectations
- assert _damage.damage_unit == _dmg_unit
- assert _damage.data == 42.0
diff --git a/tests/analysis/damages/damage/__init__.py b/tests/analysis/damages/damage_functions/__init__.py
similarity index 100%
rename from tests/analysis/damages/damage/__init__.py
rename to tests/analysis/damages/damage_functions/__init__.py
diff --git a/tests/analysis/damages/damage/test_damage_function_road_type_lane.py b/tests/analysis/damages/damage_functions/test_damage_function_road_type_lane.py
similarity index 75%
rename from tests/analysis/damages/damage/test_damage_function_road_type_lane.py
rename to tests/analysis/damages/damage_functions/test_damage_function_road_type_lane.py
index 4fdb8081e..2331e826e 100644
--- a/tests/analysis/damages/damage/test_damage_function_road_type_lane.py
+++ b/tests/analysis/damages/damage_functions/test_damage_function_road_type_lane.py
@@ -1,6 +1,6 @@
import pytest
-from ra2ce.analysis.damages.damage.damage_function_road_type_lane import (
+from ra2ce.analysis.damages.damage_functions.damage_function_road_type_lane import (
DamageFunctionByRoadTypeByLane,
)
from tests import test_data
@@ -9,14 +9,13 @@
class TestDamageFunctionByRoadTypeByLane:
def test_from_input_folder_without_damage_files_raises(self):
# 1. Define test data.
- _damage_function = DamageFunctionByRoadTypeByLane()
_damage_test_data = test_data / "damages" / "no_files"
if not _damage_test_data.exists():
_damage_test_data.mkdir(parents=True)
# 2. Run test.
with pytest.raises(ValueError) as exc_err:
- _damage_function.from_input_folder(_damage_test_data)
+ DamageFunctionByRoadTypeByLane.from_input_folder(None, _damage_test_data)
# 3. Verify final expectations.
assert str(exc_err.value) == "Did not find any damage file in {}".format(
@@ -25,11 +24,10 @@ def test_from_input_folder_without_damage_files_raises(self):
def test_from_input_folder_with_too_many_files_raises(self):
# 1. Define test data.
- _damage_function = DamageFunctionByRoadTypeByLane()
_damage_test_data = test_data / "damages" / "repeated_files"
# 2. Run test.
with pytest.raises(ValueError) as exc_err:
- _damage_function.from_input_folder(_damage_test_data)
+ DamageFunctionByRoadTypeByLane.from_input_folder(None, _damage_test_data)
# 3. Verify final expectations.
assert str(exc_err.value) == "Found more then one damage file in {}".format(
diff --git a/tests/analysis/damages/damage_functions/test_manual_damage_functions_reader.py b/tests/analysis/damages/damage_functions/test_manual_damage_functions_reader.py
new file mode 100644
index 000000000..76f5370bb
--- /dev/null
+++ b/tests/analysis/damages/damage_functions/test_manual_damage_functions_reader.py
@@ -0,0 +1,30 @@
+from ra2ce.analysis.damages.damage_functions.manual_damage_functions import (
+ ManualDamageFunctions,
+)
+from ra2ce.analysis.damages.damage_functions.manual_damage_functions_reader import (
+ ManualDamageFunctionsReader,
+)
+from ra2ce.common.io.readers.file_reader_protocol import FileReaderProtocol
+from tests import test_data
+
+
+class TestManualDamageFunctionsReader:
+ def test_initialize(self):
+ # 1. Run test
+ _reader = ManualDamageFunctionsReader()
+
+ # 2. Verify expections
+ assert isinstance(_reader, ManualDamageFunctionsReader)
+ assert isinstance(_reader, FileReaderProtocol)
+
+ def test_read_returns_manual_damage_functions(self):
+ # 1. Define test data
+ _path = test_data.joinpath("damages", "test_damage_functions")
+ assert _path.exists()
+
+ # 2. Execute test
+ _result = ManualDamageFunctionsReader().read(_path)
+
+ # 3. Verify expectations
+ assert isinstance(_result, ManualDamageFunctions)
+ assert len(_result.damage_functions) == 1
diff --git a/tests/analysis/damages/damage_functions/test_max_damage.py b/tests/analysis/damages/damage_functions/test_max_damage.py
new file mode 100644
index 000000000..8fb43b0de
--- /dev/null
+++ b/tests/analysis/damages/damage_functions/test_max_damage.py
@@ -0,0 +1,62 @@
+from ra2ce.analysis.damages.damage_functions.max_damage import MaxDamage
+
+
+class TestMaxDamage:
+ def test_initialize(self):
+ # 1. Define test data.
+ _name = "sth"
+ _dmg_unit = "else/m"
+
+ # 2. Run test.
+ _damage = MaxDamage(name=_name, damage_unit=_dmg_unit, data=42.0)
+
+ # 3. Verify final expectations.
+ assert isinstance(_damage, MaxDamage)
+ assert _damage.name == _name
+ assert _damage.damage_unit == _dmg_unit
+
+ # Mock to avoid execution of __post_init__ method
+ class MockMaxDamage(MaxDamage):
+ def __post_init__(self) -> None:
+ pass
+
+ def test__convert_length_unit_converts_to_m(self):
+ # 1. Define test data.
+ _name = "sth"
+ _dmg_unit = "else/km"
+ _desired_unit = "else/m"
+ _data = 42.0
+ _damage = self.MockMaxDamage(name=_name, damage_unit=_dmg_unit, data=_data)
+
+ # 2. Run test.
+ _damage._convert_length_unit(_desired_unit)
+
+ # 3. Verify expectations
+ assert _damage.damage_unit == _desired_unit
+ assert _damage.data == _data / 1000
+
+ def test__convert_length_unit_same_unit_does_nothing(self):
+ # 1. Define test data.
+ _name = "sth"
+ _dmg_unit = "else"
+ _damage = self.MockMaxDamage(name=_name, damage_unit=_dmg_unit, data=None)
+
+ # 2. Run test.
+ _damage._convert_length_unit(_dmg_unit)
+
+ # 3. Verify expectations
+ assert _damage.damage_unit == _dmg_unit
+
+ def test__convert_length_unit_unsupported_does_nothing(self):
+ # 1. Define test data.
+ _name = "my_damage"
+ _dmg_unit = "sth/km"
+ _desired_unit = "else/miles"
+ _damage = self.MockMaxDamage(name=_name, damage_unit=_dmg_unit, data=42.0)
+
+ # 2. Run test.
+ _damage._convert_length_unit(_desired_unit)
+
+ # 3. Verify expectations
+ assert _damage.damage_unit == _dmg_unit
+ assert _damage.data == 42.0
diff --git a/tests/analysis/damages/test_damages.py b/tests/analysis/damages/test_damages.py
index b65d22714..728d49bfc 100644
--- a/tests/analysis/damages/test_damages.py
+++ b/tests/analysis/damages/test_damages.py
@@ -7,13 +7,18 @@
from ra2ce.analysis.analysis_config_data.enums.risk_calculation_mode_enum import (
RiskCalculationModeEnum,
)
-from ra2ce.analysis.damages.damage.manual_damage_functions import ManualDamageFunctions
from ra2ce.analysis.damages.damage_calculation.damage_network_events import (
DamageNetworkEvents,
)
from ra2ce.analysis.damages.damage_calculation.damage_network_return_periods import (
DamageNetworkReturnPeriods,
)
+from ra2ce.analysis.damages.damage_functions.manual_damage_functions import (
+ ManualDamageFunctions,
+)
+from ra2ce.analysis.damages.damage_functions.manual_damage_functions_reader import (
+ ManualDamageFunctionsReader,
+)
from tests import test_data
damages_test_data = test_data / "damages"
@@ -212,13 +217,11 @@ def test_event_based_damage_calculation_osdamage_stylized(self):
)
def _load_manual_damage_function(self):
- manual_damage_functions = ManualDamageFunctions()
- manual_damage_functions.find_damage_functions(
- folder=damages_test_data / "test_damage_functions"
+ manual_damage_functions = ManualDamageFunctionsReader().read(
+ damages_test_data.joinpath("test_damage_functions")
)
- manual_damage_functions.load_damage_functions()
-
- fun0 = manual_damage_functions.loaded[0]
+ assert isinstance(manual_damage_functions, ManualDamageFunctions)
+ fun0 = list(manual_damage_functions.damage_functions.values())[0]
# Check some damage fractions
assert fun0.prefix == "te"
@@ -259,13 +262,11 @@ def test_event_based_damage_calculation_manual_stylized(self):
]
# LOAD DAMAGE FUNCTIONS
- manual_damage_functions = ManualDamageFunctions()
- manual_damage_functions.find_damage_functions(
- folder=damages_test_data / "test_damage_functions"
+ manual_damage_functions = ManualDamageFunctionsReader().read(
+ damages_test_data.joinpath("test_damage_functions")
)
- manual_damage_functions.load_damage_functions()
-
- fun0 = manual_damage_functions.loaded[0]
+ assert isinstance(manual_damage_functions, ManualDamageFunctions)
+ fun0 = list(manual_damage_functions.damage_functions.values())[0]
assert fun0.prefix == "te"
_representative_damage_percentage = 100