Skip to content

Commit

Permalink
Merge branch 'development' into feature/49-add-more-effects-on-vehicl…
Browse files Browse the repository at this point in the history
…eplayer

# Conflicts:
#	src/DataModel/Effects/HackingEffects/CleanHackedEffect.py
  • Loading branch information
iav-DaBi committed Nov 27, 2024
2 parents cf000e5 + 14f65a8 commit fb6f110
Show file tree
Hide file tree
Showing 54 changed files with 2,883 additions and 723 deletions.
2 changes: 2 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[flake8]
max-line-length = 120
3 changes: 3 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ jobs:
echo "$(pipenv --venv)/bin" >> $GITHUB_PATH
- uses: jakebailey/pyright-action@v2
with:
workdir: "src"
args: "--project pyrightconfig.json"

flake8-lint:
name: Check coding style via flake8
Expand Down
760 changes: 378 additions & 382 deletions Pipfile.lock

Large diffs are not rendered by default.

15 changes: 15 additions & 0 deletions pyrightconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"include": ["src", "src/**"],
"exclude": ["**/__pycache__", "test", "test/**"],
"reportMissingImports": true,
"reportUnusedVariable": true,
"reportOptionalSubscript": true,
"reportOptionalMemberAccess": true,
"reportGeneralTypeIssues": true,
"reportUnboundVariable": true,
"reportPrivateUsage": true,
"typeCheckingMode": "basic",
"venvPath": "C:/Users/dbierkar/.virtualenvs/",
"venv": "IAVDistortion-WH4ls6rj",
"pythonVersion": "3.11",
}
4 changes: 2 additions & 2 deletions src/CyberSecurityManager/CyberSecurityManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ def activate_hacking_scenario_for_vehicle(self, uuid: str, scenario_id: str) ->

return

def get_active_hacking_scenarios(self) -> dict:
scenario_map = {}
def get_active_hacking_scenarios(self) -> dict[str, str]:
scenario_map: dict[str, str] = {}
for car in self._environment_manager.get_vehicle_list():
scenario_map.update({car.vehicle_id: car.hacking_scenario})
return scenario_map
15 changes: 13 additions & 2 deletions src/DataModel/Effects/HackingEffects/CleanHackedEffect.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,26 @@


class CleanHackedEffect(VehicleEffect):

def __init__(self, scenario: str):
super().__init__()
self._scenario = scenario

def identify(self) -> VehicleEffectIdentification:
return VehicleEffectIdentification.CLEAN_HACKED_EFFECTS

def effect_should_end(self, vehicle: 'Vehicle') -> bool:
def can_be_applied(self, vehicle: Vehicle) -> bool:
_ = vehicle
return True

def conflicts_with(self) -> list[VehicleEffectIdentification]:
return []

def effect_should_end(self, vehicle: Vehicle) -> bool:
# this should end immediately since it's a quick "remove all bad effects" effect
return True

def on_start(self, vehicle: 'Vehicle') -> bool:
def on_start(self, vehicle: Vehicle) -> bool:
# remove all hacked effects
for effect in vehicle.get_active_effects():
effect_type = effect.identify()
Expand All @@ -28,3 +36,6 @@ def on_start(self, vehicle: 'Vehicle') -> bool:
vehicle.remove_effect(effect)
vehicle.hacking_scenario = self._scenario
return True

def on_end(self, vehicle: 'Vehicle') -> None:
return
8 changes: 4 additions & 4 deletions src/DataModel/Effects/HackingEffects/HackedEffect.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from abc import ABC
from typing import List

from DataModel.Effects.VehicleEffect import VehicleEffect
from DataModel.Effects.VehicleEffectList import VehicleEffectIdentification
Expand All @@ -11,13 +10,14 @@ def __init__(self, scenario: str):
super().__init__()
self._scenario = scenario

def on_start(self, vehicle: 'Vehicle') -> None:
def on_start(self, vehicle: Vehicle) -> bool:
vehicle.hacking_scenario = self._scenario
return True

def effect_should_end(self, vehicle: 'Vehicle') -> bool:
def effect_should_end(self, vehicle: Vehicle) -> bool:
return False

def conflicts_with(self) -> List[VehicleEffectIdentification]:
def conflicts_with(self) -> list[VehicleEffectIdentification]:
return [VehicleEffectIdentification.HACKING_PROTECTION]

def remove_other_hacking_effects(self, vehicle: Vehicle):
Expand Down
7 changes: 6 additions & 1 deletion src/DataModel/Effects/HackingEffects/HackedNoDriving.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,15 @@ class HackedNoDriving(HackedEffect):
def identify(self) -> VehicleEffectIdentification:
return VehicleEffectIdentification.HACKED_NO_DRIVING

def on_start(self, vehicle: 'Vehicle') -> None:
def can_be_applied(self, vehicle: Vehicle) -> bool:
_ = vehicle
return True

def on_start(self, vehicle: 'Vehicle') -> bool:
super().on_start(vehicle)
self.remove_other_hacking_effects(vehicle)
vehicle.speed_factor = 0.0
return True

def on_end(self, vehicle: 'Vehicle') -> None:
vehicle.speed_factor = 1
9 changes: 7 additions & 2 deletions src/DataModel/Effects/HackingEffects/HackedNoSafetyModule.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,17 @@ class HackedNoSafetyModule(HackedEffect):
def identify(self) -> VehicleEffectIdentification:
return VehicleEffectIdentification.HACKED_NO_SAFETY_MODULE

def on_start(self, vehicle: 'Vehicle') -> None:
def can_be_applied(self, vehicle: Vehicle) -> bool:
_ = vehicle
return True

def on_start(self, vehicle: Vehicle) -> bool:
super().on_start(vehicle)
self.remove_other_hacking_effects(vehicle)
vehicle.speed_factor = 1.5
vehicle.set_safemode(False)
return True

def on_end(self, vehicle: 'Vehicle') -> None:
def on_end(self, vehicle: Vehicle) -> None:
vehicle.speed_factor = 1
vehicle.set_safemode(True)
8 changes: 7 additions & 1 deletion src/DataModel/Effects/HackingEffects/HackedReducedSpeed.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,16 @@ class HackedReducedSpeed(HackedEffect):
def identify(self) -> VehicleEffectIdentification:
return VehicleEffectIdentification.HACKED_REDUCED_SPEED

def on_start(self, vehicle: 'Vehicle') -> None:
def can_be_applied(self, vehicle: Vehicle) -> bool:
_ = vehicle
return True

def on_start(self, vehicle: Vehicle) -> bool:
super().on_start(vehicle)
self.remove_other_hacking_effects(vehicle)
vehicle.speed_factor = 0.3
return True

def on_end(self, vehicle: 'Vehicle') -> None:
vehicle.speed_factor = 1
return
20 changes: 15 additions & 5 deletions src/DataModel/Effects/HackingProtection.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,29 +11,39 @@


class HackingProtection(VehicleEffect):

def __init__(self):
super().__init__()
self._end_time = None
self._config_handler = ConfigurationHandler()

def identify(self) -> VehicleEffectIdentification:
return VehicleEffectIdentification.HACKING_PROTECTION

def can_be_applied(self, vehicle: Vehicle) -> bool:
_ = vehicle
return True

def conflicts_with(self) -> list[VehicleEffectIdentification]:
return []

def on_start(self, vehicle: 'Vehicle') -> bool:
super().on_start(vehicle)
clean_effect = CleanHackedEffect("0")
vehicle.apply_effect(clean_effect)
vehicle.remove_effect(clean_effect)

try:
# TODO: Implement general config objects and handle default values there!
duration = self._config_handler.get_configuration()['hacking_protection']['portection_duration_s']
duration = int(self._config_handler.get_configuration()['hacking_protection']['portection_duration_s'])
except KeyError:
duration = 15

self._end_time = datetime.now() + timedelta(seconds=duration)

return True

def identify(self) -> VehicleEffectIdentification:
return VehicleEffectIdentification.HACKING_PROTECTION
def on_end(self, vehicle: 'Vehicle') -> None:
return

def effect_should_end(self, vehicle: 'Vehicle') -> bool:
if self._end_time is None:
Expand Down
13 changes: 8 additions & 5 deletions src/DataModel/Effects/VehicleEffect.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,26 @@ def identify(self) -> VehicleEffectIdentification:
"""
raise NotImplementedError

@abstractmethod
def can_be_applied(self, vehicle: 'Vehicle') -> bool:
"""
Returns whether the effect can be applied to a Vehicle
"""
_ = vehicle
return True
raise NotImplementedError

@abstractmethod
def on_start(self, vehicle: 'Vehicle') -> bool:
"""
Runs when added to a vehicle. Can also be used to start a background task
"""
pass
raise NotImplementedError

@abstractmethod
def on_end(self, vehicle: 'Vehicle') -> None:
"""
Runs when removed from a vehicle. Should be used for cleaning up
"""
pass
raise NotImplementedError

@abstractmethod
def effect_should_end(self, vehicle: 'Vehicle') -> bool:
Expand All @@ -49,8 +51,9 @@ def effect_should_end(self, vehicle: 'Vehicle') -> bool:
"""
raise NotImplementedError

@abstractmethod
def conflicts_with(self) -> List[VehicleEffectIdentification]:
"""
Returns a list of other effects that prevent this effect from being applied
"""
return []
raise NotImplementedError
28 changes: 15 additions & 13 deletions src/DataModel/InitializationCar.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import asyncio
from asyncio import Event
from typing import List

from LocationService.Track import TrackPiece
from LocationService.TrackPieces import StartPieceAfterLine, TrackBuilder, StartPieceBeforeLine, StraightPiece, \
Expand All @@ -10,8 +9,8 @@
_START_PIECE_ID_AFTER_LINE: int = 33
_START_PIECE_ID_BEFORE_LINE: int = 34

