sources.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284
  1. import logging
  2. import mimetypes
  3. import os
  4. from collections import defaultdict
  5. from typing import Callable, Dict, Iterable, List, Optional, Tuple
  6. from pip._vendor.packaging.utils import (
  7. InvalidSdistFilename,
  8. InvalidWheelFilename,
  9. canonicalize_name,
  10. parse_sdist_filename,
  11. parse_wheel_filename,
  12. )
  13. from pip._internal.models.candidate import InstallationCandidate
  14. from pip._internal.models.link import Link
  15. from pip._internal.utils.urls import path_to_url, url_to_path
  16. from pip._internal.vcs import is_url
  17. logger = logging.getLogger(__name__)
  18. FoundCandidates = Iterable[InstallationCandidate]
  19. FoundLinks = Iterable[Link]
  20. CandidatesFromPage = Callable[[Link], Iterable[InstallationCandidate]]
  21. PageValidator = Callable[[Link], bool]
  22. class LinkSource:
  23. @property
  24. def link(self) -> Optional[Link]:
  25. """Returns the underlying link, if there's one."""
  26. raise NotImplementedError()
  27. def page_candidates(self) -> FoundCandidates:
  28. """Candidates found by parsing an archive listing HTML file."""
  29. raise NotImplementedError()
  30. def file_links(self) -> FoundLinks:
  31. """Links found by specifying archives directly."""
  32. raise NotImplementedError()
  33. def _is_html_file(file_url: str) -> bool:
  34. return mimetypes.guess_type(file_url, strict=False)[0] == "text/html"
  35. class _FlatDirectoryToUrls:
  36. """Scans directory and caches results"""
  37. def __init__(self, path: str) -> None:
  38. self._path = path
  39. self._page_candidates: List[str] = []
  40. self._project_name_to_urls: Dict[str, List[str]] = defaultdict(list)
  41. self._scanned_directory = False
  42. def _scan_directory(self) -> None:
  43. """Scans directory once and populates both page_candidates
  44. and project_name_to_urls at the same time
  45. """
  46. for entry in os.scandir(self._path):
  47. url = path_to_url(entry.path)
  48. if _is_html_file(url):
  49. self._page_candidates.append(url)
  50. continue
  51. # File must have a valid wheel or sdist name,
  52. # otherwise not worth considering as a package
  53. try:
  54. project_filename = parse_wheel_filename(entry.name)[0]
  55. except InvalidWheelFilename:
  56. try:
  57. project_filename = parse_sdist_filename(entry.name)[0]
  58. except InvalidSdistFilename:
  59. continue
  60. self._project_name_to_urls[project_filename].append(url)
  61. self._scanned_directory = True
  62. @property
  63. def page_candidates(self) -> List[str]:
  64. if not self._scanned_directory:
  65. self._scan_directory()
  66. return self._page_candidates
  67. @property
  68. def project_name_to_urls(self) -> Dict[str, List[str]]:
  69. if not self._scanned_directory:
  70. self._scan_directory()
  71. return self._project_name_to_urls
  72. class _FlatDirectorySource(LinkSource):
  73. """Link source specified by ``--find-links=<path-to-dir>``.
  74. This looks the content of the directory, and returns:
  75. * ``page_candidates``: Links listed on each HTML file in the directory.
  76. * ``file_candidates``: Archives in the directory.
  77. """
  78. _paths_to_urls: Dict[str, _FlatDirectoryToUrls] = {}
  79. def __init__(
  80. self,
  81. candidates_from_page: CandidatesFromPage,
  82. path: str,
  83. project_name: str,
  84. ) -> None:
  85. self._candidates_from_page = candidates_from_page
  86. self._project_name = canonicalize_name(project_name)
  87. # Get existing instance of _FlatDirectoryToUrls if it exists
  88. if path in self._paths_to_urls:
  89. self._path_to_urls = self._paths_to_urls[path]
  90. else:
  91. self._path_to_urls = _FlatDirectoryToUrls(path=path)
  92. self._paths_to_urls[path] = self._path_to_urls
  93. @property
  94. def link(self) -> Optional[Link]:
  95. return None
  96. def page_candidates(self) -> FoundCandidates:
  97. for url in self._path_to_urls.page_candidates:
  98. yield from self._candidates_from_page(Link(url))
  99. def file_links(self) -> FoundLinks:
  100. for url in self._path_to_urls.project_name_to_urls[self._project_name]:
  101. yield Link(url)
  102. class _LocalFileSource(LinkSource):
  103. """``--find-links=<path-or-url>`` or ``--[extra-]index-url=<path-or-url>``.
  104. If a URL is supplied, it must be a ``file:`` URL. If a path is supplied to
  105. the option, it is converted to a URL first. This returns:
  106. * ``page_candidates``: Links listed on an HTML file.
  107. * ``file_candidates``: The non-HTML file.
  108. """
  109. def __init__(
  110. self,
  111. candidates_from_page: CandidatesFromPage,
  112. link: Link,
  113. ) -> None:
  114. self._candidates_from_page = candidates_from_page
  115. self._link = link
  116. @property
  117. def link(self) -> Optional[Link]:
  118. return self._link
  119. def page_candidates(self) -> FoundCandidates:
  120. if not _is_html_file(self._link.url):
  121. return
  122. yield from self._candidates_from_page(self._link)
  123. def file_links(self) -> FoundLinks:
  124. if _is_html_file(self._link.url):
  125. return
  126. yield self._link
  127. class _RemoteFileSource(LinkSource):
  128. """``--find-links=<url>`` or ``--[extra-]index-url=<url>``.
  129. This returns:
  130. * ``page_candidates``: Links listed on an HTML file.
  131. * ``file_candidates``: The non-HTML file.
  132. """
  133. def __init__(
  134. self,
  135. candidates_from_page: CandidatesFromPage,
  136. page_validator: PageValidator,
  137. link: Link,
  138. ) -> None:
  139. self._candidates_from_page = candidates_from_page
  140. self._page_validator = page_validator
  141. self._link = link
  142. @property
  143. def link(self) -> Optional[Link]:
  144. return self._link
  145. def page_candidates(self) -> FoundCandidates:
  146. if not self._page_validator(self._link):
  147. return
  148. yield from self._candidates_from_page(self._link)
  149. def file_links(self) -> FoundLinks:
  150. yield self._link
  151. class _IndexDirectorySource(LinkSource):
  152. """``--[extra-]index-url=<path-to-directory>``.
  153. This is treated like a remote URL; ``candidates_from_page`` contains logic
  154. for this by appending ``index.html`` to the link.
  155. """
  156. def __init__(
  157. self,
  158. candidates_from_page: CandidatesFromPage,
  159. link: Link,
  160. ) -> None:
  161. self._candidates_from_page = candidates_from_page
  162. self._link = link
  163. @property
  164. def link(self) -> Optional[Link]:
  165. return self._link
  166. def page_candidates(self) -> FoundCandidates:
  167. yield from self._candidates_from_page(self._link)
  168. def file_links(self) -> FoundLinks:
  169. return ()
  170. def build_source(
  171. location: str,
  172. *,
  173. candidates_from_page: CandidatesFromPage,
  174. page_validator: PageValidator,
  175. expand_dir: bool,
  176. cache_link_parsing: bool,
  177. project_name: str,
  178. ) -> Tuple[Optional[str], Optional[LinkSource]]:
  179. path: Optional[str] = None
  180. url: Optional[str] = None
  181. if os.path.exists(location): # Is a local path.
  182. url = path_to_url(location)
  183. path = location
  184. elif location.startswith("file:"): # A file: URL.
  185. url = location
  186. path = url_to_path(location)
  187. elif is_url(location):
  188. url = location
  189. if url is None:
  190. msg = (
  191. "Location '%s' is ignored: "
  192. "it is either a non-existing path or lacks a specific scheme."
  193. )
  194. logger.warning(msg, location)
  195. return (None, None)
  196. if path is None:
  197. source: LinkSource = _RemoteFileSource(
  198. candidates_from_page=candidates_from_page,
  199. page_validator=page_validator,
  200. link=Link(url, cache_link_parsing=cache_link_parsing),
  201. )
  202. return (url, source)
  203. if os.path.isdir(path):
  204. if expand_dir:
  205. source = _FlatDirectorySource(
  206. candidates_from_page=candidates_from_page,
  207. path=path,
  208. project_name=project_name,
  209. )
  210. else:
  211. source = _IndexDirectorySource(
  212. candidates_from_page=candidates_from_page,
  213. link=Link(url, cache_link_parsing=cache_link_parsing),
  214. )
  215. return (url, source)
  216. elif os.path.isfile(path):
  217. source = _LocalFileSource(
  218. candidates_from_page=candidates_from_page,
  219. link=Link(url, cache_link_parsing=cache_link_parsing),
  220. )
  221. return (url, source)
  222. logger.warning(
  223. "Location '%s' is ignored: it is neither a file nor a directory.",
  224. location,
  225. )
  226. return (url, None)