Created
February 25, 2026 12:04
-
-
Save szobov/6c2c6972041b0fe82affc4e48e58f203 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import rclpy | |
| from rclpy.node import Node | |
| from sensor_msgs.msg import PointCloud2, PointField | |
| from std_msgs.msg import Header | |
| import sensor_msgs_py.point_cloud2 as pc2 | |
| import open3d as o3d | |
| import numpy as np | |
| import sys | |
| import os | |
| import time | |
| class PlyPointCloudPublisher(Node): | |
| def __init__(self, ply_path): | |
| super().__init__('ply_pointcloud_publisher_once') | |
| self.publisher_ = self.create_publisher( | |
| PointCloud2, | |
| '/pointcloud', | |
| 10 | |
| ) | |
| if not os.path.exists(ply_path): | |
| self.get_logger().error(f"PLY file not found: {ply_path}") | |
| sys.exit(1) | |
| self.get_logger().info(f"Loading PLY file: {ply_path}") | |
| self.pcd = o3d.io.read_point_cloud(ply_path) | |
| if len(self.pcd.points) == 0: | |
| self.get_logger().error("Loaded point cloud is empty.") | |
| sys.exit(1) | |
| # Convert to numpy and scale mm meters | |
| self.points = np.asarray(self.pcd.points).astype(np.float32) / 1000.0 | |
| if self.pcd.has_colors(): | |
| self.colors = np.asarray(self.pcd.colors) | |
| else: | |
| self.colors = None | |
| self.get_logger().info("PLY loaded and scaled to meters.") | |
| def publish_once(self): | |
| header = Header() | |
| header.stamp = self.get_clock().now().to_msg() | |
| header.frame_id = "base_link" | |
| if self.colors is not None: | |
| cloud_data = [] | |
| for i in range(len(self.points)): | |
| x, y, z = self.points[i] | |
| r, g, b = (self.colors[i] * 255).astype(np.uint8) | |
| rgb = (r << 16) | (g << 8) | b | |
| cloud_data.append([x, y, z, rgb]) | |
| fields = [ | |
| PointField(name='x', offset=0, datatype=PointField.FLOAT32, count=1), | |
| PointField(name='y', offset=4, datatype=PointField.FLOAT32, count=1), | |
| PointField(name='z', offset=8, datatype=PointField.FLOAT32, count=1), | |
| PointField(name='rgb', offset=12, datatype=PointField.UINT32, count=1), | |
| ] | |
| msg = pc2.create_cloud(header, fields, cloud_data) | |
| else: | |
| msg = pc2.create_cloud_xyz32(header, self.points) | |
| # Allow DDS discovery | |
| time.sleep(0.5) | |
| self.publisher_.publish(msg) | |
| self.get_logger().info("Point cloud published once (scaled to meters).") | |
| def main(args=None): | |
| rclpy.init(args=args) | |
| if len(sys.argv) < 2: | |
| print("Usage: ros2 run <your_package> ply_pointcloud_publisher_once.py <path_to_ply>") | |
| sys.exit(1) | |
| ply_path = sys.argv[1] | |
| node = PlyPointCloudPublisher(ply_path) | |
| rclpy.spin_once(node, timeout_sec=0.5) | |
| node.publish_once() | |
| rclpy.spin_once(node, timeout_sec=0.5) | |
| node.destroy_node() | |
| rclpy.shutdown() | |
| if __name__ == '__main__': | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment