Skip to content

Instantly share code, notes, and snippets.

@alexlib
Created February 17, 2026 21:40
Show Gist options
  • Select an option

  • Save alexlib/d4f1bddc9f0430fdb52834dd559302e9 to your computer and use it in GitHub Desktop.

Select an option

Save alexlib/d4f1bddc9f0430fdb52834dd559302e9 to your computer and use it in GitHub Desktop.
smoke image velocimetry - marimo notebook with Python code, not tested
# /// script
# requires-python = ">=3.14"
# dependencies = [
# "marimo>=0.19.9",
# "matplotlib==3.10.8",
# "numpy==2.4.2",
# "pydantic-ai-slim==1.57.0",
# "scipy==1.17.0",
# ]
# ///
import marimo
__generated_with = "0.19.9"
app = marimo.App(width="medium")
@app.cell
def _(mo):
mo.md("""
# Smoke Image Velocimetry (SIV) Interactive Demo
This notebook implements the **Smoke Image Velocimetry** technique as described by **Mikheev & Dushin (2016)** in *"A Method for Measuring the Dynamics of Velocity Vector Fields in a Turbulent Flow Using Smoke Image-Visualization Videos"*.
### Method Overview
Unlike Particle Image Velocimetry (PIV) which tracks distinct particles, SIV is optimized for continuous scalar fields (smoke) where individual particles are indistinguishable.
**Key Algorithm Steps:**
1. **Preprocessing:** Application of a median filter (typically $3 \times 3$) to remove high-frequency sensor noise.
2. **Criteria Selection:** Thresholding based on the standard deviation of brightness ($\sigma_I$) in the reference window to ignore texture-less regions.
3. **Displacement Estimation:** Minimization of the **Sum of Absolute Differences (SAD)** functional, $\Phi$.
4. **Sub-pixel Refinement:** The paper notes that while $\Phi$ is conical near the minimum, $\Phi^2$ (SAD squared) is parabolic. We fit a quadratic surface to $\Phi^2$ to find the sub-pixel peak.
""")
return
@app.cell
def _():
import numpy as np
from scipy.ndimage import map_coordinates, gaussian_filter, median_filter
import matplotlib.pyplot as plt
import marimo as mo
return gaussian_filter, map_coordinates, median_filter, mo, np, plt
@app.cell
def _(gaussian_filter, map_coordinates, median_filter, np):
# --- Core SIV Algorithm ---
def preprocess_images(images, background=None, median_kernel=3):
"""
Applies preprocessing: Median filtering and optional background subtraction.
"""
processed = []
for img in images:
# 1. Median Filter (Mikheev & Dushin 2016 recommend 3x3)
if median_kernel > 0:
img_filt = median_filter(img, size=median_kernel)
else:
img_filt = img.copy()
# 2. Background Subtraction
if background is not None:
img_filt = img_filt.astype(np.float32) - background.astype(
np.float32
)
processed.append(img_filt)
return np.array(processed)
def refine_subpixel_quadratic(surface_3x3):
"""
Fits a generic 2D quadratic surface to a 3x3 grid and finds the minimum.
Used for Phi^2 (SAD squared).
Surface: F(x,y) = Ax^2 + By^2 + Cxy + Dx + Ey + F
"""
values = surface_3x3.flatten()
# Precomputed Pseudo-Inverse for 3x3 grid to find coefficients [A, B, C, D, E, F]
M_pinv = np.array(
[
[
0.16666667,
-0.33333333,
0.16666667,
0.16666667,
-0.33333333,
0.16666667,
0.16666667,
-0.33333333,
0.16666667,
], # A
[
0.16666667,
0.16666667,
0.16666667,
-0.33333333,
-0.33333333,
-0.33333333,
0.16666667,
0.16666667,
0.16666667,
], # B
[0.25, -0.0, -0.25, 0.0, -0.0, -0.0, -0.25, 0.0, 0.25], # C
[
-0.16666667,
0.0,
0.16666667,
-0.16666667,
0.0,
0.16666667,
-0.16666667,
0.0,
0.16666667,
], # D
[
-0.16666667,
-0.16666667,
-0.16666667,
0.0,
0.0,
-0.0,
0.16666667,
0.16666667,
0.16666667,
], # E
[
-0.11111111,
0.22222222,
-0.11111111,
0.22222222,
0.55555556,
0.22222222,
-0.11111111,
0.22222222,
-0.11111111,
], # F
]
)
coeffs = M_pinv @ values
A, B, C, D, E, F = coeffs
# Minimum condition: Grad(F) = 0 -> System of linear equations
det = 4 * A * B - C * C
if abs(det) < 1e-9:
return 0.0, 0.0
sub_x = (-D * (2 * B) - (-E) * C) / det
sub_y = ((2 * A) * (-E) - C * (-D)) / det
# If the peak is predicted far outside the 3x3 grid, the local fit is invalid.
if abs(sub_x) > 1.0 or abs(sub_y) > 1.0:
return 0.0, 0.0
return sub_x, sub_y
def compute_siv_field(
I1, I2, window_size=32, search_radius=16, grid_step=16, min_contrast=0.01
):
"""
Computes the displacement field using SAD minimization and Phi^2 sub-pixel refinement.
"""
H, W = I1.shape
# Define grid
y_range = range(window_size // 2, H - window_size // 2, grid_step)
x_range = range(window_size // 2, W - window_size // 2, grid_step)
shape = (len(y_range), len(x_range))
u_field = np.zeros(shape)
v_field = np.zeros(shape)
x_grid = np.zeros(shape)
y_grid = np.zeros(shape)
mask = np.ones(shape, dtype=bool)
hw = window_size // 2
for i, cy in enumerate(y_range):
for j, cx in enumerate(x_range):
x_grid[i, j] = cx
y_grid[i, j] = cy
# 1. Extract Reference
template = I1[cy - hw : cy + hw, cx - hw : cx + hw]
# 2. Check Contrast (Sigma_I)
sigma_I = np.std(template)
if sigma_I < min_contrast:
mask[i, j] = False
continue
# 3. Search for Minimum SAD
best_sad = np.inf
best_dx, best_dy = 0, 0
# Search Bounds
min_dx = max(-search_radius, -cx + hw)
max_dx = min(search_radius, W - cx - hw)
min_dy = max(-search_radius, -cy + hw)
max_dy = min(search_radius, H - cy - hw)
# Integer Search
# Note: In a production C++ env, this is FFT-accelerated.
# Here we use a direct loop for clarity and lack of FFT padding complexity.
for dy in range(min_dy, max_dy + 1):
for dx in range(min_dx, max_dx + 1):
candidate = I2[
cy + dy - hw : cy + dy + hw,
cx + dx - hw : cx + dx + hw,
]
sad = np.sum(np.abs(template - candidate))
if sad < best_sad:
best_sad = sad
best_dx = dx
best_dy = dy
# 4. Sub-pixel Refinement (Quadratic on SAD^2)
try:
sad_sq_matrix = np.zeros((3, 3))
valid_subpixel = True
for sii, s_dy in enumerate([-1, 0, 1]):
for sjj, s_dx in enumerate([-1, 0, 1]):
check_dx = best_dx + s_dx
check_dy = best_dy + s_dy
if not (
min_dx <= check_dx <= max_dx
and min_dy <= check_dy <= max_dy
):
valid_subpixel = False
break
neigh = I2[
cy + check_dy - hw : cy + check_dy + hw,
cx + check_dx - hw : cx + check_dx + hw,
]
val = np.sum(np.abs(template - neigh))
sad_sq_matrix[sii, sjj] = val**2
if not valid_subpixel:
break
if valid_subpixel:
sub_dx, sub_dy = _refine_subpixel_quadratic(sad_sq_matrix)
u_field[i, j] = best_dx + sub_dx
v_field[i, j] = best_dy + sub_dy
else:
u_field[i, j] = best_dx
v_field[i, j] = best_dy
except Exception:
u_field[i, j] = best_dx
v_field[i, j] = best_dy
return x_grid, y_grid, u_field, v_field, mask
# --- Synthetic Data Generation ---
def generate_vortex_frames(shape=(256, 256), max_displacement=5.0):
"""Generates synthetic smoke frames with a Rankine vortex."""
np.random.seed(42)
noise = np.random.rand(*shape)
smoke = gaussian_filter(noise, sigma=8)
smoke = (smoke - smoke.min()) / (smoke.max() - smoke.min())
y, x = np.mgrid[0 : shape[0], 0 : shape[1]]
cy, cx = shape[0] // 2, shape[1] // 2
dy = y - cy
dx = x - cx
r = np.sqrt(dy**2 + dx**2) + 1e-5
core_radius = 40.0
v_theta = r / (r**2 + core_radius**2)
v_theta = v_theta / v_theta.max() * max_displacement
u_true = -v_theta * (dy / r) * r
v_true = v_theta * (dx / r) * r
# Apply Warp
coords = np.indices(shape).astype(np.float32)
coords[0] -= v_true
coords[1] -= u_true
smoke2 = map_coordinates(smoke, coords, order=1, mode="nearest")
# Add sensor noise
smoke2 += np.random.normal(0, 0.005, smoke2.shape)
return smoke, smoke2, u_true, v_true
return (
compute_siv_field,
generate_vortex_frames,
preprocess_images,
refine_subpixel_quadratic,
)
@app.cell
def _(mo):
# --- UI Controls ---
window_size = mo.ui.slider(16, 64, step=4, value=32, label="Window Size (px)")
max_disp = mo.ui.slider(
1.0, 15.0, step=0.5, value=6.0, label="Max Velocity (px/frame)"
)
contrast = mo.ui.slider(
0.0, 0.05, step=0.001, value=0.005, label="Min Contrast Threshold"
)
run_btn = mo.ui.button(label="Run SIV Analysis", kind="success")
mo.md(f"""
### 1. Configuration
Adjust the parameters below and click Run.
| Parameter | Control | Description |
| :--- | :--- | :--- |
| **Interrogation Window** | {window_size} | Size of the patch to track ($N_x \\times N_y$). Larger = robust but lower spatial res. |
| **Max Displacement** | {max_disp} | Simulated max velocity. The search radius adapts to $1.5 \\times$ this value. |
| **Contrast Threshold** | {contrast} | Minimum standard deviation ($\\sigma_I$) required to process a window. |
{run_btn}
""")
return contrast, max_disp, run_btn, window_size
@app.cell
def _(
compute_siv_field,
contrast,
generate_vortex_frames,
max_disp,
np,
plt,
preprocess_images,
run_btn,
window_size,
):
# --- Execution Cell ---
run_btn.value # React to button click
# 1. Generate Synthetic Data
raw_I1, raw_I2, u_true, v_true = generate_vortex_frames(
shape=(300, 300), max_displacement=max_disp.value
)
# 2. Preprocess (Paper Step 1)
imgs = preprocess_images([raw_I1, raw_I2], median_kernel=3)
I1, I2 = imgs[0], imgs[1]
# 3. SIV Calculation
w_size = window_size.value
search_r = int(max_disp.value * 1.5) + 2
step = w_size // 2 # 50% overlap is standard
x_grid, y_grid, u_est, v_est, mask = compute_siv_field(
I1,
I2,
window_size=w_size,
search_radius=search_r,
grid_step=step,
min_contrast=contrast.value,
)
# 4. Prepare Plots
u_plot = np.where(mask, u_est, np.nan)
v_plot = np.where(mask, v_est, np.nan)
fig, axes = plt.subplots(1, 3, figsize=(15, 5))
# Plot A: Input Frame
axes[0].imshow(I1, cmap="gray", origin="upper")
axes[0].set_title("Input Smoke (Frame 1)")
axes[0].axis("off")
# Plot B: Ground Truth
skip = 20
axes[1].imshow(I2, cmap="gray", alpha=0.5, origin="upper")
y_sub, x_sub = np.mgrid[0 : I1.shape[0] : skip, 0 : I1.shape[1] : skip]
u_sub = u_true[::skip, ::skip]
v_sub = v_true[::skip, ::skip]
axes[1].quiver(
x_sub, y_sub, u_sub, -v_sub, color="lime", scale=50, scale_units="inches"
)
axes[1].set_title("Ground Truth Flow")
axes[1].axis("off")
# Plot C: SIV Result
axes[2].imshow(I2, cmap="gray", alpha=0.5, origin="upper")
axes[2].quiver(
x_grid,
y_grid,
u_plot,
-v_plot,
color="red",
scale=50,
scale_units="inches",
width=0.005,
)
# Plot rejected points
invalid_x = x_grid[~mask]
invalid_y = y_grid[~mask]
if len(invalid_x) > 0:
axes[2].scatter(
invalid_x,
invalid_y,
c="yellow",
s=10,
marker="x",
label="Low Contrast",
alpha=0.6,
)
axes[2].legend(loc="upper right")
axes[2].set_title("SIV Estimated Flow")
axes[2].axis("off")
plt.tight_layout()
chart = plt.gcf()
return I1, chart, mask, u_est, u_true, v_est, v_true, x_grid, y_grid
@app.cell
def _(chart, mo):
mo.mpl.interactive(chart)
return
@app.cell
def _(I1, mask, mo, np, u_est, u_true, v_est, v_true, x_grid, y_grid):
# --- Statistics Cell ---
# Collect valid vectors
valid_u, valid_v = [], []
true_u_list, true_v_list = [], []
H, W = I1.shape
for i in range(x_grid.shape[0]):
for j in range(x_grid.shape[1]):
if not mask[i, j]:
continue
px, py = int(x_grid[i, j]), int(y_grid[i, j])
# Boundary check
if 0 <= px < W and 0 <= py < H:
# To compare fairly, we grab the True Velocity at the exact center of the window
t_u = u_true[py, px]
t_v = v_true[py, px]
# Exclude edges where synthetic data might have boundary effects
if 20 < px < W - 20 and 20 < py < H - 20:
valid_u.append(u_est[i, j])
valid_v.append(v_est[i, j])
true_u_list.append(t_u)
true_v_list.append(t_v)
if len(valid_u) > 0:
valid_u = np.array(valid_u)
valid_v = np.array(valid_v)
true_u_list = np.array(true_u_list)
true_v_list = np.array(true_v_list)
# Calculate Error
diff_u = valid_u - true_u_list
diff_v = valid_v - true_v_list
rmse = np.sqrt(np.mean(diff_u**2 + diff_v**2))
# vector magnitude statistics
mag_true = np.sqrt(true_u_list**2 + true_v_list**2)
relative_err = (
np.mean(np.sqrt(diff_u**2 + diff_v**2) / (mag_true + 1e-6)) * 100
)
else:
rmse = 0.0
relative_err = 0.0
mo.md(f"""
### 2. Accuracy Analysis
Comparing SIV estimation against the synthetic Ground Truth in the center of the domain:
* **RMSE (Root Mean Square Error):** {rmse:.4f} pixels
* **Relative Error:** {relative_err:.2f}%
* **Vectors Processed:** {len(valid_u)}
*Note: RMSE below 0.5 pixels indicates successful sub-pixel refinement.*
""")
return
@app.cell
def _(compute_siv_field, mo, np, refine_subpixel_quadratic):
def _():
# --- Unit Test Cell ---
# This cell runs automatically to verify the math logic matches the paper
results = []
# Test 1: Quadratic Surface Refinement
# Construct a perfect paraboloid: z = (x - 0.2)^2 + (y + 0.3)^2
# Min is at (0.2, -0.3)
grid = np.zeros((3, 3))
for i, y in enumerate([-1, 0, 1]):
for j, x in enumerate([-1, 0, 1]):
grid[i, j] = (x - 0.2) ** 2 + (y + 0.3) ** 2
dx, dy = refine_subpixel_quadratic(grid)
test1 = (abs(dx - 0.2) < 1e-4) and (abs(dy - (-0.3)) < 1e-4)
results.append(
f"Sub-pixel Refinement Math: {'✅ PASS' if test1 else '❌ FAIL'}"
)
# Test 2: Contrast Thresholding
I_flat = np.ones((50, 50)) * 0.5
I_noise = np.ones((50, 50)) * 0.5
I_noise[20:30, 20:30] = np.random.rand(10, 10) # High contrast spot
_, _, _, _, mask = compute_siv_field(
I_noise, I_noise, window_size=16, min_contrast=0.1
)
# Center should be True, Edges False
center_val = mask[mask.shape[0] // 2, mask.shape[1] // 2]
corner_val = mask[0, 0]
test2 = center_val and not corner_val
results.append(
f"Contrast Thresholding: {'✅ PASS' if test2 else '❌ FAIL'}"
)
return mo.md(
"### 3. System Health Checks\n"
+ "\n".join([f"- {r}" for r in results])
)
_()
return
if __name__ == "__main__":
app.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment