diff --git a/aruco/ArucoDetector.py b/aruco/ArucoDetector.py new file mode 100644 index 0000000..001ea89 --- /dev/null +++ b/aruco/ArucoDetector.py @@ -0,0 +1,217 @@ +import pyrealsense2 as rs +import numpy as np +import cv2 +import cv2.aruco as aruco +import time + +class ArucoDetector: + def __init__(self, aruco_dict_type=aruco.DICT_5X5_250): + self.pipeline = rs.pipeline() + self.config = rs.config() + self.config.enable_stream(rs.stream.depth, 848, 480, rs.format.z16, 60) + self.config.enable_stream(rs.stream.color, 848, 480, rs.format.bgr8, 60) + + # Align the two cameras since there is physical offset + self.align_to = rs.stream.color + self.align = rs.align(self.align_to) + + # Use 6x6_250 Dict aruco library + self.aruco_dict = aruco.Dictionary_get(aruco_dict_type) + self.aruco_params = aruco.DetectorParameters_create() + + # Dictionary to hold marker positions + self.marker_positions = {} + + # Variable to hold camera intrinsics + self.intrinsics = None + + def __del__(self): + # Destructor to safely close windows and stop the stream + print("Closing ArucoDetector and releasing resources") + try: + if self.pipeline: + self.pipeline.stop() + except Exception as e: + print(f"Error stopping pipeline: {e}") + finally: + cv2.destroyAllWindows() + # set back to auto exposure + color_sensor = self.profile.get_device().first_color_sensor() + color_sensor.set_option(rs.option.enable_auto_exposure, 1) + + # same as desructor, can be manually called + def uninitialize(self): + # Destructor to safely close windows and stop the stream + print("Closing ArucoDetector and releasing resources") + try: + if self.pipeline: + self.pipeline.stop() + except Exception as e: + print(f"Error stopping pipeline: {e}") + finally: + cv2.destroyAllWindows() + # set back to auto exposure + color_sensor = self.profile.get_device().first_color_sensor() + color_sensor.set_option(rs.option.enable_auto_exposure, 1) + + + def start_stream(self): + # Start the pipeline + self.profile = self.pipeline.start(self.config) + # Get the camera intrinsics from the color stream + self.intrinsics = self.profile.get_stream(rs.stream.color).as_video_stream_profile().get_intrinsics() + + def stop_stream(self): + # Stop the pipeline + self.pipeline.stop() + + """ + Automatically calibrate the camera's exposure to detect ArUco markers. + Args: + min_exposure (int): Minimum exposure value to start from. + max_exposure (int): Maximum exposure value to go up to. + step (int): Step size to increment exposure. + frames_to_check (int): Number of frames to check at each exposure level. + """ + def auto_calibration(self, min_exposure=50, max_exposure=2000, step=50, frames_to_check=5): + color_sensor = self.profile.get_device().first_color_sensor() + color_sensor.set_option(rs.option.enable_auto_exposure, 0) + + best_exposure = min_exposure + max_detected_markers = 0 + + for exposure_value in range(min_exposure, max_exposure + 1, step): + color_sensor.set_option(rs.option.exposure, exposure_value) + print(f"Setting exposure to {exposure_value} and checking markers...") + + markers_detected = 0 + + for frame_count in range(frames_to_check): + aligned_depth_frame, color_image = self.process_frames() + if color_image is not None and aligned_depth_frame is not None: + # Display the live RGB output + cv2.imshow('Calibration - RGB Output', color_image) + if cv2.waitKey(1) & 0xFF == ord('q'): # Exit if 'q' is pressed + cv2.destroyAllWindows() + return + + gray = cv2.cvtColor(color_image, cv2.COLOR_BGR2GRAY) + corners, ids, _ = aruco.detectMarkers(gray, self.aruco_dict, parameters=self.aruco_params) + + if ids is not None: + markers_detected += len(ids) + + time.sleep(0.02) # Give some time for the camera to adjust if needed + + print(f"Exposure: {exposure_value}, Markers Detected: {markers_detected}") + if markers_detected > max_detected_markers: + max_detected_markers = markers_detected + best_exposure = exposure_value + print(f"New best exposure found: {best_exposure} with {markers_detected} markers detected.") + + # After finding the best exposure, set the camera to use it + color_sensor.set_option(rs.option.exposure, best_exposure) + print(f"Auto-calibration complete. Best exposure set to {best_exposure}.") + cv2.destroyAllWindows() + + # reset from manual exposure to built in auto-exposure + def reset_exposure(self): + # Get the color sensor from the connected device + color_sensor = self.profile.get_device().first_color_sensor() + + # Set the exposure mode back to automatic + color_sensor.set_option(rs.option.enable_auto_exposure, 1) + print("Exposure reset to automatic.") + + + # non-blocking processing of frames (poll_for_frames) + def process_frames(self): + # Attempt to retrieve the next set of frames + frames = self.pipeline.poll_for_frames() + + # Check if frames are available + if frames: + aligned_frames = self.align.process(frames) + aligned_depth_frame = aligned_frames.get_depth_frame() + color_frame = aligned_frames.get_color_frame() + + if not aligned_depth_frame or not color_frame: + return None, None + + # Convert color image to numpy array + color_image = np.asanyarray(color_frame.get_data()) + + return aligned_depth_frame, color_image + else: + # No new frames available + return None, None + + +# ### helper functions for update_marker_positions ### # + # calculates x, y, z position + def calculate_m (self, x_pixel, y_pixel, depth_frame): + # Get depth value + depth = depth_frame.get_distance(x_pixel, y_pixel) + + # Deproject pixels to 3D space + point_3D = rs.rs2_deproject_pixel_to_point(self.intrinsics, [x_pixel, y_pixel], depth) + + # Adjust y-coordinate so that up is positive y + # spec sheet gives that camera is -4.2mm from front glass + x, y, z = point_3D + y = -y + z = z - 0.0042 + + # Convert from meters to centimeters + # consistently 6mm too much? + # x_cm, y_cm, z_cm = x * 100, y * 100, z * 100 + + return x, y, z + + # Display the 3D coordinates next to the marker + # display x, y, z in different lines + def display(self, color_image, x_pixel, y_pixel, marker_id): + # Get the position data from the dictionary + x, y, z = self.marker_positions[marker_id] + + coord_text = (f"ID {marker_id}:\n" + f"X={x:.4f}m\n" + f"Y={y:.4f}m\n" + f"Z={z:.4f}m") + for j, line in enumerate(coord_text.split('\n')): + y_offset = y_pixel + 20 + (15 * j) + cv2.putText(color_image, line, (x_pixel + 20, y_offset), + cv2.FONT_HERSHEY_PLAIN, 1, (0, 0, 255), 2) + + +# ######################################################## # + + def update_marker_positions(self, color_image, depth_frame): + # Detect ArUco markers + gray = cv2.cvtColor(color_image, cv2.COLOR_BGR2GRAY) + corners, ids, rejectedImgPoints = aruco.detectMarkers(gray, self.aruco_dict, parameters=self.aruco_params) + + if ids is not None: + # Draw detected markers + aruco.drawDetectedMarkers(color_image, corners, ids) + + for i in range(len(ids)): + # Average the corners for better center estimation + center = np.mean(corners[i][0], axis=0) + x_pixel, y_pixel = int(center[0]), int(center[1]) + + # calulcate position in 3D space in cm + x, y, z = self.calculate_m(x_pixel, y_pixel, depth_frame) + + # Update the dictionary with the latest position data + marker_id = ids[i][0] + self.marker_positions[marker_id] = [x, y, z] + + self.display(color_image, x_pixel, y_pixel, marker_id) + + return color_image + + def get_marker_position(self, marker_id): + # Retrieve the latest recorded position of a marker by its ID + # return is x, y, z as a list of floats + return self.marker_positions.get(marker_id) diff --git a/aruco/DICT_5X5_250/DICT_5X5_id0.png b/aruco/DICT_5X5_250/DICT_5X5_id0.png new file mode 100644 index 0000000..b74fedd Binary files /dev/null and b/aruco/DICT_5X5_250/DICT_5X5_id0.png differ diff --git a/aruco/DICT_5X5_250/DICT_5X5_id1.png b/aruco/DICT_5X5_250/DICT_5X5_id1.png new file mode 100644 index 0000000..46da02e Binary files /dev/null and b/aruco/DICT_5X5_250/DICT_5X5_id1.png differ diff --git a/aruco/DICT_5X5_250/DICT_5X5_id2.png b/aruco/DICT_5X5_250/DICT_5X5_id2.png new file mode 100644 index 0000000..ce89917 Binary files /dev/null and b/aruco/DICT_5X5_250/DICT_5X5_id2.png differ diff --git a/aruco/DICT_5X5_250/DICT_5X5_id3.png b/aruco/DICT_5X5_250/DICT_5X5_id3.png new file mode 100644 index 0000000..372e03b Binary files /dev/null and b/aruco/DICT_5X5_250/DICT_5X5_id3.png differ diff --git a/aruco/DICT_6X6_250/DICT_6X6_id0.png b/aruco/DICT_6X6_250/DICT_6X6_id0.png new file mode 100644 index 0000000..9bd7303 Binary files /dev/null and b/aruco/DICT_6X6_250/DICT_6X6_id0.png differ diff --git a/aruco/DICT_6X6_250/DICT_6X6_id1.png b/aruco/DICT_6X6_250/DICT_6X6_id1.png new file mode 100644 index 0000000..c2de747 Binary files /dev/null and b/aruco/DICT_6X6_250/DICT_6X6_id1.png differ diff --git a/aruco/DICT_6X6_250/DICT_6X6_id2.png b/aruco/DICT_6X6_250/DICT_6X6_id2.png new file mode 100644 index 0000000..a51ad54 Binary files /dev/null and b/aruco/DICT_6X6_250/DICT_6X6_id2.png differ diff --git a/aruco/DICT_6X6_250/DICT_6X6_id3.png b/aruco/DICT_6X6_250/DICT_6X6_id3.png new file mode 100644 index 0000000..231d929 Binary files /dev/null and b/aruco/DICT_6X6_250/DICT_6X6_id3.png differ diff --git a/aruco/DICT_6X6_250/DICT_6X6_id4.png b/aruco/DICT_6X6_250/DICT_6X6_id4.png new file mode 100644 index 0000000..67ec17c Binary files /dev/null and b/aruco/DICT_6X6_250/DICT_6X6_id4.png differ diff --git a/aruco/DICT_6X6_250/DICT_6X6_id5.png b/aruco/DICT_6X6_250/DICT_6X6_id5.png new file mode 100644 index 0000000..41abff7 Binary files /dev/null and b/aruco/DICT_6X6_250/DICT_6X6_id5.png differ diff --git a/aruco/DICT_6X6_250/DICT_6X6_id6.png b/aruco/DICT_6X6_250/DICT_6X6_id6.png new file mode 100644 index 0000000..86de01a Binary files /dev/null and b/aruco/DICT_6X6_250/DICT_6X6_id6.png differ diff --git a/aruco/__init__.py b/aruco/__init__.py new file mode 100644 index 0000000..be246e6 --- /dev/null +++ b/aruco/__init__.py @@ -0,0 +1,2 @@ +from .ArucoDetector import ArucoDetector +from .generate_aruco import generate_aruco \ No newline at end of file diff --git a/aruco/generate_aruco.py b/aruco/generate_aruco.py new file mode 100644 index 0000000..75fd3a4 --- /dev/null +++ b/aruco/generate_aruco.py @@ -0,0 +1,69 @@ +from typing import List, Union +import cv2 +import cv2.aruco as aruco +import numpy as np +import os + +# Define the ArUco dictionary constants +DICT_4X4 = aruco.DICT_4X4_250 +DICT_5X5 = aruco.DICT_5X5_250 +DICT_6X6 = aruco.DICT_6X6_250 +DICT_7X7 = aruco.DICT_7X7_250 + +# Mapping of dictionary constants to their string names +DICT_MAPPING = { + DICT_4X4: "DICT_4X4", + DICT_5X5: "DICT_5X5", + DICT_6X6: "DICT_6X6", + DICT_7X7: "DICT_7X7" +} + + +""" +Generate ArUco markers for the given IDs and tag types. + +:param ids: List of IDs for the ArUco markers. +:param tag_types: ArUco tag type or a list of tag types to be used. +:param size: Size of the ArUco markers. +:param file_format: File format for the output images. +:param directory: Directory where the ArUco markers will be saved. +""" +def aruco_generator(ids: List[int], + tag_types: Union[int, List[int]] = DICT_6X6, + size: int = 200, + file_format: str = 'png', + directory: str = 'aruco_markers') -> None: + + if isinstance(tag_types, int): + tag_types = [tag_types] + + # Create the directory if it doesn't exist + if not os.path.exists(directory): + os.makedirs(directory) + + # Loop through each tag type + for tag_type in tag_types: + # Get the ArUco dictionary + aruco_dict = aruco.Dictionary_get(tag_type) + tag_type_str = DICT_MAPPING[tag_type] # Get the string representation + + # Generate markers for each id + for id in ids: + # Generate the marker + img = aruco.drawMarker(aruco_dict, id, size) + + # Construct the file name + file_name = f"{tag_type_str}_id{id}.{file_format}" + file_path = os.path.join(directory, file_name) + + # Save the marker to a file + cv2.imwrite(file_path, img) + + print(f"Generated {len(ids) * len(tag_types)} ArUco markers in '{directory}' directory.") + + +# Example usage: +ids = [0, 1, 2, 3] +tag_types = [DICT_5X5] +# Generate ArUco markers +aruco_generator(ids, tag_types, 200, "png", "DICT_5X5_250") diff --git a/aruco/reset.py b/aruco/reset.py new file mode 100644 index 0000000..dd2b617 --- /dev/null +++ b/aruco/reset.py @@ -0,0 +1,13 @@ +import pyrealsense2 as rs + +def reset_camera_settings(): + # Create a context object. This object owns the handles to all connected realsense devices + ctx = rs.context() + devices = ctx.query_devices() + for dev in devices: + # The following method is used to reset the hardware device + print("Resetting device:", dev.get_info(rs.camera_info.name)) + dev.hardware_reset() + +if __name__ == '__main__': + reset_camera_settings() diff --git a/aruco/test_aruco.py b/aruco/test_aruco.py new file mode 100644 index 0000000..945e0a9 --- /dev/null +++ b/aruco/test_aruco.py @@ -0,0 +1,46 @@ +# the size of my aruco tags is 1.97inch, or 0.05m +from ArucoDetector import ArucoDetector +import cv2 + + +def aruco_test(): + # Initialize the ArucoDetector + aruco_detector = ArucoDetector(cv2.aruco.DICT_5X5_250) + + # Start the camera stream + aruco_detector.start_stream() + + # Set the exposure of the camera + aruco_detector.auto_calibration() # Make sure this value is within the supported range + + while True: + # Process frames from the camera + aligned_depth_frame, color_image = aruco_detector.process_frames() + if color_image is not None and aligned_depth_frame is not None: + # Update marker positions in the frame + color_image = aruco_detector.update_marker_positions(color_image, aligned_depth_frame) + # Display the image + cv2.imshow('RealSense Color', color_image) + + # buttons to test functions + key = cv2.waitKey(1) & 0xFF + + # Check for 'c' key to run auto_calibration + if key == ord('c'): + aruco_detector.auto_calibration() + + # Check for 'r' key to reset exposure + elif key == ord('r'): + aruco_detector.reset_exposure() + + # Break the loop when 'q' is pressed + elif key == ord('q'): + break + + # Clean up resources properly + aruco_detector.uninitialize() + # cv2.destroyAllWindows() # redundant + + +if __name__ == '__main__': + aruco_test()