_STRAIGHT_PIECE_IDS: List[int] = [36, 39, 40]
_CURVE_PIECE_IDS: List[int] = [17, 18, 20, 23]
_STRAIGHT_PIECE_IDS: list[int] = [36, 39, 40]
_CURVE_PIECE_IDS: list[int] = [17, 18, 20, 23]


def _raw_location_to_normalized_location(piece: int, location: int) -> int:
Expand Down Expand Up @@ -40,7 +39,7 @@ class ScannedPiece:
"""
def __init__(self, piece_id: int, location: int):
self._piece_id: int = piece_id
self._locations: List[int] = list()
self._locations: list[int] = list()
self._locations.append(_raw_location_to_normalized_location(piece_id, location))

def get_id(self) -> int | None:
Expand Down Expand Up @@ -82,10 +81,13 @@ def is_location_counting_downwards(self) -> bool | None:
return None
return self._locations[0] > self._locations[1]

def __eq__(self, other):
def __eq__(self, other: object) -> bool:
"""
Checks whether this scanned piece is equal to another scanned piece
"""
if not isinstance(other, self.__class__):
return False

return type(self) == type(other) and \
self.get_id() == other.get_id() and \
(self.is_location_counting_downwards() == other.is_location_counting_downwards()) # noqa: E721
Expand All @@ -105,11 +107,11 @@ class InitializationCar:
"""
def __init__(self, controller: AnkiController):
self._controller: AnkiController = controller
self._piece_ids: List[ScannedPiece] = list()
self._piece_ids: list[ScannedPiece] = list()
self._finished_scanning_event: Event = Event()
self._new_piece: bool = True

async def run(self) -> List[TrackPiece] | None:
async def run(self) -> list[TrackPiece] | None:
"""
Start the scanning
"""
Expand All @@ -127,8 +129,8 @@ async def run(self) -> List[TrackPiece] | None:
await asyncio.sleep(1)
self._controller.change_speed_to(40)

old_scan: List[ScannedPiece] = []
new_scan: List[ScannedPiece] = await self._scan_for_track_ids()
old_scan: list[ScannedPiece] = []
new_scan: list[ScannedPiece] = await self._scan_for_track_ids()

while old_scan != new_scan:
old_scan = new_scan
Expand All @@ -140,7 +142,7 @@ async def run(self) -> List[TrackPiece] | None:
self._controller.change_speed_to(0)
return self._convert_collected_data_to_pieces(new_scan)

async def _scan_for_track_ids(self) -> List[ScannedPiece]:
async def _scan_for_track_ids(self) -> list[ScannedPiece]:
"""
Drives a scanning round and returns the IDs after that
"""
Expand All @@ -151,7 +153,7 @@ async def _scan_for_track_ids(self) -> List[ScannedPiece]:

return self._piece_ids.copy()

def _receive_location(self, value_tuple) -> None:
def _receive_location(self, value_tuple: tuple[int, int, int, int, int]) -> None:
"""
Callback for when a location event is sent
"""
Expand All @@ -177,7 +179,7 @@ def _receive_location(self, value_tuple) -> None:

return

def _receive_transition(self, value_tuple) -> None:
def _receive_transition(self, value_tuple: tuple[int]) -> None:
"""
Callback for when a transition event is sent
"""
Expand Down Expand Up @@ -206,7 +208,7 @@ def _convert_collected_data_to_pieces(self, scanned_pieces: list[ScannedPiece])
# get piece constants from the track builder
constants = TrackBuilder()

converted_list: List[TrackPiece] = list()
converted_list: list[TrackPiece] = list()
for piece in scanned_pieces:
if piece.get_id() == _START_PIECE_ID_AFTER_LINE:
converted_list.append(StartPieceAfterLine(constants.START_PIECE_AFTER_LINE_LENGTH,
Expand Down
6 changes: 3 additions & 3 deletions src/DataModel/PhysicalCar.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def __init__(self,
vehicle_id: str,
controller: AnkiController,
location_service: PhysicalLocationService,
disable_item_removal=False) -> None:
disable_item_removal: bool = False) -> None:
super().__init__(vehicle_id, location_service, disable_item_removal)
self._controller: AnkiController | None = controller
self._location_service: PhysicalLocationService = location_service
Expand Down Expand Up @@ -54,15 +54,15 @@ async def initiate_connection(self, uuid: str) -> bool:
else:
return False

def _receive_location(self, value_tuple) -> None:
def _receive_location(self, value_tuple: tuple[int, int, int, int, int]) -> None:
location, piece, offset, speed, _ = value_tuple
offset = clamp(offset, -66.5, 66.5)
self._current_driving_speed = speed if self._requested_speed != 0 else 0
self._offset_from_center = offset
self._location_service.notify_location_event(piece, location, offset, self._current_driving_speed)
self._on_driving_data_change()

def _receive_transition(self, value_tuple) -> None:
def _receive_transition(self, value_tuple: tuple[int, int, int, int]) -> None:
super()._receive_transition(value_tuple)
_, _, offset, _ = value_tuple
offset = clamp(offset, -66.5, 66.5)
Expand Down
Loading

0 comments on commit fb6f110

Please sign in to comment.