diff --git a/selfdrive/car/modules/ALCA_module.py b/selfdrive/car/modules/ALCA_module.py index fcbbd21378c9ef..63e5dd333dc5e0 100644 --- a/selfdrive/car/modules/ALCA_module.py +++ b/selfdrive/car/modules/ALCA_module.py @@ -130,7 +130,7 @@ def stop_ALCA(self, CS, isDone): self.send_status(CS) - def update(self,enabled,CS,actuators,alcaStateData,frame): + def update(self,enabled,CS,actuators,alcaStateData,frame,blinker): cl_min_v = CS.CL_MIN_V cl_max_a = CS.CL_MAX_A self.frame = frame @@ -146,23 +146,13 @@ def update(self,enabled,CS,actuators,alcaStateData,frame): self.laneChange_cancelled = False # Basic highway lane change logic - turn_signal_needed = 0 # send 1 for left, 2 for right 0 for not needed - - if CS.turn_signal_stalk_state == 0 and \ - (self.laneChange_enabled == 4): - self.debug_alca("ALCA reset: resetting 4 -> 1") - self.laneChange_enabled =1 - self.laneChange_counter =0 - self.laneChange_direction =0 - CS.UE.custom_alert_message(-1,"",0) - - if CS.turn_signal_stalk_state == 0 and \ - (self.laneChange_enabled > 1): - # no blinkers on but we are still changing lane, so we need to send blinker command - if self.laneChange_direction == -1: - turn_signal_needed = 1 - elif self.laneChange_direction == 1: - turn_signal_needed = 2 + + if self.laneChange_enabled == 4 and frame > blinker.override_frame_end: + self.debug_alca("ALCA reset: resetting 4 -> 1") + self.laneChange_enabled = 1 + self.laneChange_counter = 0 + self.laneChange_direction = 0 + CS.UE.custom_alert_message(-1, "", 0) if (CS.cstm_btns.get_button_status("alca") > 0) and self.alcaEnabled and (self.laneChange_enabled == 1): if ((CS.v_ego < cl_min_v) or (abs(actuators.steerAngle) >= cl_max_a) or \ @@ -180,11 +170,12 @@ def update(self,enabled,CS,actuators,alcaStateData,frame): self.stop_ALCA(CS, False) return 0, False - if self.alcaEnabled and enabled and CS.prev_turn_signal_stalk_state == 0 and CS.turn_signal_stalk_state > 0 and \ + if self.alcaEnabled and enabled and blinker.tap_direction != 0 and \ (CS.v_ego >= cl_min_v) and (abs(actuators.steerAngle) < cl_max_a) and (self.laneChange_enabled == 1): # start blinker, speed and angle is within limits, let's go - laneChange_direction = -1 if CS.turn_signal_stalk_state == 1 else 1 # left -1, right 1 - self.debug_alca("ALCA blinker on detected") + laneChange_direction = -1 if blinker.tap_direction == 1 else 1 # left -1, right 1 + blinker.override_direction = blinker.tap_direction + self.debug_alca("ALCA blinker tap detected") CS.UE.custom_alert_message(2,"Auto Lane Change Engaged!",100) self.debug_alca("ALCA engaged") @@ -240,9 +231,11 @@ def update(self,enabled,CS,actuators,alcaStateData,frame): self.laneChange_counter = 0 self.stop_ALCA(CS, True) return 0, False + else: + blinker.override_frame_end = max(blinker.override_frame_end, frame + 25) self.send_status(CS) - return turn_signal_needed, self.laneChange_enabled > 1 + return self.laneChange_enabled > 1 class ALCAModelParser(): def __init__(self): diff --git a/selfdrive/car/tesla/HSO_module.py b/selfdrive/car/tesla/HSO_module.py index 002b0adb463e89..d774a1a88f8231 100644 --- a/selfdrive/car/tesla/HSO_module.py +++ b/selfdrive/car/tesla/HSO_module.py @@ -12,8 +12,8 @@ def update_stat(self, CC, CS, enabled, actuators, frame): # if steering but not by ALCA if CS.steer_override > 0: self.frame_humanSteered = frame - elif CC.ALCA.laneChange_enabled <= 1: - if CS.turn_signal_stalk_state > 0 or frame <= (CC.blinker_on_frame_start + int(100 * CS.hsoNumbPeriod)): # stalk locked or blinker within numbPeriod + elif CC.ALCA.laneChange_enabled <= 1 and frame > CC.blinker.blinker_on_frame_start + CC.blinker.tap_duration_frames: + if CS.turn_signal_stalk_state > 0 or frame <= (CC.blinker.blinker_on_frame_start + int(100 * CS.hsoNumbPeriod)): # stalk locked or blinker within numbPeriod self.frame_humanSteered = frame elif frame - self.frame_humanSteered < 50: # Need more human testing of handoff timing # Find steering difference between visiond model and human (no need to do every frame if we run out of CPU): diff --git a/selfdrive/car/tesla/blinker_module.py b/selfdrive/car/tesla/blinker_module.py new file mode 100644 index 00000000000000..f837691dc45b83 --- /dev/null +++ b/selfdrive/car/tesla/blinker_module.py @@ -0,0 +1,28 @@ +class Blinker: + + def __init__(self): + self.tap_duration_frames = 55 # stalk signal for less than 550ms means it was tapped + self.tap_direction = 0 # tap direction lasts for one frame (when it was let go) + self.blinker_on_frame_start = 0 + self.override_frame_end = 0 + self.override_direction = 0 + + def update_state(self, CS, frame): + self.tap_direction = 0 + if CS.turn_signal_stalk_state > 0 and CS.prev_turn_signal_stalk_state == 0: + if self.override_frame_end == 0: + self.blinker_on_frame_start = frame + elif CS.turn_signal_stalk_state == 0 and CS.prev_turn_signal_stalk_state > 0: # turn signal stalk just turned off + if frame - self.blinker_on_frame_start <= self.tap_duration_frames: + self.tap_direction = CS.prev_turn_signal_stalk_state + if CS.tapBlinkerExtension > 0 and self.override_frame_end == 0: + blink_duration_frames = 58 # one blink takes ~580ms + total_blinks = 3 + CS.tapBlinkerExtension # 3 blinks are minimum and controlled by the car + self.override_frame_end = self.blinker_on_frame_start + blink_duration_frames * total_blinks + self.override_direction = CS.prev_turn_signal_stalk_state + + if 0 < self.override_frame_end < frame or (self.override_direction > 0 and (CS.turn_signal_stalk_state > 0 or self.override_frame_end == 0)): + self.override_frame_end = 0 + self.override_direction = 0 + if self.blinker_on_frame_start > 0 and CS.turn_signal_stalk_state == 0 and not CS.turn_signal_blinking and self.override_frame_end == 0: + self.blinker_on_frame_start = 0 diff --git a/selfdrive/car/tesla/carcontroller.py b/selfdrive/car/tesla/carcontroller.py index 28fcafd0676417..f5541ccf73fffb 100644 --- a/selfdrive/car/tesla/carcontroller.py +++ b/selfdrive/car/tesla/carcontroller.py @@ -5,6 +5,7 @@ from common.numpy_fast import clip, interp from common import realtime from selfdrive.car.tesla import teslacan +from selfdrive.car.tesla.blinker_module import Blinker from selfdrive.car.tesla.speed_utils.fleet_speed import FleetSpeed from selfdrive.car.tesla.values import AH, CM from selfdrive.can.packer import CANPacker @@ -88,6 +89,7 @@ def __init__(self, dbc_name): self.epas_disabled = True self.last_angle = 0. self.last_accel = 0. + self.blinker = Blinker() self.ALCA = ALCAController(self,True,True) # Enabled and SteerByAngle both True self.ACC = ACCController(self) self.PCC = PCCController(self) @@ -190,10 +192,6 @@ def __init__(self, dbc_name): self.ldwStatus = 0 self.prev_ldwStatus = 0 - self.blinker_on_frame_start = 0 - self.blinker_extension_frame_end = 0 - self.blinker_extension_blinker_type = 0 - self.radarVin_idx = 0 self.LDW_ENABLE_SPEED = 16 self.should_ldw = False @@ -352,7 +350,9 @@ def update(self, enabled, CS, frame, actuators, \ # Update statuses for custom buttons every 0.1 sec. if (frame % 10 == 0): self.ALCA.update_status((CS.cstm_btns.get_button_status("alca") > 0) and ((CS.enableALCA and not CS.hasTeslaIcIntegration) or (CS.hasTeslaIcIntegration and CS.alcaEnabled))) - + + self.blinker.update_state(CS, frame) + # update PCC module info pedal_can_sends = self.PCC.update_stat(CS, frame) if self.PCC.pcc_available: @@ -372,10 +372,7 @@ def update(self, enabled, CS, frame, actuators, \ CS.v_cruise_pcm = self.PCC.pedal_speed_kph * speed_uom_kph else: CS.v_cruise_pcm = max(0., CS.v_ego * CV.MS_TO_KPH) * speed_uom_kph - # Get the turn signal from ALCA. - turn_signal_needed, self.alca_enabled = self.ALCA.update(enabled, CS, actuators, self.alcaStateData, frame) - if turn_signal_needed == 0: - turn_signal_needed = self._should_extend_blinker(CS, frame) + self.alca_enabled = self.ALCA.update(enabled, CS, actuators, self.alcaStateData, frame, self.blinker) self.should_ldw = self._should_ldw(CS, frame) apply_angle = -actuators.steerAngle # Tesla is reversed vs OP. # Update HSO module info. @@ -507,8 +504,8 @@ def update(self, enabled, CS, frame, actuators, \ op_status = 0x01 if self.opState == 5: op_status = 0x03 - if turn_signal_needed > 0: - alca_state = 0x08 + turn_signal_needed + if self.blinker.override_direction > 0: + alca_state = 0x08 + self.blinker.override_direction elif (self.lLine > 1) and (self.rLine > 1): alca_state = 0x08 elif (self.lLine > 1): @@ -608,7 +605,7 @@ def update(self, enabled, CS, frame, actuators, \ adaptive_cruise = 1 if (not self.PCC.pcc_available and self.ACC.adaptive) or self.PCC.pcc_available else 0 can_sends.append(teslacan.create_fake_DAS_msg(speed_control_enabled,speed_override,self.DAS_206_apUnavailable, collision_warning, op_status, \ acc_speed_kph, \ - turn_signal_needed,forward_collision_warning, adaptive_cruise, hands_on_state, \ + self.blinker.override_direction,forward_collision_warning, adaptive_cruise, hands_on_state, \ cc_state, 1 if self.PCC.pcc_available else 0, alca_state, \ CS.v_cruise_pcm, CS.speedLimitToIc, #speed_limit_to_car, @@ -640,7 +637,7 @@ def update(self, enabled, CS, frame, actuators, \ if cruise_btn: cruise_msg = teslacan.create_cruise_adjust_msg( spdCtrlLvr_stat=cruise_btn, - turnIndLvr_Stat= 0, #turn_signal_needed, + turnIndLvr_Stat= 0, real_steering_wheel_stalk=CS.steering_wheel_stalk) # Send this CAN msg first because it is racing against the real stalk. can_sends.insert(0, cruise_msg) @@ -822,26 +819,6 @@ def handleTrafficEvents(self, trafficEventsMsgs): def speedUnits(self, fromMetersPerSecond): return fromMetersPerSecond * (CV.MS_TO_KPH if self.isMetric else CV.MS_TO_MPH) + 0.5 - def _should_extend_blinker(self, CS, frame): - if CS.tapBlinkerExtension <= 0: - return 0 - if CS.turn_signal_stalk_state > 0 and CS.prev_turn_signal_stalk_state == 0 and self.blinker_extension_frame_end == 0: - self.blinker_on_frame_start = frame - elif CS.turn_signal_stalk_state == 0 and CS.prev_turn_signal_stalk_state > 0: # turn signal stalk just turned off - is_blinker_tap = frame - self.blinker_on_frame_start < 55 # stalk signal for less than 550ms means it was tapped - if is_blinker_tap: - blink_duration_frames = 58 # one blink takes ~580ms - self.blinker_extension_frame_end = self.blinker_on_frame_start + blink_duration_frames * (3 + CS.tapBlinkerExtension) - self.blinker_extension_blinker_type = CS.prev_turn_signal_stalk_state - - if 0 < self.blinker_extension_frame_end < frame: - self.blinker_extension_frame_end = 0 - self.blinker_extension_blinker_type = 0 - if self.blinker_on_frame_start > 0 and CS.turn_signal_stalk_state == 0 and not CS.turn_signal_blinking and self.blinker_extension_frame_end == 0: - self.blinker_on_frame_start = 0 - - return self.blinker_extension_blinker_type - def _should_ldw(self, CS, frame): if not CS.enableLdw: return False