Created
February 17, 2026 21:40
-
-
Save alexlib/d4f1bddc9f0430fdb52834dd559302e9 to your computer and use it in GitHub Desktop.
smoke image velocimetry - marimo notebook with Python code, not tested
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # /// script | |
| # requires-python = ">=3.14" | |
| # dependencies = [ | |
| # "marimo>=0.19.9", | |
| # "matplotlib==3.10.8", | |
| # "numpy==2.4.2", | |
| # "pydantic-ai-slim==1.57.0", | |
| # "scipy==1.17.0", | |
| # ] | |
| # /// | |
| import marimo | |
| __generated_with = "0.19.9" | |
| app = marimo.App(width="medium") | |
| @app.cell | |
| def _(mo): | |
| mo.md(""" | |
| # Smoke Image Velocimetry (SIV) Interactive Demo | |
| This notebook implements the **Smoke Image Velocimetry** technique as described by **Mikheev & Dushin (2016)** in *"A Method for Measuring the Dynamics of Velocity Vector Fields in a Turbulent Flow Using Smoke Image-Visualization Videos"*. | |
| ### Method Overview | |
| Unlike Particle Image Velocimetry (PIV) which tracks distinct particles, SIV is optimized for continuous scalar fields (smoke) where individual particles are indistinguishable. | |
| **Key Algorithm Steps:** | |
| 1. **Preprocessing:** Application of a median filter (typically $3 \times 3$) to remove high-frequency sensor noise. | |
| 2. **Criteria Selection:** Thresholding based on the standard deviation of brightness ($\sigma_I$) in the reference window to ignore texture-less regions. | |
| 3. **Displacement Estimation:** Minimization of the **Sum of Absolute Differences (SAD)** functional, $\Phi$. | |
| 4. **Sub-pixel Refinement:** The paper notes that while $\Phi$ is conical near the minimum, $\Phi^2$ (SAD squared) is parabolic. We fit a quadratic surface to $\Phi^2$ to find the sub-pixel peak. | |
| """) | |
| return | |
| @app.cell | |
| def _(): | |
| import numpy as np | |
| from scipy.ndimage import map_coordinates, gaussian_filter, median_filter | |
| import matplotlib.pyplot as plt | |
| import marimo as mo | |
| return gaussian_filter, map_coordinates, median_filter, mo, np, plt | |
| @app.cell | |
| def _(gaussian_filter, map_coordinates, median_filter, np): | |
| # --- Core SIV Algorithm --- | |
| def preprocess_images(images, background=None, median_kernel=3): | |
| """ | |
| Applies preprocessing: Median filtering and optional background subtraction. | |
| """ | |
| processed = [] | |
| for img in images: | |
| # 1. Median Filter (Mikheev & Dushin 2016 recommend 3x3) | |
| if median_kernel > 0: | |
| img_filt = median_filter(img, size=median_kernel) | |
| else: | |
| img_filt = img.copy() | |
| # 2. Background Subtraction | |
| if background is not None: | |
| img_filt = img_filt.astype(np.float32) - background.astype( | |
| np.float32 | |
| ) | |
| processed.append(img_filt) | |
| return np.array(processed) | |
| def refine_subpixel_quadratic(surface_3x3): | |
| """ | |
| Fits a generic 2D quadratic surface to a 3x3 grid and finds the minimum. | |
| Used for Phi^2 (SAD squared). | |
| Surface: F(x,y) = Ax^2 + By^2 + Cxy + Dx + Ey + F | |
| """ | |
| values = surface_3x3.flatten() | |
| # Precomputed Pseudo-Inverse for 3x3 grid to find coefficients [A, B, C, D, E, F] | |
| M_pinv = np.array( | |
| [ | |
| [ | |
| 0.16666667, | |
| -0.33333333, | |
| 0.16666667, | |
| 0.16666667, | |
| -0.33333333, | |
| 0.16666667, | |
| 0.16666667, | |
| -0.33333333, | |
| 0.16666667, | |
| ], # A | |
| [ | |
| 0.16666667, | |
| 0.16666667, | |
| 0.16666667, | |
| -0.33333333, | |
| -0.33333333, | |
| -0.33333333, | |
| 0.16666667, | |
| 0.16666667, | |
| 0.16666667, | |
| ], # B | |
| [0.25, -0.0, -0.25, 0.0, -0.0, -0.0, -0.25, 0.0, 0.25], # C | |
| [ | |
| -0.16666667, | |
| 0.0, | |
| 0.16666667, | |
| -0.16666667, | |
| 0.0, | |
| 0.16666667, | |
| -0.16666667, | |
| 0.0, | |
| 0.16666667, | |
| ], # D | |
| [ | |
| -0.16666667, | |
| -0.16666667, | |
| -0.16666667, | |
| 0.0, | |
| 0.0, | |
| -0.0, | |
| 0.16666667, | |
| 0.16666667, | |
| 0.16666667, | |
| ], # E | |
| [ | |
| -0.11111111, | |
| 0.22222222, | |
| -0.11111111, | |
| 0.22222222, | |
| 0.55555556, | |
| 0.22222222, | |
| -0.11111111, | |
| 0.22222222, | |
| -0.11111111, | |
| ], # F | |
| ] | |
| ) | |
| coeffs = M_pinv @ values | |
| A, B, C, D, E, F = coeffs | |
| # Minimum condition: Grad(F) = 0 -> System of linear equations | |
| det = 4 * A * B - C * C | |
| if abs(det) < 1e-9: | |
| return 0.0, 0.0 | |
| sub_x = (-D * (2 * B) - (-E) * C) / det | |
| sub_y = ((2 * A) * (-E) - C * (-D)) / det | |
| # If the peak is predicted far outside the 3x3 grid, the local fit is invalid. | |
| if abs(sub_x) > 1.0 or abs(sub_y) > 1.0: | |
| return 0.0, 0.0 | |
| return sub_x, sub_y | |
| def compute_siv_field( | |
| I1, I2, window_size=32, search_radius=16, grid_step=16, min_contrast=0.01 | |
| ): | |
| """ | |
| Computes the displacement field using SAD minimization and Phi^2 sub-pixel refinement. | |
| """ | |
| H, W = I1.shape | |
| # Define grid | |
| y_range = range(window_size // 2, H - window_size // 2, grid_step) | |
| x_range = range(window_size // 2, W - window_size // 2, grid_step) | |
| shape = (len(y_range), len(x_range)) | |
| u_field = np.zeros(shape) | |
| v_field = np.zeros(shape) | |
| x_grid = np.zeros(shape) | |
| y_grid = np.zeros(shape) | |
| mask = np.ones(shape, dtype=bool) | |
| hw = window_size // 2 | |
| for i, cy in enumerate(y_range): | |
| for j, cx in enumerate(x_range): | |
| x_grid[i, j] = cx | |
| y_grid[i, j] = cy | |
| # 1. Extract Reference | |
| template = I1[cy - hw : cy + hw, cx - hw : cx + hw] | |
| # 2. Check Contrast (Sigma_I) | |
| sigma_I = np.std(template) | |
| if sigma_I < min_contrast: | |
| mask[i, j] = False | |
| continue | |
| # 3. Search for Minimum SAD | |
| best_sad = np.inf | |
| best_dx, best_dy = 0, 0 | |
| # Search Bounds | |
| min_dx = max(-search_radius, -cx + hw) | |
| max_dx = min(search_radius, W - cx - hw) | |
| min_dy = max(-search_radius, -cy + hw) | |
| max_dy = min(search_radius, H - cy - hw) | |
| # Integer Search | |
| # Note: In a production C++ env, this is FFT-accelerated. | |
| # Here we use a direct loop for clarity and lack of FFT padding complexity. | |
| for dy in range(min_dy, max_dy + 1): | |
| for dx in range(min_dx, max_dx + 1): | |
| candidate = I2[ | |
| cy + dy - hw : cy + dy + hw, | |
| cx + dx - hw : cx + dx + hw, | |
| ] | |
| sad = np.sum(np.abs(template - candidate)) | |
| if sad < best_sad: | |
| best_sad = sad | |
| best_dx = dx | |
| best_dy = dy | |
| # 4. Sub-pixel Refinement (Quadratic on SAD^2) | |
| try: | |
| sad_sq_matrix = np.zeros((3, 3)) | |
| valid_subpixel = True | |
| for sii, s_dy in enumerate([-1, 0, 1]): | |
| for sjj, s_dx in enumerate([-1, 0, 1]): | |
| check_dx = best_dx + s_dx | |
| check_dy = best_dy + s_dy | |
| if not ( | |
| min_dx <= check_dx <= max_dx | |
| and min_dy <= check_dy <= max_dy | |
| ): | |
| valid_subpixel = False | |
| break | |
| neigh = I2[ | |
| cy + check_dy - hw : cy + check_dy + hw, | |
| cx + check_dx - hw : cx + check_dx + hw, | |
| ] | |
| val = np.sum(np.abs(template - neigh)) | |
| sad_sq_matrix[sii, sjj] = val**2 | |
| if not valid_subpixel: | |
| break | |
| if valid_subpixel: | |
| sub_dx, sub_dy = _refine_subpixel_quadratic(sad_sq_matrix) | |
| u_field[i, j] = best_dx + sub_dx | |
| v_field[i, j] = best_dy + sub_dy | |
| else: | |
| u_field[i, j] = best_dx | |
| v_field[i, j] = best_dy | |
| except Exception: | |
| u_field[i, j] = best_dx | |
| v_field[i, j] = best_dy | |
| return x_grid, y_grid, u_field, v_field, mask | |
| # --- Synthetic Data Generation --- | |
| def generate_vortex_frames(shape=(256, 256), max_displacement=5.0): | |
| """Generates synthetic smoke frames with a Rankine vortex.""" | |
| np.random.seed(42) | |
| noise = np.random.rand(*shape) | |
| smoke = gaussian_filter(noise, sigma=8) | |
| smoke = (smoke - smoke.min()) / (smoke.max() - smoke.min()) | |
| y, x = np.mgrid[0 : shape[0], 0 : shape[1]] | |
| cy, cx = shape[0] // 2, shape[1] // 2 | |
| dy = y - cy | |
| dx = x - cx | |
| r = np.sqrt(dy**2 + dx**2) + 1e-5 | |
| core_radius = 40.0 | |
| v_theta = r / (r**2 + core_radius**2) | |
| v_theta = v_theta / v_theta.max() * max_displacement | |
| u_true = -v_theta * (dy / r) * r | |
| v_true = v_theta * (dx / r) * r | |
| # Apply Warp | |
| coords = np.indices(shape).astype(np.float32) | |
| coords[0] -= v_true | |
| coords[1] -= u_true | |
| smoke2 = map_coordinates(smoke, coords, order=1, mode="nearest") | |
| # Add sensor noise | |
| smoke2 += np.random.normal(0, 0.005, smoke2.shape) | |
| return smoke, smoke2, u_true, v_true | |
| return ( | |
| compute_siv_field, | |
| generate_vortex_frames, | |
| preprocess_images, | |
| refine_subpixel_quadratic, | |
| ) | |
| @app.cell | |
| def _(mo): | |
| # --- UI Controls --- | |
| window_size = mo.ui.slider(16, 64, step=4, value=32, label="Window Size (px)") | |
| max_disp = mo.ui.slider( | |
| 1.0, 15.0, step=0.5, value=6.0, label="Max Velocity (px/frame)" | |
| ) | |
| contrast = mo.ui.slider( | |
| 0.0, 0.05, step=0.001, value=0.005, label="Min Contrast Threshold" | |
| ) | |
| run_btn = mo.ui.button(label="Run SIV Analysis", kind="success") | |
| mo.md(f""" | |
| ### 1. Configuration | |
| Adjust the parameters below and click Run. | |
| | Parameter | Control | Description | | |
| | :--- | :--- | :--- | | |
| | **Interrogation Window** | {window_size} | Size of the patch to track ($N_x \\times N_y$). Larger = robust but lower spatial res. | | |
| | **Max Displacement** | {max_disp} | Simulated max velocity. The search radius adapts to $1.5 \\times$ this value. | | |
| | **Contrast Threshold** | {contrast} | Minimum standard deviation ($\\sigma_I$) required to process a window. | | |
| {run_btn} | |
| """) | |
| return contrast, max_disp, run_btn, window_size | |
| @app.cell | |
| def _( | |
| compute_siv_field, | |
| contrast, | |
| generate_vortex_frames, | |
| max_disp, | |
| np, | |
| plt, | |
| preprocess_images, | |
| run_btn, | |
| window_size, | |
| ): | |
| # --- Execution Cell --- | |
| run_btn.value # React to button click | |
| # 1. Generate Synthetic Data | |
| raw_I1, raw_I2, u_true, v_true = generate_vortex_frames( | |
| shape=(300, 300), max_displacement=max_disp.value | |
| ) | |
| # 2. Preprocess (Paper Step 1) | |
| imgs = preprocess_images([raw_I1, raw_I2], median_kernel=3) | |
| I1, I2 = imgs[0], imgs[1] | |
| # 3. SIV Calculation | |
| w_size = window_size.value | |
| search_r = int(max_disp.value * 1.5) + 2 | |
| step = w_size // 2 # 50% overlap is standard | |
| x_grid, y_grid, u_est, v_est, mask = compute_siv_field( | |
| I1, | |
| I2, | |
| window_size=w_size, | |
| search_radius=search_r, | |
| grid_step=step, | |
| min_contrast=contrast.value, | |
| ) | |
| # 4. Prepare Plots | |
| u_plot = np.where(mask, u_est, np.nan) | |
| v_plot = np.where(mask, v_est, np.nan) | |
| fig, axes = plt.subplots(1, 3, figsize=(15, 5)) | |
| # Plot A: Input Frame | |
| axes[0].imshow(I1, cmap="gray", origin="upper") | |
| axes[0].set_title("Input Smoke (Frame 1)") | |
| axes[0].axis("off") | |
| # Plot B: Ground Truth | |
| skip = 20 | |
| axes[1].imshow(I2, cmap="gray", alpha=0.5, origin="upper") | |
| y_sub, x_sub = np.mgrid[0 : I1.shape[0] : skip, 0 : I1.shape[1] : skip] | |
| u_sub = u_true[::skip, ::skip] | |
| v_sub = v_true[::skip, ::skip] | |
| axes[1].quiver( | |
| x_sub, y_sub, u_sub, -v_sub, color="lime", scale=50, scale_units="inches" | |
| ) | |
| axes[1].set_title("Ground Truth Flow") | |
| axes[1].axis("off") | |
| # Plot C: SIV Result | |
| axes[2].imshow(I2, cmap="gray", alpha=0.5, origin="upper") | |
| axes[2].quiver( | |
| x_grid, | |
| y_grid, | |
| u_plot, | |
| -v_plot, | |
| color="red", | |
| scale=50, | |
| scale_units="inches", | |
| width=0.005, | |
| ) | |
| # Plot rejected points | |
| invalid_x = x_grid[~mask] | |
| invalid_y = y_grid[~mask] | |
| if len(invalid_x) > 0: | |
| axes[2].scatter( | |
| invalid_x, | |
| invalid_y, | |
| c="yellow", | |
| s=10, | |
| marker="x", | |
| label="Low Contrast", | |
| alpha=0.6, | |
| ) | |
| axes[2].legend(loc="upper right") | |
| axes[2].set_title("SIV Estimated Flow") | |
| axes[2].axis("off") | |
| plt.tight_layout() | |
| chart = plt.gcf() | |
| return I1, chart, mask, u_est, u_true, v_est, v_true, x_grid, y_grid | |
| @app.cell | |
| def _(chart, mo): | |
| mo.mpl.interactive(chart) | |
| return | |
| @app.cell | |
| def _(I1, mask, mo, np, u_est, u_true, v_est, v_true, x_grid, y_grid): | |
| # --- Statistics Cell --- | |
| # Collect valid vectors | |
| valid_u, valid_v = [], [] | |
| true_u_list, true_v_list = [], [] | |
| H, W = I1.shape | |
| for i in range(x_grid.shape[0]): | |
| for j in range(x_grid.shape[1]): | |
| if not mask[i, j]: | |
| continue | |
| px, py = int(x_grid[i, j]), int(y_grid[i, j]) | |
| # Boundary check | |
| if 0 <= px < W and 0 <= py < H: | |
| # To compare fairly, we grab the True Velocity at the exact center of the window | |
| t_u = u_true[py, px] | |
| t_v = v_true[py, px] | |
| # Exclude edges where synthetic data might have boundary effects | |
| if 20 < px < W - 20 and 20 < py < H - 20: | |
| valid_u.append(u_est[i, j]) | |
| valid_v.append(v_est[i, j]) | |
| true_u_list.append(t_u) | |
| true_v_list.append(t_v) | |
| if len(valid_u) > 0: | |
| valid_u = np.array(valid_u) | |
| valid_v = np.array(valid_v) | |
| true_u_list = np.array(true_u_list) | |
| true_v_list = np.array(true_v_list) | |
| # Calculate Error | |
| diff_u = valid_u - true_u_list | |
| diff_v = valid_v - true_v_list | |
| rmse = np.sqrt(np.mean(diff_u**2 + diff_v**2)) | |
| # vector magnitude statistics | |
| mag_true = np.sqrt(true_u_list**2 + true_v_list**2) | |
| relative_err = ( | |
| np.mean(np.sqrt(diff_u**2 + diff_v**2) / (mag_true + 1e-6)) * 100 | |
| ) | |
| else: | |
| rmse = 0.0 | |
| relative_err = 0.0 | |
| mo.md(f""" | |
| ### 2. Accuracy Analysis | |
| Comparing SIV estimation against the synthetic Ground Truth in the center of the domain: | |
| * **RMSE (Root Mean Square Error):** {rmse:.4f} pixels | |
| * **Relative Error:** {relative_err:.2f}% | |
| * **Vectors Processed:** {len(valid_u)} | |
| *Note: RMSE below 0.5 pixels indicates successful sub-pixel refinement.* | |
| """) | |
| return | |
| @app.cell | |
| def _(compute_siv_field, mo, np, refine_subpixel_quadratic): | |
| def _(): | |
| # --- Unit Test Cell --- | |
| # This cell runs automatically to verify the math logic matches the paper | |
| results = [] | |
| # Test 1: Quadratic Surface Refinement | |
| # Construct a perfect paraboloid: z = (x - 0.2)^2 + (y + 0.3)^2 | |
| # Min is at (0.2, -0.3) | |
| grid = np.zeros((3, 3)) | |
| for i, y in enumerate([-1, 0, 1]): | |
| for j, x in enumerate([-1, 0, 1]): | |
| grid[i, j] = (x - 0.2) ** 2 + (y + 0.3) ** 2 | |
| dx, dy = refine_subpixel_quadratic(grid) | |
| test1 = (abs(dx - 0.2) < 1e-4) and (abs(dy - (-0.3)) < 1e-4) | |
| results.append( | |
| f"Sub-pixel Refinement Math: {'✅ PASS' if test1 else '❌ FAIL'}" | |
| ) | |
| # Test 2: Contrast Thresholding | |
| I_flat = np.ones((50, 50)) * 0.5 | |
| I_noise = np.ones((50, 50)) * 0.5 | |
| I_noise[20:30, 20:30] = np.random.rand(10, 10) # High contrast spot | |
| _, _, _, _, mask = compute_siv_field( | |
| I_noise, I_noise, window_size=16, min_contrast=0.1 | |
| ) | |
| # Center should be True, Edges False | |
| center_val = mask[mask.shape[0] // 2, mask.shape[1] // 2] | |
| corner_val = mask[0, 0] | |
| test2 = center_val and not corner_val | |
| results.append( | |
| f"Contrast Thresholding: {'✅ PASS' if test2 else '❌ FAIL'}" | |
| ) | |
| return mo.md( | |
| "### 3. System Health Checks\n" | |
| + "\n".join([f"- {r}" for r in results]) | |
| ) | |
| _() | |
| return | |
| if __name__ == "__main__": | |
| app.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment