Skip to content

Instantly share code, notes, and snippets.

@adlerweb
Created December 4, 2025 22:17
Show Gist options
  • Select an option

  • Save adlerweb/8610f8a7b57c8b95f30aef050402999b to your computer and use it in GitHub Desktop.

Select an option

Save adlerweb/8610f8a7b57c8b95f30aef050402999b to your computer and use it in GitHub Desktop.
LVM: Script to show all LVs (or segments of LVs) on a specific PV. Handy for rebalancing. Beware: Vibecoded, might kill your system and some kittens.
#!/usr/bin/env python3
import sys
import subprocess
import shlex
try:
import humanize
except ImportError:
humanize = None
def run(cmd):
return subprocess.run(
shlex.split(cmd),
capture_output=True,
text=True,
check=True
).stdout
def format_size(bytes_count):
if humanize:
return humanize.naturalsize(bytes_count, binary=True)
units = ["B", "KiB", "MiB", "GiB", "TiB", "PiB"]
size = float(bytes_count)
idx = 0
while size >= 1024 and idx < len(units) - 1:
size /= 1024
idx += 1
return f"{size:.2f} {units[idx]}"
def get_vg_for_pv(pv_dev):
out = run(f"pvs --noheadings -o vg_name {shlex.quote(pv_dev)}")
vg = out.strip()
if not vg:
raise RuntimeError(f"No VG found for PV {pv_dev}")
return vg
def get_extent_size_bytes(vg_name):
out = run(f"vgs --noheadings --units b --nosuffix -o vg_extent_size {shlex.quote(vg_name)}")
s = out.strip()
if not s:
raise RuntimeError(f"Could not determine extent size for VG {vg_name}")
return int(float(s))
def load_lv_ranges(vg_name):
"""
Build a map of LV name -> seg_pe_ranges (including hidden RAID images).
"""
out = run(f"lvs -a --segments --noheadings --separator '|' -o vg_name,lv_name,seg_pe_ranges")
lv_ranges = {}
lv_names = []
for line in out.splitlines():
line = line.strip()
if not line:
continue
parts = [p.strip() for p in line.split("|", 2)]
if len(parts) < 3:
continue
vg, lv_name, ranges = parts
if vg != vg_name:
continue
# Normalize LV name (remove optional brackets around internal LVs)
norm_name = lv_name.strip()
if norm_name.startswith("[") and norm_name.endswith("]"):
norm_name = norm_name[1:-1].strip()
lv_ranges[norm_name] = ranges
lv_names.append(norm_name)
return lv_ranges, lv_names
def resolve_lv_extents(lv_name, pv_dev, lv_ranges, visited=None):
if visited is None:
visited = set()
if lv_name in visited:
return 0
visited.add(lv_name)
ranges_field = lv_ranges.get(lv_name, "")
if not ranges_field or ranges_field == "-":
return 0
total_extents = 0
for entry in ranges_field.replace(", ", ",").split(","):
entry = entry.strip()
if not entry or ":" not in entry:
continue
target, se = entry.split(":", 1)
if target.startswith("[") and target.endswith("]"):
target = target[1:-1].strip()
if target == pv_dev:
try:
start_str, end_str = se.split("-", 1)
start, end = int(start_str), int(end_str)
if end >= start:
total_extents += (end - start + 1)
except ValueError:
continue
elif not target.startswith("/dev/"):
total_extents += resolve_lv_extents(target, pv_dev, lv_ranges, visited)
return total_extents
def main():
if len(sys.argv) < 2:
print(f"Usage: {sys.argv[0]} <pv_device> # e.g. /dev/sdc")
sys.exit(1)
pv_dev = sys.argv[1]
try:
vg_name = get_vg_for_pv(pv_dev)
extent_size = get_extent_size_bytes(vg_name)
lv_ranges, all_lvs = load_lv_ranges(vg_name)
except subprocess.CalledProcessError as e:
print(f"Command failed: {e}")
sys.exit(2)
except RuntimeError as e:
print(str(e))
sys.exit(3)
results = []
for lv in all_lvs:
extents = resolve_lv_extents(lv, pv_dev, lv_ranges, visited=set())
if extents > 0:
size_bytes = extents * extent_size
results.append((lv, size_bytes))
results.sort(key=lambda x: x[1], reverse=True)
for lv_name, size_bytes in results:
print(f"{lv_name:30} {format_size(size_bytes)}")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment