Delete env_web directory
This commit is contained in:
@@ -1,128 +0,0 @@
|
||||
import contextlib
|
||||
import functools
|
||||
import os
|
||||
import sys
|
||||
from typing import TYPE_CHECKING, List, Optional, Type, cast
|
||||
|
||||
from pip._internal.utils.misc import strtobool
|
||||
|
||||
from .base import BaseDistribution, BaseEnvironment, FilesystemWheel, MemoryWheel, Wheel
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from typing import Literal, Protocol
|
||||
else:
|
||||
Protocol = object
|
||||
|
||||
__all__ = [
|
||||
"BaseDistribution",
|
||||
"BaseEnvironment",
|
||||
"FilesystemWheel",
|
||||
"MemoryWheel",
|
||||
"Wheel",
|
||||
"get_default_environment",
|
||||
"get_environment",
|
||||
"get_wheel_distribution",
|
||||
"select_backend",
|
||||
]
|
||||
|
||||
|
||||
def _should_use_importlib_metadata() -> bool:
|
||||
"""Whether to use the ``importlib.metadata`` or ``pkg_resources`` backend.
|
||||
|
||||
By default, pip uses ``importlib.metadata`` on Python 3.11+, and
|
||||
``pkg_resourcess`` otherwise. This can be overridden by a couple of ways:
|
||||
|
||||
* If environment variable ``_PIP_USE_IMPORTLIB_METADATA`` is set, it
|
||||
dictates whether ``importlib.metadata`` is used, regardless of Python
|
||||
version.
|
||||
* On Python 3.11+, Python distributors can patch ``importlib.metadata``
|
||||
to add a global constant ``_PIP_USE_IMPORTLIB_METADATA = False``. This
|
||||
makes pip use ``pkg_resources`` (unless the user set the aforementioned
|
||||
environment variable to *True*).
|
||||
"""
|
||||
with contextlib.suppress(KeyError, ValueError):
|
||||
return bool(strtobool(os.environ["_PIP_USE_IMPORTLIB_METADATA"]))
|
||||
if sys.version_info < (3, 11):
|
||||
return False
|
||||
import importlib.metadata
|
||||
|
||||
return bool(getattr(importlib.metadata, "_PIP_USE_IMPORTLIB_METADATA", True))
|
||||
|
||||
|
||||
class Backend(Protocol):
|
||||
NAME: 'Literal["importlib", "pkg_resources"]'
|
||||
Distribution: Type[BaseDistribution]
|
||||
Environment: Type[BaseEnvironment]
|
||||
|
||||
|
||||
@functools.lru_cache(maxsize=None)
|
||||
def select_backend() -> Backend:
|
||||
if _should_use_importlib_metadata():
|
||||
from . import importlib
|
||||
|
||||
return cast(Backend, importlib)
|
||||
from . import pkg_resources
|
||||
|
||||
return cast(Backend, pkg_resources)
|
||||
|
||||
|
||||
def get_default_environment() -> BaseEnvironment:
|
||||
"""Get the default representation for the current environment.
|
||||
|
||||
This returns an Environment instance from the chosen backend. The default
|
||||
Environment instance should be built from ``sys.path`` and may use caching
|
||||
to share instance state accorss calls.
|
||||
"""
|
||||
return select_backend().Environment.default()
|
||||
|
||||
|
||||
def get_environment(paths: Optional[List[str]]) -> BaseEnvironment:
|
||||
"""Get a representation of the environment specified by ``paths``.
|
||||
|
||||
This returns an Environment instance from the chosen backend based on the
|
||||
given import paths. The backend must build a fresh instance representing
|
||||
the state of installed distributions when this function is called.
|
||||
"""
|
||||
return select_backend().Environment.from_paths(paths)
|
||||
|
||||
|
||||
def get_directory_distribution(directory: str) -> BaseDistribution:
|
||||
"""Get the distribution metadata representation in the specified directory.
|
||||
|
||||
This returns a Distribution instance from the chosen backend based on
|
||||
the given on-disk ``.dist-info`` directory.
|
||||
"""
|
||||
return select_backend().Distribution.from_directory(directory)
|
||||
|
||||
|
||||
def get_wheel_distribution(wheel: Wheel, canonical_name: str) -> BaseDistribution:
|
||||
"""Get the representation of the specified wheel's distribution metadata.
|
||||
|
||||
This returns a Distribution instance from the chosen backend based on
|
||||
the given wheel's ``.dist-info`` directory.
|
||||
|
||||
:param canonical_name: Normalized project name of the given wheel.
|
||||
"""
|
||||
return select_backend().Distribution.from_wheel(wheel, canonical_name)
|
||||
|
||||
|
||||
def get_metadata_distribution(
|
||||
metadata_contents: bytes,
|
||||
filename: str,
|
||||
canonical_name: str,
|
||||
) -> BaseDistribution:
|
||||
"""Get the dist representation of the specified METADATA file contents.
|
||||
|
||||
This returns a Distribution instance from the chosen backend sourced from the data
|
||||
in `metadata_contents`.
|
||||
|
||||
:param metadata_contents: Contents of a METADATA file within a dist, or one served
|
||||
via PEP 658.
|
||||
:param filename: Filename for the dist this metadata represents.
|
||||
:param canonical_name: Normalized project name of the given dist.
|
||||
"""
|
||||
return select_backend().Distribution.from_metadata_file_contents(
|
||||
metadata_contents,
|
||||
filename,
|
||||
canonical_name,
|
||||
)
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -1,84 +0,0 @@
|
||||
# Extracted from https://github.com/pfmoore/pkg_metadata
|
||||
|
||||
from email.header import Header, decode_header, make_header
|
||||
from email.message import Message
|
||||
from typing import Any, Dict, List, Union
|
||||
|
||||
METADATA_FIELDS = [
|
||||
# Name, Multiple-Use
|
||||
("Metadata-Version", False),
|
||||
("Name", False),
|
||||
("Version", False),
|
||||
("Dynamic", True),
|
||||
("Platform", True),
|
||||
("Supported-Platform", True),
|
||||
("Summary", False),
|
||||
("Description", False),
|
||||
("Description-Content-Type", False),
|
||||
("Keywords", False),
|
||||
("Home-page", False),
|
||||
("Download-URL", False),
|
||||
("Author", False),
|
||||
("Author-email", False),
|
||||
("Maintainer", False),
|
||||
("Maintainer-email", False),
|
||||
("License", False),
|
||||
("Classifier", True),
|
||||
("Requires-Dist", True),
|
||||
("Requires-Python", False),
|
||||
("Requires-External", True),
|
||||
("Project-URL", True),
|
||||
("Provides-Extra", True),
|
||||
("Provides-Dist", True),
|
||||
("Obsoletes-Dist", True),
|
||||
]
|
||||
|
||||
|
||||
def json_name(field: str) -> str:
|
||||
return field.lower().replace("-", "_")
|
||||
|
||||
|
||||
def msg_to_json(msg: Message) -> Dict[str, Any]:
|
||||
"""Convert a Message object into a JSON-compatible dictionary."""
|
||||
|
||||
def sanitise_header(h: Union[Header, str]) -> str:
|
||||
if isinstance(h, Header):
|
||||
chunks = []
|
||||
for bytes, encoding in decode_header(h):
|
||||
if encoding == "unknown-8bit":
|
||||
try:
|
||||
# See if UTF-8 works
|
||||
bytes.decode("utf-8")
|
||||
encoding = "utf-8"
|
||||
except UnicodeDecodeError:
|
||||
# If not, latin1 at least won't fail
|
||||
encoding = "latin1"
|
||||
chunks.append((bytes, encoding))
|
||||
return str(make_header(chunks))
|
||||
return str(h)
|
||||
|
||||
result = {}
|
||||
for field, multi in METADATA_FIELDS:
|
||||
if field not in msg:
|
||||
continue
|
||||
key = json_name(field)
|
||||
if multi:
|
||||
value: Union[str, List[str]] = [
|
||||
sanitise_header(v) for v in msg.get_all(field) # type: ignore
|
||||
]
|
||||
else:
|
||||
value = sanitise_header(msg.get(field)) # type: ignore
|
||||
if key == "keywords":
|
||||
# Accept both comma-separated and space-separated
|
||||
# forms, for better compatibility with old data.
|
||||
if "," in value:
|
||||
value = [v.strip() for v in value.split(",")]
|
||||
else:
|
||||
value = value.split()
|
||||
result[key] = value
|
||||
|
||||
payload = msg.get_payload()
|
||||
if payload:
|
||||
result["description"] = payload
|
||||
|
||||
return result
|
||||
@@ -1,702 +0,0 @@
|
||||
import csv
|
||||
import email.message
|
||||
import functools
|
||||
import json
|
||||
import logging
|
||||
import pathlib
|
||||
import re
|
||||
import zipfile
|
||||
from typing import (
|
||||
IO,
|
||||
TYPE_CHECKING,
|
||||
Any,
|
||||
Collection,
|
||||
Container,
|
||||
Dict,
|
||||
Iterable,
|
||||
Iterator,
|
||||
List,
|
||||
NamedTuple,
|
||||
Optional,
|
||||
Tuple,
|
||||
Union,
|
||||
)
|
||||
|
||||
from pip._vendor.packaging.requirements import Requirement
|
||||
from pip._vendor.packaging.specifiers import InvalidSpecifier, SpecifierSet
|
||||
from pip._vendor.packaging.utils import NormalizedName, canonicalize_name
|
||||
from pip._vendor.packaging.version import LegacyVersion, Version
|
||||
|
||||
from pip._internal.exceptions import NoneMetadataError
|
||||
from pip._internal.locations import site_packages, user_site
|
||||
from pip._internal.models.direct_url import (
|
||||
DIRECT_URL_METADATA_NAME,
|
||||
DirectUrl,
|
||||
DirectUrlValidationError,
|
||||
)
|
||||
from pip._internal.utils.compat import stdlib_pkgs # TODO: Move definition here.
|
||||
from pip._internal.utils.egg_link import egg_link_path_from_sys_path
|
||||
from pip._internal.utils.misc import is_local, normalize_path
|
||||
from pip._internal.utils.urls import url_to_path
|
||||
|
||||
from ._json import msg_to_json
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from typing import Protocol
|
||||
else:
|
||||
Protocol = object
|
||||
|
||||
DistributionVersion = Union[LegacyVersion, Version]
|
||||
|
||||
InfoPath = Union[str, pathlib.PurePath]
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class BaseEntryPoint(Protocol):
|
||||
@property
|
||||
def name(self) -> str:
|
||||
raise NotImplementedError()
|
||||
|
||||
@property
|
||||
def value(self) -> str:
|
||||
raise NotImplementedError()
|
||||
|
||||
@property
|
||||
def group(self) -> str:
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
def _convert_installed_files_path(
|
||||
entry: Tuple[str, ...],
|
||||
info: Tuple[str, ...],
|
||||
) -> str:
|
||||
"""Convert a legacy installed-files.txt path into modern RECORD path.
|
||||
|
||||
The legacy format stores paths relative to the info directory, while the
|
||||
modern format stores paths relative to the package root, e.g. the
|
||||
site-packages directory.
|
||||
|
||||
:param entry: Path parts of the installed-files.txt entry.
|
||||
:param info: Path parts of the egg-info directory relative to package root.
|
||||
:returns: The converted entry.
|
||||
|
||||
For best compatibility with symlinks, this does not use ``abspath()`` or
|
||||
``Path.resolve()``, but tries to work with path parts:
|
||||
|
||||
1. While ``entry`` starts with ``..``, remove the equal amounts of parts
|
||||
from ``info``; if ``info`` is empty, start appending ``..`` instead.
|
||||
2. Join the two directly.
|
||||
"""
|
||||
while entry and entry[0] == "..":
|
||||
if not info or info[-1] == "..":
|
||||
info += ("..",)
|
||||
else:
|
||||
info = info[:-1]
|
||||
entry = entry[1:]
|
||||
return str(pathlib.Path(*info, *entry))
|
||||
|
||||
|
||||
class RequiresEntry(NamedTuple):
|
||||
requirement: str
|
||||
extra: str
|
||||
marker: str
|
||||
|
||||
|
||||
class BaseDistribution(Protocol):
|
||||
@classmethod
|
||||
def from_directory(cls, directory: str) -> "BaseDistribution":
|
||||
"""Load the distribution from a metadata directory.
|
||||
|
||||
:param directory: Path to a metadata directory, e.g. ``.dist-info``.
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
@classmethod
|
||||
def from_metadata_file_contents(
|
||||
cls,
|
||||
metadata_contents: bytes,
|
||||
filename: str,
|
||||
project_name: str,
|
||||
) -> "BaseDistribution":
|
||||
"""Load the distribution from the contents of a METADATA file.
|
||||
|
||||
This is used to implement PEP 658 by generating a "shallow" dist object that can
|
||||
be used for resolution without downloading or building the actual dist yet.
|
||||
|
||||
:param metadata_contents: The contents of a METADATA file.
|
||||
:param filename: File name for the dist with this metadata.
|
||||
:param project_name: Name of the project this dist represents.
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
@classmethod
|
||||
def from_wheel(cls, wheel: "Wheel", name: str) -> "BaseDistribution":
|
||||
"""Load the distribution from a given wheel.
|
||||
|
||||
:param wheel: A concrete wheel definition.
|
||||
:param name: File name of the wheel.
|
||||
|
||||
:raises InvalidWheel: Whenever loading of the wheel causes a
|
||||
:py:exc:`zipfile.BadZipFile` exception to be thrown.
|
||||
:raises UnsupportedWheel: If the wheel is a valid zip, but malformed
|
||||
internally.
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"{self.raw_name} {self.version} ({self.location})"
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f"{self.raw_name} {self.version}"
|
||||
|
||||
@property
|
||||
def location(self) -> Optional[str]:
|
||||
"""Where the distribution is loaded from.
|
||||
|
||||
A string value is not necessarily a filesystem path, since distributions
|
||||
can be loaded from other sources, e.g. arbitrary zip archives. ``None``
|
||||
means the distribution is created in-memory.
|
||||
|
||||
Do not canonicalize this value with e.g. ``pathlib.Path.resolve()``. If
|
||||
this is a symbolic link, we want to preserve the relative path between
|
||||
it and files in the distribution.
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
@property
|
||||
def editable_project_location(self) -> Optional[str]:
|
||||
"""The project location for editable distributions.
|
||||
|
||||
This is the directory where pyproject.toml or setup.py is located.
|
||||
None if the distribution is not installed in editable mode.
|
||||
"""
|
||||
# TODO: this property is relatively costly to compute, memoize it ?
|
||||
direct_url = self.direct_url
|
||||
if direct_url:
|
||||
if direct_url.is_local_editable():
|
||||
return url_to_path(direct_url.url)
|
||||
else:
|
||||
# Search for an .egg-link file by walking sys.path, as it was
|
||||
# done before by dist_is_editable().
|
||||
egg_link_path = egg_link_path_from_sys_path(self.raw_name)
|
||||
if egg_link_path:
|
||||
# TODO: get project location from second line of egg_link file
|
||||
# (https://github.com/pypa/pip/issues/10243)
|
||||
return self.location
|
||||
return None
|
||||
|
||||
@property
|
||||
def installed_location(self) -> Optional[str]:
|
||||
"""The distribution's "installed" location.
|
||||
|
||||
This should generally be a ``site-packages`` directory. This is
|
||||
usually ``dist.location``, except for legacy develop-installed packages,
|
||||
where ``dist.location`` is the source code location, and this is where
|
||||
the ``.egg-link`` file is.
|
||||
|
||||
The returned location is normalized (in particular, with symlinks removed).
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
@property
|
||||
def info_location(self) -> Optional[str]:
|
||||
"""Location of the .[egg|dist]-info directory or file.
|
||||
|
||||
Similarly to ``location``, a string value is not necessarily a
|
||||
filesystem path. ``None`` means the distribution is created in-memory.
|
||||
|
||||
For a modern .dist-info installation on disk, this should be something
|
||||
like ``{location}/{raw_name}-{version}.dist-info``.
|
||||
|
||||
Do not canonicalize this value with e.g. ``pathlib.Path.resolve()``. If
|
||||
this is a symbolic link, we want to preserve the relative path between
|
||||
it and other files in the distribution.
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
@property
|
||||
def installed_by_distutils(self) -> bool:
|
||||
"""Whether this distribution is installed with legacy distutils format.
|
||||
|
||||
A distribution installed with "raw" distutils not patched by setuptools
|
||||
uses one single file at ``info_location`` to store metadata. We need to
|
||||
treat this specially on uninstallation.
|
||||
"""
|
||||
info_location = self.info_location
|
||||
if not info_location:
|
||||
return False
|
||||
return pathlib.Path(info_location).is_file()
|
||||
|
||||
@property
|
||||
def installed_as_egg(self) -> bool:
|
||||
"""Whether this distribution is installed as an egg.
|
||||
|
||||
This usually indicates the distribution was installed by (older versions
|
||||
of) easy_install.
|
||||
"""
|
||||
location = self.location
|
||||
if not location:
|
||||
return False
|
||||
return location.endswith(".egg")
|
||||
|
||||
@property
|
||||
def installed_with_setuptools_egg_info(self) -> bool:
|
||||
"""Whether this distribution is installed with the ``.egg-info`` format.
|
||||
|
||||
This usually indicates the distribution was installed with setuptools
|
||||
with an old pip version or with ``single-version-externally-managed``.
|
||||
|
||||
Note that this ensure the metadata store is a directory. distutils can
|
||||
also installs an ``.egg-info``, but as a file, not a directory. This
|
||||
property is *False* for that case. Also see ``installed_by_distutils``.
|
||||
"""
|
||||
info_location = self.info_location
|
||||
if not info_location:
|
||||
return False
|
||||
if not info_location.endswith(".egg-info"):
|
||||
return False
|
||||
return pathlib.Path(info_location).is_dir()
|
||||
|
||||
@property
|
||||
def installed_with_dist_info(self) -> bool:
|
||||
"""Whether this distribution is installed with the "modern format".
|
||||
|
||||
This indicates a "modern" installation, e.g. storing metadata in the
|
||||
``.dist-info`` directory. This applies to installations made by
|
||||
setuptools (but through pip, not directly), or anything using the
|
||||
standardized build backend interface (PEP 517).
|
||||
"""
|
||||
info_location = self.info_location
|
||||
if not info_location:
|
||||
return False
|
||||
if not info_location.endswith(".dist-info"):
|
||||
return False
|
||||
return pathlib.Path(info_location).is_dir()
|
||||
|
||||
@property
|
||||
def canonical_name(self) -> NormalizedName:
|
||||
raise NotImplementedError()
|
||||
|
||||
@property
|
||||
def version(self) -> DistributionVersion:
|
||||
raise NotImplementedError()
|
||||
|
||||
@property
|
||||
def setuptools_filename(self) -> str:
|
||||
"""Convert a project name to its setuptools-compatible filename.
|
||||
|
||||
This is a copy of ``pkg_resources.to_filename()`` for compatibility.
|
||||
"""
|
||||
return self.raw_name.replace("-", "_")
|
||||
|
||||
@property
|
||||
def direct_url(self) -> Optional[DirectUrl]:
|
||||
"""Obtain a DirectUrl from this distribution.
|
||||
|
||||
Returns None if the distribution has no `direct_url.json` metadata,
|
||||
or if `direct_url.json` is invalid.
|
||||
"""
|
||||
try:
|
||||
content = self.read_text(DIRECT_URL_METADATA_NAME)
|
||||
except FileNotFoundError:
|
||||
return None
|
||||
try:
|
||||
return DirectUrl.from_json(content)
|
||||
except (
|
||||
UnicodeDecodeError,
|
||||
json.JSONDecodeError,
|
||||
DirectUrlValidationError,
|
||||
) as e:
|
||||
logger.warning(
|
||||
"Error parsing %s for %s: %s",
|
||||
DIRECT_URL_METADATA_NAME,
|
||||
self.canonical_name,
|
||||
e,
|
||||
)
|
||||
return None
|
||||
|
||||
@property
|
||||
def installer(self) -> str:
|
||||
try:
|
||||
installer_text = self.read_text("INSTALLER")
|
||||
except (OSError, ValueError, NoneMetadataError):
|
||||
return "" # Fail silently if the installer file cannot be read.
|
||||
for line in installer_text.splitlines():
|
||||
cleaned_line = line.strip()
|
||||
if cleaned_line:
|
||||
return cleaned_line
|
||||
return ""
|
||||
|
||||
@property
|
||||
def requested(self) -> bool:
|
||||
return self.is_file("REQUESTED")
|
||||
|
||||
@property
|
||||
def editable(self) -> bool:
|
||||
return bool(self.editable_project_location)
|
||||
|
||||
@property
|
||||
def local(self) -> bool:
|
||||
"""If distribution is installed in the current virtual environment.
|
||||
|
||||
Always True if we're not in a virtualenv.
|
||||
"""
|
||||
if self.installed_location is None:
|
||||
return False
|
||||
return is_local(self.installed_location)
|
||||
|
||||
@property
|
||||
def in_usersite(self) -> bool:
|
||||
if self.installed_location is None or user_site is None:
|
||||
return False
|
||||
return self.installed_location.startswith(normalize_path(user_site))
|
||||
|
||||
@property
|
||||
def in_site_packages(self) -> bool:
|
||||
if self.installed_location is None or site_packages is None:
|
||||
return False
|
||||
return self.installed_location.startswith(normalize_path(site_packages))
|
||||
|
||||
def is_file(self, path: InfoPath) -> bool:
|
||||
"""Check whether an entry in the info directory is a file."""
|
||||
raise NotImplementedError()
|
||||
|
||||
def iter_distutils_script_names(self) -> Iterator[str]:
|
||||
"""Find distutils 'scripts' entries metadata.
|
||||
|
||||
If 'scripts' is supplied in ``setup.py``, distutils records those in the
|
||||
installed distribution's ``scripts`` directory, a file for each script.
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def read_text(self, path: InfoPath) -> str:
|
||||
"""Read a file in the info directory.
|
||||
|
||||
:raise FileNotFoundError: If ``path`` does not exist in the directory.
|
||||
:raise NoneMetadataError: If ``path`` exists in the info directory, but
|
||||
cannot be read.
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def iter_entry_points(self) -> Iterable[BaseEntryPoint]:
|
||||
raise NotImplementedError()
|
||||
|
||||
def _metadata_impl(self) -> email.message.Message:
|
||||
raise NotImplementedError()
|
||||
|
||||
@functools.lru_cache(maxsize=1)
|
||||
def _metadata_cached(self) -> email.message.Message:
|
||||
# When we drop python 3.7 support, move this to the metadata property and use
|
||||
# functools.cached_property instead of lru_cache.
|
||||
metadata = self._metadata_impl()
|
||||
self._add_egg_info_requires(metadata)
|
||||
return metadata
|
||||
|
||||
@property
|
||||
def metadata(self) -> email.message.Message:
|
||||
"""Metadata of distribution parsed from e.g. METADATA or PKG-INFO.
|
||||
|
||||
This should return an empty message if the metadata file is unavailable.
|
||||
|
||||
:raises NoneMetadataError: If the metadata file is available, but does
|
||||
not contain valid metadata.
|
||||
"""
|
||||
return self._metadata_cached()
|
||||
|
||||
@property
|
||||
def metadata_dict(self) -> Dict[str, Any]:
|
||||
"""PEP 566 compliant JSON-serializable representation of METADATA or PKG-INFO.
|
||||
|
||||
This should return an empty dict if the metadata file is unavailable.
|
||||
|
||||
:raises NoneMetadataError: If the metadata file is available, but does
|
||||
not contain valid metadata.
|
||||
"""
|
||||
return msg_to_json(self.metadata)
|
||||
|
||||
@property
|
||||
def metadata_version(self) -> Optional[str]:
|
||||
"""Value of "Metadata-Version:" in distribution metadata, if available."""
|
||||
return self.metadata.get("Metadata-Version")
|
||||
|
||||
@property
|
||||
def raw_name(self) -> str:
|
||||
"""Value of "Name:" in distribution metadata."""
|
||||
# The metadata should NEVER be missing the Name: key, but if it somehow
|
||||
# does, fall back to the known canonical name.
|
||||
return self.metadata.get("Name", self.canonical_name)
|
||||
|
||||
@property
|
||||
def requires_python(self) -> SpecifierSet:
|
||||
"""Value of "Requires-Python:" in distribution metadata.
|
||||
|
||||
If the key does not exist or contains an invalid value, an empty
|
||||
SpecifierSet should be returned.
|
||||
"""
|
||||
value = self.metadata.get("Requires-Python")
|
||||
if value is None:
|
||||
return SpecifierSet()
|
||||
try:
|
||||
# Convert to str to satisfy the type checker; this can be a Header object.
|
||||
spec = SpecifierSet(str(value))
|
||||
except InvalidSpecifier as e:
|
||||
message = "Package %r has an invalid Requires-Python: %s"
|
||||
logger.warning(message, self.raw_name, e)
|
||||
return SpecifierSet()
|
||||
return spec
|
||||
|
||||
def iter_dependencies(self, extras: Collection[str] = ()) -> Iterable[Requirement]:
|
||||
"""Dependencies of this distribution.
|
||||
|
||||
For modern .dist-info distributions, this is the collection of
|
||||
"Requires-Dist:" entries in distribution metadata.
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def iter_provided_extras(self) -> Iterable[str]:
|
||||
"""Extras provided by this distribution.
|
||||
|
||||
For modern .dist-info distributions, this is the collection of
|
||||
"Provides-Extra:" entries in distribution metadata.
|
||||
|
||||
The return value of this function is not particularly useful other than
|
||||
display purposes due to backward compatibility issues and the extra
|
||||
names being poorly normalized prior to PEP 685. If you want to perform
|
||||
logic operations on extras, use :func:`is_extra_provided` instead.
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def is_extra_provided(self, extra: str) -> bool:
|
||||
"""Check whether an extra is provided by this distribution.
|
||||
|
||||
This is needed mostly for compatibility issues with pkg_resources not
|
||||
following the extra normalization rules defined in PEP 685.
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def _iter_declared_entries_from_record(self) -> Optional[Iterator[str]]:
|
||||
try:
|
||||
text = self.read_text("RECORD")
|
||||
except FileNotFoundError:
|
||||
return None
|
||||
# This extra Path-str cast normalizes entries.
|
||||
return (str(pathlib.Path(row[0])) for row in csv.reader(text.splitlines()))
|
||||
|
||||
def _iter_declared_entries_from_legacy(self) -> Optional[Iterator[str]]:
|
||||
try:
|
||||
text = self.read_text("installed-files.txt")
|
||||
except FileNotFoundError:
|
||||
return None
|
||||
paths = (p for p in text.splitlines(keepends=False) if p)
|
||||
root = self.location
|
||||
info = self.info_location
|
||||
if root is None or info is None:
|
||||
return paths
|
||||
try:
|
||||
info_rel = pathlib.Path(info).relative_to(root)
|
||||
except ValueError: # info is not relative to root.
|
||||
return paths
|
||||
if not info_rel.parts: # info *is* root.
|
||||
return paths
|
||||
return (
|
||||
_convert_installed_files_path(pathlib.Path(p).parts, info_rel.parts)
|
||||
for p in paths
|
||||
)
|
||||
|
||||
def iter_declared_entries(self) -> Optional[Iterator[str]]:
|
||||
"""Iterate through file entries declared in this distribution.
|
||||
|
||||
For modern .dist-info distributions, this is the files listed in the
|
||||
``RECORD`` metadata file. For legacy setuptools distributions, this
|
||||
comes from ``installed-files.txt``, with entries normalized to be
|
||||
compatible with the format used by ``RECORD``.
|
||||
|
||||
:return: An iterator for listed entries, or None if the distribution
|
||||
contains neither ``RECORD`` nor ``installed-files.txt``.
|
||||
"""
|
||||
return (
|
||||
self._iter_declared_entries_from_record()
|
||||
or self._iter_declared_entries_from_legacy()
|
||||
)
|
||||
|
||||
def _iter_requires_txt_entries(self) -> Iterator[RequiresEntry]:
|
||||
"""Parse a ``requires.txt`` in an egg-info directory.
|
||||
|
||||
This is an INI-ish format where an egg-info stores dependencies. A
|
||||
section name describes extra other environment markers, while each entry
|
||||
is an arbitrary string (not a key-value pair) representing a dependency
|
||||
as a requirement string (no markers).
|
||||
|
||||
There is a construct in ``importlib.metadata`` called ``Sectioned`` that
|
||||
does mostly the same, but the format is currently considered private.
|
||||
"""
|
||||
try:
|
||||
content = self.read_text("requires.txt")
|
||||
except FileNotFoundError:
|
||||
return
|
||||
extra = marker = "" # Section-less entries don't have markers.
|
||||
for line in content.splitlines():
|
||||
line = line.strip()
|
||||
if not line or line.startswith("#"): # Comment; ignored.
|
||||
continue
|
||||
if line.startswith("[") and line.endswith("]"): # A section header.
|
||||
extra, _, marker = line.strip("[]").partition(":")
|
||||
continue
|
||||
yield RequiresEntry(requirement=line, extra=extra, marker=marker)
|
||||
|
||||
def _iter_egg_info_extras(self) -> Iterable[str]:
|
||||
"""Get extras from the egg-info directory."""
|
||||
known_extras = {""}
|
||||
for entry in self._iter_requires_txt_entries():
|
||||
extra = canonicalize_name(entry.extra)
|
||||
if extra in known_extras:
|
||||
continue
|
||||
known_extras.add(extra)
|
||||
yield extra
|
||||
|
||||
def _iter_egg_info_dependencies(self) -> Iterable[str]:
|
||||
"""Get distribution dependencies from the egg-info directory.
|
||||
|
||||
To ease parsing, this converts a legacy dependency entry into a PEP 508
|
||||
requirement string. Like ``_iter_requires_txt_entries()``, there is code
|
||||
in ``importlib.metadata`` that does mostly the same, but not do exactly
|
||||
what we need.
|
||||
|
||||
Namely, ``importlib.metadata`` does not normalize the extra name before
|
||||
putting it into the requirement string, which causes marker comparison
|
||||
to fail because the dist-info format do normalize. This is consistent in
|
||||
all currently available PEP 517 backends, although not standardized.
|
||||
"""
|
||||
for entry in self._iter_requires_txt_entries():
|
||||
extra = canonicalize_name(entry.extra)
|
||||
if extra and entry.marker:
|
||||
marker = f'({entry.marker}) and extra == "{extra}"'
|
||||
elif extra:
|
||||
marker = f'extra == "{extra}"'
|
||||
elif entry.marker:
|
||||
marker = entry.marker
|
||||
else:
|
||||
marker = ""
|
||||
if marker:
|
||||
yield f"{entry.requirement} ; {marker}"
|
||||
else:
|
||||
yield entry.requirement
|
||||
|
||||
def _add_egg_info_requires(self, metadata: email.message.Message) -> None:
|
||||
"""Add egg-info requires.txt information to the metadata."""
|
||||
if not metadata.get_all("Requires-Dist"):
|
||||
for dep in self._iter_egg_info_dependencies():
|
||||
metadata["Requires-Dist"] = dep
|
||||
if not metadata.get_all("Provides-Extra"):
|
||||
for extra in self._iter_egg_info_extras():
|
||||
metadata["Provides-Extra"] = extra
|
||||
|
||||
|
||||
class BaseEnvironment:
|
||||
"""An environment containing distributions to introspect."""
|
||||
|
||||
@classmethod
|
||||
def default(cls) -> "BaseEnvironment":
|
||||
raise NotImplementedError()
|
||||
|
||||
@classmethod
|
||||
def from_paths(cls, paths: Optional[List[str]]) -> "BaseEnvironment":
|
||||
raise NotImplementedError()
|
||||
|
||||
def get_distribution(self, name: str) -> Optional["BaseDistribution"]:
|
||||
"""Given a requirement name, return the installed distributions.
|
||||
|
||||
The name may not be normalized. The implementation must canonicalize
|
||||
it for lookup.
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def _iter_distributions(self) -> Iterator["BaseDistribution"]:
|
||||
"""Iterate through installed distributions.
|
||||
|
||||
This function should be implemented by subclass, but never called
|
||||
directly. Use the public ``iter_distribution()`` instead, which
|
||||
implements additional logic to make sure the distributions are valid.
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def iter_all_distributions(self) -> Iterator[BaseDistribution]:
|
||||
"""Iterate through all installed distributions without any filtering."""
|
||||
for dist in self._iter_distributions():
|
||||
# Make sure the distribution actually comes from a valid Python
|
||||
# packaging distribution. Pip's AdjacentTempDirectory leaves folders
|
||||
# e.g. ``~atplotlib.dist-info`` if cleanup was interrupted. The
|
||||
# valid project name pattern is taken from PEP 508.
|
||||
project_name_valid = re.match(
|
||||
r"^([A-Z0-9]|[A-Z0-9][A-Z0-9._-]*[A-Z0-9])$",
|
||||
dist.canonical_name,
|
||||
flags=re.IGNORECASE,
|
||||
)
|
||||
if not project_name_valid:
|
||||
logger.warning(
|
||||
"Ignoring invalid distribution %s (%s)",
|
||||
dist.canonical_name,
|
||||
dist.location,
|
||||
)
|
||||
continue
|
||||
yield dist
|
||||
|
||||
def iter_installed_distributions(
|
||||
self,
|
||||
local_only: bool = True,
|
||||
skip: Container[str] = stdlib_pkgs,
|
||||
include_editables: bool = True,
|
||||
editables_only: bool = False,
|
||||
user_only: bool = False,
|
||||
) -> Iterator[BaseDistribution]:
|
||||
"""Return a list of installed distributions.
|
||||
|
||||
This is based on ``iter_all_distributions()`` with additional filtering
|
||||
options. Note that ``iter_installed_distributions()`` without arguments
|
||||
is *not* equal to ``iter_all_distributions()``, since some of the
|
||||
configurations exclude packages by default.
|
||||
|
||||
:param local_only: If True (default), only return installations
|
||||
local to the current virtualenv, if in a virtualenv.
|
||||
:param skip: An iterable of canonicalized project names to ignore;
|
||||
defaults to ``stdlib_pkgs``.
|
||||
:param include_editables: If False, don't report editables.
|
||||
:param editables_only: If True, only report editables.
|
||||
:param user_only: If True, only report installations in the user
|
||||
site directory.
|
||||
"""
|
||||
it = self.iter_all_distributions()
|
||||
if local_only:
|
||||
it = (d for d in it if d.local)
|
||||
if not include_editables:
|
||||
it = (d for d in it if not d.editable)
|
||||
if editables_only:
|
||||
it = (d for d in it if d.editable)
|
||||
if user_only:
|
||||
it = (d for d in it if d.in_usersite)
|
||||
return (d for d in it if d.canonical_name not in skip)
|
||||
|
||||
|
||||
class Wheel(Protocol):
|
||||
location: str
|
||||
|
||||
def as_zipfile(self) -> zipfile.ZipFile:
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
class FilesystemWheel(Wheel):
|
||||
def __init__(self, location: str) -> None:
|
||||
self.location = location
|
||||
|
||||
def as_zipfile(self) -> zipfile.ZipFile:
|
||||
return zipfile.ZipFile(self.location, allowZip64=True)
|
||||
|
||||
|
||||
class MemoryWheel(Wheel):
|
||||
def __init__(self, location: str, stream: IO[bytes]) -> None:
|
||||
self.location = location
|
||||
self.stream = stream
|
||||
|
||||
def as_zipfile(self) -> zipfile.ZipFile:
|
||||
return zipfile.ZipFile(self.stream, allowZip64=True)
|
||||
@@ -1,6 +0,0 @@
|
||||
from ._dists import Distribution
|
||||
from ._envs import Environment
|
||||
|
||||
__all__ = ["NAME", "Distribution", "Environment"]
|
||||
|
||||
NAME = "importlib"
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -1,55 +0,0 @@
|
||||
import importlib.metadata
|
||||
from typing import Any, Optional, Protocol, cast
|
||||
|
||||
|
||||
class BadMetadata(ValueError):
|
||||
def __init__(self, dist: importlib.metadata.Distribution, *, reason: str) -> None:
|
||||
self.dist = dist
|
||||
self.reason = reason
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f"Bad metadata in {self.dist} ({self.reason})"
|
||||
|
||||
|
||||
class BasePath(Protocol):
|
||||
"""A protocol that various path objects conform.
|
||||
|
||||
This exists because importlib.metadata uses both ``pathlib.Path`` and
|
||||
``zipfile.Path``, and we need a common base for type hints (Union does not
|
||||
work well since ``zipfile.Path`` is too new for our linter setup).
|
||||
|
||||
This does not mean to be exhaustive, but only contains things that present
|
||||
in both classes *that we need*.
|
||||
"""
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
raise NotImplementedError()
|
||||
|
||||
@property
|
||||
def parent(self) -> "BasePath":
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
def get_info_location(d: importlib.metadata.Distribution) -> Optional[BasePath]:
|
||||
"""Find the path to the distribution's metadata directory.
|
||||
|
||||
HACK: This relies on importlib.metadata's private ``_path`` attribute. Not
|
||||
all distributions exist on disk, so importlib.metadata is correct to not
|
||||
expose the attribute as public. But pip's code base is old and not as clean,
|
||||
so we do this to avoid having to rewrite too many things. Hopefully we can
|
||||
eliminate this some day.
|
||||
"""
|
||||
return getattr(d, "_path", None)
|
||||
|
||||
|
||||
def get_dist_name(dist: importlib.metadata.Distribution) -> str:
|
||||
"""Get the distribution's project name.
|
||||
|
||||
The ``name`` attribute is only available in Python 3.10 or later. We are
|
||||
targeting exactly that, but Mypy does not know this.
|
||||
"""
|
||||
name = cast(Any, dist).name
|
||||
if not isinstance(name, str):
|
||||
raise BadMetadata(dist, reason="invalid metadata entry 'name'")
|
||||
return name
|
||||
@@ -1,227 +0,0 @@
|
||||
import email.message
|
||||
import importlib.metadata
|
||||
import os
|
||||
import pathlib
|
||||
import zipfile
|
||||
from typing import (
|
||||
Collection,
|
||||
Dict,
|
||||
Iterable,
|
||||
Iterator,
|
||||
Mapping,
|
||||
Optional,
|
||||
Sequence,
|
||||
cast,
|
||||
)
|
||||
|
||||
from pip._vendor.packaging.requirements import Requirement
|
||||
from pip._vendor.packaging.utils import NormalizedName, canonicalize_name
|
||||
from pip._vendor.packaging.version import parse as parse_version
|
||||
|
||||
from pip._internal.exceptions import InvalidWheel, UnsupportedWheel
|
||||
from pip._internal.metadata.base import (
|
||||
BaseDistribution,
|
||||
BaseEntryPoint,
|
||||
DistributionVersion,
|
||||
InfoPath,
|
||||
Wheel,
|
||||
)
|
||||
from pip._internal.utils.misc import normalize_path
|
||||
from pip._internal.utils.temp_dir import TempDirectory
|
||||
from pip._internal.utils.wheel import parse_wheel, read_wheel_metadata_file
|
||||
|
||||
from ._compat import BasePath, get_dist_name
|
||||
|
||||
|
||||
class WheelDistribution(importlib.metadata.Distribution):
|
||||
"""An ``importlib.metadata.Distribution`` read from a wheel.
|
||||
|
||||
Although ``importlib.metadata.PathDistribution`` accepts ``zipfile.Path``,
|
||||
its implementation is too "lazy" for pip's needs (we can't keep the ZipFile
|
||||
handle open for the entire lifetime of the distribution object).
|
||||
|
||||
This implementation eagerly reads the entire metadata directory into the
|
||||
memory instead, and operates from that.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
files: Mapping[pathlib.PurePosixPath, bytes],
|
||||
info_location: pathlib.PurePosixPath,
|
||||
) -> None:
|
||||
self._files = files
|
||||
self.info_location = info_location
|
||||
|
||||
@classmethod
|
||||
def from_zipfile(
|
||||
cls,
|
||||
zf: zipfile.ZipFile,
|
||||
name: str,
|
||||
location: str,
|
||||
) -> "WheelDistribution":
|
||||
info_dir, _ = parse_wheel(zf, name)
|
||||
paths = (
|
||||
(name, pathlib.PurePosixPath(name.split("/", 1)[-1]))
|
||||
for name in zf.namelist()
|
||||
if name.startswith(f"{info_dir}/")
|
||||
)
|
||||
files = {
|
||||
relpath: read_wheel_metadata_file(zf, fullpath)
|
||||
for fullpath, relpath in paths
|
||||
}
|
||||
info_location = pathlib.PurePosixPath(location, info_dir)
|
||||
return cls(files, info_location)
|
||||
|
||||
def iterdir(self, path: InfoPath) -> Iterator[pathlib.PurePosixPath]:
|
||||
# Only allow iterating through the metadata directory.
|
||||
if pathlib.PurePosixPath(str(path)) in self._files:
|
||||
return iter(self._files)
|
||||
raise FileNotFoundError(path)
|
||||
|
||||
def read_text(self, filename: str) -> Optional[str]:
|
||||
try:
|
||||
data = self._files[pathlib.PurePosixPath(filename)]
|
||||
except KeyError:
|
||||
return None
|
||||
try:
|
||||
text = data.decode("utf-8")
|
||||
except UnicodeDecodeError as e:
|
||||
wheel = self.info_location.parent
|
||||
error = f"Error decoding metadata for {wheel}: {e} in {filename} file"
|
||||
raise UnsupportedWheel(error)
|
||||
return text
|
||||
|
||||
|
||||
class Distribution(BaseDistribution):
|
||||
def __init__(
|
||||
self,
|
||||
dist: importlib.metadata.Distribution,
|
||||
info_location: Optional[BasePath],
|
||||
installed_location: Optional[BasePath],
|
||||
) -> None:
|
||||
self._dist = dist
|
||||
self._info_location = info_location
|
||||
self._installed_location = installed_location
|
||||
|
||||
@classmethod
|
||||
def from_directory(cls, directory: str) -> BaseDistribution:
|
||||
info_location = pathlib.Path(directory)
|
||||
dist = importlib.metadata.Distribution.at(info_location)
|
||||
return cls(dist, info_location, info_location.parent)
|
||||
|
||||
@classmethod
|
||||
def from_metadata_file_contents(
|
||||
cls,
|
||||
metadata_contents: bytes,
|
||||
filename: str,
|
||||
project_name: str,
|
||||
) -> BaseDistribution:
|
||||
# Generate temp dir to contain the metadata file, and write the file contents.
|
||||
temp_dir = pathlib.Path(
|
||||
TempDirectory(kind="metadata", globally_managed=True).path
|
||||
)
|
||||
metadata_path = temp_dir / "METADATA"
|
||||
metadata_path.write_bytes(metadata_contents)
|
||||
# Construct dist pointing to the newly created directory.
|
||||
dist = importlib.metadata.Distribution.at(metadata_path.parent)
|
||||
return cls(dist, metadata_path.parent, None)
|
||||
|
||||
@classmethod
|
||||
def from_wheel(cls, wheel: Wheel, name: str) -> BaseDistribution:
|
||||
try:
|
||||
with wheel.as_zipfile() as zf:
|
||||
dist = WheelDistribution.from_zipfile(zf, name, wheel.location)
|
||||
except zipfile.BadZipFile as e:
|
||||
raise InvalidWheel(wheel.location, name) from e
|
||||
except UnsupportedWheel as e:
|
||||
raise UnsupportedWheel(f"{name} has an invalid wheel, {e}")
|
||||
return cls(dist, dist.info_location, pathlib.PurePosixPath(wheel.location))
|
||||
|
||||
@property
|
||||
def location(self) -> Optional[str]:
|
||||
if self._info_location is None:
|
||||
return None
|
||||
return str(self._info_location.parent)
|
||||
|
||||
@property
|
||||
def info_location(self) -> Optional[str]:
|
||||
if self._info_location is None:
|
||||
return None
|
||||
return str(self._info_location)
|
||||
|
||||
@property
|
||||
def installed_location(self) -> Optional[str]:
|
||||
if self._installed_location is None:
|
||||
return None
|
||||
return normalize_path(str(self._installed_location))
|
||||
|
||||
def _get_dist_name_from_location(self) -> Optional[str]:
|
||||
"""Try to get the name from the metadata directory name.
|
||||
|
||||
This is much faster than reading metadata.
|
||||
"""
|
||||
if self._info_location is None:
|
||||
return None
|
||||
stem, suffix = os.path.splitext(self._info_location.name)
|
||||
if suffix not in (".dist-info", ".egg-info"):
|
||||
return None
|
||||
return stem.split("-", 1)[0]
|
||||
|
||||
@property
|
||||
def canonical_name(self) -> NormalizedName:
|
||||
name = self._get_dist_name_from_location() or get_dist_name(self._dist)
|
||||
return canonicalize_name(name)
|
||||
|
||||
@property
|
||||
def version(self) -> DistributionVersion:
|
||||
return parse_version(self._dist.version)
|
||||
|
||||
def is_file(self, path: InfoPath) -> bool:
|
||||
return self._dist.read_text(str(path)) is not None
|
||||
|
||||
def iter_distutils_script_names(self) -> Iterator[str]:
|
||||
# A distutils installation is always "flat" (not in e.g. egg form), so
|
||||
# if this distribution's info location is NOT a pathlib.Path (but e.g.
|
||||
# zipfile.Path), it can never contain any distutils scripts.
|
||||
if not isinstance(self._info_location, pathlib.Path):
|
||||
return
|
||||
for child in self._info_location.joinpath("scripts").iterdir():
|
||||
yield child.name
|
||||
|
||||
def read_text(self, path: InfoPath) -> str:
|
||||
content = self._dist.read_text(str(path))
|
||||
if content is None:
|
||||
raise FileNotFoundError(path)
|
||||
return content
|
||||
|
||||
def iter_entry_points(self) -> Iterable[BaseEntryPoint]:
|
||||
# importlib.metadata's EntryPoint structure sasitfies BaseEntryPoint.
|
||||
return self._dist.entry_points
|
||||
|
||||
def _metadata_impl(self) -> email.message.Message:
|
||||
# From Python 3.10+, importlib.metadata declares PackageMetadata as the
|
||||
# return type. This protocol is unfortunately a disaster now and misses
|
||||
# a ton of fields that we need, including get() and get_payload(). We
|
||||
# rely on the implementation that the object is actually a Message now,
|
||||
# until upstream can improve the protocol. (python/cpython#94952)
|
||||
return cast(email.message.Message, self._dist.metadata)
|
||||
|
||||
def iter_provided_extras(self) -> Iterable[str]:
|
||||
return self.metadata.get_all("Provides-Extra", [])
|
||||
|
||||
def is_extra_provided(self, extra: str) -> bool:
|
||||
return any(
|
||||
canonicalize_name(provided_extra) == canonicalize_name(extra)
|
||||
for provided_extra in self.metadata.get_all("Provides-Extra", [])
|
||||
)
|
||||
|
||||
def iter_dependencies(self, extras: Collection[str] = ()) -> Iterable[Requirement]:
|
||||
contexts: Sequence[Dict[str, str]] = [{"extra": e} for e in extras]
|
||||
for req_string in self.metadata.get_all("Requires-Dist", []):
|
||||
req = Requirement(req_string)
|
||||
if not req.marker:
|
||||
yield req
|
||||
elif not extras and req.marker.evaluate({"extra": ""}):
|
||||
yield req
|
||||
elif any(req.marker.evaluate(context) for context in contexts):
|
||||
yield req
|
||||
@@ -1,189 +0,0 @@
|
||||
import functools
|
||||
import importlib.metadata
|
||||
import logging
|
||||
import os
|
||||
import pathlib
|
||||
import sys
|
||||
import zipfile
|
||||
import zipimport
|
||||
from typing import Iterator, List, Optional, Sequence, Set, Tuple
|
||||
|
||||
from pip._vendor.packaging.utils import NormalizedName, canonicalize_name
|
||||
|
||||
from pip._internal.metadata.base import BaseDistribution, BaseEnvironment
|
||||
from pip._internal.models.wheel import Wheel
|
||||
from pip._internal.utils.deprecation import deprecated
|
||||
from pip._internal.utils.filetypes import WHEEL_EXTENSION
|
||||
|
||||
from ._compat import BadMetadata, BasePath, get_dist_name, get_info_location
|
||||
from ._dists import Distribution
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _looks_like_wheel(location: str) -> bool:
|
||||
if not location.endswith(WHEEL_EXTENSION):
|
||||
return False
|
||||
if not os.path.isfile(location):
|
||||
return False
|
||||
if not Wheel.wheel_file_re.match(os.path.basename(location)):
|
||||
return False
|
||||
return zipfile.is_zipfile(location)
|
||||
|
||||
|
||||
class _DistributionFinder:
|
||||
"""Finder to locate distributions.
|
||||
|
||||
The main purpose of this class is to memoize found distributions' names, so
|
||||
only one distribution is returned for each package name. At lot of pip code
|
||||
assumes this (because it is setuptools's behavior), and not doing the same
|
||||
can potentially cause a distribution in lower precedence path to override a
|
||||
higher precedence one if the caller is not careful.
|
||||
|
||||
Eventually we probably want to make it possible to see lower precedence
|
||||
installations as well. It's useful feature, after all.
|
||||
"""
|
||||
|
||||
FoundResult = Tuple[importlib.metadata.Distribution, Optional[BasePath]]
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._found_names: Set[NormalizedName] = set()
|
||||
|
||||
def _find_impl(self, location: str) -> Iterator[FoundResult]:
|
||||
"""Find distributions in a location."""
|
||||
# Skip looking inside a wheel. Since a package inside a wheel is not
|
||||
# always valid (due to .data directories etc.), its .dist-info entry
|
||||
# should not be considered an installed distribution.
|
||||
if _looks_like_wheel(location):
|
||||
return
|
||||
# To know exactly where we find a distribution, we have to feed in the
|
||||
# paths one by one, instead of dumping the list to importlib.metadata.
|
||||
for dist in importlib.metadata.distributions(path=[location]):
|
||||
info_location = get_info_location(dist)
|
||||
try:
|
||||
raw_name = get_dist_name(dist)
|
||||
except BadMetadata as e:
|
||||
logger.warning("Skipping %s due to %s", info_location, e.reason)
|
||||
continue
|
||||
normalized_name = canonicalize_name(raw_name)
|
||||
if normalized_name in self._found_names:
|
||||
continue
|
||||
self._found_names.add(normalized_name)
|
||||
yield dist, info_location
|
||||
|
||||
def find(self, location: str) -> Iterator[BaseDistribution]:
|
||||
"""Find distributions in a location.
|
||||
|
||||
The path can be either a directory, or a ZIP archive.
|
||||
"""
|
||||
for dist, info_location in self._find_impl(location):
|
||||
if info_location is None:
|
||||
installed_location: Optional[BasePath] = None
|
||||
else:
|
||||
installed_location = info_location.parent
|
||||
yield Distribution(dist, info_location, installed_location)
|
||||
|
||||
def find_linked(self, location: str) -> Iterator[BaseDistribution]:
|
||||
"""Read location in egg-link files and return distributions in there.
|
||||
|
||||
The path should be a directory; otherwise this returns nothing. This
|
||||
follows how setuptools does this for compatibility. The first non-empty
|
||||
line in the egg-link is read as a path (resolved against the egg-link's
|
||||
containing directory if relative). Distributions found at that linked
|
||||
location are returned.
|
||||
"""
|
||||
path = pathlib.Path(location)
|
||||
if not path.is_dir():
|
||||
return
|
||||
for child in path.iterdir():
|
||||
if child.suffix != ".egg-link":
|
||||
continue
|
||||
with child.open() as f:
|
||||
lines = (line.strip() for line in f)
|
||||
target_rel = next((line for line in lines if line), "")
|
||||
if not target_rel:
|
||||
continue
|
||||
target_location = str(path.joinpath(target_rel))
|
||||
for dist, info_location in self._find_impl(target_location):
|
||||
yield Distribution(dist, info_location, path)
|
||||
|
||||
def _find_eggs_in_dir(self, location: str) -> Iterator[BaseDistribution]:
|
||||
from pip._vendor.pkg_resources import find_distributions
|
||||
|
||||
from pip._internal.metadata import pkg_resources as legacy
|
||||
|
||||
with os.scandir(location) as it:
|
||||
for entry in it:
|
||||
if not entry.name.endswith(".egg"):
|
||||
continue
|
||||
for dist in find_distributions(entry.path):
|
||||
yield legacy.Distribution(dist)
|
||||
|
||||
def _find_eggs_in_zip(self, location: str) -> Iterator[BaseDistribution]:
|
||||
from pip._vendor.pkg_resources import find_eggs_in_zip
|
||||
|
||||
from pip._internal.metadata import pkg_resources as legacy
|
||||
|
||||
try:
|
||||
importer = zipimport.zipimporter(location)
|
||||
except zipimport.ZipImportError:
|
||||
return
|
||||
for dist in find_eggs_in_zip(importer, location):
|
||||
yield legacy.Distribution(dist)
|
||||
|
||||
def find_eggs(self, location: str) -> Iterator[BaseDistribution]:
|
||||
"""Find eggs in a location.
|
||||
|
||||
This actually uses the old *pkg_resources* backend. We likely want to
|
||||
deprecate this so we can eventually remove the *pkg_resources*
|
||||
dependency entirely. Before that, this should first emit a deprecation
|
||||
warning for some versions when using the fallback since importing
|
||||
*pkg_resources* is slow for those who don't need it.
|
||||
"""
|
||||
if os.path.isdir(location):
|
||||
yield from self._find_eggs_in_dir(location)
|
||||
if zipfile.is_zipfile(location):
|
||||
yield from self._find_eggs_in_zip(location)
|
||||
|
||||
|
||||
@functools.lru_cache(maxsize=None) # Warn a distribution exactly once.
|
||||
def _emit_egg_deprecation(location: Optional[str]) -> None:
|
||||
deprecated(
|
||||
reason=f"Loading egg at {location} is deprecated.",
|
||||
replacement="to use pip for package installation.",
|
||||
gone_in="24.3",
|
||||
issue=12330,
|
||||
)
|
||||
|
||||
|
||||
class Environment(BaseEnvironment):
|
||||
def __init__(self, paths: Sequence[str]) -> None:
|
||||
self._paths = paths
|
||||
|
||||
@classmethod
|
||||
def default(cls) -> BaseEnvironment:
|
||||
return cls(sys.path)
|
||||
|
||||
@classmethod
|
||||
def from_paths(cls, paths: Optional[List[str]]) -> BaseEnvironment:
|
||||
if paths is None:
|
||||
return cls(sys.path)
|
||||
return cls(paths)
|
||||
|
||||
def _iter_distributions(self) -> Iterator[BaseDistribution]:
|
||||
finder = _DistributionFinder()
|
||||
for location in self._paths:
|
||||
yield from finder.find(location)
|
||||
for dist in finder.find_eggs(location):
|
||||
_emit_egg_deprecation(dist.location)
|
||||
yield dist
|
||||
# This must go last because that's how pkg_resources tie-breaks.
|
||||
yield from finder.find_linked(location)
|
||||
|
||||
def get_distribution(self, name: str) -> Optional[BaseDistribution]:
|
||||
matches = (
|
||||
distribution
|
||||
for distribution in self.iter_all_distributions()
|
||||
if distribution.canonical_name == canonicalize_name(name)
|
||||
)
|
||||
return next(matches, None)
|
||||
@@ -1,278 +0,0 @@
|
||||
import email.message
|
||||
import email.parser
|
||||
import logging
|
||||
import os
|
||||
import zipfile
|
||||
from typing import Collection, Iterable, Iterator, List, Mapping, NamedTuple, Optional
|
||||
|
||||
from pip._vendor import pkg_resources
|
||||
from pip._vendor.packaging.requirements import Requirement
|
||||
from pip._vendor.packaging.utils import NormalizedName, canonicalize_name
|
||||
from pip._vendor.packaging.version import parse as parse_version
|
||||
|
||||
from pip._internal.exceptions import InvalidWheel, NoneMetadataError, UnsupportedWheel
|
||||
from pip._internal.utils.egg_link import egg_link_path_from_location
|
||||
from pip._internal.utils.misc import display_path, normalize_path
|
||||
from pip._internal.utils.wheel import parse_wheel, read_wheel_metadata_file
|
||||
|
||||
from .base import (
|
||||
BaseDistribution,
|
||||
BaseEntryPoint,
|
||||
BaseEnvironment,
|
||||
DistributionVersion,
|
||||
InfoPath,
|
||||
Wheel,
|
||||
)
|
||||
|
||||
__all__ = ["NAME", "Distribution", "Environment"]
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
NAME = "pkg_resources"
|
||||
|
||||
|
||||
class EntryPoint(NamedTuple):
|
||||
name: str
|
||||
value: str
|
||||
group: str
|
||||
|
||||
|
||||
class InMemoryMetadata:
|
||||
"""IMetadataProvider that reads metadata files from a dictionary.
|
||||
|
||||
This also maps metadata decoding exceptions to our internal exception type.
|
||||
"""
|
||||
|
||||
def __init__(self, metadata: Mapping[str, bytes], wheel_name: str) -> None:
|
||||
self._metadata = metadata
|
||||
self._wheel_name = wheel_name
|
||||
|
||||
def has_metadata(self, name: str) -> bool:
|
||||
return name in self._metadata
|
||||
|
||||
def get_metadata(self, name: str) -> str:
|
||||
try:
|
||||
return self._metadata[name].decode()
|
||||
except UnicodeDecodeError as e:
|
||||
# Augment the default error with the origin of the file.
|
||||
raise UnsupportedWheel(
|
||||
f"Error decoding metadata for {self._wheel_name}: {e} in {name} file"
|
||||
)
|
||||
|
||||
def get_metadata_lines(self, name: str) -> Iterable[str]:
|
||||
return pkg_resources.yield_lines(self.get_metadata(name))
|
||||
|
||||
def metadata_isdir(self, name: str) -> bool:
|
||||
return False
|
||||
|
||||
def metadata_listdir(self, name: str) -> List[str]:
|
||||
return []
|
||||
|
||||
def run_script(self, script_name: str, namespace: str) -> None:
|
||||
pass
|
||||
|
||||
|
||||
class Distribution(BaseDistribution):
|
||||
def __init__(self, dist: pkg_resources.Distribution) -> None:
|
||||
self._dist = dist
|
||||
|
||||
@classmethod
|
||||
def from_directory(cls, directory: str) -> BaseDistribution:
|
||||
dist_dir = directory.rstrip(os.sep)
|
||||
|
||||
# Build a PathMetadata object, from path to metadata. :wink:
|
||||
base_dir, dist_dir_name = os.path.split(dist_dir)
|
||||
metadata = pkg_resources.PathMetadata(base_dir, dist_dir)
|
||||
|
||||
# Determine the correct Distribution object type.
|
||||
if dist_dir.endswith(".egg-info"):
|
||||
dist_cls = pkg_resources.Distribution
|
||||
dist_name = os.path.splitext(dist_dir_name)[0]
|
||||
else:
|
||||
assert dist_dir.endswith(".dist-info")
|
||||
dist_cls = pkg_resources.DistInfoDistribution
|
||||
dist_name = os.path.splitext(dist_dir_name)[0].split("-")[0]
|
||||
|
||||
dist = dist_cls(base_dir, project_name=dist_name, metadata=metadata)
|
||||
return cls(dist)
|
||||
|
||||
@classmethod
|
||||
def from_metadata_file_contents(
|
||||
cls,
|
||||
metadata_contents: bytes,
|
||||
filename: str,
|
||||
project_name: str,
|
||||
) -> BaseDistribution:
|
||||
metadata_dict = {
|
||||
"METADATA": metadata_contents,
|
||||
}
|
||||
dist = pkg_resources.DistInfoDistribution(
|
||||
location=filename,
|
||||
metadata=InMemoryMetadata(metadata_dict, filename),
|
||||
project_name=project_name,
|
||||
)
|
||||
return cls(dist)
|
||||
|
||||
@classmethod
|
||||
def from_wheel(cls, wheel: Wheel, name: str) -> BaseDistribution:
|
||||
try:
|
||||
with wheel.as_zipfile() as zf:
|
||||
info_dir, _ = parse_wheel(zf, name)
|
||||
metadata_dict = {
|
||||
path.split("/", 1)[-1]: read_wheel_metadata_file(zf, path)
|
||||
for path in zf.namelist()
|
||||
if path.startswith(f"{info_dir}/")
|
||||
}
|
||||
except zipfile.BadZipFile as e:
|
||||
raise InvalidWheel(wheel.location, name) from e
|
||||
except UnsupportedWheel as e:
|
||||
raise UnsupportedWheel(f"{name} has an invalid wheel, {e}")
|
||||
dist = pkg_resources.DistInfoDistribution(
|
||||
location=wheel.location,
|
||||
metadata=InMemoryMetadata(metadata_dict, wheel.location),
|
||||
project_name=name,
|
||||
)
|
||||
return cls(dist)
|
||||
|
||||
@property
|
||||
def location(self) -> Optional[str]:
|
||||
return self._dist.location
|
||||
|
||||
@property
|
||||
def installed_location(self) -> Optional[str]:
|
||||
egg_link = egg_link_path_from_location(self.raw_name)
|
||||
if egg_link:
|
||||
location = egg_link
|
||||
elif self.location:
|
||||
location = self.location
|
||||
else:
|
||||
return None
|
||||
return normalize_path(location)
|
||||
|
||||
@property
|
||||
def info_location(self) -> Optional[str]:
|
||||
return self._dist.egg_info
|
||||
|
||||
@property
|
||||
def installed_by_distutils(self) -> bool:
|
||||
# A distutils-installed distribution is provided by FileMetadata. This
|
||||
# provider has a "path" attribute not present anywhere else. Not the
|
||||
# best introspection logic, but pip has been doing this for a long time.
|
||||
try:
|
||||
return bool(self._dist._provider.path)
|
||||
except AttributeError:
|
||||
return False
|
||||
|
||||
@property
|
||||
def canonical_name(self) -> NormalizedName:
|
||||
return canonicalize_name(self._dist.project_name)
|
||||
|
||||
@property
|
||||
def version(self) -> DistributionVersion:
|
||||
return parse_version(self._dist.version)
|
||||
|
||||
def is_file(self, path: InfoPath) -> bool:
|
||||
return self._dist.has_metadata(str(path))
|
||||
|
||||
def iter_distutils_script_names(self) -> Iterator[str]:
|
||||
yield from self._dist.metadata_listdir("scripts")
|
||||
|
||||
def read_text(self, path: InfoPath) -> str:
|
||||
name = str(path)
|
||||
if not self._dist.has_metadata(name):
|
||||
raise FileNotFoundError(name)
|
||||
content = self._dist.get_metadata(name)
|
||||
if content is None:
|
||||
raise NoneMetadataError(self, name)
|
||||
return content
|
||||
|
||||
def iter_entry_points(self) -> Iterable[BaseEntryPoint]:
|
||||
for group, entries in self._dist.get_entry_map().items():
|
||||
for name, entry_point in entries.items():
|
||||
name, _, value = str(entry_point).partition("=")
|
||||
yield EntryPoint(name=name.strip(), value=value.strip(), group=group)
|
||||
|
||||
def _metadata_impl(self) -> email.message.Message:
|
||||
"""
|
||||
:raises NoneMetadataError: if the distribution reports `has_metadata()`
|
||||
True but `get_metadata()` returns None.
|
||||
"""
|
||||
if isinstance(self._dist, pkg_resources.DistInfoDistribution):
|
||||
metadata_name = "METADATA"
|
||||
else:
|
||||
metadata_name = "PKG-INFO"
|
||||
try:
|
||||
metadata = self.read_text(metadata_name)
|
||||
except FileNotFoundError:
|
||||
if self.location:
|
||||
displaying_path = display_path(self.location)
|
||||
else:
|
||||
displaying_path = repr(self.location)
|
||||
logger.warning("No metadata found in %s", displaying_path)
|
||||
metadata = ""
|
||||
feed_parser = email.parser.FeedParser()
|
||||
feed_parser.feed(metadata)
|
||||
return feed_parser.close()
|
||||
|
||||
def iter_dependencies(self, extras: Collection[str] = ()) -> Iterable[Requirement]:
|
||||
if extras: # pkg_resources raises on invalid extras, so we sanitize.
|
||||
extras = frozenset(pkg_resources.safe_extra(e) for e in extras)
|
||||
extras = extras.intersection(self._dist.extras)
|
||||
return self._dist.requires(extras)
|
||||
|
||||
def iter_provided_extras(self) -> Iterable[str]:
|
||||
return self._dist.extras
|
||||
|
||||
def is_extra_provided(self, extra: str) -> bool:
|
||||
return pkg_resources.safe_extra(extra) in self._dist.extras
|
||||
|
||||
|
||||
class Environment(BaseEnvironment):
|
||||
def __init__(self, ws: pkg_resources.WorkingSet) -> None:
|
||||
self._ws = ws
|
||||
|
||||
@classmethod
|
||||
def default(cls) -> BaseEnvironment:
|
||||
return cls(pkg_resources.working_set)
|
||||
|
||||
@classmethod
|
||||
def from_paths(cls, paths: Optional[List[str]]) -> BaseEnvironment:
|
||||
return cls(pkg_resources.WorkingSet(paths))
|
||||
|
||||
def _iter_distributions(self) -> Iterator[BaseDistribution]:
|
||||
for dist in self._ws:
|
||||
yield Distribution(dist)
|
||||
|
||||
def _search_distribution(self, name: str) -> Optional[BaseDistribution]:
|
||||
"""Find a distribution matching the ``name`` in the environment.
|
||||
|
||||
This searches from *all* distributions available in the environment, to
|
||||
match the behavior of ``pkg_resources.get_distribution()``.
|
||||
"""
|
||||
canonical_name = canonicalize_name(name)
|
||||
for dist in self.iter_all_distributions():
|
||||
if dist.canonical_name == canonical_name:
|
||||
return dist
|
||||
return None
|
||||
|
||||
def get_distribution(self, name: str) -> Optional[BaseDistribution]:
|
||||
# Search the distribution by looking through the working set.
|
||||
dist = self._search_distribution(name)
|
||||
if dist:
|
||||
return dist
|
||||
|
||||
# If distribution could not be found, call working_set.require to
|
||||
# update the working set, and try to find the distribution again.
|
||||
# This might happen for e.g. when you install a package twice, once
|
||||
# using setup.py develop and again using setup.py install. Now when
|
||||
# running pip uninstall twice, the package gets removed from the
|
||||
# working set in the first uninstall, so we have to populate the
|
||||
# working set again so that pip knows about it and the packages gets
|
||||
# picked up and is successfully uninstalled the second time too.
|
||||
try:
|
||||
# We didn't pass in any version specifiers, so this can never
|
||||
# raise pkg_resources.VersionConflict.
|
||||
self._ws.require(name)
|
||||
except pkg_resources.DistributionNotFound:
|
||||
return None
|
||||
return self._search_distribution(name)
|
||||
Reference in New Issue
Block a user