-
Notifications
You must be signed in to change notification settings - Fork 17
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: Ramiro Serra <[email protected]>
- Loading branch information
1 parent
d23d446
commit 059fcbb
Showing
3 changed files
with
117 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,112 @@ | ||
import rosbag2_py | ||
|
||
import numpy as np | ||
from sensor_msgs.msg import PointCloud2 | ||
from rclpy.serialization import deserialize_message | ||
from scipy.spatial.transform import Rotation | ||
|
||
import sensor_msgs_py.point_cloud2 as pc2 | ||
from rosidl_runtime_py.utilities import get_message | ||
import rclpy | ||
import open3d as o3d | ||
from tf2_msgs.msg import TFMessage | ||
import sensor_msgs_py.numpy_compat as nc | ||
|
||
import tf2_ros | ||
|
||
|
||
def lookup_transform(tf_buffer, target_frame, source_frame, time): | ||
try: | ||
transform_stamped = tf_buffer.lookup_transform( | ||
target_frame, source_frame, time) | ||
return transform_stamped.transform | ||
except tf2_ros.LookupException as e: | ||
print(f"error : {e=}") | ||
return None | ||
|
||
|
||
def get_rosbag_options(path, storage_id, serialization_format='cdr'): | ||
storage_options = rosbag2_py.StorageOptions( | ||
uri=path, storage_id=storage_id) | ||
|
||
converter_options = rosbag2_py.ConverterOptions( | ||
input_serialization_format=serialization_format, | ||
output_serialization_format=serialization_format) | ||
|
||
return storage_options, converter_options | ||
|
||
|
||
def get_tf_tree(reader: rosbag2_py.SequentialReader) -> tf2_ros.Buffer: | ||
storage_filter = rosbag2_py.StorageFilter(topics=["/tf", "/tf_static"]) | ||
reader.set_filter(storage_filter) | ||
topic_types = reader.get_all_topics_and_types() | ||
type_map = { | ||
topic_types[i].name: topic_types[i].type for i in range(len(topic_types))} | ||
tf_buffer = tf2_ros.Buffer() | ||
while reader.has_next(): | ||
(topic, data, t) = reader.read_next() | ||
msg_type = get_message(type_map[topic]) | ||
msg: TFMessage = deserialize_message(data, msg_type) | ||
for tf in msg.transforms: | ||
if topic == "/tf_static": | ||
tf_buffer.set_transform_static(transform=tf, authority="test") | ||
else: | ||
tf_buffer.set_transform(transform=tf, authority="test") | ||
return tf_buffer | ||
|
||
|
||
def get_matrix(msg) -> np.ndarray: | ||
mat = np.identity(4, dtype=float) | ||
rotation = Rotation.from_quat([ | ||
msg.transform.rotation.x, | ||
msg.transform.rotation.y, | ||
msg.transform.rotation.z, | ||
msg.transform.rotation.w, | ||
]) | ||
mat[0:3, 0:3] = rotation.as_matrix() | ||
mat[0, 3] = msg.transform.translation.x | ||
mat[1, 3] = msg.transform.translation.y | ||
mat[2, 3] = msg.transform.translation.z | ||
return mat | ||
|
||
|
||
def main(): | ||
# TODO(@someone): program argument everything that makes sense. | ||
BAG_FILE = '/home/ramiro/ws/src/andino_gz/bag/rosbag2_2024_08_01-12_53_07' | ||
SCAN_TOPIC = '/scan/points' | ||
STORAGE_ID = 'sqlite3' | ||
storage_options, converter_options = get_rosbag_options( | ||
BAG_FILE, STORAGE_ID) | ||
reader = rosbag2_py.SequentialReader() | ||
reader.open(storage_options, converter_options) | ||
tf_tree = get_tf_tree(reader=reader) | ||
reader = rosbag2_py.SequentialReader() | ||
reader.open(storage_options, converter_options) | ||
storage_filter = rosbag2_py.StorageFilter(topics=[SCAN_TOPIC]) | ||
reader.set_filter(storage_filter) | ||
topic_types = reader.get_all_topics_and_types() | ||
type_map = { | ||
topic_types[i].name: topic_types[i].type for i in range(len(topic_types))} | ||
|
||
map_points = o3d.geometry.PointCloud() | ||
i=0 | ||
while reader.has_next(): | ||
(topic, data, t) = reader.read_next() | ||
msg_type = get_message(type_map[topic]) | ||
msg: PointCloud2 = deserialize_message(data, msg_type) | ||
points = nc.structured_to_unstructured(pc2.read_points( | ||
msg, field_names=("x", "y", "z"), skip_nans=True)).T | ||
time = rclpy.time.Time.from_msg(msg.header.stamp) | ||
try: | ||
tf = tf_tree.lookup_transform("odom", "rplidar_laser_link", time) | ||
matrix = get_matrix(tf) | ||
pcd = o3d.geometry.PointCloud() | ||
pcd.points = o3d.utility.Vector3dVector(points.T) | ||
pcd.remove_non_finite_points() | ||
pcd.transform(matrix) | ||
pcd = pcd.voxel_down_sample(0.025) | ||
map_points += pcd | ||
except Exception as e: | ||
print(e) | ||
pass | ||
o3d.io.write_point_cloud("./data.ply", map_points.voxel_down_sample(0.025)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters