Skip to content

Instantly share code, notes, and snippets.

@mal1k-me
Last active March 12, 2026 09:40
Show Gist options
  • Select an option

  • Save mal1k-me/8483a290d89edfd2df03e98479128c65 to your computer and use it in GitHub Desktop.

Select an option

Save mal1k-me/8483a290d89edfd2df03e98479128c65 to your computer and use it in GitHub Desktop.
opinstall - A pacman wrapper that handles optional dependencies intelligently
#!/usr/bin/env python3
"""
opinstall
=========
A pacman wrapper that handles optional dependencies intelligently.
:goals:
- **Install with awareness**: interactively present optional deps for
every package in a resolved transaction, letting the user cherry-pick
which to include — recursively, level by level, with user-gated
drill-down.
- **Correct install reason**: all user-selected optional deps are marked
``--asdeps`` so they never pollute the explicitly-installed list.
This marking is guaranteed to run regardless of success, failure, or
user cancellation.
- **Safe removal**: prevent ``pacman -Rs`` from cascading into deps
that are optional deps of *other* installed packages. Protect them
before the transaction and revert unconditionally afterwards.
- **Minimal footprint**: pure standard-library Python, thin wrappers
around ``pacman`` and ``expac``, no persistent state or config files.
:anti-goals:
- **Not a full pacman replacement**: only ``install`` and ``remove``
are wrapped. System upgrades, queries, file operations, etc. are
left to pacman/yay directly.
- **Not a post-hoc orphan sweeper**: unlike ``yay -Yc``, this does not
scan for orphans after the fact. It protects deps *during* the
transaction itself — complementary, not redundant.
- **Not an AUR helper**: no PKGBUILD fetching, building, or AUR
interaction of any kind.
:usage:
.. code-block:: bash
opinstall install pkg1 [pkg2 ...]
opinstall remove pkg1 [pkg2 ...]
:requires:
- Python 3.10+ (for ``match`` and ``|`` union type syntax)
- ``expac`` (``pacman -S expac``)
- ``pacman`` and ``sudo`` available in PATH
"""
# ---------------------------------------------------------------------------
# Standard-library imports only — no third-party dependencies.
# ---------------------------------------------------------------------------
from __future__ import annotations
import os
import subprocess
import sys
from contextlib import contextmanager
from dataclasses import dataclass, field
from typing import ClassVar, Generator, NoReturn, Sequence
# ===========================================================================
# Entry point
# ===========================================================================
def main() -> None:
"""
Parse CLI arguments, escalate privileges if needed, and dispatch to the
appropriate :class:`Command` subclass.
:raises SystemExit: on invalid usage or unrecognised command.
"""
# Replace current process with a sudo-wrapped re-invocation when not root.
# os.execvp never returns on success, so the rest of main() only runs as root.
_ensure_root()
if len(sys.argv) < 3:
_die(Command.USAGE)
verb, *pkgs = sys.argv[1:]
# Registry lookup: Command subclasses register themselves via __init_subclass__.
cmd_cls = Command.registry.get(verb)
if cmd_cls is None:
_die(f"unknown command: {verb!r}\n\n{Command.USAGE}")
cmd_cls(pkgs).run()
# ===========================================================================
# Helpers
# ===========================================================================
def _ensure_root() -> None:
"""
Re-execute the script under ``sudo`` when the effective UID is not 0.
Uses :func:`os.execvp` to *replace* the current process rather than
spawning a child — this means the escalated process inherits the same
PID and stdio handles cleanly.
:raises SystemExit: if ``sudo`` is not found in PATH.
"""
if os.geteuid() != 0:
print("==> Escalating privileges via sudo...")
try:
# execvp replaces the current process image; argv[0] must be
# the executable name, followed by the full argument list.
os.execvp("sudo", ["sudo", sys.executable, *sys.argv])
except FileNotFoundError:
_die("sudo not found — please run as root.")
def _die(msg: str, code: int = 1) -> NoReturn:
"""
Print *msg* to stderr and exit with *code*.
:param msg: Human-readable error description.
:param code: Exit status code (default 1).
:raises SystemExit: always.
"""
print(f"[error] {msg}", file=sys.stderr)
sys.exit(code)
def _warn(msg: str) -> None:
"""
Print a non-fatal warning to stderr.
:param msg: Warning message text.
"""
print(f"[warn] {msg}", file=sys.stderr)
def _info(msg: str) -> None:
"""
Print a formatted informational step message to stdout.
:param msg: Step description.
"""
print(f"==> {msg}")
# ===========================================================================
# Data models
# ===========================================================================
@dataclass(frozen=True, slots=True)
class OptDep:
"""
A single optional dependency entry as returned by expac.
:param name: Package name (the part before the colon in expac output).
:param reason: Human-readable install rationale (the part after the colon).
May be an empty string when expac omits it.
"""
name: str
reason: str = ""
def __str__(self) -> str:
"""Return a display string, appending the reason when present."""
return f"{self.name}: {self.reason}" if self.reason else self.name
@dataclass(slots=True)
class PackageSet:
"""
A resolved set of packages for a pending transaction, built from a
pacman dry-run.
:param requested: Packages explicitly requested by the user.
:param resolved: Full set pacman would actually act on (includes deps).
"""
requested: Sequence[str]
resolved: list[str] = field(default_factory=list)
@property
def pulled_deps(self) -> list[str]:
"""
Packages in *resolved* that were not explicitly requested — i.e.
dependencies pacman added automatically.
:returns: List of dependency package names.
"""
req = frozenset(self.requested)
return [p for p in self.resolved if p not in req]
# ===========================================================================
# CLI tool wrappers
# ===========================================================================
class CliTool:
"""
Abstract base for thin, stateless wrappers around CLI tools.
Subclasses set :attr:`BINARY` to the executable name and inherit a
shared :meth:`_run` implementation that invokes the tool, optionally
raising on non-zero exit.
All methods are ``@staticmethod`` or ``@classmethod`` — these classes
are namespaces, not service objects. Keeping every subprocess call
behind a known binary makes privilege requirements and side-effects
easy to audit.
:cvar BINARY: Name of the CLI executable (e.g. ``"expac"``, ``"pacman"``).
"""
BINARY: ClassVar[str]
@classmethod
def _run(
cls,
*args: str,
check: bool = False,
capture_output: bool = False,
text: bool = False,
stdin: int | None = None,
stdout: int | None = None,
stderr: int | None = None,
) -> subprocess.CompletedProcess[str]:
"""
Execute ``<BINARY> <args>`` with explicit keyword forwarding.
:param args: Arguments passed directly to the tool.
:param check: If ``True``, raise on non-zero exit.
:param capture_output: Capture stdout/stderr.
:param text: Decode output as text.
:param stdin: File descriptor for stdin.
:param stdout: File descriptor for stdout.
:param stderr: File descriptor for stderr.
:returns: :class:`subprocess.CompletedProcess` instance.
"""
return subprocess.run(
[cls.BINARY, *args],
check=check,
capture_output=capture_output,
text=text,
stdin=stdin,
stdout=stdout,
stderr=stderr,
)
@classmethod
def _capture(cls, *args: str) -> str:
"""
Execute ``<BINARY> <args>``, capture output, and return stripped stdout.
Raises :class:`subprocess.CalledProcessError` on non-zero exit with
stderr attached to the exception for diagnostics.
:param args: Arguments passed directly to the tool.
:returns: Stripped stdout.
:raises subprocess.CalledProcessError: if the tool exits non-zero.
"""
result = cls._run(*args, capture_output=True, text=True)
if result.returncode != 0:
msg = (
result.stderr.strip()
or f"{cls.BINARY} exited with code {result.returncode}"
)
raise subprocess.CalledProcessError(
result.returncode,
[cls.BINARY, *args],
output=result.stdout,
stderr=msg,
)
return result.stdout.strip()
# ===========================================================================
# expac interface
# ===========================================================================
class Expac(CliTool):
"""
Thin, stateless wrapper around the ``expac`` CLI tool.
``expac`` format tokens used here:
- ``%n`` — package name
- ``%o`` — optional dependencies (space-separated ``name:reason`` pairs)
"""
BINARY: ClassVar[str] = "expac"
@staticmethod
def installed_names() -> frozenset[str]:
"""
Return the names of all locally installed packages.
:returns: Frozen set of package name strings.
:raises subprocess.CalledProcessError: if expac exits non-zero.
"""
out = Expac._capture("-Q", "%n")
return frozenset(out.splitlines()) if out else frozenset()
@staticmethod
def opt_map() -> dict[str, frozenset[str]]:
"""
Build a mapping of every installed package to its optional dep names.
Fetched in a single ``expac`` call for efficiency (O(1) subprocesses
regardless of how many packages are installed). Values are frozen
sets for O(1) membership tests.
:returns: ``{pkg_name: frozenset(optdep_names)}`` for all installed
packages.
"""
out = Expac._capture("-Q", "%n\t%o")
result: dict[str, frozenset[str]] = {}
for line in out.splitlines():
# Partition on the first tab — right side may be empty when
# a package declares no optional deps.
name, _, raw = line.partition("\t")
# Optional dep entries are "name:reason" tokens; we only need names.
result[name] = (
frozenset(o.split(":", 1)[0] for o in raw.split())
if raw
else frozenset()
)
return result
@staticmethod
def optdeps_for(
pkg: str,
*,
exclude: frozenset[str] | set[str],
sync: bool = True,
) -> list[OptDep]:
"""
Return the optional dependencies of *pkg* that are not in *exclude*.
Queries the sync db first (useful before installation), falling back
to the local db if the sync query fails or returns nothing (e.g. AUR
or already-installed packages not in any sync repo). Deduplicates by
name to handle packages that appear in multiple repos (e.g. ``extra``
and ``cachyos-extra-v3``).
:param pkg: Package to query.
:param exclude: Names to suppress (already installed / already selected).
:param sync: Query sync db when ``True``, local db when ``False``.
:returns: Ordered, deduplicated list of :class:`OptDep` objects.
"""
out = ""
if sync:
try:
out = Expac._capture("-S", "%o", pkg)
except subprocess.CalledProcessError:
# Package not in sync db — fall through to local query.
pass
# Fallback: package not in sync db (e.g. AUR or already installed)
if not out:
try:
out = Expac._capture("-Q", "%o", pkg)
except subprocess.CalledProcessError:
return []
if not out:
return []
seen: set[str] = set()
deps: list[OptDep] = []
for entry in out.split():
name, _, reason = entry.partition(":")
if name and name not in exclude and name not in seen:
seen.add(name)
deps.append(OptDep(name=name, reason=reason))
return deps
@staticmethod
def batch_optdeps(
pkgs: Sequence[str],
*,
exclude: frozenset[str] | set[str],
sync: bool = True,
) -> dict[str, list[OptDep]]:
"""
Fetch optional deps for multiple packages in one pass, returning
only entries that have uninstalled optional deps after exclusion.
This avoids repeated subprocess calls to ``optdeps_for`` when we
need to probe many packages — e.g. filtering candidates to those
with further optional deps.
:param pkgs: Packages to query.
:param exclude: Names to suppress.
:param sync: Query sync db when ``True``.
:returns: ``{pkg: [OptDep, ...]}`` for packages with non-empty
optdeps only. Packages with no remaining optdeps
are omitted from the dict.
"""
result: dict[str, list[OptDep]] = {}
for pkg in pkgs:
opts = Expac.optdeps_for(pkg, exclude=exclude, sync=sync)
if opts:
result[pkg] = opts
return result
# ===========================================================================
# pacman interface
# ===========================================================================
class Pacman(CliTool):
"""
Thin, stateless wrapper around the ``pacman`` CLI tool.
Like :class:`Expac`, this is a namespace class — all methods are static.
Keeping every ``pacman`` call here makes privilege requirements and
side-effects easy to audit.
"""
BINARY: ClassVar[str] = "pacman"
@staticmethod
def dry_install(pkgs: Sequence[str]) -> list[str]:
"""
Return the full list of packages a ``pacman -S`` would install,
including all pulled-in dependencies.
Uses ``--print-format '%n'`` which emits one bare package name per
line — no table parsing required.
:param pkgs: Packages the user requested.
:returns: Flat list of names pacman would install.
:raises SystemExit: if pacman reports an error.
"""
r = Pacman._run(
"-S",
"--print-format",
"%n",
*pkgs,
capture_output=True,
text=True,
)
if r.returncode != 0:
_die(f"pacman install dry-run failed:\n{r.stderr.strip()}")
return r.stdout.strip().splitlines() if r.stdout.strip() else []
@staticmethod
def dry_remove(pkgs: Sequence[str]) -> list[str]:
"""
Return the full list of packages a ``pacman -Rs`` would remove,
including orphaned dependencies.
Feeds ``/dev/null`` as stdin so pacman sees EOF at the Y/n prompt
and aborts without performing the removal. Both stdout and stderr
are captured so the package table (which pacman may write to either
stream depending on terminal detection) is always available.
:param pkgs: Packages the user requested to remove.
:returns: Flat list of names pacman would remove.
"""
with open("/dev/null") as devnull:
r = Pacman._run(
"-Rs",
*pkgs,
stdin=devnull.fileno(),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
# Parse the package table that pacman prints between the
# "Package (N)" header and the "Total Removed Size" footer.
names: list[str] = []
in_table = False
combined = r.stdout + r.stderr
for line in combined.splitlines():
if line.startswith("Package ("):
in_table = True
continue
if line.startswith("Total "):
break
# Table rows: "pkgname version size" (at least 2 fields)
if in_table:
parts = line.split()
if len(parts) >= 2:
names.append(parts[0])
return names
@staticmethod
def install(pkgs: Sequence[str]) -> int:
"""
Run ``pacman -S <pkgs>`` interactively (stdio inherited).
:param pkgs: Packages to install.
:returns: pacman exit code.
"""
return Pacman._run("-S", *pkgs).returncode
@staticmethod
def remove(pkgs: Sequence[str]) -> int:
"""
Run ``pacman -Rs <pkgs>`` interactively (stdio inherited).
:param pkgs: Packages to remove.
:returns: pacman exit code.
"""
return Pacman._run("-Rs", *pkgs).returncode
@staticmethod
def mark(pkgs: Sequence[str], *, explicit: bool) -> None:
"""
Change the install reason of *pkgs* via ``pacman -D``.
:param pkgs: Packages whose install reason should change.
:param explicit: ``True`` for ``--asexplicit``, ``False`` for ``--asdeps``.
:raises subprocess.CalledProcessError: if pacman exits non-zero.
"""
flag = "--asexplicit" if explicit else "--asdeps"
Pacman._run("-D", flag, *pkgs, check=True, capture_output=True)
@staticmethod
def _mark_surviving_as_deps(pkgs: Sequence[str]) -> None:
"""
Mark *pkgs* as dependency-installed, skipping any that are no longer
present on the system (e.g. the user cancelled before they were
installed, or they were already removed).
Never raises — all errors are downgraded to warnings so this is
safe to call from ``finally`` blocks without masking exceptions.
:param pkgs: Package names that should be marked ``--asdeps``.
"""
if not pkgs:
return
try:
still_installed = Expac.installed_names()
except subprocess.CalledProcessError as e:
_warn(f"failed to query installed packages, skipping dep mark: {e}")
return
mark = [p for p in pkgs if p in still_installed]
if not mark:
return
_info(f"Marking as deps: {' '.join(mark)}")
try:
Pacman.mark(mark, explicit=False)
except subprocess.CalledProcessError as e:
_warn(f"failed to mark packages as deps: {e}")
@staticmethod
@contextmanager
def safe_transaction(deps_to_mark: Sequence[str]) -> Generator[None, None, None]:
"""
Context manager that guarantees ``--asdeps`` marking runs on exit.
Wraps the pacman transaction (install or remove) so that no matter
how the block exits — success, ``SystemExit`` (from :func:`_die`),
``KeyboardInterrupt`` (Ctrl-C), or any other exception — the
cleanup in :meth:`_mark_surviving_as_deps` always executes first.
``KeyboardInterrupt`` is caught and converted to a clean
``SystemExit(1)`` after cleanup. All other exceptions propagate
after cleanup.
Usage::
with Pacman.safe_transaction(optdeps):
rc = Pacman.install(pkgs)
if rc != 0:
_die(f"pacman exited with code {rc}", rc)
:param deps_to_mark: Package names to mark ``--asdeps`` on exit.
"""
interrupted = False
try:
yield
except KeyboardInterrupt:
print("\n[cancelled]")
interrupted = True
finally:
Pacman._mark_surviving_as_deps(deps_to_mark)
if interrupted:
sys.exit(1)
# ===========================================================================
# Selection prompt
# ===========================================================================
class Selector:
"""
Interactive numbered-list prompt for selecting optional dependencies.
Accepts a flexible input syntax: individual numbers, comma or space
separated lists, hyphen-delimited ranges, and ``a`` for all — or any
combination thereof (e.g. ``1-3, 5 7`` or ``2,4-6 8``).
:param pkg: Package name — used only for the display header.
:param opts: Ordered list of :class:`OptDep` candidates.
"""
__slots__ = ("pkg", "opts")
def __init__(self, pkg: str, opts: Sequence[OptDep]) -> None:
self.pkg = pkg
self.opts = opts
def prompt(self) -> list[str]:
"""
Display the numbered list and return the names the user selected.
Handles ``EOFError`` (non-interactive stdin) gracefully by returning
an empty list.
:returns: Deduplicated, ordered list of selected package names.
"""
self._print_menu()
try:
raw = input(" > ").strip().lower()
except EOFError:
return []
if not raw:
return []
if raw == "a":
return [o.name for o in self.opts]
return self._parse(raw)
# ------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------
def _print_menu(self) -> None:
"""Render the numbered list of optional deps to stdout."""
print(f"\n Optional deps for [{self.pkg}]:")
for i, opt in enumerate(self.opts, 1):
print(f" {i:>2}) {opt}")
print(
" (numbers · ranges e.g. 1-3 · comma/space separated · 'a' = all · enter = skip)"
)
def _parse(self, raw: str) -> list[str]:
"""
Translate raw user input into a deduplicated list of package names.
:param raw: Stripped, lowercased input string from the prompt.
:returns: Selected package names in input order, without duplicates.
"""
# Treat commas as whitespace so "1,2-4, 6" tokenises cleanly.
tokens = raw.replace(",", " ").split()
indices: list[int] = []
for token in tokens:
if "-" in token:
indices.extend(self._parse_range(token))
else:
idx = self._parse_index(token)
if idx is not None:
indices.append(idx)
# Deduplicate while preserving order and validate bounds.
seen: set[int] = set()
selected: list[str] = []
n = len(self.opts)
for i in indices:
if i in seen:
continue
seen.add(i)
if 0 <= i < n:
selected.append(self.opts[i].name)
else:
_warn(f"index {i + 1} out of range, skipping")
return selected
def _parse_range(self, token: str) -> list[int]:
"""
Parse a ``start-end`` range token into a list of 0-based indices.
:param token: Raw token such as ``"2-5"``.
:returns: List of 0-based indices, empty on parse error.
"""
parts = token.split("-", 1)
try:
start, end = int(parts[0]) - 1, int(parts[1]) - 1
except (ValueError, IndexError):
_warn(f"invalid range {token!r}, skipping")
return []
if start > end:
_warn(f"invalid range {token!r} (start > end), skipping")
return []
return list(range(start, end + 1))
def _parse_index(self, token: str) -> int | None:
"""
Parse a single numeric token into a 0-based index.
:param token: Raw token such as ``"3"``.
:returns: 0-based integer index, or ``None`` on parse error.
"""
try:
return int(token) - 1
except ValueError:
_warn(f"invalid input {token!r}, skipping")
return None
# ===========================================================================
# Command base + registry
# ===========================================================================
class Command:
"""
Abstract base for ``opinstall`` sub-commands.
Subclasses register themselves automatically via :meth:`__init_subclass__`
using their lower-cased class name as the CLI verb. This means adding a
new command only requires subclassing — the dispatch table in :func:`main`
never needs to change.
:cvar registry: Auto-populated ``{verb: subclass}`` mapping.
:cvar USAGE: Usage string printed on invalid invocation.
"""
# Populated by __init_subclass__ — acts as a self-maintaining dispatch table.
registry: ClassVar[dict[str, type[Command]]] = {}
USAGE: ClassVar[str] = (
"Usage:\n"
" opinstall install pkg1 [pkg2 ...]\n"
" opinstall remove pkg1 [pkg2 ...]\n"
)
def __init_subclass__(cls, **kwargs: object) -> None:
"""
Register each concrete subclass under its lower-cased name.
Called automatically by Python when a subclass is defined — no
decorator or explicit registration needed.
"""
super().__init_subclass__(**kwargs)
Command.registry[cls.__name__.lower()] = cls
def __init__(self, pkgs: Sequence[str]) -> None:
"""
:param pkgs: Package names supplied on the command line.
"""
if not pkgs:
_die(f"no packages specified\n\n{self.USAGE}")
self.pkgs = pkgs
def run(self) -> None:
"""
Execute the command. Must be overridden by every subclass.
:raises NotImplementedError: if called on the base class.
"""
raise NotImplementedError
# ===========================================================================
# Concrete commands
# ===========================================================================
class Install(Command):
"""
Install packages and interactively select their optional dependencies.
Resolution covers the *entire* would-be-installed set — not just the
packages explicitly requested, but every dependency pacman pulls in.
This ensures that, for example, a codec library installed as a dep of a
media application also has its optional plugins offered to the user.
Optional deps that are already installed or are themselves part of the
pending transaction are excluded from the selection menu.
:cvar DEEP_RESOLVE_EXISTING: When ``True``, level 1+ drill-down also
re-examines packages that were already in the initial resolve
(e.g. ``mesa``) for any remaining optional deps not yet covered.
When ``False`` (the default), only packages *newly* introduced by
user selections are considered at deeper levels.
:param pkgs: Packages to install (as typed by the user).
"""
DEEP_RESOLVE_EXISTING: ClassVar[bool] = False
def run(self) -> None:
"""
Perform the dry-run, collect optional dep selections, then install.
Resolution is iterative and user-controlled:
- **Level 0**: resolve the full dependency tree of the requested
packages, then present optional-dep selection menus for every
package in the resolved set (no confirmation gate).
- **Level N** (N > 0): check which packages selected in the
previous round have their own optional deps. If any do, list
them and present a single ``[y/N]`` gate for the whole level.
If the user accepts, show each package's selection menu in
turn. Repeat until no further optional deps exist or the user
declines a level.
:raises SystemExit: on pacman error or user cancellation.
"""
_info(f"Resolving install for: {' '.join(self.pkgs)}")
installed = Expac.installed_names()
# Seed the install list with the user's request.
current_list: list[str] = list(self.pkgs)
# All user-selected optional deps accumulated across all levels.
all_selected: list[str] = []
# Track packages already queried for optdeps across all rounds so
# we never re-present the same package at a deeper level.
queried: set[str] = set()
# --- Level 0: initial resolve + optdep menus -----------------------
resolved = Pacman.dry_install(current_list)
if not resolved:
_info("Nothing to install.")
return
_info(f"Would install ({len(resolved)}): {' '.join(resolved)}\n")
# post_install is mutable — grows as the user selects optdeps.
post_install: set[str] = set(installed) | set(resolved)
round_selected: list[str] = []
for pkg in resolved:
queried.add(pkg)
opts = Expac.optdeps_for(pkg, exclude=post_install, sync=True)
if not opts:
continue
chosen = Selector(pkg, opts).prompt()
round_selected.extend(chosen)
# Grow exclusion set so later menus don't re-offer these.
post_install.update(chosen)
if round_selected:
all_selected.extend(round_selected)
current_list = list(dict.fromkeys(current_list + round_selected))
# --- Level 1+: gated drill-down ------------------------------------
# Each round re-resolves the full install tree and checks every
# *new* package (user-selected optdeps AND their pulled-in deps)
# for further optional deps. This ensures transitive hard deps
# like libkate (pulled in by vlc-plugin-kate) also get their
# optdeps offered.
#
# When DEEP_RESOLVE_EXISTING is False, packages that were part of
# the initial level-0 resolve are excluded — only genuinely new
# additions are drilled into.
initial_resolved = frozenset(queried)
while round_selected:
# Re-resolve so we see deps pulled in by the latest selections.
resolved = Pacman.dry_install(current_list)
post_install = set(installed) | set(resolved) | set(all_selected)
# Candidates = every newly-resolved package not yet queried,
# i.e. the user's selections AND any deps they pulled in.
candidates = [p for p in resolved if p not in queried]
if not self.DEEP_RESOLVE_EXISTING:
candidates = [p for p in candidates if p not in initial_resolved]
if not candidates:
break
# Batch-fetch optdeps for all candidates in one pass; returns
# only those with non-empty optdeps after exclusion.
further = Expac.batch_optdeps(candidates, exclude=post_install, sync=True)
if not further:
break
has_further = list(further)
# Show which packages have optdeps, then one gate for the level.
print(
"\n The following have their own optional deps:\n"
+ "".join(f" - {p}\n" for p in has_further)
+ " Resolve their optional deps? [y/N] ",
end="",
)
try:
answer = input().strip().lower()
except EOFError:
answer = ""
if answer != "y":
break
# User accepted — show each package's selection menu.
round_selected = []
for pkg in has_further:
queried.add(pkg)
# Re-fetch with updated exclusion set (earlier selections
# in this round may have satisfied some optdeps).
opts = Expac.optdeps_for(pkg, exclude=post_install, sync=True)
if not opts:
continue
chosen = Selector(pkg, opts).prompt()
round_selected.extend(chosen)
post_install.update(chosen)
if not round_selected:
break
all_selected.extend(round_selected)
current_list = list(dict.fromkeys(current_list + round_selected))
if not all_selected:
_info("No uninstalled optional deps found.\n")
# Final install list: original request + all selected optdeps.
install_list = list(dict.fromkeys(list(self.pkgs) + all_selected))
if all_selected:
_info(f"Selected optional deps: {' '.join(all_selected)}")
# --- Install --------------------------------------------------------
_info(f"Installing: {' '.join(install_list)}")
with Pacman.safe_transaction(all_selected):
rc = Pacman.install(install_list)
if rc != 0:
_die(f"pacman exited with code {rc}", rc)
_info("Done.")
class Remove(Command):
"""
Remove packages without inadvertently deleting optional dependencies
relied upon by other installed packages.
Before running the real removal, the command:
1. Performs a dry-run to find all packages that *would* be removed.
2. Cross-references every pulled-in dependency against the optional-dep
declarations of all installed packages (via :meth:`Expac.opt_map`).
3. Temporarily marks protected deps as ``--asexplicit`` so pacman's
orphan sweep (the ``-s`` flag) does not collect them.
4. Runs the real ``pacman -Rs`` for the user to confirm.
5. Via :meth:`Pacman.safe_transaction` — regardless of confirmation,
cancellation, or error — reverts all surviving protected packages
back to ``--asdeps``, restoring the original system state for those
packages.
:param pkgs: Packages to remove (as typed by the user).
"""
def run(self) -> None:
"""
Perform the safe removal workflow.
:raises SystemExit: if marking packages explicit fails.
"""
_info(f"Resolving removal of: {' '.join(self.pkgs)}")
# --- Dry-run -------------------------------------------------------
pset = PackageSet(
requested=self.pkgs,
resolved=Pacman.dry_remove(self.pkgs),
)
if not pset.resolved:
_info("Nothing would be removed.")
return
_info(f"Would remove ({len(pset.resolved)}): {' '.join(pset.resolved)}\n")
# --- Identify deps to protect --------------------------------------
keep: list[str] = []
if pset.pulled_deps:
_info("Checking pulled-in deps against installed optional dep users:\n")
opt_map = Expac.opt_map()
for dep in pset.pulled_deps:
# Find installed packages that list this dep as optional.
# frozenset.__contains__ is O(1) per lookup.
users = [p for p, opts in opt_map.items() if dep in opts]
if users:
print(f" KEEP: {dep} (optional dep of: {', '.join(users)})")
keep.append(dep)
else:
print(f" REMOVE: {dep}")
print()
# --- Temporarily protect optional deps ----------------------------
if keep:
_info(f"Temporarily marking as explicit: {' '.join(keep)}")
try:
Pacman.mark(keep, explicit=True)
except subprocess.CalledProcessError as e:
_die(f"failed to mark packages explicit: {e}")
print()
# --- Real removal (user confirms) ----------------------------------
_info(f"Removing: {' '.join(self.pkgs)}")
with Pacman.safe_transaction(keep):
Pacman.remove(self.pkgs)
_info("Done.")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment