From 8a8760705a04d4f82efb1cbacf5c011bb276f43b Mon Sep 17 00:00:00 2001 From: cvhub Date: Mon, 18 Sep 2023 18:25:00 +0800 Subject: [PATCH] supoort yolov5_sam --- anylabeling/app_info.py | 2 +- anylabeling/configs/auto_labeling/models.yaml | 2 + .../yolov5s_mobile_sam_vit_h.yaml | 98 +++ .../services/auto_labeling/model_manager.py | 36 +- .../services/auto_labeling/yolov5_sam.py | 748 ++++++++++++++++++ 5 files changed, 882 insertions(+), 4 deletions(-) create mode 100644 anylabeling/configs/auto_labeling/yolov5s_mobile_sam_vit_h.yaml create mode 100644 anylabeling/services/auto_labeling/yolov5_sam.py diff --git a/anylabeling/app_info.py b/anylabeling/app_info.py index ac9aab40..2565d52d 100644 --- a/anylabeling/app_info.py +++ b/anylabeling/app_info.py @@ -1,4 +1,4 @@ __appname__ = "X-AnyLabeling" __appdescription__ = "Advanced Auto Labeling Solution with Added Features" -__version__ = "0.2.2" +__version__ = "0.2.3" __preferred_device__ = "CPU" # GPU or CPU diff --git a/anylabeling/configs/auto_labeling/models.yaml b/anylabeling/configs/auto_labeling/models.yaml index 0e14651a..3338cbd9 100644 --- a/anylabeling/configs/auto_labeling/models.yaml +++ b/anylabeling/configs/auto_labeling/models.yaml @@ -1,5 +1,7 @@ - model_name: "ch_ppocr_v4-r20230915" config_file: ":/ch_ppocr_v4.yaml" +- model_name: "yolov5s_mobile_sam_vit_h-r20230920" + config_file: ":/yolov5s_mobile_sam_vit_h.yaml" - model_name: "sam_med2d_vit_b-r20230901" config_file: ":/sam_med2d_vit_b.yaml" - model_name: "segment_anything_vit_b_quant-r20230810" diff --git a/anylabeling/configs/auto_labeling/yolov5s_mobile_sam_vit_h.yaml b/anylabeling/configs/auto_labeling/yolov5s_mobile_sam_vit_h.yaml new file mode 100644 index 00000000..7bc64180 --- /dev/null +++ b/anylabeling/configs/auto_labeling/yolov5s_mobile_sam_vit_h.yaml @@ -0,0 +1,98 @@ +type: yolov5_sam +name: yolov5s_mobile_sam_vit_h-r20230920 +display_name: YOLOv5s-MobileSAM (ViT-Huge) +#SAM +encoder_model_path: https://github.com/CVHub520/X-AnyLabeling/releases/download/v0.2.0/mobile_sam.encoder.onnx +decoder_model_path: https://github.com/CVHub520/X-AnyLabeling/releases/download/v0.2.0/sam_vit_h_4b8939.decoder.onnx +cache_size: 10 +target_size: 1024 +max_width: 1024 +max_height: 682 +#Det +model_path: https://github.com/CVHub520/X-AnyLabeling/releases/download/v0.1.0/yolov5s.onnx +input_width: 640 +input_height: 640 +stride: 32 +nms_threshold: 0.45 +confidence_threshold: 0.25 +classes: + - person + - bicycle + - car + - motorcycle + - airplane + - bus + - train + - truck + - boat + - traffic light + - fire hydrant + - stop sign + - parking meter + - bench + - bird + - cat + - dog + - horse + - sheep + - cow + - elephant + - bear + - zebra + - giraffe + - backpack + - umbrella + - handbag + - tie + - suitcase + - frisbee + - skis + - snowboard + - sports ball + - kite + - baseball bat + - baseball glove + - skateboard + - surfboard + - tennis racket + - bottle + - wine glass + - cup + - fork + - knife + - spoon + - bowl + - banana + - apple + - sandwich + - orange + - broccoli + - carrot + - hot dog + - pizza + - donut + - cake + - chair + - couch + - potted plant + - bed + - dining table + - toilet + - tv + - laptop + - mouse + - remote + - keyboard + - cell phone + - microwave + - oven + - toaster + - sink + - refrigerator + - book + - clock + - vase + - scissors + - teddy bear + - hair drier + - toothbrush \ No newline at end of file diff --git a/anylabeling/services/auto_labeling/model_manager.py b/anylabeling/services/auto_labeling/model_manager.py index 988710ba..828bf5c5 100644 --- a/anylabeling/services/auto_labeling/model_manager.py +++ b/anylabeling/services/auto_labeling/model_manager.py @@ -179,7 +179,8 @@ def load_custom_model(self, config_file): "yolo_nas", "yolox_dwpose", "clrnet", - "ppocr_v4" + "ppocr_v4", + "yolov5_sam", ] ): self.new_model_status.emit( @@ -435,6 +436,30 @@ def _load_model(self, model_id): ) ) return + elif model_config["type"] == "yolov5_sam": + from .yolov5_sam import YOLOv5SegmentAnything + + try: + model_config["model"] = YOLOv5SegmentAnything( + model_config, on_message=self.new_model_status.emit + ) + self.auto_segmentation_model_selected.emit() + except Exception as e: # noqa + print( + "Error in loading model: {error_message}".format( + error_message=str(e) + ) + ) + self.new_model_status.emit( + self.tr( + "Error in loading model: {error_message}".format( + error_message=str(e) + ) + ) + ) + return + # Request next files for prediction + self.request_next_files_requested.emit() elif model_config["type"] == "segment_anything": from .segment_anything import SegmentAnything @@ -625,9 +650,14 @@ def set_auto_labeling_marks(self, marks): """Set auto labeling marks (For example, for segment_anything model, it is the marks for) """ + marks_model_list = [ + "segment_anything", + "sam_med2d", + "yolov5_sam", + ] if ( self.loaded_model_config is None - or self.loaded_model_config["type"] not in ["segment_anything", "sam_med2d"] + or self.loaded_model_config["type"] not in marks_model_list ): return self.loaded_model_config["model"].set_auto_labeling_marks(marks) @@ -714,7 +744,7 @@ def on_next_files_changed(self, next_files): return # Currently only segment_anything-like model supports this feature - if self.loaded_model_config["type"] not in ["segment_anything", "sam_med2d"]: + if self.loaded_model_config["type"] not in ["segment_anything", "sam_med2d", "yolov5_sam"]: return self.loaded_model_config["model"].on_next_files_changed(next_files) diff --git a/anylabeling/services/auto_labeling/yolov5_sam.py b/anylabeling/services/auto_labeling/yolov5_sam.py new file mode 100644 index 00000000..00622d79 --- /dev/null +++ b/anylabeling/services/auto_labeling/yolov5_sam.py @@ -0,0 +1,748 @@ +import logging +import os +import traceback + +import cv2 +import math +import numpy as np +import onnxruntime as ort +from copy import deepcopy +from PyQt5 import QtCore +from PyQt5.QtCore import QThread +from PyQt5.QtCore import QCoreApplication + +from anylabeling.app_info import __preferred_device__ +from anylabeling.utils import GenericWorker +from anylabeling.views.labeling.shape import Shape +from anylabeling.views.labeling.utils.opencv import qt_img_to_rgb_cv_img + +from .lru_cache import LRUCache +from .model import Model +from .types import AutoLabelingResult + + +class SegmentAnythingONNX: + """Segmentation model using SegmentAnything""" + + def __init__(self, encoder_session, decoder_session, target_size, input_size) -> None: + self.target_size = target_size + self.input_size = input_size + self.encoder_session = encoder_session + self.encoder_input_name = self.encoder_session.get_inputs()[0].name + self.decoder_session = decoder_session + + def get_input_points(self, prompt): + """Get input points""" + points = [] + labels = [] + for mark in prompt: + if mark["type"] == "point": + points.append(mark["data"]) + labels.append(mark["label"]) + elif mark["type"] == "rectangle": + points.append([mark["data"][0], mark["data"][1]]) # top left + points.append( + [mark["data"][2], mark["data"][3]] + ) # bottom right + labels.append(2) + labels.append(3) + points, labels = np.array(points), np.array(labels) + return points, labels + + def run_encoder(self, encoder_inputs): + """Run encoder""" + output = self.encoder_session.run(None, encoder_inputs) + image_embedding = output[0] + return image_embedding + + @staticmethod + def get_preprocess_shape(oldh: int, oldw: int, long_side_length: int): + """ + Compute the output size given input size and target long side length. + """ + scale = long_side_length * 1.0 / max(oldh, oldw) + newh, neww = oldh * scale, oldw * scale + neww = int(neww + 0.5) + newh = int(newh + 0.5) + return (newh, neww) + + def apply_coords(self, coords: np.ndarray, original_size, target_length): + """ + Expects a numpy array of length 2 in the final dimension. Requires the + original image size in (H, W) format. + """ + old_h, old_w = original_size + new_h, new_w = self.get_preprocess_shape( + original_size[0], original_size[1], target_length + ) + coords = deepcopy(coords).astype(float) + coords[..., 0] = coords[..., 0] * (new_w / old_w) + coords[..., 1] = coords[..., 1] * (new_h / old_h) + return coords + + def run_decoder( + self, image_embedding, original_size, transform_matrix, prompt, transform_prompt + ): + """Run decoder""" + if transform_prompt: + input_points, input_labels = self.get_input_points(prompt) + else: + input_points, input_labels = prompt + + # Add a batch index, concatenate a padding point, and transform. + onnx_coord = np.concatenate( + [input_points, np.array([[0.0, 0.0]])], axis=0 + )[None, :, :] + onnx_label = np.concatenate([input_labels, np.array([-1])], axis=0)[ + None, : + ].astype(np.float32) + onnx_coord = self.apply_coords( + onnx_coord, self.input_size, self.target_size + ).astype(np.float32) + + # Apply the transformation matrix to the coordinates. + onnx_coord = np.concatenate( + [ + onnx_coord, + np.ones((1, onnx_coord.shape[1], 1), dtype=np.float32), + ], + axis=2, + ) + onnx_coord = np.matmul(onnx_coord, transform_matrix.T) + onnx_coord = onnx_coord[:, :, :2].astype(np.float32) + + # Create an empty mask input and an indicator for no mask. + onnx_mask_input = np.zeros((1, 1, 256, 256), dtype=np.float32) + onnx_has_mask_input = np.zeros(1, dtype=np.float32) + + decoder_inputs = { + "image_embeddings": image_embedding, + "point_coords": onnx_coord, + "point_labels": onnx_label, + "mask_input": onnx_mask_input, + "has_mask_input": onnx_has_mask_input, + "orig_im_size": np.array(self.input_size, dtype=np.float32), + } + masks, _, _ = self.decoder_session.run(None, decoder_inputs) + + # Transform the masks back to the original image size. + inv_transform_matrix = np.linalg.inv(transform_matrix) + transformed_masks = self.transform_masks( + masks, original_size, inv_transform_matrix + ) + + return transformed_masks + + def transform_masks(self, masks, original_size, transform_matrix): + """Transform masks + Transform the masks back to the original image size. + """ + output_masks = [] + for batch in range(masks.shape[0]): + batch_masks = [] + for mask_id in range(masks.shape[1]): + mask = masks[batch, mask_id] + mask = cv2.warpAffine( + mask, + transform_matrix[:2], + (original_size[1], original_size[0]), + flags=cv2.INTER_LINEAR, + ) + batch_masks.append(mask) + output_masks.append(batch_masks) + return np.array(output_masks) + + def encode(self, cv_image): + """ + Calculate embedding and metadata for a single image. + """ + original_size = cv_image.shape[:2] + + # Calculate a transformation matrix to convert to self.input_size + scale_x = self.input_size[1] / cv_image.shape[1] + scale_y = self.input_size[0] / cv_image.shape[0] + scale = min(scale_x, scale_y) + transform_matrix = np.array( + [ + [scale, 0, 0], + [0, scale, 0], + [0, 0, 1], + ] + ) + cv_image = cv2.warpAffine( + cv_image, + transform_matrix[:2], + (self.input_size[1], self.input_size[0]), + flags=cv2.INTER_LINEAR, + ) + + encoder_inputs = { + self.encoder_input_name: cv_image.astype(np.float32), + } + image_embedding = self.run_encoder(encoder_inputs) + return { + "image_embedding": image_embedding, + "original_size": original_size, + "transform_matrix": transform_matrix, + } + + def predict_masks(self, embedding, prompt, transform_prompt=True): + """ + Predict masks for a single image. + """ + masks = self.run_decoder( + embedding["image_embedding"], + embedding["original_size"], + embedding["transform_matrix"], + prompt, + transform_prompt, + ) + + return masks + +class YOLOv5SegmentAnything(Model): + """Segmentation model using YOLOv5 by SegmentAnything""" + + class Meta: + required_config_names = [ + "type", + "name", + "display_name", + "cache_size", + "target_size", + "max_width", + "max_height", + "model_path", + "input_width", + "input_height", + "stride", + "nms_threshold", + "confidence_threshold", + "classes", + "encoder_model_path", + "decoder_model_path", + ] + widgets = [ + "output_label", + "output_select_combobox", + "button_add_point", + "button_remove_point", + "button_add_rect", + "button_clear", + "button_finish_object", + ] + output_modes = { + "polygon": QCoreApplication.translate("Model", "Polygon"), + "rectangle": QCoreApplication.translate("Model", "Rectangle"), + } + default_output_mode = "polygon" + + def __init__(self, config_path, on_message) -> None: + # Run the parent class's init method + super().__init__(config_path, on_message) + + """ONNX""" + sess_opts = ort.SessionOptions() + if "OMP_NUM_THREADS" in os.environ: + sess_opts.inter_op_num_threads = int(os.environ["OMP_NUM_THREADS"]) + providers = ['CPUExecutionProvider'] + if __preferred_device__ == "GPU": + providers = ['CUDAExecutionProvider'] + + """YOLOv5""" + # Get model paths + model_abs_path = self.get_model_abs_path(self.config, "model_path") + if not model_abs_path or not os.path.isfile(model_abs_path): + raise FileNotFoundError( + QCoreApplication.translate( + "Model", "Could not download or initialize YOLOv5 model." + ) + ) + self.net = ort.InferenceSession( + model_abs_path, + providers=providers, + sess_options=sess_opts, + ) + self.classes = self.config["classes"] + self.img_size = self.check_img_size( + [self.config["input_width"], self.config["input_height"]], + s=self.config["stride"] + ) + self.runned_flag = False + + """Segment Anything Model""" + # Get encoder and decoder model paths + encoder_model_abs_path = self.get_model_abs_path( + self.config, "encoder_model_path" + ) + if not encoder_model_abs_path or not os.path.isfile( + encoder_model_abs_path + ): + raise FileNotFoundError( + QCoreApplication.translate( + "Model", + "Could not download or initialize encoder of Segment Anything.", + ) + ) + decoder_model_abs_path = self.get_model_abs_path( + self.config, "decoder_model_path" + ) + if not decoder_model_abs_path or not os.path.isfile( + decoder_model_abs_path + ): + raise FileNotFoundError( + QCoreApplication.translate( + "Model", + "Could not download or initialize decoder of Segment Anything.", + ) + ) + + # Load models + self.target_size = self.config["target_size"] + self.input_size = (self.config["max_height"], self.config["max_width"]) + encoder_session = ort.InferenceSession(encoder_model_abs_path, providers=providers, sess_options=sess_opts) + decoder_session = ort.InferenceSession(decoder_model_abs_path, providers=providers, sess_options=sess_opts) + self.model = SegmentAnythingONNX(encoder_session, decoder_session, self.target_size, self.input_size) + + # Mark for auto labeling: [points, rectangles] + self.marks = [] + + # Cache for image embedding + self.cache_size = self.config["cache_size"] + self.preloaded_size = self.cache_size - 3 + self.image_embedding_cache = LRUCache(self.cache_size) + + # Pre-inference worker + self.pre_inference_thread = None + self.pre_inference_worker = None + self.stop_inference = False + + def set_auto_labeling_marks(self, marks): + """Set auto labeling marks""" + self.marks = marks + + def post_process(self, masks, label=None): + """ + Post process masks + """ + # Find contours + masks[masks > 0.0] = 255 + masks[masks <= 0.0] = 0 + masks = masks.astype(np.uint8) + contours, _ = cv2.findContours( + masks, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE + ) + + # Refine contours + approx_contours = [] + for contour in contours: + # Approximate contour + epsilon = 0.001 * cv2.arcLength(contour, True) + approx = cv2.approxPolyDP(contour, epsilon, True) + approx_contours.append(approx) + + # Remove too big contours ( >90% of image size) + if len(approx_contours) > 1: + image_size = masks.shape[0] * masks.shape[1] + areas = [cv2.contourArea(contour) for contour in approx_contours] + filtered_approx_contours = [ + contour + for contour, area in zip(approx_contours, areas) + if area < image_size * 0.9 + ] + + # Remove small contours (area < 20% of average area) + if len(approx_contours) > 1: + areas = [cv2.contourArea(contour) for contour in approx_contours] + avg_area = np.mean(areas) + + filtered_approx_contours = [ + contour + for contour, area in zip(approx_contours, areas) + if area > avg_area * 0.2 + ] + approx_contours = filtered_approx_contours + + # Contours to shapes + shapes = [] + if self.output_mode == "polygon": + for approx in approx_contours: + # Scale points + points = approx.reshape(-1, 2) + points[:, 0] = points[:, 0] + points[:, 1] = points[:, 1] + points = points.tolist() + if len(points) < 3: + continue + points.append(points[0]) + + # Create shape + shape = Shape(flags={}) + for point in points: + point[0] = int(point[0]) + point[1] = int(point[1]) + shape.add_point(QtCore.QPointF(point[0], point[1])) + shape.shape_type = "polygon" + shape.closed = True + shape.fill_color = "#000000" + shape.line_color = "#000000" + shape.line_width = 1 + shape.label = "AUTOLABEL_OBJECT" if label is None else label + shape.selected = False + shapes.append(shape) + elif self.output_mode == "rectangle": + x_min = 100000000 + y_min = 100000000 + x_max = 0 + y_max = 0 + for approx in approx_contours: + # Scale points + points = approx.reshape(-1, 2) + points[:, 0] = points[:, 0] + points[:, 1] = points[:, 1] + points = points.tolist() + if len(points) < 3: + continue + + # Get min/max + for point in points: + x_min = min(x_min, point[0]) + y_min = min(y_min, point[1]) + x_max = max(x_max, point[0]) + y_max = max(y_max, point[1]) + + # Create shape + shape = Shape(flags={}) + shape.add_point(QtCore.QPointF(x_min, y_min)) + shape.add_point(QtCore.QPointF(x_max, y_max)) + shape.shape_type = "rectangle" + shape.closed = True + shape.fill_color = "#000000" + shape.line_color = "#000000" + shape.line_width = 1 + shape.label = "AUTOLABEL_OBJECT" if label is None else label + shape.selected = False + shapes.append(shape) + + return shapes if label is None else shapes[0] + + def predict_shapes(self, image, filename=None) -> AutoLabelingResult: + """ + Predict shapes from image + """ + + if image is None or not self.marks: + return AutoLabelingResult([], replace=False) + + shapes = [] + try: + # Use cached image embedding if possible + cached_data = self.image_embedding_cache.get(filename) + if cached_data is not None: + image_embedding = cached_data + else: + cv_image = qt_img_to_rgb_cv_img(image, filename) + if self.stop_inference: + return AutoLabelingResult([], replace=False) + image_embedding = self.model.encode(cv_image) + self.image_embedding_cache.put( + filename, + image_embedding, + ) + if self.stop_inference: + return AutoLabelingResult([], replace=False) + + if not self.runned_flag: + processed_img, detections = self.yolo_pre_process(cv_image, self.net) + prompts, labels = self.yolo_post_process(cv_image, processed_img, detections) + for prompt, label in zip(prompts, labels): + masks = self.model.predict_masks(image_embedding, prompt, transform_prompt=False) + if len(masks.shape) == 4: + masks = masks[0][0] + else: + masks = masks[0] + results = self.post_process(masks, label=label) + shapes.append(results) + result = AutoLabelingResult(shapes, replace=True) + self.runned_flag = True # Run only once + shapes = [] + return result + + masks = self.model.predict_masks(image_embedding, self.marks) + if len(masks.shape) == 4: + masks = masks[0][0] + else: + masks = masks[0] + shapes = self.post_process(masks) + except Exception as e: # noqa + logging.warning("Could not inference model") + logging.warning(e) + traceback.print_exc() + return AutoLabelingResult([], replace=False) + + result = AutoLabelingResult(shapes, replace=False) + return result + + def yolo_pre_process(self, input_image, net): + """ + Pre-process the input RGB image before feeding it to the network. + """ + image = self.letterbox(input_image, self.img_size, stride=self.config['stride'])[0] + image = image.transpose((2, 0, 1)) # HWC to CHW + image = np.ascontiguousarray(image).astype('float32') + image /= 255 # 0 - 255 to 0.0 - 1.0 + if len(image.shape) == 3: + image = image[None] + inputs = net.get_inputs()[0].name + outputs = net.run(None, {inputs: image})[0] + + return image, outputs + + def yolo_post_process(self, img_src, img_processed, outputs): + """ + Post-process the network's output, to get the bounding boxes, key-points and + their confidence scores. + """ + det = self.non_max_suppression(outputs)[0] + output_infos, labels = [], [] + if len(det): + det[:, :4] = self.rescale(img_processed.shape[2:], det[:, :4], img_src.shape).round() + for *xyxy, _, class_id in reversed(det): + x1, y1, x2, y2 = xyxy + prompt = [np.array([[int(x1), int(y1)], [int(x2), int(y2)]]), np.array([2, 3])] + output_infos.append(prompt) + labels.append(self.classes[int(class_id)]) + return output_infos, labels + + def check_img_size(self, img_size, s=32, floor=0): + """Make sure image size is a multiple of stride s in each dimension, and return a new shape list of image.""" + if isinstance(img_size, int): # integer i.e. img_size=640 + new_size = max(self.make_divisible(img_size, int(s)), floor) + elif isinstance(img_size, list): # list i.e. img_size=[640, 480] + new_size = [max(self.make_divisible(x, int(s)), floor) for x in img_size] + else: + raise Exception(f"Unsupported type of img_size: {type(img_size)}") + + if new_size != img_size: + print(f'WARNING: --img-size {img_size} must be multiple of max stride {s}, updating to {new_size}') + return new_size if isinstance(img_size,list) else [new_size]*2 + + def make_divisible(self, x, divisor): + # Upward revision the value x to make it evenly divisible by the divisor. + return math.ceil(x / divisor) * divisor + + @staticmethod + def letterbox(im, new_shape=(640, 640), color=(114, 114, 114), auto=False, scaleup=True, stride=32, return_int=False): + '''Resize and pad image while meeting stride-multiple constraints.''' + shape = im.shape[:2] # current shape [height, width] + if isinstance(new_shape, int): + new_shape = (new_shape, new_shape) + elif isinstance(new_shape, list) and len(new_shape) == 1: + new_shape = (new_shape[0], new_shape[0]) + + # Scale ratio (new / old) + r = min(new_shape[0] / shape[0], new_shape[1] / shape[1]) + if not scaleup: # only scale down, do not scale up (for better val mAP) + r = min(r, 1.0) + + # Compute padding + new_unpad = int(round(shape[1] * r)), int(round(shape[0] * r)) + dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1] # wh padding + + if auto: # minimum rectangle + dw, dh = np.mod(dw, stride), np.mod(dh, stride) # wh padding + + dw /= 2 # divide padding into 2 sides + dh /= 2 + + if shape[::-1] != new_unpad: # resize + im = cv2.resize(im, new_unpad, interpolation=cv2.INTER_LINEAR) + top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1)) + left, right = int(round(dw - 0.1)), int(round(dw + 0.1)) + im = cv2.copyMakeBorder(im, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color) # add border + if not return_int: + return im, r, (dw, dh) + else: + return im, r, (left, top) + + @staticmethod + def xywh2xyxy(x): + '''Convert boxes with shape [n, 4] from [x, y, w, h] to [x1, y1, x2, y2] where x1y1 is top-left, x2y2=bottom-right.''' + y = np.copy(x) + y[:, 0] = x[:, 0] - x[:, 2] / 2 # top left x + y[:, 1] = x[:, 1] - x[:, 3] / 2 # top left y + y[:, 2] = x[:, 0] + x[:, 2] / 2 # bottom right x + y[:, 3] = x[:, 1] + x[:, 3] / 2 # bottom right y + return y + + @staticmethod + def rescale(ori_shape, boxes, target_shape): + '''Rescale the output to the original image shape''' + ratio = min(ori_shape[0] / target_shape[0], ori_shape[1] / target_shape[1]) + padding = ((ori_shape[1] - target_shape[1] * ratio) / 2, (ori_shape[0] - target_shape[0] * ratio) / 2) + boxes[:, [0, 2]] -= padding[0] + boxes[:, [1, 3]] -= padding[1] + boxes[:, :4] /= ratio + boxes[:, 0] = np.clip(boxes[:, 0], 0, target_shape[1]) # x1 + boxes[:, 1] = np.clip(boxes[:, 1], 0, target_shape[0]) # y1 + boxes[:, 2] = np.clip(boxes[:, 2], 0, target_shape[1]) # x2 + boxes[:, 3] = np.clip(boxes[:, 3], 0, target_shape[0]) # y2 + return boxes + + def non_max_suppression(self, prediction, conf_thres=0.25, iou_thres=0.45, classes=None, agnostic=False, multi_label=False, max_det=1000): + """Runs Non-Maximum Suppression (NMS) on inference results. + This code is borrowed from: https://github.com/ultralytics/yolov5/blob/47233e1698b89fc437a4fb9463c815e9171be955/utils/general.py#L775 + Args: + prediction: (tensor), with shape [N, 5 + num_classes], N is the number of bboxes. + conf_thres: (float) confidence threshold. + iou_thres: (float) iou threshold. + classes: (None or list[int]), if a list is provided, nms only keep the classes you provide. + agnostic: (bool), when it is set to True, we do class-independent nms, otherwise, different class would do nms respectively. + multi_label: (bool), when it is set to True, one box can have multi labels, otherwise, one box only huave one label. + max_det:(int), max number of output bboxes. + + Returns: + list of detections, echo item is one tensor with shape (num_boxes, 6), 6 is for [xyxy, conf, cls]. + """ + conf_thres = self.config["confidence_threshold"] + iou_thres = self.config["nms_threshold"] + + num_classes = prediction.shape[2] - 5 # number of classes + pred_candidates = np.logical_and(prediction[..., 4] > conf_thres, np.max(prediction[..., 5:], axis=-1) > conf_thres) # candidates + # Check the parameters. + assert 0 <= conf_thres <= 1, f'conf_thresh must be in 0.0 to 1.0, however {conf_thres} is provided.' + assert 0 <= iou_thres <= 1, f'iou_thres must be in 0.0 to 1.0, however {iou_thres} is provided.' + + # Function settings. + max_wh = 4096 # maximum box width and height + max_nms = 30000 # maximum number of boxes put into torchvision.ops.nms() + multi_label &= num_classes > 1 # multiple labels per box + + output = [np.zeros((0, 6))] * prediction.shape[0] + for img_idx, x in enumerate(prediction): # image index, image inference + x = x[pred_candidates[img_idx]] # confidence + + # If no box remains, skip the next process. + if not x.shape[0]: + continue + + # confidence multiply the objectness + x[:, 5:] *= x[:, 4:5] # conf = obj_conf * cls_conf + + # (center x, center y, width, height) to (x1, y1, x2, y2) + box = self.xywh2xyxy(x[:, :4]) + + # Detections matrix's shape is (n,6), each row represents (xyxy, conf, cls) + if multi_label: + box_idx, class_idx = np.nonzero(x[:, 5:] > conf_thres) + box = box[box_idx] + conf = x[box_idx, class_idx + 5][:, None] + class_idx = class_idx[:, None].astype(float) + x = np.concatenate((box, conf, class_idx), axis=1) + else: + conf = np.max(x[:, 5:], axis=1, keepdims=True) + class_idx = np.argmax(x[:, 5:], axis=1) + x = np.concatenate((box, conf, class_idx[:, None].astype(float)), axis=1)[conf.flatten() > conf_thres] + + # Filter by class, only keep boxes whose category is in classes. + if classes is not None: + x = x[(x[:, 5:6] == np.array(classes)).any(1)] + + # Check shape + num_box = x.shape[0] # number of boxes + if not num_box: # no boxes kept. + continue + elif num_box > max_nms: # excess max boxes' number. + x = x[x[:, 4].argsort(descending=True)[:max_nms]] # sort by confidence + + # Batched NMS + class_offset = x[:, 5:6] * (0 if agnostic else max_wh) # classes + boxes, scores = x[:, :4] + class_offset, x[:, 4] # boxes (offset by class), scores + keep_box_idx = self.numpy_nms(boxes, scores, iou_thres) # NMS + if keep_box_idx.shape[0] > max_det: # limit detections + keep_box_idx = keep_box_idx[:max_det] + + output[img_idx] = x[keep_box_idx] + + return output + + @staticmethod + def box_area(boxes :np.array): + return (boxes[:, 2] - boxes[:, 0]) * (boxes[:, 3] - boxes[:, 1]) + + def box_iou(self, box1: np.array, box2: np.array): + area1 = self.box_area(box1) # N + area2 = self.box_area(box2) # M + # broadcasting + lt = np.maximum(box1[:, np.newaxis, :2], box2[:, :2]) + rb = np.minimum(box1[:, np.newaxis, 2:], box2[:, 2:]) + wh = rb - lt + wh = np.maximum(0, wh) # [N, M, 2] + inter = wh[:, :, 0] * wh[:, :, 1] + iou = inter / (area1[:, np.newaxis] + area2 - inter) + return iou # NxM + + def numpy_nms(self, boxes: np.array, scores: np.array, iou_threshold :float): + idxs = scores.argsort() + keep = [] + while idxs.size > 0: + max_score_index = idxs[-1] + max_score_box = boxes[max_score_index][None, :] + keep.append(max_score_index) + if idxs.size == 1: + break + idxs = idxs[:-1] + other_boxes = boxes[idxs] + ious = self.box_iou(max_score_box, other_boxes) + idxs = idxs[ious[0] <= iou_threshold] + keep = np.array(keep) + return keep + + def unload(self): + self.stop_inference = True + if self.pre_inference_thread: + self.pre_inference_thread.quit() + del self.net + + def preload_worker(self, files): + """ + Preload next files, run inference and cache results + """ + files = files[: self.preloaded_size] + for filename in files: + if self.image_embedding_cache.find(filename): + continue + image = self.load_image_from_filename(filename) + if image is None: + continue + if self.stop_inference: + return + cv_image = qt_img_to_rgb_cv_img(image) + image_embedding = self.model.encode(cv_image) + self.image_embedding_cache.put( + filename, + image_embedding, + ) + + def on_next_files_changed(self, next_files): + """ + Handle next files changed. This function can preload next files + and run inference to save time for user. + """ + if ( + self.pre_inference_thread is None + or not self.pre_inference_thread.isRunning() + ): + self.pre_inference_thread = QThread() + self.pre_inference_worker = GenericWorker( + self.preload_worker, next_files + ) + self.pre_inference_worker.finished.connect( + self.pre_inference_thread.quit + ) + self.pre_inference_worker.moveToThread(self.pre_inference_thread) + self.pre_inference_thread.started.connect( + self.pre_inference_worker.run + ) + self.pre_inference_thread.start() +