Skip to content

Instantly share code, notes, and snippets.

@alexlib
Created January 17, 2026 20:51
Show Gist options
  • Select an option

  • Save alexlib/4cafda881264c95822adfa39d7454db2 to your computer and use it in GitHub Desktop.

Select an option

Save alexlib/4cafda881264c95822adfa39d7454db2 to your computer and use it in GitHub Desktop.
Use OpenCV to detect a large sphere and multiple flow tracers around in a thin slice around a sphere. Part of the Turbulence Structure Laboratory project on settling of spheres through stratified interfaces. An OpenPTV competitor :) uses marimo (read about it, no need to thank me, thank them)
import marimo
__generated_with = "0.19.4"
app = marimo.App(width="medium", auto_download=["ipynb"])
@app.cell
def _():
import marimo as mo
import cv2
import numpy as np
import pandas as pd
import trackpy as tp
from pathlib import Path
import matplotlib.pyplot as plt
from skimage.feature import peak_local_max
import os
return Path, cv2, np, pd, peak_local_max, plt, tp
@app.cell
def parameters():
year = "2024"
month = "01"
day = "08"
run = 19
image_ext = "*.tif"
return day, image_ext, month, run, year
@app.cell
def _(Path, day, image_ext, month, run, year):
image_path = Path(
f"/media/user/Elements/Chen/experiment {year} {month} {day}/C001H001S{run:04d}/"
)
image_files = sorted(image_path.rglob(image_ext))
print(f"Found {len(image_files)} files")
detections_file = f"{year}_{month}_{day}_{run}_tracers.csv"
sphere_file = f"{year}_{month}_{day}_{run}_sphere.csv"
trajectories_file = f"{year}_{month}_{day}_{run}_trajectories.csv"
return detections_file, image_files, sphere_file, trajectories_file
@app.cell
def _(cv2, np, peak_local_max):
def detect_particles_in_frame(image_path, frame_number):
"""
Detects sphere and tracers in a specific frame.
Returns a list of dictionaries containing particle coordinates within the ROI.
"""
img_gray = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
if img_gray is None:
return []
height, width = img_gray.shape
# --- 1. Detect Sphere ---
blurred = cv2.GaussianBlur(img_gray, (9, 9), 2)
_, sphere_thresh = cv2.threshold(blurred, 200, 255, cv2.THRESH_BINARY)
contours, _ = cv2.findContours(
sphere_thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE
)
sphere_center = None
sphere_radius = 0
if contours:
largest_contour = max(contours, key=cv2.contourArea)
M = cv2.moments(largest_contour)
if M["m00"] != 0:
cX = int(M["m10"] / M["m00"])
cY = int(M["m01"] / M["m00"])
sphere_center = (cX, cY)
_, radius = cv2.minEnclosingCircle(largest_contour)
sphere_radius = int(radius)
if sphere_center is None:
return []
# --- 2. Define ROI (5 Diameters Width) ---
sphere_diameter = sphere_radius * 2
offset_pixels = int(
2.5 * sphere_diameter
) # 2.5 radii each side = 5D total width
x_start = max(0, sphere_center[0] - offset_pixels)
x_end = min(width, sphere_center[0] + offset_pixels)
# --- 3. Masking ---
mask = np.zeros_like(img_gray)
mask[:, x_start:x_end] = 255 # Enable strip
cv2.circle(mask, sphere_center, sphere_radius + 5, 0, -1) # Disable sphere
img_roi_masked = cv2.bitwise_and(img_gray, img_gray, mask=mask)
# --- 4. Detect Tracers ---
# We use skimage peak_local_max
# Note: peak_local_max returns (row, col) which corresponds to (y, x)
coordinates = peak_local_max(
img_roi_masked, min_distance=3, threshold_abs=20
)
# Format data for Pandas/Trackpy
# Trackpy expects columns: 'y', 'x', 'frame'
detections = []
for y, x in coordinates:
detections.append(
{
"y": y,
"x": x,
"frame": frame_number,
"brightness": img_gray[y, x], # Optional: store brightness
}
)
return detections, sphere_center
return (detect_particles_in_frame,)
@app.cell
def _(detect_particles_in_frame, pd, tp):
def run_detecting_pipeline(image_file_list):
"""
1. Loops through images to detect particles.
2. Creates a DataFrame.
3. Links particles into trajectories using Trackpy.
"""
print(f"Detecting features in {len(image_file_list)} frames...")
all_particles, all_spheres = [], []
# --- Step A: Feature Detection Loop ---
for i, img_path in enumerate(image_file_list):
particles, sphere_center = detect_particles_in_frame(
img_path, frame_number=i
)
all_particles.extend(particles)
all_spheres.append(sphere_center)
if i % 10 == 0:
print(f"Processed frame {i}/{len(image_file_list)}")
if not all_particles:
print("No particles detected.")
return pd.DataFrame()
# Convert to DataFrame
df = pd.DataFrame(all_particles)
print(f"Total particles detected across all frames: {len(df)}")
df_sphere = pd.DataFrame(all_spheres)
return df, df_sphere
def tracking_pipeline(df, search_range=5, memory=3):
# --- Step B: Linking (The Tracking Algorithm) ---
# search_range: Max pixels a particle travels between frames.
# memory: How many frames a particle can 'vanish' and still be remembered.
print("Linking trajectories...")
t = tp.link(df, search_range=search_range, memory=memory)
# --- Step C: Filtering ---
# Filter out stubs (trajectories that last only a few frames)
# e.g., keep tracks that exist for at least 5 frames
t_filtered = tp.filter_stubs(t, threshold=5)
particle_count = t_filtered["particle"].nunique()
print(
f"Tracking complete. Found {particle_count} unique Lagrangian trajectories."
)
return t_filtered
return run_detecting_pipeline, tracking_pipeline
@app.cell
def _(cv2, plt, tp):
def visualize_trajectories(df, background_image_path):
"""
Plots the trajectories over the first frame of the video.
"""
img = cv2.imread(background_image_path)
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
plt.figure(figsize=(12, 12))
plt.imshow(img)
# Tp.plot_traj plots the tracks on the current axes
# label=True adds the ID, typically we turn it off for dense fields
tp.plot_traj(df, colorby="particle", alpha=0.5, lw=1)
plt.title("Lagrangian Trajectories (Filtered)")
plt.show()
return
@app.cell
def _(
Path,
detections_file,
image_files,
pd,
run_detecting_pipeline,
sphere_file,
):
# 2. Run Pipeline
# search_range=20: assumes a particle moves max 20px per frame. Adjust based on your flow speed.
if not Path(detections_file).exists():
detections, sphere = run_detecting_pipeline(image_files)
detections.to_csv(detections_file)
sphere.to_csv(sphere_file)
else:
detections = pd.read_csv(detections_file)
sphere = pd.read_csv(sphere_file)
return (detections,)
@app.cell
def _(detections, tracking_pipeline):
trajectories = tracking_pipeline(detections, search_range=3, memory=3)
return (trajectories,)
@app.cell
def _(trajectories, trajectories_file):
# 3. View Data
if not trajectories.empty:
print(trajectories.head())
# 4. Save to CSV
trajectories.to_csv(trajectories_file, index=False)
print(f"Trajectories saved to {trajectories_file}")
return
@app.cell
def _():
# 5. Visualize
# visualize_trajectories(trajectories, image_files[0])
return
if __name__ == "__main__":
app.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment