Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
N
news
Project
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Sartika Aritonang
news
Commits
2c3b194b
Commit
2c3b194b
authored
May 29, 2020
by
Sartika Aritonang
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Upload New File
parent
9ee50ca8
Show whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
661 additions
and
0 deletions
+661
-0
collector.py
stbi/Lib/site-packages/pip/_internal/index/collector.py
+661
-0
No files found.
stbi/Lib/site-packages/pip/_internal/index/collector.py
0 → 100644
View file @
2c3b194b
"""
The main purpose of this module is to expose LinkCollector.collect_links().
"""
import
cgi
import
functools
import
itertools
import
logging
import
mimetypes
import
os
import
re
from
collections
import
OrderedDict
from
pip._vendor
import
html5lib
,
requests
from
pip._vendor.distlib.compat
import
unescape
from
pip._vendor.requests.exceptions
import
HTTPError
,
RetryError
,
SSLError
from
pip._vendor.six.moves.urllib
import
parse
as
urllib_parse
from
pip._vendor.six.moves.urllib
import
request
as
urllib_request
from
pip._internal.models.link
import
Link
from
pip._internal.utils.filetypes
import
ARCHIVE_EXTENSIONS
from
pip._internal.utils.misc
import
pairwise
,
redact_auth_from_url
from
pip._internal.utils.typing
import
MYPY_CHECK_RUNNING
from
pip._internal.utils.urls
import
path_to_url
,
url_to_path
from
pip._internal.vcs
import
is_url
,
vcs
if
MYPY_CHECK_RUNNING
:
from
typing
import
(
Callable
,
Iterable
,
List
,
MutableMapping
,
Optional
,
Protocol
,
Sequence
,
Tuple
,
TypeVar
,
Union
,
)
import
xml.etree.ElementTree
from
pip._vendor.requests
import
Response
from
pip._internal.models.search_scope
import
SearchScope
from
pip._internal.network.session
import
PipSession
HTMLElement
=
xml
.
etree
.
ElementTree
.
Element
ResponseHeaders
=
MutableMapping
[
str
,
str
]
# Used in the @lru_cache polyfill.
F
=
TypeVar
(
'F'
)
class
LruCache
(
Protocol
):
def
__call__
(
self
,
maxsize
=
None
):
# type: (Optional[int]) -> Callable[[F], F]
raise
NotImplementedError
logger
=
logging
.
getLogger
(
__name__
)
# Fallback to noop_lru_cache in Python 2
# TODO: this can be removed when python 2 support is dropped!
def
noop_lru_cache
(
maxsize
=
None
):
# type: (Optional[int]) -> Callable[[F], F]
def
_wrapper
(
f
):
# type: (F) -> F
return
f
return
_wrapper
_lru_cache
=
getattr
(
functools
,
"lru_cache"
,
noop_lru_cache
)
# type: LruCache
def
_match_vcs_scheme
(
url
):
# type: (str) -> Optional[str]
"""Look for VCS schemes in the URL.
Returns the matched VCS scheme, or None if there's no match.
"""
for
scheme
in
vcs
.
schemes
:
if
url
.
lower
()
.
startswith
(
scheme
)
and
url
[
len
(
scheme
)]
in
'+:'
:
return
scheme
return
None
def
_is_url_like_archive
(
url
):
# type: (str) -> bool
"""Return whether the URL looks like an archive.
"""
filename
=
Link
(
url
)
.
filename
for
bad_ext
in
ARCHIVE_EXTENSIONS
:
if
filename
.
endswith
(
bad_ext
):
return
True
return
False
class
_NotHTML
(
Exception
):
def
__init__
(
self
,
content_type
,
request_desc
):
# type: (str, str) -> None
super
(
_NotHTML
,
self
)
.
__init__
(
content_type
,
request_desc
)
self
.
content_type
=
content_type
self
.
request_desc
=
request_desc
def
_ensure_html_header
(
response
):
# type: (Response) -> None
"""Check the Content-Type header to ensure the response contains HTML.
Raises `_NotHTML` if the content type is not text/html.
"""
content_type
=
response
.
headers
.
get
(
"Content-Type"
,
""
)
if
not
content_type
.
lower
()
.
startswith
(
"text/html"
):
raise
_NotHTML
(
content_type
,
response
.
request
.
method
)
class
_NotHTTP
(
Exception
):
pass
def
_ensure_html_response
(
url
,
session
):
# type: (str, PipSession) -> None
"""Send a HEAD request to the URL, and ensure the response contains HTML.
Raises `_NotHTTP` if the URL is not available for a HEAD request, or
`_NotHTML` if the content type is not text/html.
"""
scheme
,
netloc
,
path
,
query
,
fragment
=
urllib_parse
.
urlsplit
(
url
)
if
scheme
not
in
{
'http'
,
'https'
}:
raise
_NotHTTP
()
resp
=
session
.
head
(
url
,
allow_redirects
=
True
)
resp
.
raise_for_status
()
_ensure_html_header
(
resp
)
def
_get_html_response
(
url
,
session
):
# type: (str, PipSession) -> Response
"""Access an HTML page with GET, and return the response.
This consists of three parts:
1. If the URL looks suspiciously like an archive, send a HEAD first to
check the Content-Type is HTML, to avoid downloading a large file.
Raise `_NotHTTP` if the content type cannot be determined, or
`_NotHTML` if it is not HTML.
2. Actually perform the request. Raise HTTP exceptions on network failures.
3. Check the Content-Type header to make sure we got HTML, and raise
`_NotHTML` otherwise.
"""
if
_is_url_like_archive
(
url
):
_ensure_html_response
(
url
,
session
=
session
)
logger
.
debug
(
'Getting page
%
s'
,
redact_auth_from_url
(
url
))
resp
=
session
.
get
(
url
,
headers
=
{
"Accept"
:
"text/html"
,
# We don't want to blindly returned cached data for
# /simple/, because authors generally expecting that
# twine upload && pip install will function, but if
# they've done a pip install in the last ~10 minutes
# it won't. Thus by setting this to zero we will not
# blindly use any cached data, however the benefit of
# using max-age=0 instead of no-cache, is that we will
# still support conditional requests, so we will still
# minimize traffic sent in cases where the page hasn't
# changed at all, we will just always incur the round
# trip for the conditional GET now instead of only
# once per 10 minutes.
# For more information, please see pypa/pip#5670.
"Cache-Control"
:
"max-age=0"
,
},
)
resp
.
raise_for_status
()
# The check for archives above only works if the url ends with
# something that looks like an archive. However that is not a
# requirement of an url. Unless we issue a HEAD request on every
# url we cannot know ahead of time for sure if something is HTML
# or not. However we can check after we've downloaded it.
_ensure_html_header
(
resp
)
return
resp
def
_get_encoding_from_headers
(
headers
):
# type: (ResponseHeaders) -> Optional[str]
"""Determine if we have any encoding information in our headers.
"""
if
headers
and
"Content-Type"
in
headers
:
content_type
,
params
=
cgi
.
parse_header
(
headers
[
"Content-Type"
])
if
"charset"
in
params
:
return
params
[
'charset'
]
return
None
def
_determine_base_url
(
document
,
page_url
):
# type: (HTMLElement, str) -> str
"""Determine the HTML document's base URL.
This looks for a ``<base>`` tag in the HTML document. If present, its href
attribute denotes the base URL of anchor tags in the document. If there is
no such tag (or if it does not have a valid href attribute), the HTML
file's URL is used as the base URL.
:param document: An HTML document representation. The current
implementation expects the result of ``html5lib.parse()``.
:param page_url: The URL of the HTML document.
"""
for
base
in
document
.
findall
(
".//base"
):
href
=
base
.
get
(
"href"
)
if
href
is
not
None
:
return
href
return
page_url
def
_clean_url_path_part
(
part
):
# type: (str) -> str
"""
Clean a "part" of a URL path (i.e. after splitting on "@" characters).
"""
# We unquote prior to quoting to make sure nothing is double quoted.
return
urllib_parse
.
quote
(
urllib_parse
.
unquote
(
part
))
def
_clean_file_url_path
(
part
):
# type: (str) -> str
"""
Clean the first part of a URL path that corresponds to a local
filesystem path (i.e. the first part after splitting on "@" characters).
"""
# We unquote prior to quoting to make sure nothing is double quoted.
# Also, on Windows the path part might contain a drive letter which
# should not be quoted. On Linux where drive letters do not
# exist, the colon should be quoted. We rely on urllib.request
# to do the right thing here.
return
urllib_request
.
pathname2url
(
urllib_request
.
url2pathname
(
part
))
# percent-encoded: /
_reserved_chars_re
=
re
.
compile
(
'(@|
%2
F)'
,
re
.
IGNORECASE
)
def
_clean_url_path
(
path
,
is_local_path
):
# type: (str, bool) -> str
"""
Clean the path portion of a URL.
"""
if
is_local_path
:
clean_func
=
_clean_file_url_path
else
:
clean_func
=
_clean_url_path_part
# Split on the reserved characters prior to cleaning so that
# revision strings in VCS URLs are properly preserved.
parts
=
_reserved_chars_re
.
split
(
path
)
cleaned_parts
=
[]
for
to_clean
,
reserved
in
pairwise
(
itertools
.
chain
(
parts
,
[
''
])):
cleaned_parts
.
append
(
clean_func
(
to_clean
))
# Normalize %xx escapes (e.g. %2f -> %2F)
cleaned_parts
.
append
(
reserved
.
upper
())
return
''
.
join
(
cleaned_parts
)
def
_clean_link
(
url
):
# type: (str) -> str
"""
Make sure a link is fully quoted.
For example, if ' ' occurs in the URL, it will be replaced with "
%20
",
and without double-quoting other characters.
"""
# Split the URL into parts according to the general structure
# `scheme://netloc/path;parameters?query#fragment`.
result
=
urllib_parse
.
urlparse
(
url
)
# If the netloc is empty, then the URL refers to a local filesystem path.
is_local_path
=
not
result
.
netloc
path
=
_clean_url_path
(
result
.
path
,
is_local_path
=
is_local_path
)
return
urllib_parse
.
urlunparse
(
result
.
_replace
(
path
=
path
))
def
_create_link_from_element
(
anchor
,
# type: HTMLElement
page_url
,
# type: str
base_url
,
# type: str
):
# type: (...) -> Optional[Link]
"""
Convert an anchor element in a simple repository page to a Link.
"""
href
=
anchor
.
get
(
"href"
)
if
not
href
:
return
None
url
=
_clean_link
(
urllib_parse
.
urljoin
(
base_url
,
href
))
pyrequire
=
anchor
.
get
(
'data-requires-python'
)
pyrequire
=
unescape
(
pyrequire
)
if
pyrequire
else
None
yanked_reason
=
anchor
.
get
(
'data-yanked'
)
if
yanked_reason
:
# This is a unicode string in Python 2 (and 3).
yanked_reason
=
unescape
(
yanked_reason
)
link
=
Link
(
url
,
comes_from
=
page_url
,
requires_python
=
pyrequire
,
yanked_reason
=
yanked_reason
,
)
return
link
class
CacheablePageContent
(
object
):
def
__init__
(
self
,
page
):
# type: (HTMLPage) -> None
assert
page
.
cache_link_parsing
self
.
page
=
page
def
__eq__
(
self
,
other
):
# type: (object) -> bool
return
(
isinstance
(
other
,
type
(
self
))
and
self
.
page
.
url
==
other
.
page
.
url
)
def
__hash__
(
self
):
# type: () -> int
return
hash
(
self
.
page
.
url
)
def
with_cached_html_pages
(
fn
,
# type: Callable[[HTMLPage], Iterable[Link]]
):
# type: (...) -> Callable[[HTMLPage], List[Link]]
"""
Given a function that parses an Iterable[Link] from an HTMLPage, cache the
function's result (keyed by CacheablePageContent), unless the HTMLPage
`page` has `page.cache_link_parsing == False`.
"""
@_lru_cache
(
maxsize
=
None
)
def
wrapper
(
cacheable_page
):
# type: (CacheablePageContent) -> List[Link]
return
list
(
fn
(
cacheable_page
.
page
))
@functools.wraps
(
fn
)
def
wrapper_wrapper
(
page
):
# type: (HTMLPage) -> List[Link]
if
page
.
cache_link_parsing
:
return
wrapper
(
CacheablePageContent
(
page
))
return
list
(
fn
(
page
))
return
wrapper_wrapper
@with_cached_html_pages
def
parse_links
(
page
):
# type: (HTMLPage) -> Iterable[Link]
"""
Parse an HTML document, and yield its anchor elements as Link objects.
"""
document
=
html5lib
.
parse
(
page
.
content
,
transport_encoding
=
page
.
encoding
,
namespaceHTMLElements
=
False
,
)
url
=
page
.
url
base_url
=
_determine_base_url
(
document
,
url
)
for
anchor
in
document
.
findall
(
".//a"
):
link
=
_create_link_from_element
(
anchor
,
page_url
=
url
,
base_url
=
base_url
,
)
if
link
is
None
:
continue
yield
link
class
HTMLPage
(
object
):
"""Represents one page, along with its URL"""
def
__init__
(
self
,
content
,
# type: bytes
encoding
,
# type: Optional[str]
url
,
# type: str
cache_link_parsing
=
True
,
# type: bool
):
# type: (...) -> None
"""
:param encoding: the encoding to decode the given content.
:param url: the URL from which the HTML was downloaded.
:param cache_link_parsing: whether links parsed from this page's url
should be cached. PyPI index urls should
have this set to False, for example.
"""
self
.
content
=
content
self
.
encoding
=
encoding
self
.
url
=
url
self
.
cache_link_parsing
=
cache_link_parsing
def
__str__
(
self
):
# type: () -> str
return
redact_auth_from_url
(
self
.
url
)
def
_handle_get_page_fail
(
link
,
# type: Link
reason
,
# type: Union[str, Exception]
meth
=
None
# type: Optional[Callable[..., None]]
):
# type: (...) -> None
if
meth
is
None
:
meth
=
logger
.
debug
meth
(
"Could not fetch URL
%
s:
%
s - skipping"
,
link
,
reason
)
def
_make_html_page
(
response
,
cache_link_parsing
=
True
):
# type: (Response, bool) -> HTMLPage
encoding
=
_get_encoding_from_headers
(
response
.
headers
)
return
HTMLPage
(
response
.
content
,
encoding
=
encoding
,
url
=
response
.
url
,
cache_link_parsing
=
cache_link_parsing
)
def
_get_html_page
(
link
,
session
=
None
):
# type: (Link, Optional[PipSession]) -> Optional[HTMLPage]
if
session
is
None
:
raise
TypeError
(
"_get_html_page() missing 1 required keyword argument: 'session'"
)
url
=
link
.
url
.
split
(
'#'
,
1
)[
0
]
# Check for VCS schemes that do not support lookup as web pages.
vcs_scheme
=
_match_vcs_scheme
(
url
)
if
vcs_scheme
:
logger
.
debug
(
'Cannot look at
%
s URL
%
s'
,
vcs_scheme
,
link
)
return
None
# Tack index.html onto file:// URLs that point to directories
scheme
,
_
,
path
,
_
,
_
,
_
=
urllib_parse
.
urlparse
(
url
)
if
(
scheme
==
'file'
and
os
.
path
.
isdir
(
urllib_request
.
url2pathname
(
path
))):
# add trailing slash if not present so urljoin doesn't trim
# final segment
if
not
url
.
endswith
(
'/'
):
url
+=
'/'
url
=
urllib_parse
.
urljoin
(
url
,
'index.html'
)
logger
.
debug
(
' file: URL is directory, getting
%
s'
,
url
)
try
:
resp
=
_get_html_response
(
url
,
session
=
session
)
except
_NotHTTP
:
logger
.
debug
(
'Skipping page
%
s because it looks like an archive, and cannot '
'be checked by HEAD.'
,
link
,
)
except
_NotHTML
as
exc
:
logger
.
debug
(
'Skipping page
%
s because the
%
s request got Content-Type:
%
s'
,
link
,
exc
.
request_desc
,
exc
.
content_type
,
)
except
HTTPError
as
exc
:
_handle_get_page_fail
(
link
,
exc
)
except
RetryError
as
exc
:
_handle_get_page_fail
(
link
,
exc
)
except
SSLError
as
exc
:
reason
=
"There was a problem confirming the ssl certificate: "
reason
+=
str
(
exc
)
_handle_get_page_fail
(
link
,
reason
,
meth
=
logger
.
info
)
except
requests
.
ConnectionError
as
exc
:
_handle_get_page_fail
(
link
,
"connection error: {}"
.
format
(
exc
))
except
requests
.
Timeout
:
_handle_get_page_fail
(
link
,
"timed out"
)
else
:
return
_make_html_page
(
resp
,
cache_link_parsing
=
link
.
cache_link_parsing
)
return
None
def
_remove_duplicate_links
(
links
):
# type: (Iterable[Link]) -> List[Link]
"""
Return a list of links, with duplicates removed and ordering preserved.
"""
# We preserve the ordering when removing duplicates because we can.
return
list
(
OrderedDict
.
fromkeys
(
links
))
def
group_locations
(
locations
,
expand_dir
=
False
):
# type: (Sequence[str], bool) -> Tuple[List[str], List[str]]
"""
Divide a list of locations into two groups: "files" (archives) and "urls."
:return: A pair of lists (files, urls).
"""
files
=
[]
urls
=
[]
# puts the url for the given file path into the appropriate list
def
sort_path
(
path
):
# type: (str) -> None
url
=
path_to_url
(
path
)
if
mimetypes
.
guess_type
(
url
,
strict
=
False
)[
0
]
==
'text/html'
:
urls
.
append
(
url
)
else
:
files
.
append
(
url
)
for
url
in
locations
:
is_local_path
=
os
.
path
.
exists
(
url
)
is_file_url
=
url
.
startswith
(
'file:'
)
if
is_local_path
or
is_file_url
:
if
is_local_path
:
path
=
url
else
:
path
=
url_to_path
(
url
)
if
os
.
path
.
isdir
(
path
):
if
expand_dir
:
path
=
os
.
path
.
realpath
(
path
)
for
item
in
os
.
listdir
(
path
):
sort_path
(
os
.
path
.
join
(
path
,
item
))
elif
is_file_url
:
urls
.
append
(
url
)
else
:
logger
.
warning
(
"Path '{0}' is ignored: "
"it is a directory."
.
format
(
path
),
)
elif
os
.
path
.
isfile
(
path
):
sort_path
(
path
)
else
:
logger
.
warning
(
"Url '
%
s' is ignored: it is neither a file "
"nor a directory."
,
url
,
)
elif
is_url
(
url
):
# Only add url with clear scheme
urls
.
append
(
url
)
else
:
logger
.
warning
(
"Url '
%
s' is ignored. It is either a non-existing "
"path or lacks a specific scheme."
,
url
,
)
return
files
,
urls
class
CollectedLinks
(
object
):
"""
Encapsulates the return value of a call to LinkCollector.collect_links().
The return value includes both URLs to project pages containing package
links, as well as individual package Link objects collected from other
sources.
This info is stored separately as:
(1) links from the configured file locations,
(2) links from the configured find_links, and
(3) urls to HTML project pages, as described by the PEP 503 simple
repository API.
"""
def
__init__
(
self
,
files
,
# type: List[Link]
find_links
,
# type: List[Link]
project_urls
,
# type: List[Link]
):
# type: (...) -> None
"""
:param files: Links from file locations.
:param find_links: Links from find_links.
:param project_urls: URLs to HTML project pages, as described by
the PEP 503 simple repository API.
"""
self
.
files
=
files
self
.
find_links
=
find_links
self
.
project_urls
=
project_urls
class
LinkCollector
(
object
):
"""
Responsible for collecting Link objects from all configured locations,
making network requests as needed.
The class's main method is its collect_links() method.
"""
def
__init__
(
self
,
session
,
# type: PipSession
search_scope
,
# type: SearchScope
):
# type: (...) -> None
self
.
search_scope
=
search_scope
self
.
session
=
session
@property
def
find_links
(
self
):
# type: () -> List[str]
return
self
.
search_scope
.
find_links
def
fetch_page
(
self
,
location
):
# type: (Link) -> Optional[HTMLPage]
"""
Fetch an HTML page containing package links.
"""
return
_get_html_page
(
location
,
session
=
self
.
session
)
def
collect_links
(
self
,
project_name
):
# type: (str) -> CollectedLinks
"""Find all available links for the given project name.
:return: All the Link objects (unfiltered), as a CollectedLinks object.
"""
search_scope
=
self
.
search_scope
index_locations
=
search_scope
.
get_index_urls_locations
(
project_name
)
index_file_loc
,
index_url_loc
=
group_locations
(
index_locations
)
fl_file_loc
,
fl_url_loc
=
group_locations
(
self
.
find_links
,
expand_dir
=
True
,
)
file_links
=
[
Link
(
url
)
for
url
in
itertools
.
chain
(
index_file_loc
,
fl_file_loc
)
]
# We trust every directly linked archive in find_links
find_link_links
=
[
Link
(
url
,
'-f'
)
for
url
in
self
.
find_links
]
# We trust every url that the user has given us whether it was given
# via --index-url or --find-links.
# We want to filter out anything that does not have a secure origin.
url_locations
=
[
link
for
link
in
itertools
.
chain
(
# Mark PyPI indices as "cache_link_parsing == False" -- this
# will avoid caching the result of parsing the page for links.
(
Link
(
url
,
cache_link_parsing
=
False
)
for
url
in
index_url_loc
),
(
Link
(
url
)
for
url
in
fl_url_loc
),
)
if
self
.
session
.
is_secure_origin
(
link
)
]
url_locations
=
_remove_duplicate_links
(
url_locations
)
lines
=
[
'{} location(s) to search for versions of {}:'
.
format
(
len
(
url_locations
),
project_name
,
),
]
for
link
in
url_locations
:
lines
.
append
(
'* {}'
.
format
(
link
))
logger
.
debug
(
'
\n
'
.
join
(
lines
))
return
CollectedLinks
(
files
=
file_links
,
find_links
=
find_link_links
,
project_urls
=
url_locations
,
)
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment