-
Notifications
You must be signed in to change notification settings - Fork 0
/
main_host.py
152 lines (115 loc) · 4.22 KB
/
main_host.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
"""
Example message for calling the setTargetTemperature in-event
TOPIC:"/Oven/in_events"
MESSAGE:"{
"event": "setTargetTemperature",
"value": "10.0"
}"
"""
import sys
import time
import os
import json
from yakindu.rx import Observer
import timer_service as m_TimerService
import Oven as m_Oven
import statechart as m_Statechart
import Gui as m_Gui
import AWS as m_AWS
class Main:
CLIENT_ID = "oven"
aws = None
statechart = None
oven = None
running = True
def __init__(self):
self.load()
self.last_update = current_milli_time()
self.running = True
while(self.running):
dt = (current_milli_time() - self.last_update)/1000
self.last_update = current_milli_time()
self.running = self.update(dt)
self.exit()
def load(self):
# Initialize Oven
self.oven = m_Oven.Oven()
# Initialize Statechart
self.statechart = m_Statechart.Statechart()
# Initialize Timerservice (required for Cyclebased statecharts)
self.statechart.timer_service = m_TimerService.TimerService()
# Initialize and set Callback of statechart in-events
callback = Callback(self.statechart, self.oven)
self.statechart.operation_callback = callback
# Initialize Gui
self.gui = m_Gui.Gui(self)
# Initialize AWS
self.aws = m_AWS.AWS(self)
# Set statechart observer for out-events
class Reached_Temperature_Observer( Observer ):
def __init__(self, aws):
self.aws = aws
def next(self, value=None):
if self.aws is not None:
self.aws.publish("reachedTemperature", None)
else:
print("Could not publish '" + "reachedTemperature" + "' because not connection was established.")
reached_temperature_observer = Reached_Temperature_Observer(self.aws)
self.statechart.ready_observable.subscribe(reached_temperature_observer)
class Cooled_Down_Observer( Observer ):
def __init__(self, aws):
self.aws = aws
def next(self, value=None):
if self.aws is not None:
self.aws.publish("cooledDown", None)
else:
print("Could not publish '" + "cooledDown" + "' because not connection was established.")
cooled_down_observer = Cooled_Down_Observer(self.aws)
self.statechart.off_observable.subscribe(cooled_down_observer)
# Enter statechart (required!)
self.statechart.enter()
# update all required objects
def update(self, dt):
self.statechart.run_cycle()
self.oven.update(dt)
keep_running = self.gui.update(dt)
return keep_running
# exit function and cleanup
def exit(self):
self.aws.disconnect()
self.gui.close()
os._exit(0)
def getTemp(self):
return self.oven.temperature
def raise_set_target_temp(self, value, shouldPublish):
self.statechart.raise_set_target_temp(value)
self.gui.setNewTemp(value)
if shouldPublish:
time.sleep(0.1)
self.aws.publish("setTargetTemperature", value)
def customCallback(self, client, userdata, message):
dictionary = json.loads(message.payload)
if dictionary["event"] and dictionary["value"]:
if dictionary["event"] == "setTargetTemperature":
value = float(dictionary["value"])
self.raise_set_target_temp(value, False)
print('Received: "' + str(json.loads(message.payload)) + '" from topic "' + "'" + (message.topic) + "'")
# Class for defining callbacks for statechart out-events
class Callback:
statechart = None
oven = None
def __init__(self, statechart, oven):
self.statechart = statechart
self.oven = oven
def heat_on(self):
self.oven.heatOn()
def heat_off(self):
self.oven.heatOff()
def get_temp(self):
return self.oven.getTemp()
# helper function to get current milliseconds
def current_milli_time():
return round(time.time() * 1000)
# main entry point
if __name__ == "__main__":
Main()