ruman1609
/
Detection-Of-Road-Damage-Using-Faster-Regional-Convolutional-Neural-Network-Method
Public
forked from FurkanOM/tf-faster-rcnn
-
Notifications
You must be signed in to change notification settings - Fork 3
/
train_utils.py
263 lines (242 loc) · 10.6 KB
/
train_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
import tensorflow as tf
import math
from . import bbox_utils
# DEFAULT = {
# "img_size": 512,
# "feature_map_shape": 32,
# "anchor_ratios": [1., 2., 1./2.],
# "anchor_scales": [128, 256, 512],
# }
DEFAULT = {
"img_size": 300,
"feature_map_shape": 19,
"anchor_ratios": [1., 2., 1./2.],
"anchor_scales": [128, 256, 512],
}
RPN = {
"vgg16": DEFAULT,
"mobilenet_v2": DEFAULT,
"resnet50": DEFAULT
}
def get_hyper_params(backbone, **kwargs):
"""Generating hyper params in a dynamic way.
inputs:
**kwargs = any value could be updated in the hyper_params
outputs:
hyper_params = dictionary
"""
hyper_params = RPN[backbone]
# Uncomment if needed
if backbone == "vgg16":
hyper_params["feature_map_shape"] -= 1
# Comment if not needed
hyper_params["train_pre_nms_topn"] = 6000
hyper_params["test_pre_nms_topn"] = 6000
hyper_params["train_post_nms_topn"] = 2000
hyper_params["test_post_nms_topn"] = 300
hyper_params["nms_iou_threshold"] = 0.7
hyper_params["total_pos_bboxes"] = 128
hyper_params["total_neg_bboxes"] = 128
hyper_params["pooling_size"] = (7, 7)
hyper_params["variances"] = [0.1, 0.1, 0.2, 0.2]
for key, value in kwargs.items():
if key in hyper_params and value:
hyper_params[key] = value
#
hyper_params["anchor_count"] = len(hyper_params["anchor_ratios"]) * len(hyper_params["anchor_scales"])
return hyper_params
def get_step_size(total_items, batch_size):
"""Get step size for given total item size and batch size.
inputs:
total_items = number of total items
batch_size = number of batch size during training or validation
outputs:
step_size = number of step size for model training
"""
return math.ceil(total_items / batch_size)
def randomly_select_xyz_mask(mask, select_xyz):
"""Selecting x, y, z number of True elements for corresponding batch and replacing others to False
inputs:
mask = (batch_size, [m_bool_value])
select_xyz = ([x_y_z_number_for_corresponding_batch])
example = tf.constant([128, 50, 42], dtype=tf.int32)
outputs:
selected_valid_mask = (batch_size, [m_bool_value])
"""
maxval = tf.reduce_max(select_xyz) * 10
random_mask = tf.random.uniform(tf.shape(mask), minval=1, maxval=maxval, dtype=tf.int32)
multiplied_mask = tf.cast(mask, tf.int32) * random_mask
sorted_mask = tf.argsort(multiplied_mask, direction="DESCENDING")
sorted_mask_indices = tf.argsort(sorted_mask)
selected_mask = tf.less(sorted_mask_indices, tf.expand_dims(select_xyz, 1))
return tf.logical_and(mask, selected_mask)
def faster_rcnn_generator(dataset, anchors, hyper_params):
"""Tensorflow data generator for fit method, yielding inputs and outputs.
inputs:
dataset = tf.data.Dataset, PaddedBatchDataset
anchors = (total_anchors, [y1, x1, y2, x2])
these values in normalized format between [0, 1]
hyper_params = dictionary
outputs:
yield inputs, outputs
"""
while True:
for image_data in dataset:
img, gt_boxes, gt_labels = image_data
bbox_deltas, bbox_labels = calculate_rpn_actual_outputs(anchors, gt_boxes, gt_labels, hyper_params)
yield (img, gt_boxes, gt_labels, bbox_deltas, bbox_labels),
def rpn_generator(dataset, anchors, hyper_params):
"""Tensorflow data generator for fit method, yielding inputs and outputs.
inputs:
dataset = tf.data.Dataset, PaddedBatchDataset
anchors = (total_anchors, [y1, x1, y2, x2])
these values in normalized format between [0, 1]
hyper_params = dictionary
outputs:
yield inputs, outputs
"""
while True:
for image_data in dataset:
img, gt_boxes, gt_labels = image_data
bbox_deltas, bbox_labels = calculate_rpn_actual_outputs(anchors, gt_boxes, gt_labels, hyper_params)
yield img, (bbox_deltas, bbox_labels)
def calculate_rpn_actual_outputs(anchors, gt_boxes, gt_labels, hyper_params):
"""Generating one step data for training or inference.
Batch operations supported.
inputs:
anchors = (total_anchors, [y1, x1, y2, x2])
these values in normalized format between [0, 1]
gt_boxes (batch_size, gt_box_size, [y1, x1, y2, x2])
these values in normalized format between [0, 1]
gt_labels (batch_size, gt_box_size)
hyper_params = dictionary
outputs:
bbox_deltas = (batch_size, total_anchors, [delta_y, delta_x, delta_h, delta_w])
bbox_labels = (batch_size, feature_map_shape, feature_map_shape, anchor_count)
"""
batch_size = tf.shape(gt_boxes)[0]
feature_map_shape = hyper_params["feature_map_shape"]
anchor_count = hyper_params["anchor_count"]
total_pos_bboxes = hyper_params["total_pos_bboxes"]
total_neg_bboxes = hyper_params["total_neg_bboxes"]
variances = hyper_params["variances"]
# Calculate iou values between each bboxes and ground truth boxes
iou_map = bbox_utils.generate_iou_map(anchors, gt_boxes)
# Get max index value for each row
max_indices_each_row = tf.argmax(iou_map, axis=2, output_type=tf.int32)
# Get max index value for each column
max_indices_each_column = tf.argmax(iou_map, axis=1, output_type=tf.int32)
# IoU map has iou values for every gt boxes and we merge these values column wise
merged_iou_map = tf.reduce_max(iou_map, axis=2)
#
pos_mask = tf.greater_equal(merged_iou_map, 0.7)
#
valid_indices_cond = tf.not_equal(gt_labels, -1)
valid_indices = tf.cast(tf.where(valid_indices_cond), tf.int32)
valid_max_indices = max_indices_each_column[valid_indices_cond]
#
scatter_bbox_indices = tf.stack([valid_indices[..., 0], valid_max_indices], 1)
max_pos_mask = tf.scatter_nd(scatter_bbox_indices, tf.fill((tf.shape(valid_indices)[0], ), True), tf.shape(pos_mask))
pos_mask = tf.logical_or(pos_mask, max_pos_mask)
pos_mask = randomly_select_xyz_mask(pos_mask, tf.constant([total_pos_bboxes], dtype=tf.int32))
#
pos_count = tf.reduce_sum(tf.cast(pos_mask, tf.int32), axis=-1)
neg_count = (total_pos_bboxes + total_neg_bboxes) - pos_count
#
neg_mask = tf.logical_and(tf.less(merged_iou_map, 0.3), tf.logical_not(pos_mask))
neg_mask = randomly_select_xyz_mask(neg_mask, neg_count)
#
pos_labels = tf.where(pos_mask, tf.ones_like(pos_mask, dtype=tf.float32), tf.constant(-1.0, dtype=tf.float32))
neg_labels = tf.cast(neg_mask, dtype=tf.float32)
bbox_labels = tf.add(pos_labels, neg_labels)
#
gt_boxes_map = tf.gather(gt_boxes, max_indices_each_row, batch_dims=1)
# Replace negative bboxes with zeros
expanded_gt_boxes = tf.where(tf.expand_dims(pos_mask, -1), gt_boxes_map, tf.zeros_like(gt_boxes_map))
# Calculate delta values between anchors and ground truth bboxes
bbox_deltas = bbox_utils.get_deltas_from_bboxes(anchors, expanded_gt_boxes) / variances
#
# bbox_deltas = tf.reshape(bbox_deltas, (batch_size, feature_map_shape, feature_map_shape, anchor_count * 4))
bbox_labels = tf.reshape(bbox_labels, (batch_size, feature_map_shape, feature_map_shape, anchor_count))
#
return bbox_deltas, bbox_labels
def frcnn_cls_loss(*args):
"""Calculating faster rcnn class loss value.
inputs:
*args = could be (y_true, y_pred) or ((y_true, y_pred), )
outputs:
loss = CategoricalCrossentropy value
"""
y_true, y_pred = args if len(args) == 2 else args[0]
loss_fn = tf.losses.CategoricalCrossentropy(reduction=tf.losses.Reduction.NONE)
loss_for_all = loss_fn(y_true, y_pred)
#
cond = tf.reduce_any(tf.not_equal(y_true, tf.constant(0.0)), axis=-1)
mask = tf.cast(cond, dtype=tf.float32)
#
conf_loss = tf.reduce_sum(mask * loss_for_all)
total_boxes = tf.maximum(1.0, tf.reduce_sum(mask))
return conf_loss / total_boxes
def rpn_cls_loss(*args):
"""Calculating rpn class loss value.
Rpn actual class value should be 0 or 1.
Because of this we only take into account non -1 values.
inputs:
*args = could be (y_true, y_pred) or ((y_true, y_pred), )
outputs:
loss = BinaryCrossentropy value
"""
y_true, y_pred = args if len(args) == 2 else args[0]
indices = tf.where(tf.not_equal(y_true, tf.constant(-1.0, dtype=tf.float32)))
target = tf.gather_nd(y_true, indices)
output = tf.gather_nd(y_pred, indices)
lf = tf.losses.BinaryCrossentropy()
return lf(target, output)
# def reg_loss_frcnn(*args):
# """
# The regressor loss
# This is smooth L1 loss calculated only on positive anchors
# Args:
# y_true, the true regressor values
# y_pred, the predicted regressor values
# Returns:
# the loss as a scalar
# """
# # modified: https://github.com/FurkanOM/tf-rpn/issues/2#issuecomment-724155215
# # original: https://github.com/FurkanOM/tf-faster-rcnn/blob/db54f3d873d74cec50b9d21409dcb831f271b7bb/utils/train_utils.py#L203
# y_true, y_pred = args if len(args) == 2 else args[0]
# smooth_l1 = tf.keras.losses.MeanAbsoluteError(reduction=tf.keras.losses.Reduction.NONE)
# batch_size = tf.shape(y_pred)[0]
# y_true = tf.reshape(y_true, [batch_size, -1, 4])
# y_pred = tf.reshape(y_pred, [batch_size, -1, 4])
#
# loss = smooth_l1(y_true, y_pred)
# # loss = tf.reduce_sum(loss, axis=-1)
#
# valid = tf.math.reduce_any(tf.not_equal(y_true, 0.0), axis=-1)
# valid = tf.cast(valid, tf.float32)
# loss = tf.reduce_sum(loss * valid, axis=-1) # loss vector for each batch
# total_pos_boxes = tf.math.maximum(1.0, tf.reduce_sum(valid, axis=-1))
# return tf.math.reduce_mean(tf.truediv(loss, total_pos_boxes))
def reg_loss(*args):
"""Calculating rpn / faster rcnn regression loss value.
Reg value should be different than zero for actual values.
Because of this we only take into account non zero values.
inputs:
*args = could be (y_true, y_pred) or ((y_true, y_pred), )
outputs:
loss = Huber it's almost the same with the smooth L1 loss
"""
y_true, y_pred = args if len(args) == 2 else args[0]
y_pred = tf.reshape(y_pred, (tf.shape(y_pred)[0], -1, 4))
#
loss_fn = tf.keras.losses.Huber(reduction=tf.keras.losses.Reduction.NONE)
loss_for_all = loss_fn(y_true, y_pred)
# loss_for_all = tf.reduce_sum(loss_for_all, axis=-1)
#
pos_cond = tf.reduce_any(tf.not_equal(y_true, tf.constant(0.0)), axis=-1)
pos_mask = tf.cast(pos_cond, dtype=tf.float32)
#
loc_loss = tf.reduce_sum(pos_mask * loss_for_all)
total_pos_bboxes = tf.maximum(1.0, tf.reduce_sum(pos_mask))
return loc_loss / total_pos_bboxes