diff --git a/ra2ce/analysis/indirect/losses.py b/ra2ce/analysis/indirect/losses.py index b5e7aa760..a09fbc08a 100644 --- a/ra2ce/analysis/indirect/losses.py +++ b/ra2ce/analysis/indirect/losses.py @@ -49,10 +49,10 @@ def _load_df_from_csv( - csv_path: Path, - columns_to_interpret: list[str], - index: Optional[str | None], - sep: str = ",", + csv_path: Path, + columns_to_interpret: list[str], + index: Optional[str | None], + sep: str = ",", ) -> pd.DataFrame: if csv_path is None or not csv_path.exists(): logging.warning("No `csv` file found at {}.".format(csv_path)) @@ -80,9 +80,9 @@ class Losses(AnalysisIndirectProtocol): hazard_names: HazardNames def __init__( - self, - analysis_input: AnalysisInputWrapper, - analysis_config: AnalysisConfigWrapper, + self, + analysis_input: AnalysisInputWrapper, + analysis_config: AnalysisConfigWrapper, ) -> None: self.analysis_input = analysis_input self.analysis_config = analysis_config @@ -100,7 +100,7 @@ def __init__( self.duration_event: float = self.analysis.duration_event self.hours_per_day: float = self.analysis.hours_per_day self.production_loss_per_capita_per_hour = ( - self.analysis.production_loss_per_capita_per_day / self.hours_per_day + self.analysis.production_loss_per_capita_per_day / self.hours_per_day ) self._check_validity_analysis_files() self.intensities = _load_df_from_csv( @@ -126,9 +126,9 @@ def __init__( def _check_validity_analysis_files(self): if ( - self.analysis.traffic_intensities_file is None - or self.analysis.resilience_curve_file is None - or self.analysis.values_of_time_file is None + self.analysis.traffic_intensities_file is None + or self.analysis.resilience_curve_file is None + or self.analysis.values_of_time_file is None ): raise ValueError( f"traffic_intensities_file, resilience_curve_file, and values_of_time_file should be given" @@ -141,7 +141,7 @@ def _check_validity_df(self): """ _required_values_of_time_keys = ["trip_types", "value_of_time", "occupants"] if not all( - key in self.values_of_time.columns for key in _required_values_of_time_keys + key in self.values_of_time.columns for key in _required_values_of_time_keys ): raise ValueError( f"Missing required columns in values_of_time: {_required_values_of_time_keys}" @@ -153,16 +153,16 @@ def _check_validity_df(self): "functionality_loss_ratio", ] if len(self.resilience_curve) > 0 and not all( - key in self.resilience_curve.columns - for key in _required_resilience_curve_keys + key in self.resilience_curve.columns + for key in _required_resilience_curve_keys ): raise ValueError( f"Missing required columns in resilience_curve: {_required_resilience_curve_keys}" ) if ( - self.link_id not in self.intensities.columns - and self.link_id not in self.intensities.index.name + self.link_id not in self.intensities.columns + and self.link_id not in self.intensities.index.name ): raise Exception( f"""traffic_intensities_file and input graph do not have the same link_id. @@ -187,7 +187,7 @@ def _get_vot_intensity_per_trip_purpose(self) -> dict[str, pd.DataFrame]: occupancy_var_name = f"occupants_{trip_purpose}" partofday_trip_purpose_name = f"{self.part_of_day}_{trip_purpose}" partofday_trip_purpose_intensity_name = ( - "intensity_" + partofday_trip_purpose_name + "intensity_" + partofday_trip_purpose_name ) # read and set the vot's _vot_dict[vot_var_name] = self.values_of_time.loc[ @@ -200,13 +200,13 @@ def _get_vot_intensity_per_trip_purpose(self) -> dict[str, pd.DataFrame]: ].item() # read and set the intensities _vot_dict[partofday_trip_purpose_intensity_name] = ( - self.intensities_simplified_graph[partofday_trip_purpose_name] - / self.hours_per_day + self.intensities_simplified_graph[partofday_trip_purpose_name] + / self.hours_per_day ) return dict(_vot_dict) def _get_disrupted_criticality_analysis_results( - self, criticality_analysis: gpd.GeoDataFrame + self, criticality_analysis: gpd.GeoDataFrame ): criticality_analysis.reset_index(inplace=True) @@ -221,19 +221,19 @@ def _get_disrupted_criticality_analysis_results( if self.analysis.aggregate_wl == AggregateWlEnum.NONE: self.criticality_analysis = criticality_analysis[ criticality_analysis["EV1_ma"] > self.analysis.threshold - ] + ] elif self.analysis.aggregate_wl == AggregateWlEnum.MAX: self.criticality_analysis = criticality_analysis[ criticality_analysis["EV1_max"] > self.analysis.threshold - ] + ] elif self.analysis.aggregate_wl == AggregateWlEnum.MEAN: self.criticality_analysis = criticality_analysis[ criticality_analysis["EV1_mean"] > self.analysis.threshold - ] + ] elif self.analysis.aggregate_wl == AggregateWlEnum.MIN: self.criticality_analysis = criticality_analysis[ criticality_analysis["EV1_min"] > self.analysis.threshold - ] + ] self.criticality_analysis_non_disrupted = criticality_analysis[ ~criticality_analysis.index.isin(self.criticality_analysis.index) @@ -303,7 +303,9 @@ def _get_range(height: float) -> str: return f"{x}-{y}" raise ValueError(f"No matching range found for height {height}") - def _create_result(vlh: gpd.GeoDataFrame, connectivity_attribute: str) -> gpd.GeoDataFrame: + def _create_result( + vlh: gpd.GeoDataFrame, connectivity_attribute: str + ) -> gpd.GeoDataFrame: """ Args: vlh: calculated vehicle_loss_hours GeoDataFrame. For single_link_losses it only includes the @@ -314,11 +316,19 @@ def _create_result(vlh: gpd.GeoDataFrame, connectivity_attribute: str) -> gpd.Ge Multi_link_losses this is not necessary because of the underlying multi_link_redundancy analysis. """ - columns_without_index = [col for col in self.criticality_analysis_non_disrupted.columns if - col not in ["level_0"]] + columns_without_index = [ + col + for col in self.criticality_analysis_non_disrupted.columns + if col not in ["level_0"] + ] # Get the vlh_columns from vehicle_loss_hours that vlh calculations are filled in. vlh_columns = list( - set(vlh.columns) - set(self.criticality_analysis_non_disrupted[columns_without_index].columns) + set(vlh.columns) + - set( + self.criticality_analysis_non_disrupted[ + columns_without_index + ].columns + ) ) vlh[vlh_columns] = vlh[vlh_columns].fillna(0) @@ -331,14 +341,10 @@ def _create_result(vlh: gpd.GeoDataFrame, connectivity_attribute: str) -> gpd.Ge result = result.reset_index() # Fill 0 for the vlh_columns of vlh and self.criticality_analysis_non_disrupted - result.loc[ - result.index.difference(vlh.index), vlh_columns - ] = result.loc[ + result.loc[result.index.difference(vlh.index), vlh_columns] = result.loc[ result.index.difference(vlh.index), vlh_columns - ].fillna( - 0 - ) - for col in ['index', 'level_0']: + ].fillna(0) + for col in ["index", "level_0"]: if col in result.columns: result = result.drop(col, axis=1) @@ -382,16 +388,19 @@ def _create_result(vlh: gpd.GeoDataFrame, connectivity_attribute: str) -> gpd.Ge # find the link_type and the hazard intensity connectivity_attribute = None if any( - col in self.criticality_analysis.columns for col in ["detour", "connected"] + col in self.criticality_analysis.columns for col in ["detour", "connected"] ): connectivity_attribute = ( "detour" if "detour" in self.criticality_analysis.columns else "connected" ) - vlh_additional_columns = self.criticality_analysis.columns.difference(vehicle_loss_hours_df.columns).tolist() + vlh_additional_columns = self.criticality_analysis.columns.difference( + vehicle_loss_hours_df.columns + ).tolist() vehicle_loss_hours_df = pd.merge( - vehicle_loss_hours_df, self.criticality_analysis[vlh_additional_columns], + vehicle_loss_hours_df, + self.criticality_analysis[vlh_additional_columns], left_on=self.link_id, right_index=True, ) @@ -431,12 +440,16 @@ def _create_result(vlh: gpd.GeoDataFrame, connectivity_attribute: str) -> gpd.Ge performance_row[-1]["v"], performance_key, ) - if (math.isnan(row_performance_change) and row_connectivity == 0) or row_performance_change == 0: + if ( + math.isnan(row_performance_change) and row_connectivity == 0 + ) or row_performance_change == 0: self._calculate_production_loss_per_capita( vehicle_loss_hours, vlh_row, event ) - elif (not(math.isnan(row_performance_change) and math.isnan(row_connectivity)) and - ((u, v, k) == row_u_v_k)): + elif not ( + math.isnan(row_performance_change) + and math.isnan(row_connectivity) + ) and ((u, v, k) == row_u_v_k): self._populate_vehicle_loss_hour( vehicle_loss_hours, row_hazard_range, @@ -445,14 +458,16 @@ def _create_result(vlh: gpd.GeoDataFrame, connectivity_attribute: str) -> gpd.Ge event, ) - vehicle_loss_hours_result = _create_result(vehicle_loss_hours, connectivity_attribute) + vehicle_loss_hours_result = _create_result( + vehicle_loss_hours, connectivity_attribute + ) return vehicle_loss_hours_result def _calculate_production_loss_per_capita( - self, - vehicle_loss_hours: gpd.GeoDataFrame, - vlh_row: pd.Series, - hazard_col_name: str, + self, + vehicle_loss_hours: gpd.GeoDataFrame, + vlh_row: pd.Series, + hazard_col_name: str, ): """ In cases where there is no alternative route in the event of disruption of the road, we propose to use a @@ -479,10 +494,10 @@ def _calculate_production_loss_per_capita( self.vot_intensity_per_trip_collection[f"occupants_{trip_type}"] ) vlh_trip_type_event_series = ( - self.duration_event - * intensity_trip_type - * occupancy_trip_type - * self.production_loss_per_capita_per_hour + self.duration_event + * intensity_trip_type + * occupancy_trip_type + * self.production_loss_per_capita_per_hour ) vlh_trip_type_event = vlh_trip_type_event_series.squeeze() vehicle_loss_hours.loc[ @@ -494,12 +509,12 @@ def _calculate_production_loss_per_capita( ] = vlh_total def _populate_vehicle_loss_hour( - self, - vehicle_loss_hours: gpd.GeoDataFrame, - row_hazard_range: str, - vlh_row: pd.Series, - performance_change: float, - hazard_col_name: str, + self, + vehicle_loss_hours: gpd.GeoDataFrame, + row_hazard_range: str, + vlh_row: pd.Series, + performance_change: float, + hazard_col_name: str, ): vlh_total = 0 @@ -510,14 +525,14 @@ def _populate_vehicle_loss_hour( row_relevant_curve = self.resilience_curve[ self.resilience_curve["link_type_hazard_intensity"] == link_type_hazard_range - ] + ] disruption = ( ( - row_relevant_curve["duration_steps"].apply(pd.Series) - * (row_relevant_curve["functionality_loss_ratio"]).apply( - pd.Series - ) + row_relevant_curve["duration_steps"].apply(pd.Series) + * (row_relevant_curve["functionality_loss_ratio"]).apply( + pd.Series + ) ).sum(axis=1) ).squeeze() if disruption > max_disruption: @@ -530,14 +545,18 @@ def _populate_vehicle_loss_hour( relevant_curve = self.resilience_curve[ self.resilience_curve["link_type_hazard_intensity"] == link_type_hazard_range - ] + ] if relevant_curve.size == 0: raise Exception( f"""{link_type_hazard_range} was not found in the introduced resilience_curve""" ) divisor = 100 - if all(ratio <= 1 for ratio_tuple in relevant_curve["functionality_loss_ratio"] for ratio in ratio_tuple): + if all( + ratio <= 1 + for ratio_tuple in relevant_curve["functionality_loss_ratio"] + for ratio in ratio_tuple + ): divisor = 1 duration_steps: list = relevant_curve["duration_steps"].item() @@ -556,11 +575,14 @@ def _populate_vehicle_loss_hour( ) vlh_trip_type_event_series = sum( - (intensity_trip_type - * duration - * loss_ratio - * performance_change - * vot_trip_type) / divisor + ( + intensity_trip_type + * duration + * loss_ratio + * performance_change + * vot_trip_type + ) + / divisor for duration, loss_ratio in zip( duration_steps, functionality_loss_ratios ) diff --git a/ra2ce/analysis/indirect/multi_link_redundancy.py b/ra2ce/analysis/indirect/multi_link_redundancy.py index b8a7f29d4..2b6d9a41e 100644 --- a/ra2ce/analysis/indirect/multi_link_redundancy.py +++ b/ra2ce/analysis/indirect/multi_link_redundancy.py @@ -29,8 +29,8 @@ class MultiLinkRedundancy(AnalysisIndirectProtocol): hazard_names: HazardNames def __init__( - self, - analysis_input: AnalysisInputWrapper, + self, + analysis_input: AnalysisInputWrapper, ) -> None: self.analysis = analysis_input.analysis self.graph_file_hazard = analysis_input.graph_file_hazard @@ -39,21 +39,22 @@ def __init__( self.output_path = analysis_input.output_path self.hazard_names = analysis_input.hazard_names - def _update_time(self, df_calculated: pd.DataFrame, gdf_graph: gpd.GeoDataFrame) \ - -> tuple[pd.DataFrame, gpd.GeoDataFrame]: + def _update_time( + self, df_calculated: pd.DataFrame, gdf_graph: gpd.GeoDataFrame + ) -> tuple[pd.DataFrame, gpd.GeoDataFrame]: """ updates the time column with the calculated dataframe and updates the rest of the gdf_graph if time is None. """ if ( - WeighingEnum.TIME.config_value in gdf_graph.columns - and WeighingEnum.TIME.config_value in df_calculated.columns + WeighingEnum.TIME.config_value in gdf_graph.columns + and WeighingEnum.TIME.config_value in df_calculated.columns ): df_calculated = df_calculated.drop(columns=[WeighingEnum.TIME.config_value]) return df_calculated, gdf_graph if ( - WeighingEnum.TIME.config_value not in gdf_graph.columns - and WeighingEnum.TIME.config_value not in df_calculated.columns + WeighingEnum.TIME.config_value not in gdf_graph.columns + and WeighingEnum.TIME.config_value not in df_calculated.columns ): return df_calculated, gdf_graph @@ -65,12 +66,12 @@ def _update_time(self, df_calculated: pd.DataFrame, gdf_graph: gpd.GeoDataFrame) row_avgspeed = row.get("avgspeed", None) row_length = row.get("length", None) if ( - pd.isna(row[WeighingEnum.TIME.config_value]) - and row_avgspeed - and row_length + pd.isna(row[WeighingEnum.TIME.config_value]) + and row_avgspeed + and row_length ): gdf_graph.at[i, WeighingEnum.TIME.config_value] = ( - row_length * 1e-3 / row_avgspeed + row_length * 1e-3 / row_avgspeed ) else: gdf_graph.at[i, WeighingEnum.TIME.config_value] = row.get( @@ -91,10 +92,10 @@ def execute(self) -> gpd.GeoDataFrame: def _is_not_none(value): return ( - value is not None - and value is not pd.NA - and not pd.isna(value) - and not np.isnan(value) + value is not None + and value is not pd.NA + and not pd.isna(value) + and not np.isnan(value) ) results = [] @@ -112,8 +113,8 @@ def _is_not_none(value): edges_remove = [] for e in _graph.edges.data(keys=True): if (hazard_name in e[-1]) and ( - ("bridge" not in e[-1]) - or ("bridge" in e[-1] and e[-1]["bridge"] != "yes") + ("bridge" not in e[-1]) + or ("bridge" in e[-1] and e[-1]["bridge"] != "yes") ): edges_remove.append(e) edges_remove = [e for e in edges_remove if (e[-1][hazard_name] is not None)] @@ -121,14 +122,14 @@ def _is_not_none(value): e for e in edges_remove if (hazard_name in e[-1]) - and ( - _is_not_none(e[-1][hazard_name]) - and (e[-1][hazard_name] > float(self.analysis.threshold)) - and ( - ("bridge" not in e[-1]) - or ("bridge" in e[-1] and e[-1]["bridge"] != "yes") - ) - ) + and ( + _is_not_none(e[-1][hazard_name]) + and (e[-1][hazard_name] > float(self.analysis.threshold)) + and ( + ("bridge" not in e[-1]) + or ("bridge" in e[-1] and e[-1]["bridge"] != "yes") + ) + ) ] _graph.remove_edges_from(edges_remove)