Skip to content

Commit

Permalink
Blinker tapping related improvements (commaai#144)
Browse files Browse the repository at this point in the history
* fix blinker extension could not be cancelled via second tap
* fix blinker extension didn't work when aborting ALCA
* ALCA now only triggers by tapping, so you can turn or make a manual lane change without warnings
  • Loading branch information
neon-dev authored Jan 15, 2020
1 parent ced7bfd commit a058729
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 57 deletions.
37 changes: 15 additions & 22 deletions selfdrive/car/modules/ALCA_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def stop_ALCA(self, CS, isDone):
self.send_status(CS)


def update(self,enabled,CS,actuators,alcaStateData,frame):
def update(self,enabled,CS,actuators,alcaStateData,frame,blinker):
cl_min_v = CS.CL_MIN_V
cl_max_a = CS.CL_MAX_A
self.frame = frame
Expand All @@ -146,23 +146,13 @@ def update(self,enabled,CS,actuators,alcaStateData,frame):
self.laneChange_cancelled = False

# Basic highway lane change logic
turn_signal_needed = 0 # send 1 for left, 2 for right 0 for not needed

if CS.turn_signal_stalk_state == 0 and \
(self.laneChange_enabled == 4):
self.debug_alca("ALCA reset: resetting 4 -> 1")
self.laneChange_enabled =1
self.laneChange_counter =0
self.laneChange_direction =0
CS.UE.custom_alert_message(-1,"",0)

if CS.turn_signal_stalk_state == 0 and \
(self.laneChange_enabled > 1):
# no blinkers on but we are still changing lane, so we need to send blinker command
if self.laneChange_direction == -1:
turn_signal_needed = 1
elif self.laneChange_direction == 1:
turn_signal_needed = 2

if self.laneChange_enabled == 4 and frame > blinker.override_frame_end:
self.debug_alca("ALCA reset: resetting 4 -> 1")
self.laneChange_enabled = 1
self.laneChange_counter = 0
self.laneChange_direction = 0
CS.UE.custom_alert_message(-1, "", 0)

if (CS.cstm_btns.get_button_status("alca") > 0) and self.alcaEnabled and (self.laneChange_enabled == 1):
if ((CS.v_ego < cl_min_v) or (abs(actuators.steerAngle) >= cl_max_a) or \
Expand All @@ -180,11 +170,12 @@ def update(self,enabled,CS,actuators,alcaStateData,frame):
self.stop_ALCA(CS, False)
return 0, False

if self.alcaEnabled and enabled and CS.prev_turn_signal_stalk_state == 0 and CS.turn_signal_stalk_state > 0 and \
if self.alcaEnabled and enabled and blinker.tap_direction != 0 and \
(CS.v_ego >= cl_min_v) and (abs(actuators.steerAngle) < cl_max_a) and (self.laneChange_enabled == 1):
# start blinker, speed and angle is within limits, let's go
laneChange_direction = -1 if CS.turn_signal_stalk_state == 1 else 1 # left -1, right 1
self.debug_alca("ALCA blinker on detected")
laneChange_direction = -1 if blinker.tap_direction == 1 else 1 # left -1, right 1
blinker.override_direction = blinker.tap_direction
self.debug_alca("ALCA blinker tap detected")

CS.UE.custom_alert_message(2,"Auto Lane Change Engaged!",100)
self.debug_alca("ALCA engaged")
Expand Down Expand Up @@ -240,9 +231,11 @@ def update(self,enabled,CS,actuators,alcaStateData,frame):
self.laneChange_counter = 0
self.stop_ALCA(CS, True)
return 0, False
else:
blinker.override_frame_end = max(blinker.override_frame_end, frame + 25)

self.send_status(CS)
return turn_signal_needed, self.laneChange_enabled > 1
return self.laneChange_enabled > 1

class ALCAModelParser():
def __init__(self):
Expand Down
4 changes: 2 additions & 2 deletions selfdrive/car/tesla/HSO_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ def update_stat(self, CC, CS, enabled, actuators, frame):
# if steering but not by ALCA
if CS.steer_override > 0:
self.frame_humanSteered = frame
elif CC.ALCA.laneChange_enabled <= 1:
if CS.turn_signal_stalk_state > 0 or frame <= (CC.blinker_on_frame_start + int(100 * CS.hsoNumbPeriod)): # stalk locked or blinker within numbPeriod
elif CC.ALCA.laneChange_enabled <= 1 and frame > CC.blinker.blinker_on_frame_start + CC.blinker.tap_duration_frames:
if CS.turn_signal_stalk_state > 0 or frame <= (CC.blinker.blinker_on_frame_start + int(100 * CS.hsoNumbPeriod)): # stalk locked or blinker within numbPeriod
self.frame_humanSteered = frame
elif frame - self.frame_humanSteered < 50: # Need more human testing of handoff timing
# Find steering difference between visiond model and human (no need to do every frame if we run out of CPU):
Expand Down
28 changes: 28 additions & 0 deletions selfdrive/car/tesla/blinker_module.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
class Blinker:

def __init__(self):
self.tap_duration_frames = 55 # stalk signal for less than 550ms means it was tapped
self.tap_direction = 0 # tap direction lasts for one frame (when it was let go)
self.blinker_on_frame_start = 0
self.override_frame_end = 0
self.override_direction = 0

def update_state(self, CS, frame):
self.tap_direction = 0
if CS.turn_signal_stalk_state > 0 and CS.prev_turn_signal_stalk_state == 0:
if self.override_frame_end == 0:
self.blinker_on_frame_start = frame
elif CS.turn_signal_stalk_state == 0 and CS.prev_turn_signal_stalk_state > 0: # turn signal stalk just turned off
if frame - self.blinker_on_frame_start <= self.tap_duration_frames:
self.tap_direction = CS.prev_turn_signal_stalk_state
if CS.tapBlinkerExtension > 0 and self.override_frame_end == 0:
blink_duration_frames = 58 # one blink takes ~580ms
total_blinks = 3 + CS.tapBlinkerExtension # 3 blinks are minimum and controlled by the car
self.override_frame_end = self.blinker_on_frame_start + blink_duration_frames * total_blinks
self.override_direction = CS.prev_turn_signal_stalk_state

if 0 < self.override_frame_end < frame or (self.override_direction > 0 and (CS.turn_signal_stalk_state > 0 or self.override_frame_end == 0)):
self.override_frame_end = 0
self.override_direction = 0
if self.blinker_on_frame_start > 0 and CS.turn_signal_stalk_state == 0 and not CS.turn_signal_blinking and self.override_frame_end == 0:
self.blinker_on_frame_start = 0
43 changes: 10 additions & 33 deletions selfdrive/car/tesla/carcontroller.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from common.numpy_fast import clip, interp
from common import realtime
from selfdrive.car.tesla import teslacan
from selfdrive.car.tesla.blinker_module import Blinker
from selfdrive.car.tesla.speed_utils.fleet_speed import FleetSpeed
from selfdrive.car.tesla.values import AH, CM
from selfdrive.can.packer import CANPacker
Expand Down Expand Up @@ -88,6 +89,7 @@ def __init__(self, dbc_name):
self.epas_disabled = True
self.last_angle = 0.
self.last_accel = 0.
self.blinker = Blinker()
self.ALCA = ALCAController(self,True,True) # Enabled and SteerByAngle both True
self.ACC = ACCController(self)
self.PCC = PCCController(self)
Expand Down Expand Up @@ -190,10 +192,6 @@ def __init__(self, dbc_name):
self.ldwStatus = 0
self.prev_ldwStatus = 0

self.blinker_on_frame_start = 0
self.blinker_extension_frame_end = 0
self.blinker_extension_blinker_type = 0

self.radarVin_idx = 0
self.LDW_ENABLE_SPEED = 16
self.should_ldw = False
Expand Down Expand Up @@ -352,7 +350,9 @@ def update(self, enabled, CS, frame, actuators, \
# Update statuses for custom buttons every 0.1 sec.
if (frame % 10 == 0):
self.ALCA.update_status((CS.cstm_btns.get_button_status("alca") > 0) and ((CS.enableALCA and not CS.hasTeslaIcIntegration) or (CS.hasTeslaIcIntegration and CS.alcaEnabled)))


self.blinker.update_state(CS, frame)

# update PCC module info
pedal_can_sends = self.PCC.update_stat(CS, frame)
if self.PCC.pcc_available:
Expand All @@ -372,10 +372,7 @@ def update(self, enabled, CS, frame, actuators, \
CS.v_cruise_pcm = self.PCC.pedal_speed_kph * speed_uom_kph
else:
CS.v_cruise_pcm = max(0., CS.v_ego * CV.MS_TO_KPH) * speed_uom_kph
# Get the turn signal from ALCA.
turn_signal_needed, self.alca_enabled = self.ALCA.update(enabled, CS, actuators, self.alcaStateData, frame)
if turn_signal_needed == 0:
turn_signal_needed = self._should_extend_blinker(CS, frame)
self.alca_enabled = self.ALCA.update(enabled, CS, actuators, self.alcaStateData, frame, self.blinker)
self.should_ldw = self._should_ldw(CS, frame)
apply_angle = -actuators.steerAngle # Tesla is reversed vs OP.
# Update HSO module info.
Expand Down Expand Up @@ -507,8 +504,8 @@ def update(self, enabled, CS, frame, actuators, \
op_status = 0x01
if self.opState == 5:
op_status = 0x03
if turn_signal_needed > 0:
alca_state = 0x08 + turn_signal_needed
if self.blinker.override_direction > 0:
alca_state = 0x08 + self.blinker.override_direction
elif (self.lLine > 1) and (self.rLine > 1):
alca_state = 0x08
elif (self.lLine > 1):
Expand Down Expand Up @@ -608,7 +605,7 @@ def update(self, enabled, CS, frame, actuators, \
adaptive_cruise = 1 if (not self.PCC.pcc_available and self.ACC.adaptive) or self.PCC.pcc_available else 0
can_sends.append(teslacan.create_fake_DAS_msg(speed_control_enabled,speed_override,self.DAS_206_apUnavailable, collision_warning, op_status, \
acc_speed_kph, \
turn_signal_needed,forward_collision_warning, adaptive_cruise, hands_on_state, \
self.blinker.override_direction,forward_collision_warning, adaptive_cruise, hands_on_state, \
cc_state, 1 if self.PCC.pcc_available else 0, alca_state, \
CS.v_cruise_pcm,
CS.speedLimitToIc, #speed_limit_to_car,
Expand Down Expand Up @@ -640,7 +637,7 @@ def update(self, enabled, CS, frame, actuators, \
if cruise_btn:
cruise_msg = teslacan.create_cruise_adjust_msg(
spdCtrlLvr_stat=cruise_btn,
turnIndLvr_Stat= 0, #turn_signal_needed,
turnIndLvr_Stat= 0,
real_steering_wheel_stalk=CS.steering_wheel_stalk)
# Send this CAN msg first because it is racing against the real stalk.
can_sends.insert(0, cruise_msg)
Expand Down Expand Up @@ -822,26 +819,6 @@ def handleTrafficEvents(self, trafficEventsMsgs):
def speedUnits(self, fromMetersPerSecond):
return fromMetersPerSecond * (CV.MS_TO_KPH if self.isMetric else CV.MS_TO_MPH) + 0.5

def _should_extend_blinker(self, CS, frame):
if CS.tapBlinkerExtension <= 0:
return 0
if CS.turn_signal_stalk_state > 0 and CS.prev_turn_signal_stalk_state == 0 and self.blinker_extension_frame_end == 0:
self.blinker_on_frame_start = frame
elif CS.turn_signal_stalk_state == 0 and CS.prev_turn_signal_stalk_state > 0: # turn signal stalk just turned off
is_blinker_tap = frame - self.blinker_on_frame_start < 55 # stalk signal for less than 550ms means it was tapped
if is_blinker_tap:
blink_duration_frames = 58 # one blink takes ~580ms
self.blinker_extension_frame_end = self.blinker_on_frame_start + blink_duration_frames * (3 + CS.tapBlinkerExtension)
self.blinker_extension_blinker_type = CS.prev_turn_signal_stalk_state

if 0 < self.blinker_extension_frame_end < frame:
self.blinker_extension_frame_end = 0
self.blinker_extension_blinker_type = 0
if self.blinker_on_frame_start > 0 and CS.turn_signal_stalk_state == 0 and not CS.turn_signal_blinking and self.blinker_extension_frame_end == 0:
self.blinker_on_frame_start = 0

return self.blinker_extension_blinker_type

def _should_ldw(self, CS, frame):
if not CS.enableLdw:
return False
Expand Down

0 comments on commit a058729

Please sign in to comment.