Created
December 4, 2025 22:17
-
-
Save adlerweb/8610f8a7b57c8b95f30aef050402999b to your computer and use it in GitHub Desktop.
LVM: Script to show all LVs (or segments of LVs) on a specific PV. Handy for rebalancing. Beware: Vibecoded, might kill your system and some kittens.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import sys | |
| import subprocess | |
| import shlex | |
| try: | |
| import humanize | |
| except ImportError: | |
| humanize = None | |
| def run(cmd): | |
| return subprocess.run( | |
| shlex.split(cmd), | |
| capture_output=True, | |
| text=True, | |
| check=True | |
| ).stdout | |
| def format_size(bytes_count): | |
| if humanize: | |
| return humanize.naturalsize(bytes_count, binary=True) | |
| units = ["B", "KiB", "MiB", "GiB", "TiB", "PiB"] | |
| size = float(bytes_count) | |
| idx = 0 | |
| while size >= 1024 and idx < len(units) - 1: | |
| size /= 1024 | |
| idx += 1 | |
| return f"{size:.2f} {units[idx]}" | |
| def get_vg_for_pv(pv_dev): | |
| out = run(f"pvs --noheadings -o vg_name {shlex.quote(pv_dev)}") | |
| vg = out.strip() | |
| if not vg: | |
| raise RuntimeError(f"No VG found for PV {pv_dev}") | |
| return vg | |
| def get_extent_size_bytes(vg_name): | |
| out = run(f"vgs --noheadings --units b --nosuffix -o vg_extent_size {shlex.quote(vg_name)}") | |
| s = out.strip() | |
| if not s: | |
| raise RuntimeError(f"Could not determine extent size for VG {vg_name}") | |
| return int(float(s)) | |
| def load_lv_ranges(vg_name): | |
| """ | |
| Build a map of LV name -> seg_pe_ranges (including hidden RAID images). | |
| """ | |
| out = run(f"lvs -a --segments --noheadings --separator '|' -o vg_name,lv_name,seg_pe_ranges") | |
| lv_ranges = {} | |
| lv_names = [] | |
| for line in out.splitlines(): | |
| line = line.strip() | |
| if not line: | |
| continue | |
| parts = [p.strip() for p in line.split("|", 2)] | |
| if len(parts) < 3: | |
| continue | |
| vg, lv_name, ranges = parts | |
| if vg != vg_name: | |
| continue | |
| # Normalize LV name (remove optional brackets around internal LVs) | |
| norm_name = lv_name.strip() | |
| if norm_name.startswith("[") and norm_name.endswith("]"): | |
| norm_name = norm_name[1:-1].strip() | |
| lv_ranges[norm_name] = ranges | |
| lv_names.append(norm_name) | |
| return lv_ranges, lv_names | |
| def resolve_lv_extents(lv_name, pv_dev, lv_ranges, visited=None): | |
| if visited is None: | |
| visited = set() | |
| if lv_name in visited: | |
| return 0 | |
| visited.add(lv_name) | |
| ranges_field = lv_ranges.get(lv_name, "") | |
| if not ranges_field or ranges_field == "-": | |
| return 0 | |
| total_extents = 0 | |
| for entry in ranges_field.replace(", ", ",").split(","): | |
| entry = entry.strip() | |
| if not entry or ":" not in entry: | |
| continue | |
| target, se = entry.split(":", 1) | |
| if target.startswith("[") and target.endswith("]"): | |
| target = target[1:-1].strip() | |
| if target == pv_dev: | |
| try: | |
| start_str, end_str = se.split("-", 1) | |
| start, end = int(start_str), int(end_str) | |
| if end >= start: | |
| total_extents += (end - start + 1) | |
| except ValueError: | |
| continue | |
| elif not target.startswith("/dev/"): | |
| total_extents += resolve_lv_extents(target, pv_dev, lv_ranges, visited) | |
| return total_extents | |
| def main(): | |
| if len(sys.argv) < 2: | |
| print(f"Usage: {sys.argv[0]} <pv_device> # e.g. /dev/sdc") | |
| sys.exit(1) | |
| pv_dev = sys.argv[1] | |
| try: | |
| vg_name = get_vg_for_pv(pv_dev) | |
| extent_size = get_extent_size_bytes(vg_name) | |
| lv_ranges, all_lvs = load_lv_ranges(vg_name) | |
| except subprocess.CalledProcessError as e: | |
| print(f"Command failed: {e}") | |
| sys.exit(2) | |
| except RuntimeError as e: | |
| print(str(e)) | |
| sys.exit(3) | |
| results = [] | |
| for lv in all_lvs: | |
| extents = resolve_lv_extents(lv, pv_dev, lv_ranges, visited=set()) | |
| if extents > 0: | |
| size_bytes = extents * extent_size | |
| results.append((lv, size_bytes)) | |
| results.sort(key=lambda x: x[1], reverse=True) | |
| for lv_name, size_bytes in results: | |
| print(f"{lv_name:30} {format_size(size_bytes)}") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment