更改enroll命名,添加了注释,向get_error_msg中添加了一些错误代码

This commit is contained in:
ygm1881
2022-05-05 22:59:35 +08:00
parent 51b5e374a3
commit ece69eaf57
4637 changed files with 7699 additions and 608140 deletions
@@ -4,28 +4,28 @@ import logging
import logging.handlers
import os
import sys
import threading
from dataclasses import dataclass
from logging import Filter
from typing import IO, Any, ClassVar, Iterator, List, Optional, TextIO, Type
from typing import IO, Any, Callable, Iterator, Optional, TextIO, Type, cast
from pip._vendor.rich.console import (
Console,
ConsoleOptions,
ConsoleRenderable,
RenderResult,
)
from pip._vendor.rich.highlighter import NullHighlighter
from pip._vendor.rich.logging import RichHandler
from pip._vendor.rich.segment import Segment
from pip._vendor.rich.style import Style
from pip._internal.exceptions import DiagnosticPipError
from pip._internal.utils._log import VERBOSE, getLogger
from pip._internal.utils.compat import WINDOWS
from pip._internal.utils.deprecation import DEPRECATION_MSG_PREFIX
from pip._internal.utils.misc import ensure_dir
try:
import threading
except ImportError:
import dummy_threading as threading # type: ignore
try:
from pip._vendor import colorama
# Lots of different errors can come from this, including SystemError and
# ImportError.
except Exception:
colorama = None
_log_state = threading.local()
subprocess_logger = getLogger("pip.subprocessor")
@@ -119,63 +119,78 @@ class IndentingFormatter(logging.Formatter):
return formatted
@dataclass
class IndentedRenderable:
renderable: ConsoleRenderable
indent: int
def _color_wrap(*colors: str) -> Callable[[str], str]:
def wrapped(inp: str) -> str:
return "".join(list(colors) + [inp, colorama.Style.RESET_ALL])
def __rich_console__(
self, console: Console, options: ConsoleOptions
) -> RenderResult:
segments = console.render(self.renderable, options)
lines = Segment.split_lines(segments)
for line in lines:
yield Segment(" " * self.indent)
yield from line
yield Segment("\n")
return wrapped
class RichPipStreamHandler(RichHandler):
KEYWORDS: ClassVar[Optional[List[str]]] = []
class ColorizedStreamHandler(logging.StreamHandler):
def __init__(self, stream: Optional[TextIO], no_color: bool) -> None:
super().__init__(
console=Console(file=stream, no_color=no_color, soft_wrap=True),
show_time=False,
show_level=False,
show_path=False,
highlighter=NullHighlighter(),
# Don't build up a list of colors if we don't have colorama
if colorama:
COLORS = [
# This needs to be in order from highest logging level to lowest.
(logging.ERROR, _color_wrap(colorama.Fore.RED)),
(logging.WARNING, _color_wrap(colorama.Fore.YELLOW)),
]
else:
COLORS = []
def __init__(self, stream: Optional[TextIO] = None, no_color: bool = None) -> None:
super().__init__(stream)
self._no_color = no_color
if WINDOWS and colorama:
self.stream = colorama.AnsiToWin32(self.stream)
def _using_stdout(self) -> bool:
"""
Return whether the handler is using sys.stdout.
"""
if WINDOWS and colorama:
# Then self.stream is an AnsiToWin32 object.
stream = cast(colorama.AnsiToWin32, self.stream)
return stream.wrapped is sys.stdout
return self.stream is sys.stdout
def should_color(self) -> bool:
# Don't colorize things if we do not have colorama or if told not to
if not colorama or self._no_color:
return False
real_stream = (
self.stream
if not isinstance(self.stream, colorama.AnsiToWin32)
else self.stream.wrapped
)
# Our custom override on Rich's logger, to make things work as we need them to.
def emit(self, record: logging.LogRecord) -> None:
style: Optional[Style] = None
# If the stream is a tty we should color it
if hasattr(real_stream, "isatty") and real_stream.isatty():
return True
# If we are given a diagnostic error to present, present it with indentation.
if record.msg == "[present-diagnostic] %s" and len(record.args) == 1:
diagnostic_error: DiagnosticPipError = record.args[0] # type: ignore[index]
assert isinstance(diagnostic_error, DiagnosticPipError)
# If we have an ANSI term we should color it
if os.environ.get("TERM") == "ANSI":
return True
renderable: ConsoleRenderable = IndentedRenderable(
diagnostic_error, indent=get_indentation()
)
else:
message = self.format(record)
renderable = self.render_message(record, message)
if record.levelno is not None:
if record.levelno >= logging.ERROR:
style = Style(color="red")
elif record.levelno >= logging.WARNING:
style = Style(color="yellow")
# If anything else we should not color it
return False
try:
self.console.print(renderable, overflow="ignore", crop=False, style=style)
except Exception:
self.handleError(record)
def format(self, record: logging.LogRecord) -> str:
msg = super().format(record)
if self.should_color():
for level, color in self.COLORS:
if record.levelno >= level:
msg = color(msg)
break
return msg
# The logging module says handleError() can be customized.
def handleError(self, record: logging.LogRecord) -> None:
"""Called when logging is unable to log some output."""
exc_class, exc = sys.exc_info()[:2]
# If a broken pipe occurred while calling write() or flush() on the
# stdout stream in logging's Handler.emit(), then raise our special
@@ -184,7 +199,7 @@ class RichPipStreamHandler(RichHandler):
if (
exc_class
and exc
and self.console.file is sys.stdout
and self._using_stdout()
and _is_broken_pipe_error(exc_class, exc)
):
raise BrokenStdoutLoggingError()
@@ -260,7 +275,7 @@ def setup_logging(verbosity: int, no_color: bool, user_log_file: Optional[str])
"stderr": "ext://sys.stderr",
}
handler_classes = {
"stream": "pip._internal.utils.logging.RichPipStreamHandler",
"stream": "pip._internal.utils.logging.ColorizedStreamHandler",
"file": "pip._internal.utils.logging.BetterRotatingFileHandler",
}
handlers = ["console", "console_errors", "console_subprocess"] + (
@@ -318,8 +333,8 @@ def setup_logging(verbosity: int, no_color: bool, user_log_file: Optional[str])
"console_subprocess": {
"level": level,
"class": handler_classes["stream"],
"stream": log_streams["stderr"],
"no_color": no_color,
"stream": log_streams["stderr"],
"filters": ["restrict_to_subprocess"],
"formatter": "indent",
},
@@ -32,12 +32,14 @@ from typing import (
cast,
)
from pip._vendor.pkg_resources import Distribution
from pip._vendor.tenacity import retry, stop_after_delay, wait_fixed
from pip import __version__
from pip._internal.exceptions import CommandError
from pip._internal.locations import get_major_minor_version
from pip._internal.locations import get_major_minor_version, site_packages, user_site
from pip._internal.utils.compat import WINDOWS
from pip._internal.utils.egg_link import egg_link_path_from_location
from pip._internal.utils.virtualenv import running_under_virtualenv
__all__ = [
@@ -326,6 +328,64 @@ def is_local(path: str) -> bool:
return path.startswith(normalize_path(sys.prefix))
def dist_is_local(dist: Distribution) -> bool:
"""
Return True if given Distribution object is installed locally
(i.e. within current virtualenv).
Always True if we're not in a virtualenv.
"""
return is_local(dist_location(dist))
def dist_in_usersite(dist: Distribution) -> bool:
"""
Return True if given Distribution is installed in user site.
"""
return dist_location(dist).startswith(normalize_path(user_site))
def dist_in_site_packages(dist: Distribution) -> bool:
"""
Return True if given Distribution is installed in
sysconfig.get_python_lib().
"""
return dist_location(dist).startswith(normalize_path(site_packages))
def get_distribution(req_name: str) -> Optional[Distribution]:
"""Given a requirement name, return the installed Distribution object.
This searches from *all* distributions available in the environment, to
match the behavior of ``pkg_resources.get_distribution()``.
Left for compatibility until direct pkg_resources uses are refactored out.
"""
from pip._internal.metadata import get_default_environment
from pip._internal.metadata.pkg_resources import Distribution as _Dist
dist = get_default_environment().get_distribution(req_name)
if dist is None:
return None
return cast(_Dist, dist)._dist
def dist_location(dist: Distribution) -> str:
"""
Get the site-packages location of this distribution. Generally
this is dist.location, except in the case of develop-installed
packages, where dist.location is the source code location, and we
want to know where the egg-link file is.
The returned location is normalized (in particular, with symlinks removed).
"""
egg_link = egg_link_path_from_location(dist.project_name)
if egg_link:
return normalize_path(egg_link)
return normalize_path(dist.location)
def write_output(msg: Any, *args: Any) -> None:
logger.info(msg, *args)
@@ -1,12 +1,16 @@
import functools
import logging
import re
from typing import NewType, Optional, Tuple, cast
from email.message import Message
from email.parser import FeedParser
from typing import Optional, Tuple
from pip._vendor import pkg_resources
from pip._vendor.packaging import specifiers, version
from pip._vendor.packaging.requirements import Requirement
from pip._vendor.pkg_resources import Distribution
NormalizedExtra = NewType("NormalizedExtra", str)
from pip._internal.exceptions import NoneMetadataError
from pip._internal.utils.misc import display_path
logger = logging.getLogger(__name__)
@@ -34,6 +38,41 @@ def check_requires_python(
return python_version in requires_python_specifier
def get_metadata(dist: Distribution) -> Message:
"""
:raises NoneMetadataError: if the distribution reports `has_metadata()`
True but `get_metadata()` returns None.
"""
metadata_name = "METADATA"
if isinstance(dist, pkg_resources.DistInfoDistribution) and dist.has_metadata(
metadata_name
):
metadata = dist.get_metadata(metadata_name)
elif dist.has_metadata("PKG-INFO"):
metadata_name = "PKG-INFO"
metadata = dist.get_metadata(metadata_name)
else:
logger.warning("No metadata found in %s", display_path(dist.location))
metadata = ""
if metadata is None:
raise NoneMetadataError(dist, metadata_name)
feed_parser = FeedParser()
# The following line errors out if with a "NoneType" TypeError if
# passed metadata=None.
feed_parser.feed(metadata)
return feed_parser.close()
def get_installer(dist: Distribution) -> str:
if dist.has_metadata("INSTALLER"):
for line in dist.get_metadata_lines("INSTALLER"):
if line.strip():
return line.strip()
return ""
@functools.lru_cache(maxsize=512)
def get_requirement(req_string: str) -> Requirement:
"""Construct a packaging.Requirement object with caching"""
@@ -43,15 +82,3 @@ def get_requirement(req_string: str) -> Requirement:
# minimize repeated parsing of the same string to construct equivalent
# Requirement objects.
return Requirement(req_string)
def safe_extra(extra: str) -> NormalizedExtra:
"""Convert an arbitrary string to a standard 'extra' name
Any runs of non-alphanumeric characters are replaced with a single '_',
and the result is always lowercased.
This function is duplicated from ``pkg_resources``. Note that this is not
the same to either ``canonicalize_name`` or ``_egg_link_name``.
"""
return cast(NormalizedExtra, re.sub("[^A-Za-z0-9.-]+", "_", extra).lower())
@@ -1,49 +1,21 @@
import sys
import textwrap
from typing import List, Optional, Sequence
# Shim to wrap setup.py invocation with setuptools
# Note that __file__ is handled via two {!r} *and* %r, to ensure that paths on
# Windows are correctly handled (it should be "C:\\Users" not "C:\Users").
_SETUPTOOLS_SHIM = textwrap.dedent(
"""
exec(compile('''
# This is <pip-setuptools-caller> -- a caller that pip uses to run setup.py
#
# - It imports setuptools before invoking setup.py, to enable projects that directly
# import from `distutils.core` to work with newer packaging standards.
# - It provides a clear error message when setuptools is not installed.
# - It sets `sys.argv[0]` to the underlying `setup.py`, when invoking `setup.py` so
# setuptools doesn't think the script is `-c`. This avoids the following warning:
# manifest_maker: standard file '-c' not found".
# - It generates a shim setup.py, for handling setup.cfg-only projects.
import os, sys, tokenize
try:
import setuptools
except ImportError as error:
print(
"ERROR: Can not execute `setup.py` since setuptools is not available in "
"the build environment.",
file=sys.stderr,
)
sys.exit(1)
__file__ = %r
sys.argv[0] = __file__
if os.path.exists(__file__):
filename = __file__
with tokenize.open(__file__) as f:
setup_py_code = f.read()
else:
filename = "<auto-generated setuptools caller>"
setup_py_code = "from setuptools import setup; setup()"
exec(compile(setup_py_code, filename, "exec"))
''' % ({!r},), "<pip-setuptools-caller>", "exec"))
"""
).rstrip()
#
# We set sys.argv[0] to the path to the underlying setup.py file so
# setuptools / distutils don't take the path to the setup.py to be "-c" when
# invoking via the shim. This avoids e.g. the following manifest_maker
# warning: "warning: manifest_maker: standard file '-c' not found".
_SETUPTOOLS_SHIM = (
"import io, os, sys, setuptools, tokenize; sys.argv[0] = {0!r}; __file__={0!r};"
"f = getattr(tokenize, 'open', open)(__file__) "
"if os.path.exists(__file__) "
"else io.StringIO('from setuptools import setup; setup()');"
"code = f.read().replace('\\r\\n', '\\n');"
"f.close();"
"exec(compile(code, __file__, 'exec'))"
)
def make_setuptools_shim_args(
@@ -13,8 +13,6 @@ from typing import (
Union,
)
from pip._vendor.rich.markup import escape
from pip._internal.cli.spinners import SpinnerInterface, open_spinner
from pip._internal.exceptions import InstallationSubprocessError
from pip._internal.utils.logging import VERBOSE, subprocess_logger
@@ -29,6 +27,9 @@ if TYPE_CHECKING:
CommandArgs = List[Union[str, HiddenText]]
LOG_DIVIDER = "----------------------------------------"
def make_command(*args: Union[str, HiddenText, CommandArgs]) -> CommandArgs:
"""
Create a CommandArgs object.
@@ -68,19 +69,53 @@ def reveal_command_args(args: Union[List[str], CommandArgs]) -> List[str]:
return [arg.secret if isinstance(arg, HiddenText) else arg for arg in args]
def make_subprocess_output_error(
cmd_args: Union[List[str], CommandArgs],
cwd: Optional[str],
lines: List[str],
exit_status: int,
) -> str:
"""
Create and return the error message to use to log a subprocess error
with command output.
:param lines: A list of lines, each ending with a newline.
"""
command = format_command_args(cmd_args)
# We know the joined output value ends in a newline.
output = "".join(lines)
msg = (
# Use a unicode string to avoid "UnicodeEncodeError: 'ascii'
# codec can't encode character ..." in Python 2 when a format
# argument (e.g. `output`) has a non-ascii character.
"Command errored out with exit status {exit_status}:\n"
" command: {command_display}\n"
" cwd: {cwd_display}\n"
"Complete output ({line_count} lines):\n{output}{divider}"
).format(
exit_status=exit_status,
command_display=command,
cwd_display=cwd,
line_count=len(lines),
output=output,
divider=LOG_DIVIDER,
)
return msg
def call_subprocess(
cmd: Union[List[str], CommandArgs],
show_stdout: bool = False,
cwd: Optional[str] = None,
on_returncode: 'Literal["raise", "warn", "ignore"]' = "raise",
extra_ok_returncodes: Optional[Iterable[int]] = None,
command_desc: Optional[str] = None,
extra_environ: Optional[Mapping[str, Any]] = None,
unset_environ: Optional[Iterable[str]] = None,
spinner: Optional[SpinnerInterface] = None,
log_failed_cmd: Optional[bool] = True,
stdout_only: Optional[bool] = False,
*,
command_desc: str,
) -> str:
"""
Args:
@@ -131,6 +166,9 @@ def call_subprocess(
# and we have a spinner.
use_spinner = not showing_subprocess and spinner is not None
if command_desc is None:
command_desc = format_command_args(cmd)
log_subprocess("Running command %s", command_desc)
env = os.environ.copy()
if extra_environ:
@@ -203,25 +241,17 @@ def call_subprocess(
spinner.finish("done")
if proc_had_error:
if on_returncode == "raise":
error = InstallationSubprocessError(
command_description=command_desc,
exit_code=proc.returncode,
output_lines=all_output if not showing_subprocess else None,
)
if log_failed_cmd:
subprocess_logger.error("[present-diagnostic] %s", error)
subprocess_logger.verbose(
"[bold magenta]full command[/]: [blue]%s[/]",
escape(format_command_args(cmd)),
extra={"markup": True},
if not showing_subprocess and log_failed_cmd:
# Then the subprocess streams haven't been logged to the
# console yet.
msg = make_subprocess_output_error(
cmd_args=cmd,
cwd=cwd,
lines=all_output,
exit_status=proc.returncode,
)
subprocess_logger.verbose(
"[bold magenta]cwd[/]: %s",
escape(cwd or "[inherit]"),
extra={"markup": True},
)
raise error
subprocess_logger.error(msg)
raise InstallationSubprocessError(proc.returncode, command_desc)
elif on_returncode == "warn":
subprocess_logger.warning(
'Command "%s" had error code %s in %s',
@@ -251,7 +281,6 @@ def runner_with_spinner_message(message: str) -> Callable[..., None]:
with open_spinner(message) as spinner:
call_subprocess(
cmd,
command_desc=message,
cwd=cwd,
extra_environ=extra_environ,
spinner=spinner,
@@ -4,12 +4,14 @@
import logging
from email.message import Message
from email.parser import Parser
from typing import Tuple
from typing import Dict, Tuple
from zipfile import BadZipFile, ZipFile
from pip._vendor.packaging.utils import canonicalize_name
from pip._vendor.pkg_resources import DistInfoDistribution, Distribution
from pip._internal.exceptions import UnsupportedWheel
from pip._internal.utils.pkg_resources import DictMetadata
VERSION_COMPATIBLE = (1, 0)
@@ -17,6 +19,50 @@ VERSION_COMPATIBLE = (1, 0)
logger = logging.getLogger(__name__)
class WheelMetadata(DictMetadata):
"""Metadata provider that maps metadata decoding exceptions to our
internal exception type.
"""
def __init__(self, metadata: Dict[str, bytes], wheel_name: str) -> None:
super().__init__(metadata)
self._wheel_name = wheel_name
def get_metadata(self, name: str) -> str:
try:
return super().get_metadata(name)
except UnicodeDecodeError as e:
# Augment the default error with the origin of the file.
raise UnsupportedWheel(
f"Error decoding metadata for {self._wheel_name}: {e}"
)
def pkg_resources_distribution_for_wheel(
wheel_zip: ZipFile, name: str, location: str
) -> Distribution:
"""Get a pkg_resources distribution given a wheel.
:raises UnsupportedWheel: on any errors
"""
info_dir, _ = parse_wheel(wheel_zip, name)
metadata_files = [p for p in wheel_zip.namelist() if p.startswith(f"{info_dir}/")]
metadata_text: Dict[str, bytes] = {}
for path in metadata_files:
_, metadata_name = path.split("/", 1)
try:
metadata_text[metadata_name] = read_wheel_metadata_file(wheel_zip, path)
except UnsupportedWheel as e:
raise UnsupportedWheel("{} has an invalid wheel, {}".format(name, str(e)))
metadata = WheelMetadata(metadata_text, location)
return DistInfoDistribution(location=location, metadata=metadata, project_name=name)
def parse_wheel(wheel_zip: ZipFile, name: str) -> Tuple[str, Message]:
"""Extract information from the provided wheel, ensuring it meets basic
standards.