spinners.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. import contextlib
  2. import itertools
  3. import logging
  4. import sys
  5. import time
  6. from typing import IO, Iterator
  7. from pip._vendor.progress import HIDE_CURSOR, SHOW_CURSOR
  8. from pip._internal.utils.compat import WINDOWS
  9. from pip._internal.utils.logging import get_indentation
  10. logger = logging.getLogger(__name__)
  11. class SpinnerInterface:
  12. def spin(self) -> None:
  13. raise NotImplementedError()
  14. def finish(self, final_status: str) -> None:
  15. raise NotImplementedError()
  16. class InteractiveSpinner(SpinnerInterface):
  17. def __init__(
  18. self,
  19. message: str,
  20. file: IO[str] = None,
  21. spin_chars: str = "-\\|/",
  22. # Empirically, 8 updates/second looks nice
  23. min_update_interval_seconds: float = 0.125,
  24. ):
  25. self._message = message
  26. if file is None:
  27. file = sys.stdout
  28. self._file = file
  29. self._rate_limiter = RateLimiter(min_update_interval_seconds)
  30. self._finished = False
  31. self._spin_cycle = itertools.cycle(spin_chars)
  32. self._file.write(" " * get_indentation() + self._message + " ... ")
  33. self._width = 0
  34. def _write(self, status: str) -> None:
  35. assert not self._finished
  36. # Erase what we wrote before by backspacing to the beginning, writing
  37. # spaces to overwrite the old text, and then backspacing again
  38. backup = "\b" * self._width
  39. self._file.write(backup + " " * self._width + backup)
  40. # Now we have a blank slate to add our status
  41. self._file.write(status)
  42. self._width = len(status)
  43. self._file.flush()
  44. self._rate_limiter.reset()
  45. def spin(self) -> None:
  46. if self._finished:
  47. return
  48. if not self._rate_limiter.ready():
  49. return
  50. self._write(next(self._spin_cycle))
  51. def finish(self, final_status: str) -> None:
  52. if self._finished:
  53. return
  54. self._write(final_status)
  55. self._file.write("\n")
  56. self._file.flush()
  57. self._finished = True
  58. # Used for dumb terminals, non-interactive installs (no tty), etc.
  59. # We still print updates occasionally (once every 60 seconds by default) to
  60. # act as a keep-alive for systems like Travis-CI that take lack-of-output as
  61. # an indication that a task has frozen.
  62. class NonInteractiveSpinner(SpinnerInterface):
  63. def __init__(self, message: str, min_update_interval_seconds: float = 60.0) -> None:
  64. self._message = message
  65. self._finished = False
  66. self._rate_limiter = RateLimiter(min_update_interval_seconds)
  67. self._update("started")
  68. def _update(self, status: str) -> None:
  69. assert not self._finished
  70. self._rate_limiter.reset()
  71. logger.info("%s: %s", self._message, status)
  72. def spin(self) -> None:
  73. if self._finished:
  74. return
  75. if not self._rate_limiter.ready():
  76. return
  77. self._update("still running...")
  78. def finish(self, final_status: str) -> None:
  79. if self._finished:
  80. return
  81. self._update(f"finished with status '{final_status}'")
  82. self._finished = True
  83. class RateLimiter:
  84. def __init__(self, min_update_interval_seconds: float) -> None:
  85. self._min_update_interval_seconds = min_update_interval_seconds
  86. self._last_update: float = 0
  87. def ready(self) -> bool:
  88. now = time.time()
  89. delta = now - self._last_update
  90. return delta >= self._min_update_interval_seconds
  91. def reset(self) -> None:
  92. self._last_update = time.time()
  93. @contextlib.contextmanager
  94. def open_spinner(message: str) -> Iterator[SpinnerInterface]:
  95. # Interactive spinner goes directly to sys.stdout rather than being routed
  96. # through the logging system, but it acts like it has level INFO,
  97. # i.e. it's only displayed if we're at level INFO or better.
  98. # Non-interactive spinner goes through the logging system, so it is always
  99. # in sync with logging configuration.
  100. if sys.stdout.isatty() and logger.getEffectiveLevel() <= logging.INFO:
  101. spinner: SpinnerInterface = InteractiveSpinner(message)
  102. else:
  103. spinner = NonInteractiveSpinner(message)
  104. try:
  105. with hidden_cursor(sys.stdout):
  106. yield spinner
  107. except KeyboardInterrupt:
  108. spinner.finish("canceled")
  109. raise
  110. except Exception:
  111. spinner.finish("error")
  112. raise
  113. else:
  114. spinner.finish("done")
  115. @contextlib.contextmanager
  116. def hidden_cursor(file: IO[str]) -> Iterator[None]:
  117. # The Windows terminal does not support the hide/show cursor ANSI codes,
  118. # even via colorama. So don't even try.
  119. if WINDOWS:
  120. yield
  121. # We don't want to clutter the output with control characters if we're
  122. # writing to a file, or if the user is running with --quiet.
  123. # See https://github.com/pypa/pip/issues/3418
  124. elif not file.isatty() or logger.getEffectiveLevel() > logging.INFO:
  125. yield
  126. else:
  127. file.write(HIDE_CURSOR)
  128. try:
  129. yield
  130. finally:
  131. file.write(SHOW_CURSOR)