-
Notifications
You must be signed in to change notification settings - Fork 15
/
mwt_io.py
252 lines (210 loc) · 7.95 KB
/
mwt_io.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
"""Handle input and output of near-shore wave tracking analysis."""
from __future__ import division
import os
import csv
import json
import numpy as np
import cv2
# We are going to dump our output in a folder in the same directory.
OUTPUT_DIR = "output"
# Names of output files to be written to output/ go here:
WAVE_LOG_CSVFILE = "wave_log.csv"
WAVE_LOG_JSONFILE = "wave_log.json"
RECOGNIZED_WAVE_REPORT_FILE = "recognized_waves.txt"
TRACKED_WAVE_FILE = "tracked_waves.mp4"
def create_video_writer(input_video):
"""Create a video.
Creates a OpenCV Video Writer object using the mp4c codec and
input video stats (frame width, height, fps) for tracking
visualization.
Args:
input_video: video read into program using opencv methods
Returns:
out: cv2 videowriter object to which individual frames can be
written
"""
# Grab some video stats for videowriter object.
original_width = input_video.get(cv2.CAP_PROP_FRAME_WIDTH)
original_height = input_video.get(cv2.CAP_PROP_FRAME_HEIGHT)
fps = input_video.get(cv2.CAP_PROP_FPS)
# Make an output directory if necessary.
if not os.path.exists(OUTPUT_DIR):
os.mkdir(OUTPUT_DIR)
# Initiate video writer object by defining the codec and initiating
# the VideoWriter object.
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
output_path = os.path.join(OUTPUT_DIR, TRACKED_WAVE_FILE)
out = cv2.VideoWriter(
output_path,
fourcc,
fps,
(int(original_width), int(original_height)),
isColor=True,
)
return out
def write_log(log, output_format="csv"):
"""Write the log.
Takes a list of list of wave attributes representing each tracked wave's
statistics in every frame of the video for which the wave was present.
Outputs each item in the list as a line to a CSV.
Args:
log: a list of lists of wave statistics
format: either "csv" for CSV or "json" for JSON
"""
# Print status update.
if not log:
print("No waves or sections detected. No log written.")
else:
if output_format == "csv":
print("Writing wave log to", WAVE_LOG_CSVFILE)
elif output_format == "json":
print("Writing wave log to", WAVE_LOG_JSONFILE)
# Declare headers here.
log_headers = [
"frame_num",
"wave_id",
"inst_mass",
"max_mass",
"inst_displacement",
"max_displacement",
"frame_birth",
"frame_death",
"recognized",
"centroid",
]
# Make an output directory if necessary.
if not os.path.exists(OUTPUT_DIR):
os.mkdir(OUTPUT_DIR)
# Using a context manager, write each item to a new row in output.
if output_format == "csv":
output_path = os.path.join(OUTPUT_DIR, WAVE_LOG_CSVFILE)
elif output_format == "json":
output_path = os.path.join(OUTPUT_DIR, WAVE_LOG_JSONFILE)
with open(output_path, "w") as outfile:
if output_format == "csv":
writer = csv.writer(outfile, delimiter=",")
writer.writerow(log_headers)
for row in log:
writer.writerow(row)
elif output_format == "json":
outfile.write("[")
for count, row in enumerate(log):
json_line = {}
for i, j in enumerate(log_headers):
json_line[j] = row[i]
json.dump(json_line, outfile)
if count == len(log) - 1:
outfile.write("\n")
else:
outfile.write(",\n")
outfile.write("]")
return
def write_report(waves, performance):
"""Write a simple report.
Takes a list of recognized wave objects and writes attributes out to a
plain text file. Supplements this information with program performance
and user stats.
Args:
waves: a list of wave objects
performance_metric: a double representing speed of program
"""
# Provide User feedback here.
if not waves:
print("No waves found. No report written.")
else:
print("Writing analysis report to", RECOGNIZED_WAVE_REPORT_FILE)
# Make an output directory if necessary.
if not os.path.exists(OUTPUT_DIR):
os.mkdir(OUTPUT_DIR)
# Write recognized waves to text file.
report_path = os.path.join(OUTPUT_DIR, RECOGNIZED_WAVE_REPORT_FILE)
# Use the context manager here:
with open(report_path, "w") as text_file:
text_file.write(
f"Program performance: {performance} frames per second.\n"
)
for i, wave in enumerate(waves):
text_file.write(
f"[Wave #{i+1}] "
f"ID: {wave.name}, "
f"Birth: {wave.birth}, "
f"Death: {wave.death}"
)
text_file.write(
f"Max Displacement: {wave.max_displacement}, "
f"Max Mass: {wave.max_mass}\n"
)
return
def draw(waves, frame, resize_factor):
"""Draw detection on a frame.
Simple function to draw on a frame for output. Draws bounding boxes in
accordance with wave.boundingbox_coors attribute, and draws some wave stats
to accompany each potential wave, including whether or not the object is
actually a wave (i.e. wave.recognized == True).
Args:
waves: list of waves
frame: frame on which to draw waves
resize_factor: factor to resize boundingbox coors to match output
frame.
Returns:
frame: input frame with waves drawn on top
"""
# Iterate through a list of waves.
for wave in waves:
# For drawing circles on detected features
# center = (wave.centroid[0],wave.centroid[1])
# radius = 15
# cv2.circle(frame,center,radius,(0,255,0),2)
if wave.death is None:
# If wave is a wave, draw green, else yellow.
# Set wave text accordingly.
if wave.recognized is True:
drawing_color = (0, 255, 0)
text = (
f"Wave Detected!\n"
f"mass: { wave.mass}\n"
f"displacement: {wave.displacement}"
)
else:
drawing_color = (0, 255, 255)
text = (
f"Potential Wave\n"
f"mass: {wave.mass}\n"
f"displacement: {wave.displacement}"
)
if len(wave.centroid_vec) > 20:
# Draw Bounding Boxes:
# Get boundingbox coors from wave objects and resize.
"""
rect = wave.boundingbox_coors
rect[:] = [resize_factor*rect[i] for i in range(4)]
frame = cv2.drawContours(frame, [rect], 0, drawing_color, 2)"""
# Use moving averages of wave centroid for stat locations
moving_x = np.mean(
[
wave.centroid_vec[-k][0]
for k in range(1, min(20, 1 + len(wave.centroid_vec)))
]
)
moving_y = np.mean(
[
wave.centroid_vec[-k][1]
for k in range(1, min(20, 1 + len(wave.centroid_vec)))
]
)
# Draw wave stats on each wave.
for i, j in enumerate(text.split("\n")):
frame = cv2.putText(
frame,
text=j,
org=(
int(resize_factor * moving_x),
int(resize_factor * moving_y) + (50 + i * 45),
),
fontFace=cv2.FONT_HERSHEY_SIMPLEX,
fontScale=1.5,
color=drawing_color,
thickness=3,
lineType=cv2.LINE_AA,
)
return frame