Skip to content

Instantly share code, notes, and snippets.

@chunibyo-wly
Last active November 14, 2025 05:46
Show Gist options
  • Select an option

  • Save chunibyo-wly/ff61644da045728844512229b23bee29 to your computer and use it in GitHub Desktop.

Select an option

Save chunibyo-wly/ff61644da045728844512229b23bee29 to your computer and use it in GitHub Desktop.
Massive point cloud, parallel load and downsample
import argparse
import concurrent.futures
import gc
import numpy as np
import open3d as o3d
from scipy.signal import find_peaks
from tqdm import tqdm
import laspy
def cli():
parser = argparse.ArgumentParser(
description="Point cloud processing pipeline: orientation estimation and downsampling"
)
parser.add_argument(
"input",
help="Input point cloud file path (.laspy format)",
)
parser.add_argument(
"output",
help="Output point cloud file path (.ply format)",
)
parser.add_argument(
"--vox-size-orientation",
type=float,
default=0.05,
help="Voxel size for orientation estimation in meters (default: 0.05)",
)
parser.add_argument(
"--vox-size-output", type=float, default=0.025, help="Voxel size for final output in meters (default: 0.025)"
)
parser.add_argument(
"--param-json",
default="./param.json",
help="Path to save processing parameters JSON file (default: param.json)",
)
args = parser.parse_args()
return args
def voxel_downsample(xyz, rgb, voxel_size, voxel_size_result, h_threshold=0.95):
pc = o3d.geometry.PointCloud()
pc.points = o3d.utility.Vector3dVector(xyz)
pc.colors = o3d.utility.Vector3dVector(rgb / 65535.0)
del xyz, rgb
a, b = pc.voxel_down_sample(voxel_size), pc.voxel_down_sample(voxel_size_result)
a.estimate_normals(search_param=o3d.geometry.KDTreeSearchParamHybrid(radius=0.50, max_nn=50))
del pc
gc.collect()
normals = np.asarray(a.normals)
mask_nz1 = normals[:, 2] < (1 - h_threshold**2) ** 0.5
mask_nz2 = normals[:, 2] > -((1 - h_threshold**2) ** 0.5)
ind = np.where(np.logical_and(mask_nz1, mask_nz2))[0]
a = a.select_by_index(ind)
a.estimate_normals(search_param=o3d.geometry.KDTreeSearchParamHybrid(radius=0.50, max_nn=50))
# to degree, 0 ~ 90, 0.1 degree bin
a_degree = np.array(a.normals)
a_degree = np.degrees(np.arctan2(a_degree[:, 1], a_degree[:, 0]))
a_degree[a_degree < 0] += 180.0
a_degree[a_degree >= 90.0] -= 90.0
a_degree = np.round(a_degree * 10, decimals=1).astype(np.uint32)
b_xyz = (np.array(b.points) * 1000.0).astype(np.int32)
b_rgb = (np.array(b.colors) * 255.0).astype(np.uint8)
del a, b
gc.collect()
return a_degree, b_xyz, b_rgb
def process_point_cloud(
src_file,
output_file,
vox_size_orientation=0.05,
vox_size_output=0.025,
):
"""
Process a point cloud file: estimate orientation, downsample, and save results.
Args:
src_file (str): Input point cloud file path
output_file (str): Output point cloud file path
vox_size_orientation (float): Voxel size for orientation estimation (default: 0.05)
vox_size_output (float): Voxel size for final output (default: 0.025)
"""
degrees = []
o3d_result = o3d.geometry.PointCloud()
results = []
with laspy.open(src_file) as file:
futures_collect = []
pbar = tqdm(total=file.header.point_count)
with concurrent.futures.ProcessPoolExecutor() as executor:
for points in file.chunk_iterator(10**7):
xyz = np.array([points.x, points.y, points.z]).T
rgb = np.array([points.red, points.green, points.blue]).T
futures_collect.append(
executor.submit(voxel_downsample, xyz, rgb, vox_size_orientation, vox_size_output)
)
del xyz, rgb
gc.collect()
if len(futures_collect) >= 8:
for fut in concurrent.futures.as_completed(futures_collect):
results.append(fut.result())
futures_collect.clear()
gc.collect()
pbar.update(len(points))
pbar.close()
for fut in concurrent.futures.as_completed(futures_collect):
results.append(fut.result())
futures_collect.clear()
gc.collect()
for res in tqdm(results, desc="Merging results"):
a_degree, b_xyz, b_rgb = res
degrees.extend(a_degree.tolist())
temp_pc_result = o3d.geometry.PointCloud()
temp_pc_result.points = o3d.utility.Vector3dVector(b_xyz.astype(np.float32) / 1000.0)
temp_pc_result.colors = o3d.utility.Vector3dVector(b_rgb.astype(np.float32) / 255.0)
o3d_result += temp_pc_result
gc.collect()
hist, bin_edges = np.histogram(np.asarray(degrees).astype(np.float32) / 10, bins=900, range=(0, 90))
peaks, _ = find_peaks(hist, height=0)
opt_rot = bin_edges[peaks[np.argmax(hist[peaks])]]
o3d_result = o3d_result.translate(-o3d_result.get_center())
o3d_result = o3d_result.rotate(
R=o3d.geometry.get_rotation_matrix_from_xyz((0, 0, -np.radians(opt_rot))),
center=o3d_result.get_center(),
)
o3d.io.write_point_cloud(output_file, o3d_result, write_ascii=False)
def main():
args = cli()
try:
process_point_cloud(
src_file=args.input,
output_file=args.output,
vox_size_orientation=args.vox_size_orientation,
vox_size_output=args.vox_size_output,
)
print("Processing completed successfully!")
return 0
except Exception as e:
print(f"Error during processing: {str(e)}")
return 1
if __name__ == "__main__":
main()
@chunibyo-wly
Copy link
Author

image image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment