Skip to content

Commit

Permalink
Use fastapi's background task mechanism for gripper motion
Browse files Browse the repository at this point in the history
This seems the cleanest approach for now for giving immediate results to
post requests and having the motion run in the background.
Also adapt the unit tests accordingly.
  • Loading branch information
stefanscherzinger committed Aug 2, 2024
1 parent b8f34e8 commit c06385b
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 20 deletions.
12 changes: 10 additions & 2 deletions schunk_egu_egk_gripper_dummy/schunk_egu_egk_gripper_dummy/main.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from src.dummy import Dummy

from fastapi import FastAPI, Request, Form
from fastapi import FastAPI, Request, Form, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from typing import Optional
import string

# Components
dummy = Dummy()
Expand All @@ -25,9 +26,16 @@ async def post(
value: str = Form(...),
elem: Optional[int] = Form(None),
callback: Optional[str] = Form(None),
background_tasks: BackgroundTasks = None,
):
msg = {"inst": inst, "value": value, "elem": elem, "callback": callback}
return dummy.post(msg)

if msg["inst"] not in dummy.data:
return {"result": 1}
if not all(digit in string.hexdigits for digit in str(msg["value"])):
return {"result": 1}
background_tasks.add_task(dummy.post, msg)
return {"result": 0}


@server.get("/adi/{path}")
Expand Down
12 changes: 2 additions & 10 deletions schunk_egu_egk_gripper_dummy/src/dummy.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import os
from pathlib import Path
import json
import string
import struct


Expand All @@ -17,14 +16,14 @@ def __init__(self):
self.data = None
self.plc_input = "0x0040"
self.plc_output = "0x0048"
self.actual_position = "0x0230"
self.actual_speed = "0x0238"
self.error_byte = 12
self.diagnostics_byte = 15
self.valid_status_bits = list(range(0, 10)) + [11, 12, 13, 14, 16, 17, 31]
self.valid_control_bits = list(range(0, 10)) + [11, 12, 13, 14, 16, 30, 31]
self.reserved_status_bits = [10, 15] + list(range(18, 31))
self.reserved_control_bits = [10, 15] + list(range(17, 30))
self.actual_position = "0x0230"
self.actual_speed = "0x0238"

enum_config = os.path.join(
Path(__file__).resolve().parents[1], "config/enum.json"
Expand Down Expand Up @@ -62,13 +61,6 @@ def _run(self) -> None:
print("Done")

def post(self, msg: dict) -> dict:
if "inst" not in msg and "value" not in msg:
return {"result": 1}
if msg["inst"] not in self.data:
return {"result": 1}
if not all(digit in string.hexdigits for digit in msg["value"]):
return {"result": 1}

if msg["inst"] == self.plc_output:
self.plc_output_buffer = bytearray(bytes.fromhex(msg["value"]))
else:
Expand Down
6 changes: 2 additions & 4 deletions schunk_egu_egk_gripper_dummy/tests/test_plc_communication.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,19 +128,17 @@ def test_dummy_supports_reading_target_speed():

def test_dummy_supports_writing_actual_position():
dummy = Dummy()
inst = "0x0230" # <actual_pos>
pos = 0.123
dummy.set_actual_position(pos)
read_pos = dummy.data[inst][0]
read_pos = dummy.data[dummy.actual_position][0]
read_pos = struct.unpack("f", bytes.fromhex(read_pos))[0]
assert pytest.approx(read_pos) == pos


def test_dummy_supports_writing_actual_speed():
dummy = Dummy()
inst = "0x0238" # <actual_vel>
speed = 66.5
dummy.set_actual_speed(speed)
read_speed = dummy.data[inst][0]
read_speed = dummy.data[dummy.actual_speed][0]
read_speed = struct.unpack("f", bytes.fromhex(read_speed))[0]
assert pytest.approx(read_speed) == speed
11 changes: 7 additions & 4 deletions schunk_egu_egk_gripper_dummy/tests/test_requests.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from src.dummy import Dummy
from schunk_egu_egk_gripper_dummy.main import server
from fastapi.testclient import TestClient


def test_dummy_responds_correctly_to_info_requests():
Expand Down Expand Up @@ -80,18 +82,19 @@ def test_dummy_stores_post_requests():


def test_dummy_rejects_invalid_post_requests():
dummy = Dummy()
client = TestClient(server)

valid_data = "AABBCCDD"
valid_inst = "0x0238"
data = {"inst": valid_inst, "value": valid_data}
assert dummy.post(data) == {"result": 0}
assert client.post("/adi/update.json", data=data).json() == {"result": 0}

invalid_data = "hello:)"
valid_inst = "0x0238"
data = {"inst": valid_inst, "value": invalid_data}
assert dummy.post(data) == {"result": 1}
assert client.post("/adi/update.json", data=data).json() == {"result": 1}

valid_data = "AABBCCDD"
invalid_inst = "0x9999"
data = {"inst": invalid_inst, "value": valid_data}
assert dummy.post(data) == {"result": 1}
assert client.post("/adi/update.json", data=data).json() == {"result": 1}

0 comments on commit c06385b

Please sign in to comment.