Skip to content

Instantly share code, notes, and snippets.

@M4nw3l
Created March 8, 2026 15:34
Show Gist options
  • Select an option

  • Save M4nw3l/a0845dd32508885455c4693848772596 to your computer and use it in GitHub Desktop.

Select an option

Save M4nw3l/a0845dd32508885455c4693848772596 to your computer and use it in GitHub Desktop.
Fixes for StaSh’s Pip utility, fixing infinite recursion in Setuptools module stubs and package installation fallback to directory guessing.
# -*- coding: utf-8 -*-
"""
Install and manage python packages
usage: pip.py [-h] [--verbose] sub-command ...
optional arguments:
-h, --help show this help message and exit
--verbose be more chatty
List of sub-commands:
sub-command "pip sub-command -h" for more help on a sub-command
list list packages installed
install install packages
download download packages
search search with the given word fragment
versions find versions available for the given package
uninstall uninstall packages
update update an installed package
"""
from __future__ import print_function, annotations
import sys
import os
import ast
import shutil
import types
import contextlib
import requests
import operator
import traceback
import platform
import json
import six
from distutils import log
from distutils.util import convert_path
from fnmatch import fnmatchcase
# noinspection PyUnresolvedReferences
from six.moves import filterfalse
from stashutils.extensions import create_command
from stashutils.wheels import Wheel, wheel_is_compatible
import itertools
from collections.abc import Iterable, Iterator, Mapping
from fnmatch import fnmatchcase
from glob import glob
from pathlib import Path
_stash = globals()["_stash"]
VersionSpecifier = _stash.libversion.VersionSpecifier # alias for readability
SITE_PACKAGES_FOLDER = _stash.libdist.SITE_PACKAGES_FOLDER
OLD_SITE_PACKAGES_FOLDER = _stash.libdist.SITE_PACKAGES_FOLDER_6
BUNDLED_MODULES = _stash.libdist.BUNDLED_MODULES
BLOCKLIST_PATH = os.path.join(os.path.expandvars("$STASH_ROOT"), "data",
"pip_blocklist.json")
PIP_INDEX_FILE = os.path.join(SITE_PACKAGES_FOLDER, "pip_index.json")
PIP_INFO_FILE = os.path.join(SITE_PACKAGES_FOLDER, ".package_info", "%s.json")
# Some packages use wrong name for their dependencies
PACKAGE_NAME_FIXER = {
"lazy_object_proxy": "lazy-object-proxy",
}
NO_OVERWRITE = False
# Utility constants
FLAG_DIST_ALLOW_SRC = 1
FLAG_DIST_ALLOW_WHL = 2
FLAG_DIST_PREFER_SRC = 4
FLAG_DIST_PREFER_WHL = 8
FLAG_IGNORE_BLOCKLIST = 16
DEFAULT_FLAGS = FLAG_DIST_ALLOW_SRC | FLAG_DIST_ALLOW_WHL | FLAG_DIST_PREFER_WHL
def _setup_stub_(*args, **kwargs):
setuptools = sys.modules["setuptools"]
setuptools._setup_params_ = (args, kwargs)
class PipError(Exception):
"""
Baseclass for pip related errors.
"""
pass
class PackageAlreadyInstalled(PipError):
"""
Error raised when a package is already installed.
"""
pass
class PackageBlocklisted(PipError):
"""
Error raised when a package is fataly blocklisted
:param pkg_name: name of blocklisted package
:type pkg_name: str
:param reason: reason for blocklisting
:type reason: str
"""
def __init__(self, pkg_name, reason):
s = "Package '{}' blocklisted. Reason: {}".format(pkg_name, reason)
PipError.__init__(self, s)
class ClassStub(object):
def __init__(self, *args, **kwargs):
self.___attr = {}
self.___item = {}
self.___call = None
def __call__(self, *args, **kwargs):
if self.___call is None:
name = object.__getattribute__(self, '__name__')
self.___call = ClassStub.new(f"{name}.__call__")
return self.___call
def __getattr__(self, item):
if item == '___attr':
return self.___attr
elif item == '___item':
return self.___item
elif item == '___call':
return self.___call
name = object.__getattribute__(self, '__name__')
key = f"{name}.{item}"
print(f"ClassStub: {key}")
attr = self.___attr.get(item)
if attr is None:
attr = ClassStub.new(item)
self.___attr[item] = attr
return attr
def __getitem__(self, item):
print(f'_item {item}')
val = self.___item.get(item)
if val is None and (isinstance(item, int) or isinstance(item, slice)):
raise IndexError()
return self.___item[item]
#def __setitem__(self, item, value):
#self._item[item] = value
#def __delitem__(self, item):
#del self._item[item]
def __mro_entries__(self, bases):
return (self.__class__, )
@staticmethod
def new(name):
class FakeClass(ClassStub):
pass
FakeClass.__name__ = name
return FakeClass
class ModuleStub(types.ModuleType):
def __init__(self, *args, **kwargs):
super(ModuleStub, self).__init__(*args, **kwargs)
self.___attr = {}
def __getattr__(self, item):
if item == '___attr':
return self.___attr
self_name = object.__getattribute__(self, "__name__")
key = f'{self_name}.{item}'
attr = self.___attr.get(item)
if attr is None:
attr = ClassStub.new(item)
self.___attr[item] = attr
#print(f"ModuleStub.___attr: {key} {attr}")
return attr
def new(new_module):
module_defs = {}
module_name = ''
if isinstance(new_module, list):
module_defs = new_module[1]
module_name = new_module[0]
else:
module_name = new_module
module_names = []
module = None
for name in module_name.split("."):
parent_name = ".".join(module_names)
module_names.append(name)
full_name = ".".join(module_names)
module = ModuleStub(full_name, "")
if full_name == module_name:
module.___attr=module_defs
if parent_name != "":
parent_module = sys.modules[parent_name]
if name not in parent_module.__dict__: # cannot use getattr
# print('setting {}'.format(name))
setattr(sys.modules[parent_name], name, module)
if full_name not in list(sys.modules.keys()):
sys.modules[full_name] = module
return module
class ExceptionStub(Exception):
@staticmethod
def new(new_error):
class FakeException(ExceptionStub):
pass
FakeException.__name__ = new_error
return FakeException
class PackageFinder(object):
"""
This class is copied from setuptools
"""
@classmethod
def find(cls, where=".", exclude=(), include=("*", )):
"""Return a list all Python packages found within directory 'where'
'where' should be supplied as a "cross-platform" (i.e. URL-style)
path; it will be converted to the appropriate local path syntax.
'exclude' is a sequence of package names to exclude; '*' can be used
as a wildcard in the names, such that 'foo.*' will exclude all
subpackages of 'foo' (but not 'foo' itself).
'include' is a sequence of package names to include. If it's
specified, only the named packages will be included. If it's not
specified, all found packages will be included. 'include' can contain
shell style wildcard patterns just like 'exclude'.
The list of included packages is built up first and then any
explicitly excluded packages are removed from it.
"""
out = cls._find_packages_iter(convert_path(where))
out = cls.require_parents(out)
includes = cls._build_filter(*include)
excludes = cls._build_filter("ez_setup", "*__pycache__", *exclude)
out = filter(includes, out)
out = filterfalse(excludes, out)
return list(out)
@staticmethod
def require_parents(packages):
"""
Exclude any apparent package that apparently doesn't include its
parent.
For example, exclude 'foo.bar' if 'foo' is not present.
"""
found = []
for pkg in packages:
base, sep, child = pkg.rpartition(".")
if base and base not in found:
continue
found.append(pkg)
yield pkg
@staticmethod
def _candidate_dirs(base_path):
"""
Return all dirs in base_path that might be packages.
"""
has_dot = lambda name: "." in name
for root, dirs, files in os.walk(base_path, followlinks=True):
# Exclude directories that contain a period, as they cannot be
# packages. Mutate the list to avoid traversal.
dirs[:] = filterfalse(has_dot, dirs)
for dir in dirs:
yield os.path.relpath(os.path.join(root, dir), base_path)
@classmethod
def _find_packages_iter(cls, base_path):
candidates = cls._candidate_dirs(base_path)
return (path.replace(os.path.sep, ".") for path in candidates
if cls._looks_like_package(os.path.join(base_path, path)))
@staticmethod
def _looks_like_package(path):
return os.path.isfile(os.path.join(path, "__init__.py"))
@staticmethod
def _build_filter(*patterns):
"""
Given a list of patterns, return a callable that will be true only if
the input matches one of the patterns.
"""
return lambda name: any(fnmatchcase(name, pat=pat) for pat in patterns)
chain_iter = itertools.chain.from_iterable
def _valid_name(path) -> bool:
# Ignore invalid names that cannot be imported directly
return os.path.basename(path).isidentifier()
class _Filter:
"""
Given a list of patterns, create a callable that will be true only if
the input matches at least one of the patterns.
"""
def __init__(self, *patterns):
self._patterns = dict.fromkeys(patterns)
def __call__(self, item):
return any(fnmatchcase(item, pat) for pat in self._patterns)
def __contains__(self, item):
return item in self._patterns
class _Finder:
"""Base class that exposes functionality for module/package finders"""
ALWAYS_EXCLUDE = ()
DEFAULT_EXCLUDE = ()
@classmethod
def find(
cls,
where = '.',
exclude = (),
include = ('*',),
):
"""Return a list of all Python items (packages or modules, depending on
the finder implementation) found within directory ``where``.
``where`` is the root directory which will be searched.
It should be supplied as a "cross-platform" (i.e. URL-style) path;
it will be converted to the appropriate local path syntax.
``exclude`` is a sequence of names to exclude; ``*`` can be used
as a wildcard in the names.
When finding packages, ``foo.*`` will exclude all subpackages of ``foo``
(but not ``foo`` itself).
``include`` is a sequence of names to include.
If it's specified, only the named items will be included.
If it's not specified, all found items will be included.
``include`` can contain shell style wildcard patterns just like
``exclude``.
"""
exclude = exclude or cls.DEFAULT_EXCLUDE
return list(
cls._find_iter(
convert_path(str(where)),
_Filter(*cls.ALWAYS_EXCLUDE, *exclude),
_Filter(*include),
)
)
@classmethod
def _find_iter(cls, where, exclude, include):
raise NotImplementedError
class PackageFinder82_0(_Finder):
"""
Generate a list of all Python packages found within a directory
"""
ALWAYS_EXCLUDE = ("ez_setup", "*__pycache__")
@classmethod
def _find_iter(cls, where, exclude, include):
"""
All the packages found in 'where' that pass the 'include' filter, but
not the 'exclude' filter.
"""
for root, dirs, files in os.walk(str(where), followlinks=True):
# Copy dirs to iterate over it, then empty dirs.
all_dirs = dirs[:]
dirs[:] = []
for dir in all_dirs:
full_path = os.path.join(root, dir)
rel_path = os.path.relpath(full_path, where)
package = rel_path.replace(os.path.sep, '.')
# Skip directory trees that are not valid packages
if '.' in dir or not cls._looks_like_package(full_path, package):
continue
# Should this package be included?
if include(package) and not exclude(package):
yield package
# Early pruning if there is nothing else to be scanned
if f"{package}*" in exclude or f"{package}.*" in exclude:
continue
# Keep searching subdirectories, as there may be more packages
# down there, even if the parent was excluded.
dirs.append(dir)
@staticmethod
def _looks_like_package(path, _package_name):
"""Does a directory look like a package?"""
return os.path.isfile(os.path.join(path, '__init__.py'))
class PEP420PackageFinder(PackageFinder82_0):
@staticmethod
def _looks_like_package(_path, _package_name):
return True
def get_requires(package, index_file=PIP_INDEX_FILE):
"""
get require of the package
:param package: package name
:type package: str
:return: a list of requires package
:rtype: list
"""
if os.path.exists(index_file):
with open(index_file) as f:
index = json.load(f)
try:
return index[package]
except KeyError: # no such package in index file
raise PipError(
"Cannot find packages in index file. Try to using 'pip dev update-index' to update index file"
)
else:
raise PipError(
"Cannot find index file. Try to using 'pip dev update-index' to update index file"
)
def get_req_by(package, index_file=PIP_INDEX_FILE):
"""
get the packages that require this package
:param package: package name
:type package: str
:return: a list of packages that require this package
:rtype: list
"""
if os.path.exists(index_file):
with open(index_file) as f:
index = json.load(f)
required_by = []
for pkg, req in index.items():
if package in req:
required_by.append(pkg)
else:
raise PipError(
"Cannot find index file. Try to using 'pip dev update-index' to update index file"
)
return required_by
def print_info(package,
pip_info_file=PIP_INFO_FILE,
site_packages=SITE_PACKAGES_FOLDER):
info_file = pip_info_file % package
if os.path.exists(info_file):
with open(info_file) as f:
info = json.load(f)
print("Name: {}".format(info["name"]))
print("Version: {}".format(info["version"]))
print("Summary: {}".format(info["summary"]))
print("Home-page: {}".format(info["project_urls"]["Homepage"]))
print("Author: {}".format(info["author"]))
print("Author-email: {}".format(info["author_email"]))
print("License: {}".format(info["license"]))
print("Location: {}".format(site_packages))
requires = get_requires(package)
required_by = get_req_by(package)
print("requires: {}".format(", ".join(requires)))
print("required-by: {}".format(", ".join(required_by)))
else: # no info_file
print(
_stash.text_color("Package not found: {}".format(package),
"yellow"))
def download_info(pkg_name,
pip_info_file=PIP_INFO_FILE,
site_packages=SITE_PACKAGES_FOLDER):
info_file = pip_info_file % pkg_name
r = requests.get("https://pypi.python.org/pypi/{}/json".format(pkg_name))
info = r.json()["info"]
info_folder = os.path.split(info_file)[0]
if not os.path.exists(info_folder):
os.mkdir(info_folder)
with open(info_file, "w") as f:
json.dump(info, f)
update_req_index()
def update_req_index(
pip_info_file=PIP_INFO_FILE,
site_packages=SITE_PACKAGES_FOLDER,
index_file=PIP_INDEX_FILE,
):
"""
update package requires index file
"""
repository = get_repository("pypi", site_packages=site_packages)
info_list = repository.list()
req_index = {
} # a dict of {package:requires_list} type:dict of {str:list of str}
for package, info in info_list:
info_file = pip_info_file % package
if os.path.exists(info_file):
# load info
with open(info_file) as f:
info = json.load(f)
# filte requires
requires = []
try:
for req in info["requires_dist"]:
if ";" not in req:
# Remove package version
requires.append(req.split(" ")[0])
except TypeError: # some package may have no require
pass
req_index[package] = requires
else: # info file not exists
print(
_stash.text_color(
"Info file of Package {} not found".format(package),
"yellow"))
with open(index_file, "w") as f:
json.dump(req_index, f)
def fake_setuptools_modules():
"""
Create a bunch of stub setuptools modules
"""
find_packages = PackageFinder.find
find_namespace_packages = find_packages # backport PEP420PackageFinder instead?
if sys.version_info>=(3,10,0):
find_packages = PackageFinder82_0.find
find_namespace_packages = PEP420PackageFinder.find
setuptools_modules = [
[
"setuptools", {
"_setup_params_": ((), {}),
"find_packages": find_packages,
"find_namespace_packages": find_namespace_packages
}
],
"setuptools.command",
"setuptools.command.alias",
"setuptools.command.bdist_egg",
"setuptools.command.bdist_rpm",
"setuptools.command.bdist_wininst",
"setuptools.command.build_ext",
"setuptools.command.build_py",
"setuptools.command.develop",
"setuptools.command.easy_install",
"setuptools.command.egg_info",
"setuptools.command.install",
"setuptools.depends.install_egg_info",
"setuptools.command.install_lib",
"setuptools.command.install_scripts",
"setuptools.command.register",
"setuptools.command.rotate",
"setuptools.command.saveopts",
"setuptools.command.sdist",
"setuptools.command.setopt",
"setuptools.command.test",
"setuptools.command.upload",
"setuptools.command.upload_docs",
["setuptools.errors", {
"CCompilerError": ExceptionStub.new("setuptools.errors.CCompilerError"),
"ExecError": ExceptionStub.new("setuptools.errors.ExecError"),
"PlatformError": ExceptionStub.new("setuptools.errors.PlatformError"),
}],
"setuptools.extern",
"setuptools.dist",
"setuptools.extension",
"setuptools.launch",
"setuptools.lib2to3_ex",
"setuptools.msvc9_support",
"setuptools.package_index",
"setuptools.py26compat",
"setuptools.py27compat",
"setuptools.py31compat",
"setuptools.sandbox",
"setuptools.site-patch",
"setuptools.ssl_support",
"setuptools.unicode_utils",
"setuptools.utils",
"setuptools.version",
"setuptools.windows_support",
# 'pkg_resources',
# 'pkg_resources.extern',
]
for m in setuptools_modules:
ModuleStub.new(m)
# First import importable distutils
import distutils.command
import distutils.core
import distutils.util
distutils_command_modules = [
"distutils.command.bdistdistutils.command.bdist_dumb",
"distutils.command.bdist_msi",
"distutils.command.bdist_rpm",
"distutils.command.bdist_wininst",
"distutils.command.build",
"distutils.command.build_clib",
"distutils.command.build_ext",
"distutils.command.build_py",
"distutils.command.build_scripts",
]
for m in distutils_command_modules:
ModuleStub.new(m)
sys.modules["distutils.util"].get_platform = ClassStub.new("distutils.util")
# fix for new problem in issue 169
sys.modules["distutils.command.build_ext"].sub_commands = []
sys.modules["setuptools.command.build_ext"].sub_commands = []
def ensure_pkg_resources():
try:
import pkg_resources
except ImportError:
try:
print("Approximating pkg_resources ...")
GitHubRepository().install("ywangd/pkg_resources", None)
except: # silently fail as it may not be important or necessary
pass
@contextlib.contextmanager
def save_current_sys_modules():
"""
Save the current sys modules and restore them when processing is over
"""
# saved_sys_modules = dict(sys.modules)
save_setuptools = {}
for name in sorted(sys.modules.keys()):
if name == "setuptools" or name.startswith("setuptools."):
save_setuptools[name] = sys.modules.pop(name)
yield
# sys.modules = saved_sys_modules
for name in sorted(sys.modules.keys()):
if name == "setuptools" or name.startswith("setuptools."):
sys.modules.pop(name)
for k, v in save_setuptools.items():
sys.modules[k] = v
# warning: the ConfigParser may refer to a different class depening on the used py version
# though I believe that pip does not use interpolation, so we *should* be safe
# noinspection PyUnresolvedReferences
from six.moves.configparser import ConfigParser, NoSectionError
class CIConfigParer(ConfigParser):
"""
This config parser is case insensitive for section names so that
the behaviour matches pypi queries.
"""
def _get_section_name(self, name):
for section_name in self.sections():
if section_name.lower() == name.lower():
return section_name
else:
raise NoSectionError(name)
def has_section(self, name):
names = [n.lower() for n in self.sections()]
return name.lower() in names
def has_option(self, name, option_name):
section_name = self._get_section_name(name)
return ConfigParser.has_option(self, section_name, option_name)
def items(self, name):
section_name = self._get_section_name(name)
return ConfigParser.items(self, section_name)
def get(self, name, option_name, *args, **kwargs):
section_name = self._get_section_name(name)
return ConfigParser.get(self, section_name, option_name, *args,
**kwargs)
def set(self, name, option_name, value):
section_name = self._get_section_name(name)
return ConfigParser.set(self, section_name, option_name,
value.replace("%", "%%"))
def remove_section(self, name):
section_name = self._get_section_name(name)
return ConfigParser.remove_section(self, section_name)
class PackageConfigHandler(object):
"""
Manager class for packages files for tracking installation of modules
"""
def __init__(self, site_packages=SITE_PACKAGES_FOLDER, verbose=False):
self.verbose = verbose
self.site_packages = site_packages
self.package_cfg = os.path.join(site_packages, ".pypi_packages")
if not os.path.isfile(self.package_cfg):
if self.verbose:
print("Creating package file...")
with open(self.package_cfg, "w") as outs:
outs.close()
self.parser = CIConfigParer()
self.parser.read(self.package_cfg)
def save(self):
with open(self.package_cfg, "w") as outs:
self.parser.write(outs)
def add_module(self, pkg_info):
"""
:param pkg_info: A dict that has name, url, version, summary
:return:
"""
if not self.parser.has_section(pkg_info["name"]):
self.parser.add_section(pkg_info["name"])
self.parser.set(pkg_info["name"], "url", pkg_info["url"])
self.parser.set(pkg_info["name"], "version", pkg_info["version"])
self.parser.set(pkg_info["name"], "summary", pkg_info["summary"])
self.parser.set(pkg_info["name"], "files", pkg_info["files"])
self.parser.set(pkg_info["name"], "dependency", pkg_info["dependency"])
self.save()
def list_modules(self):
return [module for module in self.parser.sections()]
def module_exists(self, name):
return self.parser.has_section(name)
def get_info(self, name):
if self.parser.has_section(name):
tbl = {}
for opt, value in self.parser.items(name):
tbl[opt] = value
return tbl
def remove_module(self, name):
self.parser.remove_section(name)
self.save()
def get_files_installed(self, section_name):
if self.parser.has_option(section_name, "files"):
files = self.parser.get(section_name, "files").strip()
return files.split(",")
else:
return None
def get_dependencies(self, section_name):
if self.parser.has_option(section_name, "dependency"):
dependencies = self.parser.get(section_name, "dependency").strip()
return set(
dependencies.split(",")) if dependencies != "" else set()
else:
return None
def get_all_dependencies(self, exclude_module=()):
all_dependencies = set()
for section_name in self.parser.sections():
if section_name not in exclude_module and self.parser.has_option(
section_name, "dependency"):
dependencies = self.parser.get(section_name,
"dependency").strip()
if dependencies != "":
for dep in dependencies.split(","):
all_dependencies.add(dep)
return all_dependencies
# noinspection PyPep8Naming,PyProtectedMember
class ArchiveFileInstaller(object):
"""
Package Installer for archive files, e.g. zip, gz, bz2
"""
class SetupTransformer(ast.NodeTransformer):
"""
Analyze and Transform AST of a setup file.
1. Create empty modules for any setuptools imports (if it is not already covered)
2. replace setup calls with a stub one to get values of its arguments
"""
def visit_Import(self, node):
for idx, alias in enumerate(node.names):
if alias.name.startswith("setuptools"):
ModuleStub.new(alias.name)
return node
def vist_ImportFrom(self, node):
if node.module.startswith("setuptools"):
ModuleStub.new(node.module)
return node
def visit_Call(self, node):
func_name = self._get_possible_setup_name(node)
print("Setup name: {}".format(func_name))
if func_name is not None and (func_name == "setup"
or func_name.endswith(".setup")):
node.func = ast.copy_location(
ast.Name("_setup_stub_", ast.Load()), node.func)
return node
def _get_possible_setup_name(self, node):
names = []
func = node.func
while isinstance(func, ast.Attribute):
names.append(func.attr)
func = func.value
if isinstance(func, ast.Name):
names.append(func.id)
return ".".join(reversed(names))
else:
return None
def __init__(self, site_packages=SITE_PACKAGES_FOLDER, verbose=False):
self.site_packages = site_packages
self.verbose = verbose
def run(self, pkg_name, archive_filename, extras=[]):
"""
Main method for Installer to do its job.
:param pkg_name: name of package
:type pkg_name: str
:param archive_filename: path to archive
:type param: str
:param extras: extras to install
:type extras: list of str
:return: tuple of (files installed, dependencies)
:rtype: tuple of (list of str, list of str)
"""
extracted_folder = self._unzip(pkg_name, archive_filename)
try:
# locate the setup file
src_dir = os.path.join(extracted_folder,
os.listdir(extracted_folder)[0])
setup_filename = os.path.join(src_dir, "setup.py")
try:
print("Running setup file ...")
return self._run_setup_file(setup_filename, extras=extras)
except Exception as e:
print("{!r}".format(e))
print("Failed to run setup.py")
if self.verbose:
# print traceback
print("")
traceback.print_exc()
print("")
print("Fall back to directory guessing ...")
pkg_name = pkg_name.lower().replace("-", "_")
if os.path.isdir(os.path.join(src_dir, pkg_name)):
ArchiveFileInstaller._safe_move(
os.path.join(src_dir, pkg_name),
os.path.join(self.site_packages, pkg_name),
)
return [os.path.join(self.site_packages, pkg_name)], []
elif os.path.isfile(os.path.join(src_dir, pkg_name + ".py")):
ArchiveFileInstaller._safe_move(
os.path.join(src_dir, pkg_name + ".py"),
os.path.join(self.site_packages, pkg_name + ".py"),
)
return [
os.path.join(self.site_packages, pkg_name + ".py")
], []
elif os.path.isdir(os.path.join(src_dir, "src", pkg_name)):
ArchiveFileInstaller._safe_move(
os.path.join(src_dir, "src", pkg_name),
os.path.join(self.site_packages, pkg_name),
)
return [os.path.join(self.site_packages, pkg_name)], []
else:
raise PipError(
"Cannot locate packages. Manual installation required."
)
finally:
shutil.rmtree(extracted_folder)
os.remove(archive_filename)
def _unzip(self, pkg_name, archive_filename):
import uuid
print("Extracting archive file ...")
extracted_folder = os.path.join(os.getenv("TMPDIR"), uuid.uuid4().hex)
os.mkdir(extracted_folder)
if ".zip" in archive_filename:
d = os.path.join(extracted_folder, pkg_name)
os.mkdir(d)
_stash("unzip -d {} {}".format(d, archive_filename))
elif ".bz2" in archive_filename:
_stash("tar -C {} -jxf {}".format(extracted_folder,
archive_filename))
else: # gzip
_stash("tar -C {} -zxf {}".format(extracted_folder,
archive_filename))
return extracted_folder
def _run_setup_file(self, filename, extras=[]):
"""
Transform and Run AST of the setup file
:param filename: file to run
:type filename: str
:param extras: extras to install
:type extras: list of str
:return: tuple of (files installed, dependencies)
:rtype: tuple of (list of str, list of str)
"""
print("Running setup file: {}...".format(filename))
try:
import pkg_resources
except ImportError:
# pkg_resources install may be in progress
pkg_resources = None
namespace = {
"_setup_stub_": _setup_stub_,
"__file__": filename,
"__name__": "__main__",
"setup_args": None,
"setup_kwargs": None,
}
source_folder = os.path.dirname(filename)
tree = ArchiveFileInstaller._get_cooked_ast(filename)
codeobj = compile(tree, filename, "exec")
# Some setup files import the package to be installed and sometimes opens a file
# in the source folder. So we modify sys.path and change directory into source folder.
saved_cwd = os.getcwd()
saved_sys_path = sys.path[:]
os.chdir(source_folder)
sys.path.insert(0, source_folder)
try:
exec(codeobj, namespace, namespace)
finally:
os.chdir(saved_cwd)
sys.path = saved_sys_path
args, kwargs = sys.modules["setuptools"]._setup_params_
#for k in sorted(kwargs.keys()): print('{}: {!r}'.format(k, kwargs[k]))
if "ext_modules" in kwargs:
ext = kwargs["ext_modules"]
if (ext is not None) and (len(ext) > 0):
print("WARNING: Extension modules are skipped: {}".format(ext))
packages = kwargs["packages"] if "packages" in kwargs else None
py_modules = kwargs["py_modules"] if "py_modules" in kwargs else None
if not packages and not py_modules:
saved_cwd = os.getcwd()
saved_sys_path = sys.path[:]
os.chdir(source_folder)
sys.path.insert(0, source_folder)
try:
packages = [] # PEP420PackageFinder.find()
finally:
os.chdir(saved_cwd)
sys.path = saved_sys_path
if len(packages) == 0:
raise PipError("failed to find packages or py_modules arguments in setup call")
py_modules = kwargs["py_modules"] if "py_modules" in kwargs else []
package_dirs = kwargs.get("package_dir", {})
use_2to3 = kwargs.get("use_2to3", False) and six.PY3
files_installed = []
# handle scripts
# we handle them before the packages because they may be moved
# while handling the packages
scripts = kwargs.get("scripts", [])
for script in scripts:
if self.verbose:
print("Handling commandline script: {s}".format(s=script))
cmdname = script.replace(os.path.dirname(script),
"").replace("/", "")
if "." not in cmdname:
cmdname += ".py"
scriptpath = os.path.join(source_folder, script)
with open(scriptpath, "r") as fin:
content = fin.read()
cmdpath = create_command(cmdname, content)
files_installed.append(cmdpath)
packages = ArchiveFileInstaller._consolidated_packages(packages)
for p in sorted(packages): # folders or files under source root
if p == "": # no packages just files
from_folder = os.path.join(source_folder,
package_dirs.get(p, ""))
for f in ArchiveFileInstaller._find_package_files(from_folder):
target_file = os.path.join(self.site_packages, f)
ArchiveFileInstaller._safe_move(
os.path.join(from_folder, f), target_file)
files_installed.append(target_file)
if use_2to3:
_stash("2to3 -w {} > /dev/null".format(target_file))
else: # packages
target_dir = os.path.join(self.site_packages, p)
if p in package_dirs:
ArchiveFileInstaller._safe_move(
os.path.join(source_folder, package_dirs[p]),
target_dir)
elif "" in package_dirs:
ArchiveFileInstaller._safe_move(
os.path.join(source_folder, package_dirs[""], p),
target_dir)
else:
ArchiveFileInstaller._safe_move(
os.path.join(source_folder, p), target_dir)
files_installed.append(target_dir)
if use_2to3:
_stash(
"""find {} --name '.py' | xargs -n 1 -I %% 2to3 -w %% > /dev/null"""
.format(target_dir))
py_modules = ArchiveFileInstaller._consolidated_packages(py_modules)
for p in sorted(
py_modules
): # files or folders where the file resides, e.g. ['file', 'folder.file']
if "" in package_dirs:
p = os.path.join(package_dirs[""], p)
if os.path.isdir(os.path.join(source_folder, p)): # folder
target_dir = os.path.join(self.site_packages, p)
ArchiveFileInstaller._safe_move(os.path.join(source_folder, p),
target_dir)
files_installed.append(target_dir)
if use_2to3:
_stash(
"""find {} --name '.py' | xargs -n 1 -I %% 2to3 -w %% > /dev/null"""
.format(target_dir))
else: # file
target_file = os.path.join(self.site_packages, p + ".py")
ArchiveFileInstaller._safe_move(
os.path.join(source_folder, p + ".py"), target_file)
files_installed.append(target_file)
if use_2to3:
_stash("2to3 -w {} > /dev/null".format(target_file))
# handle entry points
entry_points = kwargs.get("entry_points", {})
if isinstance(entry_points, (six.binary_type, six.text_type)):
if pkg_resources is not None:
entry_points = {
s: c
for s, c in pkg_resources.split_sections(entry_points)
}
else:
print(
"Warning: pkg_resources not available, skipping entry_points definitions."
)
entry_points = {}
for epn in entry_points:
if self.verbose:
print("Handling entrypoints for: " + epn)
ep = entry_points[epn]
if isinstance(ep, (six.binary_type, six.text_type)):
ep = [ep]
if epn == "console_scripts":
for dec in ep:
name, loc = dec.replace(" ", "").split("=")
modname, funcname = loc.split(":")
if not name.endswith(".py"):
name += ".py"
desc = kwargs.get("description", "")
path = create_command(
name,
("""'''%s'''
from %s import %s
if __name__ == "__main__":
%s()
""" % (desc, modname, funcname, funcname)).encode("utf-8"),
)
files_installed.append(path)
else:
print("Warning: passing entry points for '{n}'.".format(n=epn))
# Recursively Handle dependencies
dependencies = kwargs.get("install_requires", [])
if isinstance(dependencies, (six.text_type, six.binary_type)):
# must be split into lines
dependencies = dependencies.splitlines()
# add extra dependencies
extra_req = kwargs.get("extras_require", [])
for en in extra_req:
if en in extras:
dependencies += list(extra_req.get(en, []))
return files_installed, dependencies
@staticmethod
def _get_cooked_ast(filename):
"""
Get AST of the setup file and also transform it for fake setuptools
and stub setup calls.
"""
# with codecs.open(filename, mode="r", encoding="UTF-8") as ins:
# s = ins.read()
with open(filename, "rb") as ins:
tree = ast.parse(ins.read(), filename=filename, mode="exec")
ArchiveFileInstaller.SetupTransformer().visit(tree)
return tree
@staticmethod
def _consolidated_packages(packages):
packages = sorted(packages)
consolidated = set()
for pkg in packages:
if "." in pkg:
consolidated.add(pkg.split(".")[0]) # append the root folder
elif "/" in pkg:
consolidated.add(pkg.split("/")[0])
else:
consolidated.add(pkg)
return consolidated
@staticmethod
def _find_package_files(directory):
files = []
for f in os.listdir(directory):
if f.endswith(".py") and f != "setup.py" and not os.path.isdir(f):
files.append(f)
return files
@staticmethod
def _safe_move(src, dest):
if not os.path.exists(src):
raise PipError("cannot locate source folder/file: {}".format(src))
if os.path.exists(dest):
if NO_OVERWRITE:
raise PipError(
"cannot overwrite existing target, manual removal required: {}"
.format(dest))
else:
if os.path.isdir(dest):
shutil.rmtree(dest)
else:
os.remove(dest)
shutil.move(src, dest)
# package_config_handler = PackageConfigHandler()
# archive_file_installer = ArchiveFileInstaller()
class PackageRepository(object):
"""
A Package Repository is a manager class to perform various actions
related to a package.
This is a base class providing basic layout of a Repository.
"""
def __init__(self, site_packages=SITE_PACKAGES_FOLDER, verbose=False):
self.site_packages = site_packages
self.verbose = verbose
self.config = PackageConfigHandler(site_packages=self.site_packages,
verbose=self.verbose)
self.installer = ArchiveFileInstaller(site_packages=self.site_packages,
verbose=self.verbose)
def versions(self, pkg_name):
raise PipError("versions only available for PyPI packages")
def download(self, pkg_name, ver_spec):
raise PipError("Action Not Available: download")
def install(self, pkg_name, ver_spec, flags=DEFAULT_FLAGS, extras=[]):
raise PipError("Action Not Available: install")
def _install(
self,
pkg_name,
pkg_info,
archive_filename,
dependency_flags=DEFAULT_FLAGS,
extras=[],
):
if archive_filename.endswith(".whl"):
print("Installing wheel: {}...".format(
os.path.basename(archive_filename)))
wheel = Wheel(archive_filename,
verbose=self.verbose,
extras=extras)
files_installed, dependencies = wheel.install(self.site_packages)
else:
files_installed, dependencies = self.installer.run(
pkg_name, archive_filename, extras=extras)
# never install setuptools as dependency
dependencies = [
dependency for dependency in dependencies
if dependency != "setuptools"
]
name_versions = [
VersionSpecifier.parse_requirement(requirement)
for requirement in dependencies
]
# filter (None, ...)
name_versions = list(filter(lambda e: e[0] is not None, name_versions))
sys.modules["setuptools"]._installed_requirements_.append(pkg_name)
pkg_info["files"] = ",".join(files_installed)
pkg_info["dependency"] = ",".join(name_version[0]
for name_version in name_versions)
self.config.add_module(pkg_info)
print("Package installed: {}".format(pkg_name))
for dep_name, ver_spec, extras in name_versions:
if dep_name.strip().startswith("#") or len(dep_name.strip()) == 0:
# not a dependency
continue
if dep_name == "setuptools": # do not install setuptools
continue
# Some packages have error on dependency names
dep_name = PACKAGE_NAME_FIXER.get(dep_name, dep_name)
# If this dependency is installed before, skipping
# TODO: should we NOT skip if extras are specified?
if dep_name in sys.modules["setuptools"]._installed_requirements_:
print("Dependency already installed: {}".format(dep_name))
continue
if dep_name in BUNDLED_MODULES:
print("Dependency already bundled in distribution: {}".format(
dep_name))
continue
print("Installing dependency: {} (required by: {})".format(
"{}{}".format(dep_name, ver_spec if ver_spec else ""),
pkg_name))
repository = get_repository(dep_name, verbose=self.verbose)
try:
repository.install(dep_name,
ver_spec,
flags=dependency_flags,
extras=extras)
except PackageAlreadyInstalled:
print("AlreadyInstalled")
# well, it is already installed...
# TODO: maybe update package if required?
pass
def search(self, name_fragment):
raise PipError("search only available for PyPI packages")
def list(self):
modules = self.config.list_modules()
return [(module, self.config.get_info(module)) for module in modules]
def remove(self, pkg_name):
if self.config.module_exists(pkg_name):
dependencies = self.config.get_dependencies(pkg_name)
other_dependencies = self.config.get_all_dependencies(
exclude_module=(pkg_name, ))
files_installed = self.config.get_files_installed(pkg_name)
if files_installed:
for f in files_installed:
if os.path.isdir(f):
shutil.rmtree(f)
elif os.path.isfile(f):
os.remove(f)
else:
print(
"Package may have been removed externally without using pip. Deleting from registry ..."
)
else:
if os.path.isdir(
os.path.expanduser(
"~/Documents/site-packages/{}".format(
pkg_name.lower()))):
shutil.rmtree(
os.path.expanduser(
"~/Documents/site-packages/{}".format(
pkg_name.lower())))
elif os.path.isfile(
os.path.expanduser(
"~/Documents/site-packages/{}.py".format(
pkg_name.lower()))):
os.remove(
os.path.expanduser(
"~/Documents/site-packages/{}.py".format(
pkg_name.lower())))
else:
print(
"Package may have been removed externally without using pip. Deleting from registry ..."
)
self.config.remove_module(pkg_name)
print("Package removed.")
if dependencies:
for dependency in dependencies:
# If not other packages depend on it, it may be subject to removal
if dependency not in other_dependencies:
# Only remove the module if it exists in the registry. Otherwise
# it is possibly a builtin module.
# For backwards compatibility, we do not remove any entries
# that do not have a dependency option (since they are manually
# installed before).
if (self.config.module_exists(dependency)
and self.config.get_dependencies(dependency)
is not None):
print("Removing dependency: {}".format(dependency))
self.remove(dependency)
else:
raise PipError("package not installed: {}".format(pkg_name))
def update(self, pkg_name):
if self.config.module_exists(pkg_name):
raise PipError(
"update only available for packages installed from PyPI")
else:
PipError("package not installed: {}".format(pkg_name))
class PyPIRepository(PackageRepository):
"""
This repository performs its actions using the PyPI as a backend store.
"""
def __init__(self, *args, **kwargs):
super(PyPIRepository, self).__init__(*args, **kwargs)
try:
import xmlrpclib
except ImportError:
# py3
import xmlrpc.client as xmlrpclib
# DO NOT USE self.pypi, it's there just for search, it's obsolete/legacy
self.pypi = xmlrpclib.ServerProxy("https://pypi.python.org/pypi")
self.standard_package_names = {}
def _check_blocklist(self, pkg_name):
"""
Check if a package is blocklisted.
The result is a tuple:
- element 0 is True if the package is blocklisted
- element 1 is the reason
- element 2 is True if the install should fail due to this
- element 3 is an optional alternative package to use instead.
:param pkg_name: name of package to check
:type pkg_name: str
:return: a tuple of (blocklisted, reason, fatal, alt).
:rtype: (bool, str, bool, str or None)
"""
if (BLOCKLIST_PATH is None) or (not os.path.exists(BLOCKLIST_PATH)):
# blocklist not available
return (False, "", False, None)
with open(BLOCKLIST_PATH) as fin:
content = json.load(fin)
if pkg_name not in content["blocklist"]:
# package not blocklisted
return (False, "", False, None)
else:
# package blocklisted
reasonid, fatal, alt = content["blocklist"][pkg_name]
reason = content["reasons"].get(reasonid, reasonid)
return (True, reason, fatal, alt)
def get_standard_package_name(self, pkg_name):
if pkg_name not in self.standard_package_names:
try:
r = requests.get(
"https://pypi.python.org/pypi/{}/json".format(pkg_name))
self.standard_package_names[pkg_name] = r.json(
)["info"]["name"]
except:
return pkg_name
return self.standard_package_names[pkg_name]
def search(self, pkg_name):
pkg_name = self.get_standard_package_name(pkg_name)
# XML-RPC replacement would be tricky, because we probably
# have to use simplified / cached index to search in, can't
# find JSON API to search for packages
hits = self.pypi.search({"name": pkg_name}, "and")
if not hits:
raise PipError("No matches found: {}".format(pkg_name))
hits = sorted(hits,
key=lambda pkg: pkg["_pypi_ordering"],
reverse=True)
return hits
def _package_data(self, pkg_name):
r = requests.get(
"https://pypi.python.org/pypi/{}/json".format(pkg_name))
if not r.status_code == requests.codes.ok:
raise PipError("Failed to fetch package release urls")
return r.json()
def _package_releases(self, pkg_data):
return pkg_data["releases"].keys()
def _package_latest_release(self, pkg_data):
return pkg_data["info"]["version"]
def _package_downloads(self, pkg_data, hit):
return pkg_data["releases"][hit]
def _package_info(self, pkg_data):
return pkg_data["info"]
def versions(self, pkg_name):
pkg_name = self.get_standard_package_name(pkg_name)
pkg_data = self._package_data(pkg_name)
releases = self._package_releases(pkg_data)
if not releases:
raise PipError("No matches found: {}".format(pkg_name))
return releases
def download(self, pkg_name, ver_spec, flags=DEFAULT_FLAGS):
print("Querying PyPI ... ")
pkg_name = self.get_standard_package_name(pkg_name)
pkg_data = self._package_data(pkg_name)
hit = self._determin_hit(pkg_data, ver_spec, flags=flags)
if self.verbose:
print("Using {n}=={v}...".format(n=pkg_name, v=hit))
downloads = self._package_downloads(pkg_data, hit)
if not downloads:
raise PipError("No download available for {}: {}".format(
pkg_name, hit))
source = None
wheel = None
for download in downloads:
if any((suffix in download["url"])
for suffix in (".zip", ".bz2", ".gz")):
source = download
# break
if ".whl" in download["url"]:
fn = download["url"][download["url"].rfind("/") + 1:]
if wheel_is_compatible(fn):
wheel = download
target = None
if source is not None and (flags & FLAG_DIST_ALLOW_SRC > 0):
# source is available and allowed
if (wheel is None or (flags & FLAG_DIST_ALLOW_WHL
== 0)) or (flags & FLAG_DIST_PREFER_SRC > 0):
# no wheel is available or source is prefered
# use source
if self.verbose:
print(
"A source distribution is available and will be used.")
target = source
elif flags & FLAG_DIST_ALLOW_WHL > 0:
# a wheel is available and allowed and source is not preffered
# use wheel
if self.verbose:
print(
"A binary distribution is available and will be used.")
target = wheel
elif wheel is not None and (flags & FLAG_DIST_ALLOW_WHL > 0):
# source is not available or allowed, but a wheel is available and allowed
# use wheel
if self.verbose:
print(
"No source distribution found, but a binary distribution was found and will be used."
)
target = wheel
if target is None:
if self.verbose:
print("No allowed distribution found!")
if wheel is not None and (flags & FLAG_DIST_ALLOW_WHL == 0):
print(
"However, a wheel is available. Maybe try without '--no-binary' or with '--only-binary :all:'?"
)
if source is not None and (flags & FLAG_DIST_ALLOW_SRC == 0):
print(
"However, a source distribution is available. Maybe try with '--no-binary :all:'?"
)
raise PipError(
"No allowed distribution found for '{}': {}!".format(
pkg_name, hit))
pkg_info = self._package_info(pkg_data)
pkg_info["url"] = "pypi"
print("Downloading package ...")
worker = _stash("wget {} -o $TMPDIR/{}".format(target["url"],
target["filename"]))
if worker.state.return_value != 0:
raise PipError("failed to download package from {}".format(
target["url"]))
return os.path.join(os.getenv("TMPDIR"), target["filename"]), pkg_info
def install(
self,
pkg_name,
ver_spec,
flags=DEFAULT_FLAGS,
pip_info_file=PIP_INFO_FILE,
extras=[],
):
pkg_name = self.get_standard_package_name(pkg_name)
# check if package is blocklisted
# we only do this for PyPI installs, since non-PyPI installs
# may have the same pkg name for a different package.
# TODO: should this be changed?
blocklisted, reason, fatal, alt = self._check_blocklist(pkg_name)
if blocklisted and not (flags & FLAG_IGNORE_BLOCKLIST > 0):
if fatal:
# raise an exception.
print(
_stash.text_color(
"Package {} is blocklisted and marked fatal. Failing install."
.format(pkg_name),
"red",
))
print(_stash.text_color("Reason: " + reason, "red"))
raise PackageBlocklisted(pkg_name, reason)
elif alt is not None:
# an alternative package exposing the same functionality
# and API is known. Print a warning and use this instead.
print(
_stash.text_color(
"Warning: Using {} instead of {}".format(
alt, pkg_name),
"yellow",
))
print("Reason: " + reason)
pkg_name = alt
# use an empty VersionSpecifier to mark any version as acceptable
ver_spec = _stash.libversion.VersionSpecifier()
# do not use extras. We can not be sure that the package provide the same extras.
extras = []
else:
# this package is probably bundled with pythonista
# we should print a warning, but continue anyway
print(
_stash.text_color(
"Warning: package '{}' is blocklisted, but marked as non-fatal."
.format(pkg_name),
"yellow",
))
print(
"This probably means that the dependency can not be installed, but pythonista ships with the package preinstalled."
)
print("Reason for blocklisting: " + reason)
return
if not self.config.module_exists(pkg_name):
archive_filename, pkg_info = self.download(pkg_name,
ver_spec,
flags=flags)
print("Installing {}@{} from {} ...".format(pkg_name, ver_spec if ver_spec is not None else "latest", archive_filename))
self._install(
pkg_name,
pkg_info,
archive_filename,
dependency_flags=flags,
extras=extras,
)
# save json file of info
info_file = pip_info_file % pkg_name
info_folder = os.path.split(info_file)[0]
if not os.path.exists(info_folder):
os.mkdir(info_folder)
with open(info_file, "w") as f:
json.dump(pkg_info, f)
else:
# todo: maybe update package?
raise PackageAlreadyInstalled("Package already installed")
def update(self, pkg_name):
pkg_name = self.get_standard_package_name(pkg_name)
if self.config.module_exists(pkg_name):
pkg_data = self._package_data(pkg_name)
hit = self._package_latest_release(pkg_data)
current = self.config.get_info(pkg_name)
if not current["version"] == hit:
print("Updating {}".format(pkg_name))
self.remove(pkg_name)
self.install(pkg_name, VersionSpecifier((("==", hit), )))
else:
print("Package already up-to-date.")
else:
raise PipError("package not installed: {}".format(pkg_name))
def _determin_hit(self, pkg_data, ver_spec, flags=None):
"""
Find a release for a package matching a specified version.
:param pkg_data: the package information
:type pkg_data: dict
:param ver_spec: the version specification
:type ver_spec: VersionSpecifier
:param flags: (distribution) options
:type flags: int or None
:return: a version matching the specified version
:rtype: str
"""
pkg_name = pkg_data["info"]["name"]
latest = self._package_latest_release(pkg_data)
# create a sorted list of versions, newest fist.
# we manualle add the latest release in front to improve the chances of finding
# the most recent compatible version
versions = [latest] + _stash.libversion.sort_versions(
self._package_releases(pkg_data))
for hit in versions:
# we return the fist matching hit, so we should sort the hits by descending version
if (flags is not None) and not self._dist_flags_allows_release(
flags, pkg_data, hit):
# hit has no source/binary release and is not allowed by dis
continue
if not self._release_matches_py_version(pkg_data, hit):
# hit contains no compatible releases
continue
if ver_spec is None or ver_spec.match(hit):
# version is allowed
return hit
else:
raise PipError("Version not found: {}{}".format(
pkg_name, ver_spec if ver_spec is not None else ""))
def _release_matches_py_version(self, pkg_data, release):
"""
Check if a release is compatible with the python version.
:param pkg_data: package information
:type pkg_data: dict
:param release: the release to check
:type release: str
:return: whether the releases matches the python version
:rtype: boolean
"""
had_v = False
has_source = False
downloads = self._package_downloads(pkg_data, release)
for download in downloads:
requires_python = download.get("requires_python", None)
if requires_python is not None:
reqs = "python" + requires_python
name, ver_spec, extras = VersionSpecifier.parse_requirement(
reqs)
assert name == "python" # if this if False some large bug happened...
if ver_spec.match(platform.python_version()):
# compatible
return True
else:
# fallback
# TODO: do we require this?
pv = download.get("python_version", None)
if pv is None:
continue
elif pv in ("py2.py3", "py3.py2"):
# compatible with both py versions
return True
elif pv.startswith("2") or pv == "py2":
# py2 release
if not six.PY3:
return True
elif pv.startswith("3") or pv == "py3":
# py3 release
if six.PY3:
return True
elif pv == "source":
# i honestly have no idea what this means
# i first assumed it means "this source is compatible with both", so just return True
# however, this seems to be wrong. Instead, we use this as a fallback yes
has_source = True
had_v = True
if had_v:
# no allowed downloads found
return has_source
else:
# none found, maybe pypi changed
# in this case, just return True
# we did it before without these checks and it worked *most* of the time, so missing this check is not horrible...
return True
def _dist_flags_allows_release(self, flags, pkg_data, release):
"""
Check if a release is allowed by the distribution flags.
:param flags: the (distribution) flags
:type flags: int
:param pkg_data: package information
:type pkg_data: dict
:param release: the release to check
:type release: str
:return: whether the flags allow this release or not
:rtype: boolean
"""
downloads = self._package_downloads(pkg_data, release)
for download in downloads:
pt = download.get("packagetype", None)
if pt is None:
continue
elif pt in ("source", "sdist"):
# source distribution
if flags & FLAG_DIST_ALLOW_SRC > 0:
return True
elif pt in ("bdist_wheel", "wheel", "whl"):
# wheel
if flags & FLAG_DIST_ALLOW_WHL > 0:
return True
# no allowed downloads found
return False
class GitHubRepository(PackageRepository):
"""
This repository performs actions using GitHub as a backend store.
"""
def _get_release_from_version_specifier(self, ver_spec):
if isinstance(ver_spec, VersionSpecifier):
try:
for op, ver in ver_spec.specs:
if op == operator.eq:
return ver
except ValueError:
raise PipError(
"GitHub repository requires exact version match")
else:
return ver_spec if ver_spec is not None else "master"
def versions(self, owner_repo):
owner, repo = owner_repo.split("/")
data = requests.get(
"https://api.github.com/repos/{}/{}/releases".format(owner,
repo)).json()
return [entry["name"] for entry in data]
def download(self, owner_repo, ver_spec):
release = self._get_release_from_version_specifier(ver_spec)
owner, repo = owner_repo.split("/")
metadata = requests.get("https://api.github.com/repos/{}/{}".format(
owner, repo)).json()
_stash(
"wget https://github.com/{0}/{1}/archive/{2}.zip -o $TMPDIR/{2}.zip"
.format(owner, repo, release))
return os.path.join(os.getenv("TMPDIR"), release + ".zip"), {
"name": owner_repo,
"url": "github",
"version": release,
"summary": metadata.get("description", ""),
}
def install(self, owner_repo, ver_spec, flags=DEFAULT_FLAGS, extras=[]):
if not self.config.module_exists(owner_repo):
owner, repo = owner_repo.split("/")
release = self._get_release_from_version_specifier(ver_spec)
archive_filename, pkg_info = self.download(owner_repo, release)
self._install(
"-".join([repo, release]),
pkg_info,
archive_filename,
dependency_flags=flags,
extras=extras,
)
else:
raise PipError("Package already installed")
class UrlRepository(PackageRepository):
"""
This repository deals with a package from a single URL
"""
def download(self, url, ver_spec):
archive_filename = os.path.basename(url)
if os.path.splitext(archive_filename)[1] not in (".zip", ".gz",
".bz2"):
raise PipError(
"cannot find a valid archive file at url: {}".format(url))
_stash("wget {} -o $TMPDIR/{}".format(url, archive_filename))
return os.path.join(os.getenv("TMPDIR"), archive_filename), {
"name": url,
"url": "url",
"version": "",
"summary": "",
}
def install(self, url, ver_spec, flags=DEFAULT_FLAGS, extras=[]):
if not self.config.module_exists(url):
archive_filename, pkg_info = self.download(url, ver_spec)
pkg_name = os.path.splitext(os.path.basename(archive_filename))[0]
self._install(
pkg_name,
pkg_info,
archive_filename,
dependency_flags=flags,
extras=extras,
)
else:
raise PipError("Package already installed")
class LocalRepository(PackageRepository):
"""
This repository deals with a local archive file.
"""
def install(self,
archive_filename,
ver_spec,
flags=DEFAULT_FLAGS,
extras=[]):
pkg_info = {
"name": archive_filename,
"url": "local",
"version": "",
"summary": "",
}
self._install(pkg_name,
pkg_info,
archive_filename,
dependency_flags=flags,
extras=extras)
# url_repository = UrlRepository()
# local_repository = LocalRepository()
# github_repository = GitHubRepository()
# pypi_repository = PyPIRepository()
def get_repository(pkg_name,
site_packages=SITE_PACKAGES_FOLDER,
verbose=False):
"""
The corresponding repository based on the given package name.
:param pkg_name: It can be one of the four following options:
1. An URL pointing to an archive file
2. Path to a local archive file
3. A owner/repo pair pointing to a GitHub repo
4. A name representing a PyPI package.
:param site_packages: folder containing the site-packages
:type site_packages: str
:param verbose: enable additional output
:type verbose: bool
"""
if (pkg_name.startswith("http://") or pkg_name.startswith("https://")
or pkg_name.startswith("ftp://")): # remote archive file
print("Working on URL repository ...")
return UrlRepository(site_packages=site_packages, verbose=verbose)
# local archive file
elif os.path.isfile(pkg_name) and (pkg_name.endswith(".zip")
or pkg_name.endswith(".gz")
or pkg_name.endswith(".bz2")):
print("Working on Local repository ...")
return LocalRepository(site_packages=site_packages, verbose=verbose)
elif "/" in pkg_name: # github, e.g. selectel/pyte
print("Working on GitHub repository ...")
return GitHubRepository(site_packages=site_packages, verbose=verbose)
else: # PyPI
return PyPIRepository(site_packages=site_packages, verbose=verbose)
if __name__ == "__main__":
import argparse
ap = argparse.ArgumentParser()
ap.add_argument("--verbose", action="store_true", help="be more chatty")
ap.add_argument(
"-6",
action="store_const",
help="manage packages for py2 and py3",
dest="site_packages",
const=OLD_SITE_PACKAGES_FOLDER,
default=SITE_PACKAGES_FOLDER,
)
subparsers = ap.add_subparsers(
dest="sub_command",
title="List of sub-commands",
metavar="sub-command",
help='"pip sub-command -h" for more help on a sub-command',
)
show_parser = subparsers.add_parser("show",
help="show information of package ")
show_parser.add_argument("package", help="package name to show")
show_parser.add_argument(
"-f",
"--force",
action="store_true",
dest="forcedownload",
help="force to download info file from pypi",
)
list_parser = subparsers.add_parser("list", help="list packages installed")
install_parser = subparsers.add_parser("install", help="install packages")
install_parser.add_argument(
"requirements",
help="the requirement specifier for installation",
nargs="+",
)
install_parser.add_argument(
"-N",
"--no-overwrite",
action="store_true",
default=False,
help="Do not overwrite existing folder/files",
)
install_parser.add_argument(
"-d",
"--directory",
help="target directory for installation",
)
install_parser.add_argument(
"--no-binary",
action="store",
help="Do not use binary packages",
dest="nobinary",
)
install_parser.add_argument(
"--only-binary",
action="store",
help="Do not use binary packages",
dest="onlybinary",
)
install_parser.add_argument(
"--prefer-binary",
action="store_true",
help=
"Prefer older binary packages over newer source packages", # TODO: do we actually check older sources/wheels?
dest="preferbinary",
)
install_parser.add_argument(
"--ignore-blocklist",
action="store_true",
help="Ignore blocklist",
dest="ignoreblocklist",
)
download_parser = subparsers.add_parser("download",
help="download packages")
download_parser.add_argument(
"requirements",
help="the requirement specifier for download",
nargs="+",
)
download_parser.add_argument(
"-d",
"--directory",
help="the directory to save the downloaded file",
)
search_parser = subparsers.add_parser(
"search", help="search with the given word fragment")
search_parser.add_argument("term", help="the word fragment to search")
versions_parser = subparsers.add_parser(
"versions", help="find versions available for given package")
versions_parser.add_argument("package_name", help="the package name")
remove_parser = subparsers.add_parser("uninstall",
help="uninstall packages")
remove_parser.add_argument(
"packages",
nargs="+",
metavar="package",
help="packages to uninstall",
)
update_parser = subparsers.add_parser("update",
help="update an installed package")
update_parser.add_argument("packages", nargs="+", help="the package name")
dev_parser = subparsers.add_parser("dev")
dev_parser.add_argument("opt")
ns = ap.parse_args()
if ns.site_packages is None:
# choosen site-packages dir may be unavailable on this platform, fallback to default
print(
"Warning: the specified site-packages directory is unavailable, falling back to default."
)
ns.site_packages = SITE_PACKAGES_FOLDER
try:
if ns.sub_command == "list":
repository = get_repository("pypi",
site_packages=ns.site_packages,
verbose=ns.verbose)
info_list = repository.list()
for module, info in info_list:
print("{} ({}) - {}".format(module, info.get("version", "???"),
info.get("summary", "")))
elif ns.sub_command == "install":
if ns.directory is not None:
site_packages = ns.directory
else:
site_packages = ns.site_packages
flags = DEFAULT_FLAGS
if ns.nobinary is not None:
if ns.nobinary == ":all:":
# disable all binaries
flags = flags & ~FLAG_DIST_ALLOW_WHL
flags = flags & ~FLAG_DIST_PREFER_WHL
elif ns.nobinary == ":none:":
# allow all binaries
flags = flags | FLAG_DIST_ALLOW_WHL
else:
# TODO: implement this
print(
"Error: --no-binary does currently only support :all: or :none:"
)
if ns.onlybinary is not None:
if ns.onlybinary == ":all:":
# disable all source
flags = flags & ~FLAG_DIST_ALLOW_SRC
flags = flags & ~FLAG_DIST_PREFER_SRC
elif ns.nobinary == ":none:":
# allow all source
flags = flags | FLAG_DIST_ALLOW_SRC
else:
# TODO: implement this
print(
"Error: --only-binary does currently only support :all: or :none:"
)
if ns.preferbinary:
# set preference to wheels
flags = flags | FLAG_DIST_PREFER_WHL | FLAG_DIST_ALLOW_WHL
flags = flags & ~FLAG_DIST_PREFER_SRC
if ns.ignoreblocklist:
flags = flags | FLAG_IGNORE_BLOCKLIST
for requirement in ns.requirements:
repository = get_repository(requirement,
site_packages=site_packages,
verbose=ns.verbose)
NO_OVERWRITE = ns.no_overwrite
pkg_name, ver_spec, extras = VersionSpecifier.parse_requirement(
requirement)
with save_current_sys_modules():
fake_setuptools_modules()
ensure_pkg_resources() # install pkg_resources if needed
# start with what we have installed (i.e. in the config file)
sys.modules[
"setuptools"]._installed_requirements_ = repository.config.list_modules(
)
repository.install(pkg_name,
ver_spec,
flags=flags,
extras=extras)
update_req_index()
elif ns.sub_command == "download":
for requirement in ns.requirements:
repository = get_repository(requirement,
site_packages=ns.site_packages,
verbose=ns.verbose)
try:
pkg_name, ver_spec, extras = VersionSpecifier.parse_requirement(
requirement)
except ValueError as e:
print(
"Error during parsing of the requirement : {e}".format(
e=e))
archive_filename, pkg_info = repository.download(
pkg_name, ver_spec)
directory = ns.directory or os.getcwd()
shutil.move(archive_filename, directory)
elif ns.sub_command == "search":
repository = get_repository("pypi",
site_packages=ns.site_packages,
verbose=ns.verbose)
search_hits = repository.search(ns.term)
search_hits = sorted(search_hits,
key=lambda pkg: pkg["_pypi_ordering"],
reverse=True)
for hit in search_hits:
print("{} {} - {}".format(hit["name"], hit["version"],
hit["summary"]))
elif ns.sub_command == "versions":
repository = get_repository(ns.package_name,
site_packages=ns.site_packages,
verbose=ns.verbose)
version_hits = repository.versions(ns.package_name)
for hit in version_hits:
print("{} - {}".format(ns.package_name, hit))
elif ns.sub_command == "uninstall":
for package_name in ns.packages:
repository = get_repository("pypi",
site_packages=ns.site_packages,
verbose=ns.verbose)
repository.remove(package_name)
update_req_index()
elif ns.sub_command == "update":
for package_name in ns.packages:
repository = get_repository(package_name,
site_packages=ns.site_packages,
verbose=ns.verbose)
with save_current_sys_modules():
fake_setuptools_modules()
ensure_pkg_resources() # install pkg_resources if needed
# start with what we have installed (i.e. in the config file)
sys.modules[
"setuptools"]._installed_requirements_ = repository.config.list_modules(
)
repository.update(package_name)
elif ns.sub_command == "show":
if ns.forcedownload:
download_info(ns.package, site_packages=ns.site_packages)
print_info(ns.package, site_packages=ns.site_packages)
elif ns.sub_command == "dev":
if ns.opt == "update-index":
update_req_index()
print("index file updated")
else:
raise PipError("unknow dev option: {}".format(ns.opt))
sys.exit(1)
else:
raise PipError("unknown command: {}".format(ns.sub_command))
sys.exit(1)
except PipError as e:
print("Error: {}".format(e))
if ns.verbose:
traceback.print_exc()
sys.exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment