123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580 |
- import os
- import re
- import sys
- import typing as t
- from functools import update_wrapper
- from types import ModuleType
- from ._compat import _default_text_stderr
- from ._compat import _default_text_stdout
- from ._compat import _find_binary_writer
- from ._compat import auto_wrap_for_ansi
- from ._compat import binary_streams
- from ._compat import get_filesystem_encoding
- from ._compat import open_stream
- from ._compat import should_strip_ansi
- from ._compat import strip_ansi
- from ._compat import text_streams
- from ._compat import WIN
- from .globals import resolve_color_default
- if t.TYPE_CHECKING:
- import typing_extensions as te
- F = t.TypeVar("F", bound=t.Callable[..., t.Any])
- def _posixify(name: str) -> str:
- return "-".join(name.split()).lower()
- def safecall(func: F) -> F:
- """Wraps a function so that it swallows exceptions."""
- def wrapper(*args, **kwargs): # type: ignore
- try:
- return func(*args, **kwargs)
- except Exception:
- pass
- return update_wrapper(t.cast(F, wrapper), func)
- def make_str(value: t.Any) -> str:
- """Converts a value into a valid string."""
- if isinstance(value, bytes):
- try:
- return value.decode(get_filesystem_encoding())
- except UnicodeError:
- return value.decode("utf-8", "replace")
- return str(value)
- def make_default_short_help(help: str, max_length: int = 45) -> str:
- """Returns a condensed version of help string."""
- # Consider only the first paragraph.
- paragraph_end = help.find("\n\n")
- if paragraph_end != -1:
- help = help[:paragraph_end]
- # Collapse newlines, tabs, and spaces.
- words = help.split()
- if not words:
- return ""
- # The first paragraph started with a "no rewrap" marker, ignore it.
- if words[0] == "\b":
- words = words[1:]
- total_length = 0
- last_index = len(words) - 1
- for i, word in enumerate(words):
- total_length += len(word) + (i > 0)
- if total_length > max_length: # too long, truncate
- break
- if word[-1] == ".": # sentence end, truncate without "..."
- return " ".join(words[: i + 1])
- if total_length == max_length and i != last_index:
- break # not at sentence end, truncate with "..."
- else:
- return " ".join(words) # no truncation needed
- # Account for the length of the suffix.
- total_length += len("...")
- # remove words until the length is short enough
- while i > 0:
- total_length -= len(words[i]) + (i > 0)
- if total_length <= max_length:
- break
- i -= 1
- return " ".join(words[:i]) + "..."
- class LazyFile:
- """A lazy file works like a regular file but it does not fully open
- the file but it does perform some basic checks early to see if the
- filename parameter does make sense. This is useful for safely opening
- files for writing.
- """
- def __init__(
- self,
- filename: str,
- mode: str = "r",
- encoding: t.Optional[str] = None,
- errors: t.Optional[str] = "strict",
- atomic: bool = False,
- ):
- self.name = filename
- self.mode = mode
- self.encoding = encoding
- self.errors = errors
- self.atomic = atomic
- self._f: t.Optional[t.IO]
- if filename == "-":
- self._f, self.should_close = open_stream(filename, mode, encoding, errors)
- else:
- if "r" in mode:
- # Open and close the file in case we're opening it for
- # reading so that we can catch at least some errors in
- # some cases early.
- open(filename, mode).close()
- self._f = None
- self.should_close = True
- def __getattr__(self, name: str) -> t.Any:
- return getattr(self.open(), name)
- def __repr__(self) -> str:
- if self._f is not None:
- return repr(self._f)
- return f"<unopened file '{self.name}' {self.mode}>"
- def open(self) -> t.IO:
- """Opens the file if it's not yet open. This call might fail with
- a :exc:`FileError`. Not handling this error will produce an error
- that Click shows.
- """
- if self._f is not None:
- return self._f
- try:
- rv, self.should_close = open_stream(
- self.name, self.mode, self.encoding, self.errors, atomic=self.atomic
- )
- except OSError as e: # noqa: E402
- from .exceptions import FileError
- raise FileError(self.name, hint=e.strerror) from e
- self._f = rv
- return rv
- def close(self) -> None:
- """Closes the underlying file, no matter what."""
- if self._f is not None:
- self._f.close()
- def close_intelligently(self) -> None:
- """This function only closes the file if it was opened by the lazy
- file wrapper. For instance this will never close stdin.
- """
- if self.should_close:
- self.close()
- def __enter__(self) -> "LazyFile":
- return self
- def __exit__(self, exc_type, exc_value, tb): # type: ignore
- self.close_intelligently()
- def __iter__(self) -> t.Iterator[t.AnyStr]:
- self.open()
- return iter(self._f) # type: ignore
- class KeepOpenFile:
- def __init__(self, file: t.IO) -> None:
- self._file = file
- def __getattr__(self, name: str) -> t.Any:
- return getattr(self._file, name)
- def __enter__(self) -> "KeepOpenFile":
- return self
- def __exit__(self, exc_type, exc_value, tb): # type: ignore
- pass
- def __repr__(self) -> str:
- return repr(self._file)
- def __iter__(self) -> t.Iterator[t.AnyStr]:
- return iter(self._file)
- def echo(
- message: t.Optional[t.Any] = None,
- file: t.Optional[t.IO[t.Any]] = None,
- nl: bool = True,
- err: bool = False,
- color: t.Optional[bool] = None,
- ) -> None:
- """Print a message and newline to stdout or a file. This should be
- used instead of :func:`print` because it provides better support
- for different data, files, and environments.
- Compared to :func:`print`, this does the following:
- - Ensures that the output encoding is not misconfigured on Linux.
- - Supports Unicode in the Windows console.
- - Supports writing to binary outputs, and supports writing bytes
- to text outputs.
- - Supports colors and styles on Windows.
- - Removes ANSI color and style codes if the output does not look
- like an interactive terminal.
- - Always flushes the output.
- :param message: The string or bytes to output. Other objects are
- converted to strings.
- :param file: The file to write to. Defaults to ``stdout``.
- :param err: Write to ``stderr`` instead of ``stdout``.
- :param nl: Print a newline after the message. Enabled by default.
- :param color: Force showing or hiding colors and other styles. By
- default Click will remove color if the output does not look like
- an interactive terminal.
- .. versionchanged:: 6.0
- Support Unicode output on the Windows console. Click does not
- modify ``sys.stdout``, so ``sys.stdout.write()`` and ``print()``
- will still not support Unicode.
- .. versionchanged:: 4.0
- Added the ``color`` parameter.
- .. versionadded:: 3.0
- Added the ``err`` parameter.
- .. versionchanged:: 2.0
- Support colors on Windows if colorama is installed.
- """
- if file is None:
- if err:
- file = _default_text_stderr()
- else:
- file = _default_text_stdout()
- # Convert non bytes/text into the native string type.
- if message is not None and not isinstance(message, (str, bytes, bytearray)):
- out: t.Optional[t.Union[str, bytes]] = str(message)
- else:
- out = message
- if nl:
- out = out or ""
- if isinstance(out, str):
- out += "\n"
- else:
- out += b"\n"
- if not out:
- file.flush()
- return
- # If there is a message and the value looks like bytes, we manually
- # need to find the binary stream and write the message in there.
- # This is done separately so that most stream types will work as you
- # would expect. Eg: you can write to StringIO for other cases.
- if isinstance(out, (bytes, bytearray)):
- binary_file = _find_binary_writer(file)
- if binary_file is not None:
- file.flush()
- binary_file.write(out)
- binary_file.flush()
- return
- # ANSI style code support. For no message or bytes, nothing happens.
- # When outputting to a file instead of a terminal, strip codes.
- else:
- color = resolve_color_default(color)
- if should_strip_ansi(file, color):
- out = strip_ansi(out)
- elif WIN:
- if auto_wrap_for_ansi is not None:
- file = auto_wrap_for_ansi(file) # type: ignore
- elif not color:
- out = strip_ansi(out)
- file.write(out) # type: ignore
- file.flush()
- def get_binary_stream(name: "te.Literal['stdin', 'stdout', 'stderr']") -> t.BinaryIO:
- """Returns a system stream for byte processing.
- :param name: the name of the stream to open. Valid names are ``'stdin'``,
- ``'stdout'`` and ``'stderr'``
- """
- opener = binary_streams.get(name)
- if opener is None:
- raise TypeError(f"Unknown standard stream '{name}'")
- return opener()
- def get_text_stream(
- name: "te.Literal['stdin', 'stdout', 'stderr']",
- encoding: t.Optional[str] = None,
- errors: t.Optional[str] = "strict",
- ) -> t.TextIO:
- """Returns a system stream for text processing. This usually returns
- a wrapped stream around a binary stream returned from
- :func:`get_binary_stream` but it also can take shortcuts for already
- correctly configured streams.
- :param name: the name of the stream to open. Valid names are ``'stdin'``,
- ``'stdout'`` and ``'stderr'``
- :param encoding: overrides the detected default encoding.
- :param errors: overrides the default error mode.
- """
- opener = text_streams.get(name)
- if opener is None:
- raise TypeError(f"Unknown standard stream '{name}'")
- return opener(encoding, errors)
- def open_file(
- filename: str,
- mode: str = "r",
- encoding: t.Optional[str] = None,
- errors: t.Optional[str] = "strict",
- lazy: bool = False,
- atomic: bool = False,
- ) -> t.IO:
- """Open a file, with extra behavior to handle ``'-'`` to indicate
- a standard stream, lazy open on write, and atomic write. Similar to
- the behavior of the :class:`~click.File` param type.
- If ``'-'`` is given to open ``stdout`` or ``stdin``, the stream is
- wrapped so that using it in a context manager will not close it.
- This makes it possible to use the function without accidentally
- closing a standard stream:
- .. code-block:: python
- with open_file(filename) as f:
- ...
- :param filename: The name of the file to open, or ``'-'`` for
- ``stdin``/``stdout``.
- :param mode: The mode in which to open the file.
- :param encoding: The encoding to decode or encode a file opened in
- text mode.
- :param errors: The error handling mode.
- :param lazy: Wait to open the file until it is accessed. For read
- mode, the file is temporarily opened to raise access errors
- early, then closed until it is read again.
- :param atomic: Write to a temporary file and replace the given file
- on close.
- .. versionadded:: 3.0
- """
- if lazy:
- return t.cast(t.IO, LazyFile(filename, mode, encoding, errors, atomic=atomic))
- f, should_close = open_stream(filename, mode, encoding, errors, atomic=atomic)
- if not should_close:
- f = t.cast(t.IO, KeepOpenFile(f))
- return f
- def format_filename(
- filename: t.Union[str, bytes, os.PathLike], shorten: bool = False
- ) -> str:
- """Formats a filename for user display. The main purpose of this
- function is to ensure that the filename can be displayed at all. This
- will decode the filename to unicode if necessary in a way that it will
- not fail. Optionally, it can shorten the filename to not include the
- full path to the filename.
- :param filename: formats a filename for UI display. This will also convert
- the filename into unicode without failing.
- :param shorten: this optionally shortens the filename to strip of the
- path that leads up to it.
- """
- if shorten:
- filename = os.path.basename(filename)
- return os.fsdecode(filename)
- def get_app_dir(app_name: str, roaming: bool = True, force_posix: bool = False) -> str:
- r"""Returns the config folder for the application. The default behavior
- is to return whatever is most appropriate for the operating system.
- To give you an idea, for an app called ``"Foo Bar"``, something like
- the following folders could be returned:
- Mac OS X:
- ``~/Library/Application Support/Foo Bar``
- Mac OS X (POSIX):
- ``~/.foo-bar``
- Unix:
- ``~/.config/foo-bar``
- Unix (POSIX):
- ``~/.foo-bar``
- Windows (roaming):
- ``C:\Users\<user>\AppData\Roaming\Foo Bar``
- Windows (not roaming):
- ``C:\Users\<user>\AppData\Local\Foo Bar``
- .. versionadded:: 2.0
- :param app_name: the application name. This should be properly capitalized
- and can contain whitespace.
- :param roaming: controls if the folder should be roaming or not on Windows.
- Has no affect otherwise.
- :param force_posix: if this is set to `True` then on any POSIX system the
- folder will be stored in the home folder with a leading
- dot instead of the XDG config home or darwin's
- application support folder.
- """
- if WIN:
- key = "APPDATA" if roaming else "LOCALAPPDATA"
- folder = os.environ.get(key)
- if folder is None:
- folder = os.path.expanduser("~")
- return os.path.join(folder, app_name)
- if force_posix:
- return os.path.join(os.path.expanduser(f"~/.{_posixify(app_name)}"))
- if sys.platform == "darwin":
- return os.path.join(
- os.path.expanduser("~/Library/Application Support"), app_name
- )
- return os.path.join(
- os.environ.get("XDG_CONFIG_HOME", os.path.expanduser("~/.config")),
- _posixify(app_name),
- )
- class PacifyFlushWrapper:
- """This wrapper is used to catch and suppress BrokenPipeErrors resulting
- from ``.flush()`` being called on broken pipe during the shutdown/final-GC
- of the Python interpreter. Notably ``.flush()`` is always called on
- ``sys.stdout`` and ``sys.stderr``. So as to have minimal impact on any
- other cleanup code, and the case where the underlying file is not a broken
- pipe, all calls and attributes are proxied.
- """
- def __init__(self, wrapped: t.IO) -> None:
- self.wrapped = wrapped
- def flush(self) -> None:
- try:
- self.wrapped.flush()
- except OSError as e:
- import errno
- if e.errno != errno.EPIPE:
- raise
- def __getattr__(self, attr: str) -> t.Any:
- return getattr(self.wrapped, attr)
- def _detect_program_name(
- path: t.Optional[str] = None, _main: t.Optional[ModuleType] = None
- ) -> str:
- """Determine the command used to run the program, for use in help
- text. If a file or entry point was executed, the file name is
- returned. If ``python -m`` was used to execute a module or package,
- ``python -m name`` is returned.
- This doesn't try to be too precise, the goal is to give a concise
- name for help text. Files are only shown as their name without the
- path. ``python`` is only shown for modules, and the full path to
- ``sys.executable`` is not shown.
- :param path: The Python file being executed. Python puts this in
- ``sys.argv[0]``, which is used by default.
- :param _main: The ``__main__`` module. This should only be passed
- during internal testing.
- .. versionadded:: 8.0
- Based on command args detection in the Werkzeug reloader.
- :meta private:
- """
- if _main is None:
- _main = sys.modules["__main__"]
- if not path:
- path = sys.argv[0]
- # The value of __package__ indicates how Python was called. It may
- # not exist if a setuptools script is installed as an egg. It may be
- # set incorrectly for entry points created with pip on Windows.
- if getattr(_main, "__package__", None) is None or (
- os.name == "nt"
- and _main.__package__ == ""
- and not os.path.exists(path)
- and os.path.exists(f"{path}.exe")
- ):
- # Executed a file, like "python app.py".
- return os.path.basename(path)
- # Executed a module, like "python -m example".
- # Rewritten by Python from "-m script" to "/path/to/script.py".
- # Need to look at main module to determine how it was executed.
- py_module = t.cast(str, _main.__package__)
- name = os.path.splitext(os.path.basename(path))[0]
- # A submodule like "example.cli".
- if name != "__main__":
- py_module = f"{py_module}.{name}"
- return f"python -m {py_module.lstrip('.')}"
- def _expand_args(
- args: t.Iterable[str],
- *,
- user: bool = True,
- env: bool = True,
- glob_recursive: bool = True,
- ) -> t.List[str]:
- """Simulate Unix shell expansion with Python functions.
- See :func:`glob.glob`, :func:`os.path.expanduser`, and
- :func:`os.path.expandvars`.
- This is intended for use on Windows, where the shell does not do any
- expansion. It may not exactly match what a Unix shell would do.
- :param args: List of command line arguments to expand.
- :param user: Expand user home directory.
- :param env: Expand environment variables.
- :param glob_recursive: ``**`` matches directories recursively.
- .. versionchanged:: 8.1
- Invalid glob patterns are treated as empty expansions rather
- than raising an error.
- .. versionadded:: 8.0
- :meta private:
- """
- from glob import glob
- out = []
- for arg in args:
- if user:
- arg = os.path.expanduser(arg)
- if env:
- arg = os.path.expandvars(arg)
- try:
- matches = glob(arg, recursive=glob_recursive)
- except re.error:
- matches = []
- if not matches:
- out.append(arg)
- else:
- out.extend(matches)
- return out
|