Skip to content

Instantly share code, notes, and snippets.

@FrancescAlted
Last active September 23, 2025 05:11
Show Gist options
  • Select an option

  • Save FrancescAlted/137ff4aa1e0a82ef3f997c1948af8805 to your computer and use it in GitHub Desktop.

Select an option

Save FrancescAlted/137ff4aa1e0a82ef3f997c1948af8805 to your computer and use it in GitHub Desktop.
A calculation of a 1000x1000x1000 cube using NumPy and Blosc2
import os
import psutil
from time import time
import numpy as np
import blosc2
# --- Experiment Setup ---
n_frames = 1000 # Raise this for more frames
width, height = np.array((n_frames, n_frames)) # Size of the grid
dtype = np.float64 # Data type for the grid
# --- Coordinate creation ---
x = np.linspace(0, n_frames, n_frames, dtype=dtype)
y = np.linspace(-4 * np.pi, 4 * np.pi, width, dtype=dtype)
z = np.linspace(-4 * np.pi, 4 * np.pi, height, dtype=dtype)
X = np.expand_dims(np.expand_dims(x, 1), 2) # Shape: (N, 1, 1)
Y = np.expand_dims(np.expand_dims(y, 0), 2) # Shape: (1, N, 1)
Z = np.expand_dims(np.expand_dims(z, 0), 0) # Shape: (1, 1, N)
# --- Memory profiler ---
def getmem():
process = psutil.Process(os.getpid())
return process.memory_info().rss / 1024 / 1024
def genexpr(a, b, c):
time_factor = a * b * 0.001
R = np.sqrt(b**2 + c**2)
theta = np.arctan2(c, b)
return np.sin(R * 3 - time_factor * 2) * np.cos(theta * 3)
# --- Run computation and profile time and memory usage ---
def monitor(BLOSC=True, **kwargs):
m0 = getmem()
t0 = time()
if kwargs:
cparams = dict(kwargs)
print(f"cparams: {cparams}")
# res = genexpr(blosc2.asarray(X), blosc2.asarray(Y), blosc2.asarray(Z)).compute(cparams=cparams)
# The next does not need to convert to a blosc2 array
res = genexpr(blosc2.SimpleProxy(X), blosc2.SimpleProxy(Y), blosc2.SimpleProxy(Z)).compute(cparams=cparams)
elif BLOSC:
genexpr2 = blosc2.jit(genexpr)
res = genexpr2(X, Y, Z)
else:
res = genexpr(X, Y, Z)
dt = time()-t0
if dt > 1e-3:
print(f'Operation took {round(dt, 3)} s')
else:
print(f'Operation took {round(dt * 1000_000, 1)} μs')
print(f'Result occupies {round((getmem()-m0))} MB')
return res
print("BLOSC=False")
res = monitor(BLOSC=False)
print("BLOSC=True")
res = monitor(BLOSC=True)
res = monitor(BLOSC=True, clevel=0)
res = monitor(BLOSC=True, clevel=1, codec=blosc2.Codec.BLOSCLZ, filters=[blosc2.Filter.SHUFFLE])
if hasattr(res, 'cratio'):
print(f"res cratio {res.cratio:.2f}x")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment