Skip to content

Instantly share code, notes, and snippets.

@szobov
Created February 25, 2026 12:04
Show Gist options
  • Select an option

  • Save szobov/6c2c6972041b0fe82affc4e48e58f203 to your computer and use it in GitHub Desktop.

Select an option

Save szobov/6c2c6972041b0fe82affc4e48e58f203 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import PointCloud2, PointField
from std_msgs.msg import Header
import sensor_msgs_py.point_cloud2 as pc2
import open3d as o3d
import numpy as np
import sys
import os
import time
class PlyPointCloudPublisher(Node):
def __init__(self, ply_path):
super().__init__('ply_pointcloud_publisher_once')
self.publisher_ = self.create_publisher(
PointCloud2,
'/pointcloud',
10
)
if not os.path.exists(ply_path):
self.get_logger().error(f"PLY file not found: {ply_path}")
sys.exit(1)
self.get_logger().info(f"Loading PLY file: {ply_path}")
self.pcd = o3d.io.read_point_cloud(ply_path)
if len(self.pcd.points) == 0:
self.get_logger().error("Loaded point cloud is empty.")
sys.exit(1)
# Convert to numpy and scale mm meters
self.points = np.asarray(self.pcd.points).astype(np.float32) / 1000.0
if self.pcd.has_colors():
self.colors = np.asarray(self.pcd.colors)
else:
self.colors = None
self.get_logger().info("PLY loaded and scaled to meters.")
def publish_once(self):
header = Header()
header.stamp = self.get_clock().now().to_msg()
header.frame_id = "base_link"
if self.colors is not None:
cloud_data = []
for i in range(len(self.points)):
x, y, z = self.points[i]
r, g, b = (self.colors[i] * 255).astype(np.uint8)
rgb = (r << 16) | (g << 8) | b
cloud_data.append([x, y, z, rgb])
fields = [
PointField(name='x', offset=0, datatype=PointField.FLOAT32, count=1),
PointField(name='y', offset=4, datatype=PointField.FLOAT32, count=1),
PointField(name='z', offset=8, datatype=PointField.FLOAT32, count=1),
PointField(name='rgb', offset=12, datatype=PointField.UINT32, count=1),
]
msg = pc2.create_cloud(header, fields, cloud_data)
else:
msg = pc2.create_cloud_xyz32(header, self.points)
# Allow DDS discovery
time.sleep(0.5)
self.publisher_.publish(msg)
self.get_logger().info("Point cloud published once (scaled to meters).")
def main(args=None):
rclpy.init(args=args)
if len(sys.argv) < 2:
print("Usage: ros2 run <your_package> ply_pointcloud_publisher_once.py <path_to_ply>")
sys.exit(1)
ply_path = sys.argv[1]
node = PlyPointCloudPublisher(ply_path)
rclpy.spin_once(node, timeout_sec=0.5)
node.publish_once()
rclpy.spin_once(node, timeout_sec=0.5)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment