Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
N
news
Project
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Sartika Aritonang
news
Commits
fd87bf9b
Commit
fd87bf9b
authored
May 29, 2020
by
Sartika Aritonang
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Upload New File
parent
706872b7
Show whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
1302 additions
and
0 deletions
+1302
-0
locators.py
stbi/Lib/site-packages/pip/_vendor/distlib/locators.py
+1302
-0
No files found.
stbi/Lib/site-packages/pip/_vendor/distlib/locators.py
0 → 100644
View file @
fd87bf9b
# -*- coding: utf-8 -*-
#
# Copyright (C) 2012-2015 Vinay Sajip.
# Licensed to the Python Software Foundation under a contributor agreement.
# See LICENSE.txt and CONTRIBUTORS.txt.
#
import
gzip
from
io
import
BytesIO
import
json
import
logging
import
os
import
posixpath
import
re
try
:
import
threading
except
ImportError
:
# pragma: no cover
import
dummy_threading
as
threading
import
zlib
from
.
import
DistlibException
from
.compat
import
(
urljoin
,
urlparse
,
urlunparse
,
url2pathname
,
pathname2url
,
queue
,
quote
,
unescape
,
string_types
,
build_opener
,
HTTPRedirectHandler
as
BaseRedirectHandler
,
text_type
,
Request
,
HTTPError
,
URLError
)
from
.database
import
Distribution
,
DistributionPath
,
make_dist
from
.metadata
import
Metadata
,
MetadataInvalidError
from
.util
import
(
cached_property
,
parse_credentials
,
ensure_slash
,
split_filename
,
get_project_data
,
parse_requirement
,
parse_name_and_version
,
ServerProxy
,
normalize_name
)
from
.version
import
get_scheme
,
UnsupportedVersionError
from
.wheel
import
Wheel
,
is_compatible
logger
=
logging
.
getLogger
(
__name__
)
HASHER_HASH
=
re
.
compile
(
r'^(\w+)=([a-f0-9]+)'
)
CHARSET
=
re
.
compile
(
r';\s*charset\s*=\s*(.*)\s*$'
,
re
.
I
)
HTML_CONTENT_TYPE
=
re
.
compile
(
'text/html|application/x(ht)?ml'
)
DEFAULT_INDEX
=
'https://pypi.org/pypi'
def
get_all_distribution_names
(
url
=
None
):
"""
Return all distribution names known by an index.
:param url: The URL of the index.
:return: A list of all known distribution names.
"""
if
url
is
None
:
url
=
DEFAULT_INDEX
client
=
ServerProxy
(
url
,
timeout
=
3.0
)
try
:
return
client
.
list_packages
()
finally
:
client
(
'close'
)()
class
RedirectHandler
(
BaseRedirectHandler
):
"""
A class to work around a bug in some Python 3.2.x releases.
"""
# There's a bug in the base version for some 3.2.x
# (e.g. 3.2.2 on Ubuntu Oneiric). If a Location header
# returns e.g. /abc, it bails because it says the scheme ''
# is bogus, when actually it should use the request's
# URL for the scheme. See Python issue #13696.
def
http_error_302
(
self
,
req
,
fp
,
code
,
msg
,
headers
):
# Some servers (incorrectly) return multiple Location headers
# (so probably same goes for URI). Use first header.
newurl
=
None
for
key
in
(
'location'
,
'uri'
):
if
key
in
headers
:
newurl
=
headers
[
key
]
break
if
newurl
is
None
:
# pragma: no cover
return
urlparts
=
urlparse
(
newurl
)
if
urlparts
.
scheme
==
''
:
newurl
=
urljoin
(
req
.
get_full_url
(),
newurl
)
if
hasattr
(
headers
,
'replace_header'
):
headers
.
replace_header
(
key
,
newurl
)
else
:
headers
[
key
]
=
newurl
return
BaseRedirectHandler
.
http_error_302
(
self
,
req
,
fp
,
code
,
msg
,
headers
)
http_error_301
=
http_error_303
=
http_error_307
=
http_error_302
class
Locator
(
object
):
"""
A base class for locators - things that locate distributions.
"""
source_extensions
=
(
'.tar.gz'
,
'.tar.bz2'
,
'.tar'
,
'.zip'
,
'.tgz'
,
'.tbz'
)
binary_extensions
=
(
'.egg'
,
'.exe'
,
'.whl'
)
excluded_extensions
=
(
'.pdf'
,)
# A list of tags indicating which wheels you want to match. The default
# value of None matches against the tags compatible with the running
# Python. If you want to match other values, set wheel_tags on a locator
# instance to a list of tuples (pyver, abi, arch) which you want to match.
wheel_tags
=
None
downloadable_extensions
=
source_extensions
+
(
'.whl'
,)
def
__init__
(
self
,
scheme
=
'default'
):
"""
Initialise an instance.
:param scheme: Because locators look for most recent versions, they
need to know the version scheme to use. This specifies
the current PEP-recommended scheme - use ``'legacy'``
if you need to support existing distributions on PyPI.
"""
self
.
_cache
=
{}
self
.
scheme
=
scheme
# Because of bugs in some of the handlers on some of the platforms,
# we use our own opener rather than just using urlopen.
self
.
opener
=
build_opener
(
RedirectHandler
())
# If get_project() is called from locate(), the matcher instance
# is set from the requirement passed to locate(). See issue #18 for
# why this can be useful to know.
self
.
matcher
=
None
self
.
errors
=
queue
.
Queue
()
def
get_errors
(
self
):
"""
Return any errors which have occurred.
"""
result
=
[]
while
not
self
.
errors
.
empty
():
# pragma: no cover
try
:
e
=
self
.
errors
.
get
(
False
)
result
.
append
(
e
)
except
self
.
errors
.
Empty
:
continue
self
.
errors
.
task_done
()
return
result
def
clear_errors
(
self
):
"""
Clear any errors which may have been logged.
"""
# Just get the errors and throw them away
self
.
get_errors
()
def
clear_cache
(
self
):
self
.
_cache
.
clear
()
def
_get_scheme
(
self
):
return
self
.
_scheme
def
_set_scheme
(
self
,
value
):
self
.
_scheme
=
value
scheme
=
property
(
_get_scheme
,
_set_scheme
)
def
_get_project
(
self
,
name
):
"""
For a given project, get a dictionary mapping available versions to Distribution
instances.
This should be implemented in subclasses.
If called from a locate() request, self.matcher will be set to a
matcher for the requirement to satisfy, otherwise it will be None.
"""
raise
NotImplementedError
(
'Please implement in the subclass'
)
def
get_distribution_names
(
self
):
"""
Return all the distribution names known to this locator.
"""
raise
NotImplementedError
(
'Please implement in the subclass'
)
def
get_project
(
self
,
name
):
"""
For a given project, get a dictionary mapping available versions to Distribution
instances.
This calls _get_project to do all the work, and just implements a caching layer on top.
"""
if
self
.
_cache
is
None
:
# pragma: no cover
result
=
self
.
_get_project
(
name
)
elif
name
in
self
.
_cache
:
result
=
self
.
_cache
[
name
]
else
:
self
.
clear_errors
()
result
=
self
.
_get_project
(
name
)
self
.
_cache
[
name
]
=
result
return
result
def
score_url
(
self
,
url
):
"""
Give an url a score which can be used to choose preferred URLs
for a given project release.
"""
t
=
urlparse
(
url
)
basename
=
posixpath
.
basename
(
t
.
path
)
compatible
=
True
is_wheel
=
basename
.
endswith
(
'.whl'
)
is_downloadable
=
basename
.
endswith
(
self
.
downloadable_extensions
)
if
is_wheel
:
compatible
=
is_compatible
(
Wheel
(
basename
),
self
.
wheel_tags
)
return
(
t
.
scheme
==
'https'
,
'pypi.org'
in
t
.
netloc
,
is_downloadable
,
is_wheel
,
compatible
,
basename
)
def
prefer_url
(
self
,
url1
,
url2
):
"""
Choose one of two URLs where both are candidates for distribution
archives for the same version of a distribution (for example,
.tar.gz vs. zip).
The current implementation favours https:// URLs over http://, archives
from PyPI over those from other locations, wheel compatibility (if a
wheel) and then the archive name.
"""
result
=
url2
if
url1
:
s1
=
self
.
score_url
(
url1
)
s2
=
self
.
score_url
(
url2
)
if
s1
>
s2
:
result
=
url1
if
result
!=
url2
:
logger
.
debug
(
'Not replacing
%
r with
%
r'
,
url1
,
url2
)
else
:
logger
.
debug
(
'Replacing
%
r with
%
r'
,
url1
,
url2
)
return
result
def
split_filename
(
self
,
filename
,
project_name
):
"""
Attempt to split a filename in project name, version and Python version.
"""
return
split_filename
(
filename
,
project_name
)
def
convert_url_to_download_info
(
self
,
url
,
project_name
):
"""
See if a URL is a candidate for a download URL for a project (the URL
has typically been scraped from an HTML page).
If it is, a dictionary is returned with keys "name", "version",
"filename" and "url"; otherwise, None is returned.
"""
def
same_project
(
name1
,
name2
):
return
normalize_name
(
name1
)
==
normalize_name
(
name2
)
result
=
None
scheme
,
netloc
,
path
,
params
,
query
,
frag
=
urlparse
(
url
)
if
frag
.
lower
()
.
startswith
(
'egg='
):
# pragma: no cover
logger
.
debug
(
'
%
s: version hint in fragment:
%
r'
,
project_name
,
frag
)
m
=
HASHER_HASH
.
match
(
frag
)
if
m
:
algo
,
digest
=
m
.
groups
()
else
:
algo
,
digest
=
None
,
None
origpath
=
path
if
path
and
path
[
-
1
]
==
'/'
:
# pragma: no cover
path
=
path
[:
-
1
]
if
path
.
endswith
(
'.whl'
):
try
:
wheel
=
Wheel
(
path
)
if
not
is_compatible
(
wheel
,
self
.
wheel_tags
):
logger
.
debug
(
'Wheel not compatible:
%
s'
,
path
)
else
:
if
project_name
is
None
:
include
=
True
else
:
include
=
same_project
(
wheel
.
name
,
project_name
)
if
include
:
result
=
{
'name'
:
wheel
.
name
,
'version'
:
wheel
.
version
,
'filename'
:
wheel
.
filename
,
'url'
:
urlunparse
((
scheme
,
netloc
,
origpath
,
params
,
query
,
''
)),
'python-version'
:
', '
.
join
(
[
'.'
.
join
(
list
(
v
[
2
:]))
for
v
in
wheel
.
pyver
]),
}
except
Exception
as
e
:
# pragma: no cover
logger
.
warning
(
'invalid path for wheel:
%
s'
,
path
)
elif
not
path
.
endswith
(
self
.
downloadable_extensions
):
# pragma: no cover
logger
.
debug
(
'Not downloadable:
%
s'
,
path
)
else
:
# downloadable extension
path
=
filename
=
posixpath
.
basename
(
path
)
for
ext
in
self
.
downloadable_extensions
:
if
path
.
endswith
(
ext
):
path
=
path
[:
-
len
(
ext
)]
t
=
self
.
split_filename
(
path
,
project_name
)
if
not
t
:
# pragma: no cover
logger
.
debug
(
'No match for project/version:
%
s'
,
path
)
else
:
name
,
version
,
pyver
=
t
if
not
project_name
or
same_project
(
project_name
,
name
):
result
=
{
'name'
:
name
,
'version'
:
version
,
'filename'
:
filename
,
'url'
:
urlunparse
((
scheme
,
netloc
,
origpath
,
params
,
query
,
''
)),
#'packagetype': 'sdist',
}
if
pyver
:
# pragma: no cover
result
[
'python-version'
]
=
pyver
break
if
result
and
algo
:
result
[
'
%
s_digest'
%
algo
]
=
digest
return
result
def
_get_digest
(
self
,
info
):
"""
Get a digest from a dictionary by looking at a "digests" dictionary
or keys of the form 'algo_digest'.
Returns a 2-tuple (algo, digest) if found, else None. Currently
looks only for SHA256, then MD5.
"""
result
=
None
if
'digests'
in
info
:
digests
=
info
[
'digests'
]
for
algo
in
(
'sha256'
,
'md5'
):
if
algo
in
digests
:
result
=
(
algo
,
digests
[
algo
])
break
if
not
result
:
for
algo
in
(
'sha256'
,
'md5'
):
key
=
'
%
s_digest'
%
algo
if
key
in
info
:
result
=
(
algo
,
info
[
key
])
break
return
result
def
_update_version_data
(
self
,
result
,
info
):
"""
Update a result dictionary (the final result from _get_project) with a
dictionary for a specific version, which typically holds information
gleaned from a filename or URL for an archive for the distribution.
"""
name
=
info
.
pop
(
'name'
)
version
=
info
.
pop
(
'version'
)
if
version
in
result
:
dist
=
result
[
version
]
md
=
dist
.
metadata
else
:
dist
=
make_dist
(
name
,
version
,
scheme
=
self
.
scheme
)
md
=
dist
.
metadata
dist
.
digest
=
digest
=
self
.
_get_digest
(
info
)
url
=
info
[
'url'
]
result
[
'digests'
][
url
]
=
digest
if
md
.
source_url
!=
info
[
'url'
]:
md
.
source_url
=
self
.
prefer_url
(
md
.
source_url
,
url
)
result
[
'urls'
]
.
setdefault
(
version
,
set
())
.
add
(
url
)
dist
.
locator
=
self
result
[
version
]
=
dist
def
locate
(
self
,
requirement
,
prereleases
=
False
):
"""
Find the most recent distribution which matches the given
requirement.
:param requirement: A requirement of the form 'foo (1.0)' or perhaps
'foo (>= 1.0, < 2.0, != 1.3)'
:param prereleases: If ``True``, allow pre-release versions
to be located. Otherwise, pre-release versions
are not returned.
:return: A :class:`Distribution` instance, or ``None`` if no such
distribution could be located.
"""
result
=
None
r
=
parse_requirement
(
requirement
)
if
r
is
None
:
# pragma: no cover
raise
DistlibException
(
'Not a valid requirement:
%
r'
%
requirement
)
scheme
=
get_scheme
(
self
.
scheme
)
self
.
matcher
=
matcher
=
scheme
.
matcher
(
r
.
requirement
)
logger
.
debug
(
'matcher:
%
s (
%
s)'
,
matcher
,
type
(
matcher
)
.
__name__
)
versions
=
self
.
get_project
(
r
.
name
)
if
len
(
versions
)
>
2
:
# urls and digests keys are present
# sometimes, versions are invalid
slist
=
[]
vcls
=
matcher
.
version_class
for
k
in
versions
:
if
k
in
(
'urls'
,
'digests'
):
continue
try
:
if
not
matcher
.
match
(
k
):
logger
.
debug
(
'
%
s did not match
%
r'
,
matcher
,
k
)
else
:
if
prereleases
or
not
vcls
(
k
)
.
is_prerelease
:
slist
.
append
(
k
)
else
:
logger
.
debug
(
'skipping pre-release '
'version
%
s of
%
s'
,
k
,
matcher
.
name
)
except
Exception
:
# pragma: no cover
logger
.
warning
(
'error matching
%
s with
%
r'
,
matcher
,
k
)
pass
# slist.append(k)
if
len
(
slist
)
>
1
:
slist
=
sorted
(
slist
,
key
=
scheme
.
key
)
if
slist
:
logger
.
debug
(
'sorted list:
%
s'
,
slist
)
version
=
slist
[
-
1
]
result
=
versions
[
version
]
if
result
:
if
r
.
extras
:
result
.
extras
=
r
.
extras
result
.
download_urls
=
versions
.
get
(
'urls'
,
{})
.
get
(
version
,
set
())
d
=
{}
sd
=
versions
.
get
(
'digests'
,
{})
for
url
in
result
.
download_urls
:
if
url
in
sd
:
# pragma: no cover
d
[
url
]
=
sd
[
url
]
result
.
digests
=
d
self
.
matcher
=
None
return
result
class
PyPIRPCLocator
(
Locator
):
"""
This locator uses XML-RPC to locate distributions. It therefore
cannot be used with simple mirrors (that only mirror file content).
"""
def
__init__
(
self
,
url
,
**
kwargs
):
"""
Initialise an instance.
:param url: The URL to use for XML-RPC.
:param kwargs: Passed to the superclass constructor.
"""
super
(
PyPIRPCLocator
,
self
)
.
__init__
(
**
kwargs
)
self
.
base_url
=
url
self
.
client
=
ServerProxy
(
url
,
timeout
=
3.0
)
def
get_distribution_names
(
self
):
"""
Return all the distribution names known to this locator.
"""
return
set
(
self
.
client
.
list_packages
())
def
_get_project
(
self
,
name
):
result
=
{
'urls'
:
{},
'digests'
:
{}}
versions
=
self
.
client
.
package_releases
(
name
,
True
)
for
v
in
versions
:
urls
=
self
.
client
.
release_urls
(
name
,
v
)
data
=
self
.
client
.
release_data
(
name
,
v
)
metadata
=
Metadata
(
scheme
=
self
.
scheme
)
metadata
.
name
=
data
[
'name'
]
metadata
.
version
=
data
[
'version'
]
metadata
.
license
=
data
.
get
(
'license'
)
metadata
.
keywords
=
data
.
get
(
'keywords'
,
[])
metadata
.
summary
=
data
.
get
(
'summary'
)
dist
=
Distribution
(
metadata
)
if
urls
:
info
=
urls
[
0
]
metadata
.
source_url
=
info
[
'url'
]
dist
.
digest
=
self
.
_get_digest
(
info
)
dist
.
locator
=
self
result
[
v
]
=
dist
for
info
in
urls
:
url
=
info
[
'url'
]
digest
=
self
.
_get_digest
(
info
)
result
[
'urls'
]
.
setdefault
(
v
,
set
())
.
add
(
url
)
result
[
'digests'
][
url
]
=
digest
return
result
class
PyPIJSONLocator
(
Locator
):
"""
This locator uses PyPI's JSON interface. It's very limited in functionality
and probably not worth using.
"""
def
__init__
(
self
,
url
,
**
kwargs
):
super
(
PyPIJSONLocator
,
self
)
.
__init__
(
**
kwargs
)
self
.
base_url
=
ensure_slash
(
url
)
def
get_distribution_names
(
self
):
"""
Return all the distribution names known to this locator.
"""
raise
NotImplementedError
(
'Not available from this locator'
)
def
_get_project
(
self
,
name
):
result
=
{
'urls'
:
{},
'digests'
:
{}}
url
=
urljoin
(
self
.
base_url
,
'
%
s/json'
%
quote
(
name
))
try
:
resp
=
self
.
opener
.
open
(
url
)
data
=
resp
.
read
()
.
decode
()
# for now
d
=
json
.
loads
(
data
)
md
=
Metadata
(
scheme
=
self
.
scheme
)
data
=
d
[
'info'
]
md
.
name
=
data
[
'name'
]
md
.
version
=
data
[
'version'
]
md
.
license
=
data
.
get
(
'license'
)
md
.
keywords
=
data
.
get
(
'keywords'
,
[])
md
.
summary
=
data
.
get
(
'summary'
)
dist
=
Distribution
(
md
)
dist
.
locator
=
self
urls
=
d
[
'urls'
]
result
[
md
.
version
]
=
dist
for
info
in
d
[
'urls'
]:
url
=
info
[
'url'
]
dist
.
download_urls
.
add
(
url
)
dist
.
digests
[
url
]
=
self
.
_get_digest
(
info
)
result
[
'urls'
]
.
setdefault
(
md
.
version
,
set
())
.
add
(
url
)
result
[
'digests'
][
url
]
=
self
.
_get_digest
(
info
)
# Now get other releases
for
version
,
infos
in
d
[
'releases'
]
.
items
():
if
version
==
md
.
version
:
continue
# already done
omd
=
Metadata
(
scheme
=
self
.
scheme
)
omd
.
name
=
md
.
name
omd
.
version
=
version
odist
=
Distribution
(
omd
)
odist
.
locator
=
self
result
[
version
]
=
odist
for
info
in
infos
:
url
=
info
[
'url'
]
odist
.
download_urls
.
add
(
url
)
odist
.
digests
[
url
]
=
self
.
_get_digest
(
info
)
result
[
'urls'
]
.
setdefault
(
version
,
set
())
.
add
(
url
)
result
[
'digests'
][
url
]
=
self
.
_get_digest
(
info
)
# for info in urls:
# md.source_url = info['url']
# dist.digest = self._get_digest(info)
# dist.locator = self
# for info in urls:
# url = info['url']
# result['urls'].setdefault(md.version, set()).add(url)
# result['digests'][url] = self._get_digest(info)
except
Exception
as
e
:
self
.
errors
.
put
(
text_type
(
e
))
logger
.
exception
(
'JSON fetch failed:
%
s'
,
e
)
return
result
class
Page
(
object
):
"""
This class represents a scraped HTML page.
"""
# The following slightly hairy-looking regex just looks for the contents of
# an anchor link, which has an attribute "href" either immediately preceded
# or immediately followed by a "rel" attribute. The attribute values can be
# declared with double quotes, single quotes or no quotes - which leads to
# the length of the expression.
_href
=
re
.
compile
(
"""
(rel
\\
s*=
\\
s*(?:"(?P<rel1>[^"]*)"|'(?P<rel2>[^']*)'|(?P<rel3>[^>
\\
s
\n
]*))
\\
s+)?
href
\\
s*=
\\
s*(?:"(?P<url1>[^"]*)"|'(?P<url2>[^']*)'|(?P<url3>[^>
\\
s
\n
]*))
(
\\
s+rel
\\
s*=
\\
s*(?:"(?P<rel4>[^"]*)"|'(?P<rel5>[^']*)'|(?P<rel6>[^>
\\
s
\n
]*)))?
"""
,
re
.
I
|
re
.
S
|
re
.
X
)
_base
=
re
.
compile
(
r"""<base\s+href\s*=\s*['"]?([^'">]+)"""
,
re
.
I
|
re
.
S
)
def
__init__
(
self
,
data
,
url
):
"""
Initialise an instance with the Unicode page contents and the URL they
came from.
"""
self
.
data
=
data
self
.
base_url
=
self
.
url
=
url
m
=
self
.
_base
.
search
(
self
.
data
)
if
m
:
self
.
base_url
=
m
.
group
(
1
)
_clean_re
=
re
.
compile
(
r'[^a-z0-9$&+,/:;=?@.#
%
_\\|-]'
,
re
.
I
)
@cached_property
def
links
(
self
):
"""
Return the URLs of all the links on a page together with information
about their "rel" attribute, for determining which ones to treat as
downloads and which ones to queue for further scraping.
"""
def
clean
(
url
):
"Tidy up an URL."
scheme
,
netloc
,
path
,
params
,
query
,
frag
=
urlparse
(
url
)
return
urlunparse
((
scheme
,
netloc
,
quote
(
path
),
params
,
query
,
frag
))
result
=
set
()
for
match
in
self
.
_href
.
finditer
(
self
.
data
):
d
=
match
.
groupdict
(
''
)
rel
=
(
d
[
'rel1'
]
or
d
[
'rel2'
]
or
d
[
'rel3'
]
or
d
[
'rel4'
]
or
d
[
'rel5'
]
or
d
[
'rel6'
])
url
=
d
[
'url1'
]
or
d
[
'url2'
]
or
d
[
'url3'
]
url
=
urljoin
(
self
.
base_url
,
url
)
url
=
unescape
(
url
)
url
=
self
.
_clean_re
.
sub
(
lambda
m
:
'
%%%2
x'
%
ord
(
m
.
group
(
0
)),
url
)
result
.
add
((
url
,
rel
))
# We sort the result, hoping to bring the most recent versions
# to the front
result
=
sorted
(
result
,
key
=
lambda
t
:
t
[
0
],
reverse
=
True
)
return
result
class
SimpleScrapingLocator
(
Locator
):
"""
A locator which scrapes HTML pages to locate downloads for a distribution.
This runs multiple threads to do the I/O; performance is at least as good
as pip's PackageFinder, which works in an analogous fashion.
"""
# These are used to deal with various Content-Encoding schemes.
decoders
=
{
'deflate'
:
zlib
.
decompress
,
'gzip'
:
lambda
b
:
gzip
.
GzipFile
(
fileobj
=
BytesIO
(
d
))
.
read
(),
'none'
:
lambda
b
:
b
,
}
def
__init__
(
self
,
url
,
timeout
=
None
,
num_workers
=
10
,
**
kwargs
):
"""
Initialise an instance.
:param url: The root URL to use for scraping.
:param timeout: The timeout, in seconds, to be applied to requests.
This defaults to ``None`` (no timeout specified).
:param num_workers: The number of worker threads you want to do I/O,
This defaults to 10.
:param kwargs: Passed to the superclass.
"""
super
(
SimpleScrapingLocator
,
self
)
.
__init__
(
**
kwargs
)
self
.
base_url
=
ensure_slash
(
url
)
self
.
timeout
=
timeout
self
.
_page_cache
=
{}
self
.
_seen
=
set
()
self
.
_to_fetch
=
queue
.
Queue
()
self
.
_bad_hosts
=
set
()
self
.
skip_externals
=
False
self
.
num_workers
=
num_workers
self
.
_lock
=
threading
.
RLock
()
# See issue #45: we need to be resilient when the locator is used
# in a thread, e.g. with concurrent.futures. We can't use self._lock
# as it is for coordinating our internal threads - the ones created
# in _prepare_threads.
self
.
_gplock
=
threading
.
RLock
()
self
.
platform_check
=
False
# See issue #112
def
_prepare_threads
(
self
):
"""
Threads are created only when get_project is called, and terminate
before it returns. They are there primarily to parallelise I/O (i.e.
fetching web pages).
"""
self
.
_threads
=
[]
for
i
in
range
(
self
.
num_workers
):
t
=
threading
.
Thread
(
target
=
self
.
_fetch
)
t
.
setDaemon
(
True
)
t
.
start
()
self
.
_threads
.
append
(
t
)
def
_wait_threads
(
self
):
"""
Tell all the threads to terminate (by sending a sentinel value) and
wait for them to do so.
"""
# Note that you need two loops, since you can't say which
# thread will get each sentinel
for
t
in
self
.
_threads
:
self
.
_to_fetch
.
put
(
None
)
# sentinel
for
t
in
self
.
_threads
:
t
.
join
()
self
.
_threads
=
[]
def
_get_project
(
self
,
name
):
result
=
{
'urls'
:
{},
'digests'
:
{}}
with
self
.
_gplock
:
self
.
result
=
result
self
.
project_name
=
name
url
=
urljoin
(
self
.
base_url
,
'
%
s/'
%
quote
(
name
))
self
.
_seen
.
clear
()
self
.
_page_cache
.
clear
()
self
.
_prepare_threads
()
try
:
logger
.
debug
(
'Queueing
%
s'
,
url
)
self
.
_to_fetch
.
put
(
url
)
self
.
_to_fetch
.
join
()
finally
:
self
.
_wait_threads
()
del
self
.
result
return
result
platform_dependent
=
re
.
compile
(
r'\b(linux_(i\d86|x86_64|arm\w+)|'
r'win(32|_amd64)|macosx_?\d+)\b'
,
re
.
I
)
def
_is_platform_dependent
(
self
,
url
):
"""
Does an URL refer to a platform-specific download?
"""
return
self
.
platform_dependent
.
search
(
url
)
def
_process_download
(
self
,
url
):
"""
See if an URL is a suitable download for a project.
If it is, register information in the result dictionary (for
_get_project) about the specific version it's for.
Note that the return value isn't actually used other than as a boolean
value.
"""
if
self
.
platform_check
and
self
.
_is_platform_dependent
(
url
):
info
=
None
else
:
info
=
self
.
convert_url_to_download_info
(
url
,
self
.
project_name
)
logger
.
debug
(
'process_download:
%
s ->
%
s'
,
url
,
info
)
if
info
:
with
self
.
_lock
:
# needed because self.result is shared
self
.
_update_version_data
(
self
.
result
,
info
)
return
info
def
_should_queue
(
self
,
link
,
referrer
,
rel
):
"""
Determine whether a link URL from a referring page and with a
particular "rel" attribute should be queued for scraping.
"""
scheme
,
netloc
,
path
,
_
,
_
,
_
=
urlparse
(
link
)
if
path
.
endswith
(
self
.
source_extensions
+
self
.
binary_extensions
+
self
.
excluded_extensions
):
result
=
False
elif
self
.
skip_externals
and
not
link
.
startswith
(
self
.
base_url
):
result
=
False
elif
not
referrer
.
startswith
(
self
.
base_url
):
result
=
False
elif
rel
not
in
(
'homepage'
,
'download'
):
result
=
False
elif
scheme
not
in
(
'http'
,
'https'
,
'ftp'
):
result
=
False
elif
self
.
_is_platform_dependent
(
link
):
result
=
False
else
:
host
=
netloc
.
split
(
':'
,
1
)[
0
]
if
host
.
lower
()
==
'localhost'
:
result
=
False
else
:
result
=
True
logger
.
debug
(
'should_queue:
%
s (
%
s) from
%
s ->
%
s'
,
link
,
rel
,
referrer
,
result
)
return
result
def
_fetch
(
self
):
"""
Get a URL to fetch from the work queue, get the HTML page, examine its
links for download candidates and candidates for further scraping.
This is a handy method to run in a thread.
"""
while
True
:
url
=
self
.
_to_fetch
.
get
()
try
:
if
url
:
page
=
self
.
get_page
(
url
)
if
page
is
None
:
# e.g. after an error
continue
for
link
,
rel
in
page
.
links
:
if
link
not
in
self
.
_seen
:
try
:
self
.
_seen
.
add
(
link
)
if
(
not
self
.
_process_download
(
link
)
and
self
.
_should_queue
(
link
,
url
,
rel
)):
logger
.
debug
(
'Queueing
%
s from
%
s'
,
link
,
url
)
self
.
_to_fetch
.
put
(
link
)
except
MetadataInvalidError
:
# e.g. invalid versions
pass
except
Exception
as
e
:
# pragma: no cover
self
.
errors
.
put
(
text_type
(
e
))
finally
:
# always do this, to avoid hangs :-)
self
.
_to_fetch
.
task_done
()
if
not
url
:
#logger.debug('Sentinel seen, quitting.')
break
def
get_page
(
self
,
url
):
"""
Get the HTML for an URL, possibly from an in-memory cache.
XXX TODO Note: this cache is never actually cleared. It's assumed that
the data won't get stale over the lifetime of a locator instance (not
necessarily true for the default_locator).
"""
# http://peak.telecommunity.com/DevCenter/EasyInstall#package-index-api
scheme
,
netloc
,
path
,
_
,
_
,
_
=
urlparse
(
url
)
if
scheme
==
'file'
and
os
.
path
.
isdir
(
url2pathname
(
path
)):
url
=
urljoin
(
ensure_slash
(
url
),
'index.html'
)
if
url
in
self
.
_page_cache
:
result
=
self
.
_page_cache
[
url
]
logger
.
debug
(
'Returning
%
s from cache:
%
s'
,
url
,
result
)
else
:
host
=
netloc
.
split
(
':'
,
1
)[
0
]
result
=
None
if
host
in
self
.
_bad_hosts
:
logger
.
debug
(
'Skipping
%
s due to bad host
%
s'
,
url
,
host
)
else
:
req
=
Request
(
url
,
headers
=
{
'Accept-encoding'
:
'identity'
})
try
:
logger
.
debug
(
'Fetching
%
s'
,
url
)
resp
=
self
.
opener
.
open
(
req
,
timeout
=
self
.
timeout
)
logger
.
debug
(
'Fetched
%
s'
,
url
)
headers
=
resp
.
info
()
content_type
=
headers
.
get
(
'Content-Type'
,
''
)
if
HTML_CONTENT_TYPE
.
match
(
content_type
):
final_url
=
resp
.
geturl
()
data
=
resp
.
read
()
encoding
=
headers
.
get
(
'Content-Encoding'
)
if
encoding
:
decoder
=
self
.
decoders
[
encoding
]
# fail if not found
data
=
decoder
(
data
)
encoding
=
'utf-8'
m
=
CHARSET
.
search
(
content_type
)
if
m
:
encoding
=
m
.
group
(
1
)
try
:
data
=
data
.
decode
(
encoding
)
except
UnicodeError
:
# pragma: no cover
data
=
data
.
decode
(
'latin-1'
)
# fallback
result
=
Page
(
data
,
final_url
)
self
.
_page_cache
[
final_url
]
=
result
except
HTTPError
as
e
:
if
e
.
code
!=
404
:
logger
.
exception
(
'Fetch failed:
%
s:
%
s'
,
url
,
e
)
except
URLError
as
e
:
# pragma: no cover
logger
.
exception
(
'Fetch failed:
%
s:
%
s'
,
url
,
e
)
with
self
.
_lock
:
self
.
_bad_hosts
.
add
(
host
)
except
Exception
as
e
:
# pragma: no cover
logger
.
exception
(
'Fetch failed:
%
s:
%
s'
,
url
,
e
)
finally
:
self
.
_page_cache
[
url
]
=
result
# even if None (failure)
return
result
_distname_re
=
re
.
compile
(
'<a href=[^>]*>([^<]+)<'
)
def
get_distribution_names
(
self
):
"""
Return all the distribution names known to this locator.
"""
result
=
set
()
page
=
self
.
get_page
(
self
.
base_url
)
if
not
page
:
raise
DistlibException
(
'Unable to get
%
s'
%
self
.
base_url
)
for
match
in
self
.
_distname_re
.
finditer
(
page
.
data
):
result
.
add
(
match
.
group
(
1
))
return
result
class
DirectoryLocator
(
Locator
):
"""
This class locates distributions in a directory tree.
"""
def
__init__
(
self
,
path
,
**
kwargs
):
"""
Initialise an instance.
:param path: The root of the directory tree to search.
:param kwargs: Passed to the superclass constructor,
except for:
* recursive - if True (the default), subdirectories are
recursed into. If False, only the top-level directory
is searched,
"""
self
.
recursive
=
kwargs
.
pop
(
'recursive'
,
True
)
super
(
DirectoryLocator
,
self
)
.
__init__
(
**
kwargs
)
path
=
os
.
path
.
abspath
(
path
)
if
not
os
.
path
.
isdir
(
path
):
# pragma: no cover
raise
DistlibException
(
'Not a directory:
%
r'
%
path
)
self
.
base_dir
=
path
def
should_include
(
self
,
filename
,
parent
):
"""
Should a filename be considered as a candidate for a distribution
archive? As well as the filename, the directory which contains it
is provided, though not used by the current implementation.
"""
return
filename
.
endswith
(
self
.
downloadable_extensions
)
def
_get_project
(
self
,
name
):
result
=
{
'urls'
:
{},
'digests'
:
{}}
for
root
,
dirs
,
files
in
os
.
walk
(
self
.
base_dir
):
for
fn
in
files
:
if
self
.
should_include
(
fn
,
root
):
fn
=
os
.
path
.
join
(
root
,
fn
)
url
=
urlunparse
((
'file'
,
''
,
pathname2url
(
os
.
path
.
abspath
(
fn
)),
''
,
''
,
''
))
info
=
self
.
convert_url_to_download_info
(
url
,
name
)
if
info
:
self
.
_update_version_data
(
result
,
info
)
if
not
self
.
recursive
:
break
return
result
def
get_distribution_names
(
self
):
"""
Return all the distribution names known to this locator.
"""
result
=
set
()
for
root
,
dirs
,
files
in
os
.
walk
(
self
.
base_dir
):
for
fn
in
files
:
if
self
.
should_include
(
fn
,
root
):
fn
=
os
.
path
.
join
(
root
,
fn
)
url
=
urlunparse
((
'file'
,
''
,
pathname2url
(
os
.
path
.
abspath
(
fn
)),
''
,
''
,
''
))
info
=
self
.
convert_url_to_download_info
(
url
,
None
)
if
info
:
result
.
add
(
info
[
'name'
])
if
not
self
.
recursive
:
break
return
result
class
JSONLocator
(
Locator
):
"""
This locator uses special extended metadata (not available on PyPI) and is
the basis of performant dependency resolution in distlib. Other locators
require archive downloads before dependencies can be determined! As you
might imagine, that can be slow.
"""
def
get_distribution_names
(
self
):
"""
Return all the distribution names known to this locator.
"""
raise
NotImplementedError
(
'Not available from this locator'
)
def
_get_project
(
self
,
name
):
result
=
{
'urls'
:
{},
'digests'
:
{}}
data
=
get_project_data
(
name
)
if
data
:
for
info
in
data
.
get
(
'files'
,
[]):
if
info
[
'ptype'
]
!=
'sdist'
or
info
[
'pyversion'
]
!=
'source'
:
continue
# We don't store summary in project metadata as it makes
# the data bigger for no benefit during dependency
# resolution
dist
=
make_dist
(
data
[
'name'
],
info
[
'version'
],
summary
=
data
.
get
(
'summary'
,
'Placeholder for summary'
),
scheme
=
self
.
scheme
)
md
=
dist
.
metadata
md
.
source_url
=
info
[
'url'
]
# TODO SHA256 digest
if
'digest'
in
info
and
info
[
'digest'
]:
dist
.
digest
=
(
'md5'
,
info
[
'digest'
])
md
.
dependencies
=
info
.
get
(
'requirements'
,
{})
dist
.
exports
=
info
.
get
(
'exports'
,
{})
result
[
dist
.
version
]
=
dist
result
[
'urls'
]
.
setdefault
(
dist
.
version
,
set
())
.
add
(
info
[
'url'
])
return
result
class
DistPathLocator
(
Locator
):
"""
This locator finds installed distributions in a path. It can be useful for
adding to an :class:`AggregatingLocator`.
"""
def
__init__
(
self
,
distpath
,
**
kwargs
):
"""
Initialise an instance.
:param distpath: A :class:`DistributionPath` instance to search.
"""
super
(
DistPathLocator
,
self
)
.
__init__
(
**
kwargs
)
assert
isinstance
(
distpath
,
DistributionPath
)
self
.
distpath
=
distpath
def
_get_project
(
self
,
name
):
dist
=
self
.
distpath
.
get_distribution
(
name
)
if
dist
is
None
:
result
=
{
'urls'
:
{},
'digests'
:
{}}
else
:
result
=
{
dist
.
version
:
dist
,
'urls'
:
{
dist
.
version
:
set
([
dist
.
source_url
])},
'digests'
:
{
dist
.
version
:
set
([
None
])}
}
return
result
class
AggregatingLocator
(
Locator
):
"""
This class allows you to chain and/or merge a list of locators.
"""
def
__init__
(
self
,
*
locators
,
**
kwargs
):
"""
Initialise an instance.
:param locators: The list of locators to search.
:param kwargs: Passed to the superclass constructor,
except for:
* merge - if False (the default), the first successful
search from any of the locators is returned. If True,
the results from all locators are merged (this can be
slow).
"""
self
.
merge
=
kwargs
.
pop
(
'merge'
,
False
)
self
.
locators
=
locators
super
(
AggregatingLocator
,
self
)
.
__init__
(
**
kwargs
)
def
clear_cache
(
self
):
super
(
AggregatingLocator
,
self
)
.
clear_cache
()
for
locator
in
self
.
locators
:
locator
.
clear_cache
()
def
_set_scheme
(
self
,
value
):
self
.
_scheme
=
value
for
locator
in
self
.
locators
:
locator
.
scheme
=
value
scheme
=
property
(
Locator
.
scheme
.
fget
,
_set_scheme
)
def
_get_project
(
self
,
name
):
result
=
{}
for
locator
in
self
.
locators
:
d
=
locator
.
get_project
(
name
)
if
d
:
if
self
.
merge
:
files
=
result
.
get
(
'urls'
,
{})
digests
=
result
.
get
(
'digests'
,
{})
# next line could overwrite result['urls'], result['digests']
result
.
update
(
d
)
df
=
result
.
get
(
'urls'
)
if
files
and
df
:
for
k
,
v
in
files
.
items
():
if
k
in
df
:
df
[
k
]
|=
v
else
:
df
[
k
]
=
v
dd
=
result
.
get
(
'digests'
)
if
digests
and
dd
:
dd
.
update
(
digests
)
else
:
# See issue #18. If any dists are found and we're looking
# for specific constraints, we only return something if
# a match is found. For example, if a DirectoryLocator
# returns just foo (1.0) while we're looking for
# foo (>= 2.0), we'll pretend there was nothing there so
# that subsequent locators can be queried. Otherwise we
# would just return foo (1.0) which would then lead to a
# failure to find foo (>= 2.0), because other locators
# weren't searched. Note that this only matters when
# merge=False.
if
self
.
matcher
is
None
:
found
=
True
else
:
found
=
False
for
k
in
d
:
if
self
.
matcher
.
match
(
k
):
found
=
True
break
if
found
:
result
=
d
break
return
result
def
get_distribution_names
(
self
):
"""
Return all the distribution names known to this locator.
"""
result
=
set
()
for
locator
in
self
.
locators
:
try
:
result
|=
locator
.
get_distribution_names
()
except
NotImplementedError
:
pass
return
result
# We use a legacy scheme simply because most of the dists on PyPI use legacy
# versions which don't conform to PEP 426 / PEP 440.
default_locator
=
AggregatingLocator
(
JSONLocator
(),
SimpleScrapingLocator
(
'https://pypi.org/simple/'
,
timeout
=
3.0
),
scheme
=
'legacy'
)
locate
=
default_locator
.
locate
NAME_VERSION_RE
=
re
.
compile
(
r'(?P<name>[\w-]+)\s*'
r'\(\s*(==\s*)?(?P<ver>[^)]+)\)$'
)
class
DependencyFinder
(
object
):
"""
Locate dependencies for distributions.
"""
def
__init__
(
self
,
locator
=
None
):
"""
Initialise an instance, using the specified locator
to locate distributions.
"""
self
.
locator
=
locator
or
default_locator
self
.
scheme
=
get_scheme
(
self
.
locator
.
scheme
)
def
add_distribution
(
self
,
dist
):
"""
Add a distribution to the finder. This will update internal information
about who provides what.
:param dist: The distribution to add.
"""
logger
.
debug
(
'adding distribution
%
s'
,
dist
)
name
=
dist
.
key
self
.
dists_by_name
[
name
]
=
dist
self
.
dists
[(
name
,
dist
.
version
)]
=
dist
for
p
in
dist
.
provides
:
name
,
version
=
parse_name_and_version
(
p
)
logger
.
debug
(
'Add to provided:
%
s,
%
s,
%
s'
,
name
,
version
,
dist
)
self
.
provided
.
setdefault
(
name
,
set
())
.
add
((
version
,
dist
))
def
remove_distribution
(
self
,
dist
):
"""
Remove a distribution from the finder. This will update internal
information about who provides what.
:param dist: The distribution to remove.
"""
logger
.
debug
(
'removing distribution
%
s'
,
dist
)
name
=
dist
.
key
del
self
.
dists_by_name
[
name
]
del
self
.
dists
[(
name
,
dist
.
version
)]
for
p
in
dist
.
provides
:
name
,
version
=
parse_name_and_version
(
p
)
logger
.
debug
(
'Remove from provided:
%
s,
%
s,
%
s'
,
name
,
version
,
dist
)
s
=
self
.
provided
[
name
]
s
.
remove
((
version
,
dist
))
if
not
s
:
del
self
.
provided
[
name
]
def
get_matcher
(
self
,
reqt
):
"""
Get a version matcher for a requirement.
:param reqt: The requirement
:type reqt: str
:return: A version matcher (an instance of
:class:`distlib.version.Matcher`).
"""
try
:
matcher
=
self
.
scheme
.
matcher
(
reqt
)
except
UnsupportedVersionError
:
# pragma: no cover
# XXX compat-mode if cannot read the version
name
=
reqt
.
split
()[
0
]
matcher
=
self
.
scheme
.
matcher
(
name
)
return
matcher
def
find_providers
(
self
,
reqt
):
"""
Find the distributions which can fulfill a requirement.
:param reqt: The requirement.
:type reqt: str
:return: A set of distribution which can fulfill the requirement.
"""
matcher
=
self
.
get_matcher
(
reqt
)
name
=
matcher
.
key
# case-insensitive
result
=
set
()
provided
=
self
.
provided
if
name
in
provided
:
for
version
,
provider
in
provided
[
name
]:
try
:
match
=
matcher
.
match
(
version
)
except
UnsupportedVersionError
:
match
=
False
if
match
:
result
.
add
(
provider
)
break
return
result
def
try_to_replace
(
self
,
provider
,
other
,
problems
):
"""
Attempt to replace one provider with another. This is typically used
when resolving dependencies from multiple sources, e.g. A requires
(B >= 1.0) while C requires (B >= 1.1).
For successful replacement, ``provider`` must meet all the requirements
which ``other`` fulfills.
:param provider: The provider we are trying to replace with.
:param other: The provider we're trying to replace.
:param problems: If False is returned, this will contain what
problems prevented replacement. This is currently
a tuple of the literal string 'cantreplace',
``provider``, ``other`` and the set of requirements
that ``provider`` couldn't fulfill.
:return: True if we can replace ``other`` with ``provider``, else
False.
"""
rlist
=
self
.
reqts
[
other
]
unmatched
=
set
()
for
s
in
rlist
:
matcher
=
self
.
get_matcher
(
s
)
if
not
matcher
.
match
(
provider
.
version
):
unmatched
.
add
(
s
)
if
unmatched
:
# can't replace other with provider
problems
.
add
((
'cantreplace'
,
provider
,
other
,
frozenset
(
unmatched
)))
result
=
False
else
:
# can replace other with provider
self
.
remove_distribution
(
other
)
del
self
.
reqts
[
other
]
for
s
in
rlist
:
self
.
reqts
.
setdefault
(
provider
,
set
())
.
add
(
s
)
self
.
add_distribution
(
provider
)
result
=
True
return
result
def
find
(
self
,
requirement
,
meta_extras
=
None
,
prereleases
=
False
):
"""
Find a distribution and all distributions it depends on.
:param requirement: The requirement specifying the distribution to
find, or a Distribution instance.
:param meta_extras: A list of meta extras such as :test:, :build: and
so on.
:param prereleases: If ``True``, allow pre-release versions to be
returned - otherwise, don't return prereleases
unless they're all that's available.
Return a set of :class:`Distribution` instances and a set of
problems.
The distributions returned should be such that they have the
:attr:`required` attribute set to ``True`` if they were
from the ``requirement`` passed to ``find()``, and they have the
:attr:`build_time_dependency` attribute set to ``True`` unless they
are post-installation dependencies of the ``requirement``.
The problems should be a tuple consisting of the string
``'unsatisfied'`` and the requirement which couldn't be satisfied
by any distribution known to the locator.
"""
self
.
provided
=
{}
self
.
dists
=
{}
self
.
dists_by_name
=
{}
self
.
reqts
=
{}
meta_extras
=
set
(
meta_extras
or
[])
if
':*:'
in
meta_extras
:
meta_extras
.
remove
(
':*:'
)
# :meta: and :run: are implicitly included
meta_extras
|=
set
([
':test:'
,
':build:'
,
':dev:'
])
if
isinstance
(
requirement
,
Distribution
):
dist
=
odist
=
requirement
logger
.
debug
(
'passed
%
s as requirement'
,
odist
)
else
:
dist
=
odist
=
self
.
locator
.
locate
(
requirement
,
prereleases
=
prereleases
)
if
dist
is
None
:
raise
DistlibException
(
'Unable to locate
%
r'
%
requirement
)
logger
.
debug
(
'located
%
s'
,
odist
)
dist
.
requested
=
True
problems
=
set
()
todo
=
set
([
dist
])
install_dists
=
set
([
odist
])
while
todo
:
dist
=
todo
.
pop
()
name
=
dist
.
key
# case-insensitive
if
name
not
in
self
.
dists_by_name
:
self
.
add_distribution
(
dist
)
else
:
#import pdb; pdb.set_trace()
other
=
self
.
dists_by_name
[
name
]
if
other
!=
dist
:
self
.
try_to_replace
(
dist
,
other
,
problems
)
ireqts
=
dist
.
run_requires
|
dist
.
meta_requires
sreqts
=
dist
.
build_requires
ereqts
=
set
()
if
meta_extras
and
dist
in
install_dists
:
for
key
in
(
'test'
,
'build'
,
'dev'
):
e
=
':
%
s:'
%
key
if
e
in
meta_extras
:
ereqts
|=
getattr
(
dist
,
'
%
s_requires'
%
key
)
all_reqts
=
ireqts
|
sreqts
|
ereqts
for
r
in
all_reqts
:
providers
=
self
.
find_providers
(
r
)
if
not
providers
:
logger
.
debug
(
'No providers found for
%
r'
,
r
)
provider
=
self
.
locator
.
locate
(
r
,
prereleases
=
prereleases
)
# If no provider is found and we didn't consider
# prereleases, consider them now.
if
provider
is
None
and
not
prereleases
:
provider
=
self
.
locator
.
locate
(
r
,
prereleases
=
True
)
if
provider
is
None
:
logger
.
debug
(
'Cannot satisfy
%
r'
,
r
)
problems
.
add
((
'unsatisfied'
,
r
))
else
:
n
,
v
=
provider
.
key
,
provider
.
version
if
(
n
,
v
)
not
in
self
.
dists
:
todo
.
add
(
provider
)
providers
.
add
(
provider
)
if
r
in
ireqts
and
dist
in
install_dists
:
install_dists
.
add
(
provider
)
logger
.
debug
(
'Adding
%
s to install_dists'
,
provider
.
name_and_version
)
for
p
in
providers
:
name
=
p
.
key
if
name
not
in
self
.
dists_by_name
:
self
.
reqts
.
setdefault
(
p
,
set
())
.
add
(
r
)
else
:
other
=
self
.
dists_by_name
[
name
]
if
other
!=
p
:
# see if other can be replaced by p
self
.
try_to_replace
(
p
,
other
,
problems
)
dists
=
set
(
self
.
dists
.
values
())
for
dist
in
dists
:
dist
.
build_time_dependency
=
dist
not
in
install_dists
if
dist
.
build_time_dependency
:
logger
.
debug
(
'
%
s is a build-time dependency only.'
,
dist
.
name_and_version
)
logger
.
debug
(
'find done for
%
s'
,
odist
)
return
dists
,
problems
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment