Created
January 17, 2026 20:51
-
-
Save alexlib/4cafda881264c95822adfa39d7454db2 to your computer and use it in GitHub Desktop.
Use OpenCV to detect a large sphere and multiple flow tracers around in a thin slice around a sphere. Part of the Turbulence Structure Laboratory project on settling of spheres through stratified interfaces. An OpenPTV competitor :) uses marimo (read about it, no need to thank me, thank them)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import marimo | |
| __generated_with = "0.19.4" | |
| app = marimo.App(width="medium", auto_download=["ipynb"]) | |
| @app.cell | |
| def _(): | |
| import marimo as mo | |
| import cv2 | |
| import numpy as np | |
| import pandas as pd | |
| import trackpy as tp | |
| from pathlib import Path | |
| import matplotlib.pyplot as plt | |
| from skimage.feature import peak_local_max | |
| import os | |
| return Path, cv2, np, pd, peak_local_max, plt, tp | |
| @app.cell | |
| def parameters(): | |
| year = "2024" | |
| month = "01" | |
| day = "08" | |
| run = 19 | |
| image_ext = "*.tif" | |
| return day, image_ext, month, run, year | |
| @app.cell | |
| def _(Path, day, image_ext, month, run, year): | |
| image_path = Path( | |
| f"/media/user/Elements/Chen/experiment {year} {month} {day}/C001H001S{run:04d}/" | |
| ) | |
| image_files = sorted(image_path.rglob(image_ext)) | |
| print(f"Found {len(image_files)} files") | |
| detections_file = f"{year}_{month}_{day}_{run}_tracers.csv" | |
| sphere_file = f"{year}_{month}_{day}_{run}_sphere.csv" | |
| trajectories_file = f"{year}_{month}_{day}_{run}_trajectories.csv" | |
| return detections_file, image_files, sphere_file, trajectories_file | |
| @app.cell | |
| def _(cv2, np, peak_local_max): | |
| def detect_particles_in_frame(image_path, frame_number): | |
| """ | |
| Detects sphere and tracers in a specific frame. | |
| Returns a list of dictionaries containing particle coordinates within the ROI. | |
| """ | |
| img_gray = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE) | |
| if img_gray is None: | |
| return [] | |
| height, width = img_gray.shape | |
| # --- 1. Detect Sphere --- | |
| blurred = cv2.GaussianBlur(img_gray, (9, 9), 2) | |
| _, sphere_thresh = cv2.threshold(blurred, 200, 255, cv2.THRESH_BINARY) | |
| contours, _ = cv2.findContours( | |
| sphere_thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE | |
| ) | |
| sphere_center = None | |
| sphere_radius = 0 | |
| if contours: | |
| largest_contour = max(contours, key=cv2.contourArea) | |
| M = cv2.moments(largest_contour) | |
| if M["m00"] != 0: | |
| cX = int(M["m10"] / M["m00"]) | |
| cY = int(M["m01"] / M["m00"]) | |
| sphere_center = (cX, cY) | |
| _, radius = cv2.minEnclosingCircle(largest_contour) | |
| sphere_radius = int(radius) | |
| if sphere_center is None: | |
| return [] | |
| # --- 2. Define ROI (5 Diameters Width) --- | |
| sphere_diameter = sphere_radius * 2 | |
| offset_pixels = int( | |
| 2.5 * sphere_diameter | |
| ) # 2.5 radii each side = 5D total width | |
| x_start = max(0, sphere_center[0] - offset_pixels) | |
| x_end = min(width, sphere_center[0] + offset_pixels) | |
| # --- 3. Masking --- | |
| mask = np.zeros_like(img_gray) | |
| mask[:, x_start:x_end] = 255 # Enable strip | |
| cv2.circle(mask, sphere_center, sphere_radius + 5, 0, -1) # Disable sphere | |
| img_roi_masked = cv2.bitwise_and(img_gray, img_gray, mask=mask) | |
| # --- 4. Detect Tracers --- | |
| # We use skimage peak_local_max | |
| # Note: peak_local_max returns (row, col) which corresponds to (y, x) | |
| coordinates = peak_local_max( | |
| img_roi_masked, min_distance=3, threshold_abs=20 | |
| ) | |
| # Format data for Pandas/Trackpy | |
| # Trackpy expects columns: 'y', 'x', 'frame' | |
| detections = [] | |
| for y, x in coordinates: | |
| detections.append( | |
| { | |
| "y": y, | |
| "x": x, | |
| "frame": frame_number, | |
| "brightness": img_gray[y, x], # Optional: store brightness | |
| } | |
| ) | |
| return detections, sphere_center | |
| return (detect_particles_in_frame,) | |
| @app.cell | |
| def _(detect_particles_in_frame, pd, tp): | |
| def run_detecting_pipeline(image_file_list): | |
| """ | |
| 1. Loops through images to detect particles. | |
| 2. Creates a DataFrame. | |
| 3. Links particles into trajectories using Trackpy. | |
| """ | |
| print(f"Detecting features in {len(image_file_list)} frames...") | |
| all_particles, all_spheres = [], [] | |
| # --- Step A: Feature Detection Loop --- | |
| for i, img_path in enumerate(image_file_list): | |
| particles, sphere_center = detect_particles_in_frame( | |
| img_path, frame_number=i | |
| ) | |
| all_particles.extend(particles) | |
| all_spheres.append(sphere_center) | |
| if i % 10 == 0: | |
| print(f"Processed frame {i}/{len(image_file_list)}") | |
| if not all_particles: | |
| print("No particles detected.") | |
| return pd.DataFrame() | |
| # Convert to DataFrame | |
| df = pd.DataFrame(all_particles) | |
| print(f"Total particles detected across all frames: {len(df)}") | |
| df_sphere = pd.DataFrame(all_spheres) | |
| return df, df_sphere | |
| def tracking_pipeline(df, search_range=5, memory=3): | |
| # --- Step B: Linking (The Tracking Algorithm) --- | |
| # search_range: Max pixels a particle travels between frames. | |
| # memory: How many frames a particle can 'vanish' and still be remembered. | |
| print("Linking trajectories...") | |
| t = tp.link(df, search_range=search_range, memory=memory) | |
| # --- Step C: Filtering --- | |
| # Filter out stubs (trajectories that last only a few frames) | |
| # e.g., keep tracks that exist for at least 5 frames | |
| t_filtered = tp.filter_stubs(t, threshold=5) | |
| particle_count = t_filtered["particle"].nunique() | |
| print( | |
| f"Tracking complete. Found {particle_count} unique Lagrangian trajectories." | |
| ) | |
| return t_filtered | |
| return run_detecting_pipeline, tracking_pipeline | |
| @app.cell | |
| def _(cv2, plt, tp): | |
| def visualize_trajectories(df, background_image_path): | |
| """ | |
| Plots the trajectories over the first frame of the video. | |
| """ | |
| img = cv2.imread(background_image_path) | |
| img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) | |
| plt.figure(figsize=(12, 12)) | |
| plt.imshow(img) | |
| # Tp.plot_traj plots the tracks on the current axes | |
| # label=True adds the ID, typically we turn it off for dense fields | |
| tp.plot_traj(df, colorby="particle", alpha=0.5, lw=1) | |
| plt.title("Lagrangian Trajectories (Filtered)") | |
| plt.show() | |
| return | |
| @app.cell | |
| def _( | |
| Path, | |
| detections_file, | |
| image_files, | |
| pd, | |
| run_detecting_pipeline, | |
| sphere_file, | |
| ): | |
| # 2. Run Pipeline | |
| # search_range=20: assumes a particle moves max 20px per frame. Adjust based on your flow speed. | |
| if not Path(detections_file).exists(): | |
| detections, sphere = run_detecting_pipeline(image_files) | |
| detections.to_csv(detections_file) | |
| sphere.to_csv(sphere_file) | |
| else: | |
| detections = pd.read_csv(detections_file) | |
| sphere = pd.read_csv(sphere_file) | |
| return (detections,) | |
| @app.cell | |
| def _(detections, tracking_pipeline): | |
| trajectories = tracking_pipeline(detections, search_range=3, memory=3) | |
| return (trajectories,) | |
| @app.cell | |
| def _(trajectories, trajectories_file): | |
| # 3. View Data | |
| if not trajectories.empty: | |
| print(trajectories.head()) | |
| # 4. Save to CSV | |
| trajectories.to_csv(trajectories_file, index=False) | |
| print(f"Trajectories saved to {trajectories_file}") | |
| return | |
| @app.cell | |
| def _(): | |
| # 5. Visualize | |
| # visualize_trajectories(trajectories, image_files[0]) | |
| return | |
| if __name__ == "__main__": | |
| app.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment