-
Notifications
You must be signed in to change notification settings - Fork 0
/
pm25_service.py
207 lines (186 loc) · 5.2 KB
/
pm25_service.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
#/usr/bin/env python3
#import serial
import time
import struct
import board
import math
import busio
import adafruit_pm25
# red 2
# black 3
# yellow 5
_emulate = False
uart = None
i2c = None
pm25 = None
buffer = []
last_sample_time = 0.0
avg_1m_pm25 = 0
avg_15s_pm25 = 0
avg_delta_pm25 = 0
avg_1m_pm100 = 0
avg_15s_pm100 = 0
avg_delta_pm100 = 0
pm25_buffer = []
pm100_buffer = []
def not_a_result (msg):
return (0, 0, 0, msg)
def init (emulate=False, use_i2c=True):
global _emulate
global uart
global i2c
global pm25
_emulate = emulate
if not emulate:
if use_i2c:
import adafruit_pm25.i2c
i2c = busio.I2C(board.SCL, board.SDA, frequency=100000)
pm25 = adafruit_pm25.i2c.PM25_I2C(i2c, None)
else:
import serial
import adafruit_pm25.uart
uart = serial.Serial("/dev/ttyS0", baudrate=9600, timeout=0.25)
pm25 = adafruit_pm25.uart.PM25_UART(uart, None)
return pm25
def stop ():
global _emulate
global uart
global i2c
global pm25
if i2c:
i2c.deinit()
if uart:
uart.close()
def emulate_read_packet ():
global last_sample_time
global avg_1m_pm25
global avg_15s_pm25
global avg_delta_pm25
current_time = time.time()
if last_sample_time == 0:
last_sample_time = current_time
t = current_time - last_sample_time
s = math.sin(t / 7.0) * 0.5 + 0.5
time.sleep(1.0)
pm25 = s * 500.0
avg_delta_pm25 = pm25 - avg_1m_pm25
avg_1m_pm25 = pm25
avg_15s_pm25 = pm25
#return (pm25, avg_1m_pm25, avg_15s_pm25, avg_delta, current_time, "OK")
return {
"pm25" : pm25,
"pm25_1m" : avg_1m_pm25,
"pm100" : pm25,
"pm100_1m" : avg_1m_pm25,
"pm25_15s" : avg_15s_pm25,
"pm25_delta" : avg_delta_pm25,
"pm100_15s" : avg_15s_pm25,
"pm100_delta" : avg_delta_pm25,
"pm03_count" : 10,
"pm05_count" : 20,
"pm10_count" : 30,
"pm25_count" : 40,
"pm50_count" : 0,
"pm100_count" : 0,
"time" : current_time,
"status" : "OK",
}
def read_packet ():
if _emulate:
return emulate_read_packet()
global buffer
global avg_1m_pm25
global avg_15s_pm25
global pm25_buffer
global pm100_buffer
global last_sample_time
global avg_delta_pm25
elapsed = 0.0
while elapsed < 1.0:
sample_time = time.time()
elapsed = sample_time - last_sample_time
if elapsed < 2.3:
# device may repeat data if under this limit
time.sleep(2.3 - elapsed)
elapsed = sample_time - last_sample_time
aqdata = None
while not aqdata:
try:
aqdata = pm25.read()
except RuntimeError:
pass
#return not_a_result("Unable to read from sensor")
pm25_std = aqdata["pm25 standard"]
pm25_env = aqdata["pm25 env"]
pm100_std = aqdata["pm100 standard"]
pm100_env = aqdata["pm100 env"]
pm03_count = aqdata["particles 03um"]
pm05_count = aqdata["particles 05um"]
pm10_count = aqdata["particles 10um"]
pm25_count = aqdata["particles 25um"]
pm50_count = aqdata["particles 50um"]
pm100_count = aqdata["particles 100um"]
#if elapsed < 2.3:
# time.sleep(2.3 - elapsed)
#else:
last_sample_time = sample_time
pm25_buffer.append((pm25_std, sample_time))
pm100_buffer.append((pm100_std, sample_time))
#while len(pm25_buffer) > 27:
while len(pm25_buffer) > 6:
pm25_buffer.pop(0)
while len(pm100_buffer) > 6:
pm100_buffer.pop(0)
avg_1m_pm25 = 0
avg_15s_pm25 = 0
count = 0
for (pm25_samp, time_samp) in reversed(pm25_buffer):
avg_1m_pm25 += pm25_samp
if count < 6:
avg_15s_pm25 += pm25_samp
count = count + 1
avg_1m_pm25 /= len(pm25_buffer)
avg_15s_pm25 /= count
avg_delta_pm25 = pm25_std - avg_15s_pm25
avg_1m_pm100 = 0
avg_15s_pm100 = 0
count = 0
for (pm100_samp, time_samp) in reversed(pm100_buffer):
avg_1m_pm100 += pm100_samp
if count < 6:
avg_15s_pm100 += pm100_samp
count = count + 1
avg_1m_pm100 /= len(pm100_buffer)
avg_15s_pm100 /= count
avg_delta_pm100 = pm100_std - avg_15s_pm100
#print(str(pm25_buffer))
buffer = buffer[32:]
return {
"pm25" : pm25_std,
"pm25_env" : pm25_env,
"pm100" : pm100_std,
"pm100_env" : pm100_env,
"pm03_count" : pm03_count,
"pm05_count" : pm05_count,
"pm10_count" : pm10_count,
"pm25_count" : pm25_count,
"pm50_count" : pm50_count,
"pm100_count" : pm100_count,
#"pm25_1m" : avg_1m_pm25,
"pm25_15s" : avg_15s_pm25,
"pm25_delta" : avg_delta_pm25,
"pm100_15s" : avg_15s_pm100,
"pm100_delta" : avg_delta_pm100,
"time" : sample_time,
"status" : "OK",
}
if __name__ == '__main__':
init()
count = 0
while True:
time.sleep(1)
p = read_packet()
print(str(p))
count = count + 1
if count > 10:
break