-
Notifications
You must be signed in to change notification settings - Fork 0
/
microwave.py
136 lines (109 loc) · 4.94 KB
/
microwave.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
from typing import Literal, TypedDict
from loguru import logger
from redis import asyncio as aioredis
State = Literal["ON", "OFF"]
class MicrowaveDict(TypedDict):
power: int
counter: int
state: State
CHANNEL_NAME = "microwave_updates"
class Microwave:
"""
Microwave class
`power` and `counter` are integers >= 0
`state` can be "ON" or "OFF" and is derived from current `power` and `counter` (it is not stored in redis)
"""
def __init__(self):
from redis_connection import redis
self.redis = redis
async def get_power(self) -> int:
power = int(await self.redis.get("microwave_power") or 0)
assert power >= 0
return power
async def get_counter(self) -> int:
counter = int(await self.redis.get("microwave_counter") or 0)
assert counter >= 0
return counter
async def get_microwave_data(self) -> MicrowaveDict:
"""Get all microwave data"""
async with self.redis.pipeline(transaction=True) as pipe:
pipe.get("microwave_power")
pipe.get("microwave_counter")
power_result, counter_result = await pipe.execute()
power: int = int(power_result) if power_result else 0
assert power >= 0
counter: int = int(counter_result) if counter_result else 0
assert counter >= 0
state: State = "ON" if counter > 0 or power > 0 else "OFF"
microwave = MicrowaveDict({"power": power, "counter": counter, "state": state})
return microwave
async def get_state(self) -> State:
"""State is derived from current power and counter"""
microwave: MicrowaveDict = await self.get_microwave_data()
return microwave["state"]
async def adjust_power(self, increment=10) -> None:
"""Adjust power by increment (increment can be negative, but value after can't be negative so `incrby` is not suitable) (also power can't be higher than 100)"""
retries = 10
while retries:
try:
async with self.redis.pipeline(transaction=True) as pipe:
await pipe.watch("microwave_power")
current_power = int(await pipe.get("microwave_power") or 0)
assert current_power >= 0
pipe.multi()
new_power: int = current_power + increment
if new_power < 0:
new_power = 0
elif new_power > 100:
new_power = 100
pipe.set("microwave_power", new_power)
results: list = await pipe.execute()
assert all(results)
await self.notify_subscribers(
f"Power adjusted. Old power: {current_power} New power: {new_power}"
)
break
except aioredis.WatchError:
retries -= 1
continue
else:
logger.error("Could not adjust power")
raise Exception("Could not adjust power")
async def adjust_counter(self, increment=10) -> None:
"""Adjust counter by increment (increment can be negative, but value after can't be negative so `incrby` is not suitable)"""
retries = 10
while retries:
try:
async with self.redis.pipeline(transaction=True) as pipe:
await pipe.watch("microwave_counter")
current_counter = int(await pipe.get("microwave_counter") or 0)
assert current_counter >= 0
pipe.multi()
new_counter: int = current_counter + increment
if new_counter < 0:
new_counter = 0
pipe.set("microwave_counter", new_counter)
results: list = await pipe.execute()
assert all(results)
await self.notify_subscribers(
f"Counter adjusted. Old counter: {current_counter} New counter: {new_counter}"
)
break
except aioredis.WatchError:
retries -= 1
continue
else:
logger.error("Could not adjust counter")
raise Exception("Could not adjust counter")
async def cancel(self) -> None:
"""Cancel microwave, set `power` and `counter` to 0, which will also set state to 'OFF'"""
async with self.redis.pipeline(transaction=True) as pipe:
pipe.set("microwave_power", 0)
pipe.set("microwave_counter", 0)
results: list = await pipe.execute()
assert all(results)
await self.notify_subscribers("Microwave canceled. Power and Counter set to 0.")
async def notify_subscribers(self, message: str) -> None:
"""Notify subscribers of changes"""
pubsub = await self.redis.publish(CHANNEL_NAME, message)
logger.info(f"Published message: '{message}' to '{pubsub}' subscribers")