-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add boilerplate code for a web server
This will mimic the real gripper's web server.
- Loading branch information
1 parent
23e9ad4
commit 98ecfa4
Showing
6 changed files
with
121 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
# Schunk EGU/EGK Dummy | ||
A minimalist protocol simulator for system tests. | ||
|
||
## Install | ||
Use a virtual environment and | ||
```bash | ||
pip install fastapi uvicorn | ||
``` | ||
|
||
## Getting started | ||
1. In a new terminal, start the dummy | ||
```bash | ||
uvicorn main:server --port 8000 --reload | ||
``` | ||
|
||
## Run tests locally | ||
|
||
Inside a virtual environment (.venv) | ||
|
||
```bash | ||
pip install pytest coverage | ||
``` | ||
|
||
And then inside this repository | ||
```bash | ||
coverage run -m pytest test/ | ||
coverage report | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
from src.dummy import Dummy | ||
|
||
from fastapi import FastAPI | ||
from fastapi.middleware.cors import CORSMiddleware | ||
from typing import Optional | ||
from pydantic import BaseModel | ||
|
||
# Components | ||
dummy = Dummy() | ||
dummy.start() | ||
server = FastAPI() | ||
client = ["http://localhost:8001"] | ||
|
||
|
||
server.add_middleware( | ||
CORSMiddleware, | ||
allow_origins=client, | ||
allow_credentials=True, | ||
allow_methods=["*"], | ||
allow_headers=[""], | ||
) | ||
|
||
|
||
class Message(BaseModel): | ||
message: str | ||
optional: Optional[str] = None | ||
|
||
|
||
@server.put("/") | ||
async def put(msg: Message): | ||
dummy.process(msg.message) | ||
return True | ||
|
||
|
||
@server.get("/") | ||
async def get(): | ||
return "Dummy ready" |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
from threading import Thread | ||
import time | ||
|
||
|
||
class Dummy(object): | ||
def __init__(self): | ||
self.thread = Thread(target=self._run) | ||
self.running = False | ||
self.done = False | ||
|
||
def start(self) -> None: | ||
if self.running: | ||
return | ||
self.thread.start() | ||
self.running = True | ||
|
||
def stop(self) -> None: | ||
self.done = True | ||
self.thread.join() | ||
self.running = False | ||
|
||
def _run(self): | ||
while not self.done: | ||
time.sleep(1) | ||
print("Done") | ||
|
||
def process(self, msg: str) -> bool: | ||
print(msg) | ||
return True |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
from src.dummy import Dummy | ||
|
||
|
||
def test_dummy_starts_a_background_thread(): | ||
dummy = Dummy() | ||
assert not dummy.running | ||
dummy.start() | ||
assert dummy.running | ||
dummy.stop() | ||
assert not dummy.running | ||
|
||
|
||
def test_dummy_survives_repeated_starts_and_stops(): | ||
dummy = Dummy() | ||
for _ in range(3): | ||
dummy.start() | ||
assert dummy.running | ||
|
||
for _ in range(3): | ||
dummy.stop() | ||
assert not dummy.running | ||
|
||
|
||
def test_dummy_processes_messages(): | ||
dummy = Dummy() | ||
msg = "Hello simulator!" | ||
assert dummy.process(msg) |