Skip to content

Commit

Permalink
Implement the acknowledge mechanism in the dummy
Browse files Browse the repository at this point in the history
Also
- Move behavior-related tests into `test_dummy.py`. They make more sense
  there
- Adapt the ROS2 test for checking whether the driver is ready after
  startup. The driver should be operational after start by acknowledging
  internal errors.
  • Loading branch information
stefanscherzinger committed Aug 1, 2024
1 parent 80192ba commit 05817c3
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 32 deletions.
12 changes: 12 additions & 0 deletions schunk_egu_egk_gripper_dummy/src/dummy.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ def post(self, msg: dict) -> dict:
self.plc_output_buffer = bytearray(bytes.fromhex(msg["value"]))
else:
self.data[msg["inst"]] = [msg["value"]]

# Behavior
self.process_control_bits()
return {"result": 0}

def get_info(self, query: dict[str, str]) -> dict:
Expand Down Expand Up @@ -167,3 +170,12 @@ def get_control_bit(self, bit: int) -> int | bool:
return False
byte_index, bit_index = divmod(bit, 8)
return 1 if self.plc_output_buffer[byte_index] & (1 << bit_index) != 0 else 0

def process_control_bits(self) -> None:

# Acknowledge
if self.get_control_bit(2) == 1:
self.set_status_bit(bit=0, value=True)
self.set_status_bit(bit=7, value=False)
self.set_status_error("00")
self.set_status_diagnostics("00")
29 changes: 29 additions & 0 deletions schunk_egu_egk_gripper_dummy/tests/test_dummy.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from src.dummy import Dummy

# [1]: https://stb.cloud.schunk.com/media/IM0046706.PDF


def test_dummy_starts_a_background_thread():
dummy = Dummy()
Expand All @@ -26,3 +28,30 @@ def test_dummy_reads_configuration_on_startup():
assert dummy.enum is not None
assert dummy.data is not None
assert dummy.metadata is not None


def test_dummy_starts_in_error_state():
# See p. 24 in
# Booting and establishing operational readiness [1]
dummy = Dummy()
assert dummy.get_status_bit(0) == 0 # not ready for operation
assert dummy.get_status_bit(7) == 1 # there's an error
assert dummy.get_status_error() == "D9" # ERR_FAST_STOP
assert dummy.get_status_diagnostics() == "EF" # ERR_COMM_LOST


def test_dummy_is_ready_after_acknowledge():
dummy = Dummy()
control_double_word = "04000000"
set_position = "00000000"
set_speed = "00000000"
gripping_force = "00000000"
command = {
"inst": dummy.plc_output,
"value": control_double_word + set_position + set_speed + gripping_force,
}
dummy.post(command)
assert dummy.get_status_bit(0) == 1 # ready
assert dummy.get_status_bit(7) == 0 # no error
assert dummy.get_status_error() == "0"
assert dummy.get_status_diagnostics() == "0"
30 changes: 0 additions & 30 deletions schunk_egu_egk_gripper_dummy/tests/test_plc_communication.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from src.dummy import Dummy
import pytest

# [1]: https://stb.cloud.schunk.com/media/IM0046706.PDF

Expand Down Expand Up @@ -95,32 +94,3 @@ def test_dummy_rejects_reading_reserved_control_bits():
for bit in dummy.reserved_control_bits:
assert isinstance(dummy.get_control_bit(bit), bool) # call fails
assert not dummy.get_control_bit(bit)


def test_dummy_starts_in_error_state():
# See p. 24 in
# Booting and establishing operational readiness [1]
dummy = Dummy()
assert dummy.get_status_bit(0) == 0 # not ready for operation
assert dummy.get_status_bit(7) == 1 # there's an error
assert dummy.get_status_error() == "D9" # ERR_FAST_STOP
assert dummy.get_status_diagnostics() == "EF" # ERR_COMM_LOST


@pytest.mark.skip()
def test_dummy_is_ready_after_acknowledge():
dummy = Dummy()
control_double_word = "04000000"
set_position = "00000000"
set_speed = "00000000"
gripping_force = "00000000"
command = {
"inst": dummy.plc_output,
"value": control_double_word + set_position + set_speed + gripping_force,
}
dummy.post(command)

query = {"inst": dummy.plc_input, "count": 1}
data = dummy.get_data(query)[0]
assert data[0:2] == "11"
assert data[30:] == "00" # ERR_NONE
41 changes: 39 additions & 2 deletions schunk_egu_egk_gripper_tests/test/test_functionality.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,45 @@
import pytest
import rclpy
from rclpy.node import Node
from test.conftest import launch_description
from rclpy.action import ActionClient
from schunk_egu_egk_gripper_interfaces.action import ( # type: ignore[attr-defined]
MoveToAbsolutePosition,
)
from schunk_egu_egk_gripper_interfaces.srv import ( # type: ignore[attr-defined]
Acknowledge,
)
from test.helpers import get_current_state


@pytest.mark.launch(fixture=launch_description)
def test_driver_starts_in_not_ready_state(launch_context, isolated, gripper_dummy):
assert get_current_state(variable="ready_for_operation") is False
def test_driver_starts_in_ready_state(launch_context, isolated, gripper_dummy):
assert get_current_state(variable="ready_for_operation") is True


@pytest.mark.launch(fixture=launch_description)
def test_driver_is_ready_after_acknowledge(launch_context, isolated, gripper_dummy):
node = Node("test")
activate_srv = node.create_client(Acknowledge, "/acknowledge")
while not activate_srv.wait_for_service(1.0):
pass
future = activate_srv.call_async(Acknowledge.Request())
rclpy.spin_until_future_complete(node, future)
assert future.result().success is True


@pytest.mark.launch(fixture=launch_description)
@pytest.mark.skip()
def test_driver_moves_to_absolute_position(launch_context, isolated, gripper_dummy):
node = Node("move_test")
# activate_srv = node.create_client(Acknowledge, "/acknowledge")
# future = activate_srv.call_async(Acknowledge.Request())
# rclpy.spin_until_future_complete(node, future)

client = ActionClient(node, MoveToAbsolutePosition, "/move_to_absolute_position")
client.wait_for_server()
goal = MoveToAbsolutePosition.Goal()
future = client.send_goal_async(goal)
rclpy.spin_until_future_complete(node, future)
print("done :)")
assert False

0 comments on commit 05817c3

Please sign in to comment.