Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
N
news
Project
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Sartika Aritonang
news
Commits
c985a374
Commit
c985a374
authored
May 29, 2020
by
Sartika Aritonang
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Upload New File
parent
cefd583c
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
511 additions
and
0 deletions
+511
-0
prepare.py
stbi/Lib/site-packages/pip/_internal/operations/prepare.py
+511
-0
No files found.
stbi/Lib/site-packages/pip/_internal/operations/prepare.py
0 → 100644
View file @
c985a374
"""Prepares a distribution for installation
"""
# The following comment should be removed at some point in the future.
# mypy: strict-optional=False
import
logging
import
mimetypes
import
os
import
shutil
from
pip._vendor
import
requests
from
pip._vendor.six
import
PY2
from
pip._internal.distributions
import
(
make_distribution_for_install_requirement
,
)
from
pip._internal.distributions.installed
import
InstalledDistribution
from
pip._internal.exceptions
import
(
DirectoryUrlHashUnsupported
,
HashMismatch
,
HashUnpinned
,
InstallationError
,
PreviousBuildDirError
,
VcsHashUnsupported
,
)
from
pip._internal.utils.hashes
import
MissingHashes
from
pip._internal.utils.logging
import
indent_log
from
pip._internal.utils.misc
import
display_path
,
hide_url
from
pip._internal.utils.temp_dir
import
TempDirectory
from
pip._internal.utils.typing
import
MYPY_CHECK_RUNNING
from
pip._internal.utils.unpacking
import
unpack_file
from
pip._internal.vcs
import
vcs
if
MYPY_CHECK_RUNNING
:
from
typing
import
(
Callable
,
List
,
Optional
,
Tuple
,
)
from
mypy_extensions
import
TypedDict
from
pip._internal.distributions
import
AbstractDistribution
from
pip._internal.index.package_finder
import
PackageFinder
from
pip._internal.models.link
import
Link
from
pip._internal.network.download
import
Downloader
from
pip._internal.req.req_install
import
InstallRequirement
from
pip._internal.req.req_tracker
import
RequirementTracker
from
pip._internal.utils.hashes
import
Hashes
if
PY2
:
CopytreeKwargs
=
TypedDict
(
'CopytreeKwargs'
,
{
'ignore'
:
Callable
[[
str
,
List
[
str
]],
List
[
str
]],
'symlinks'
:
bool
,
},
total
=
False
,
)
else
:
CopytreeKwargs
=
TypedDict
(
'CopytreeKwargs'
,
{
'copy_function'
:
Callable
[[
str
,
str
],
None
],
'ignore'
:
Callable
[[
str
,
List
[
str
]],
List
[
str
]],
'ignore_dangling_symlinks'
:
bool
,
'symlinks'
:
bool
,
},
total
=
False
,
)
logger
=
logging
.
getLogger
(
__name__
)
def
_get_prepared_distribution
(
req
,
# type: InstallRequirement
req_tracker
,
# type: RequirementTracker
finder
,
# type: PackageFinder
build_isolation
# type: bool
):
# type: (...) -> AbstractDistribution
"""Prepare a distribution for installation.
"""
abstract_dist
=
make_distribution_for_install_requirement
(
req
)
with
req_tracker
.
track
(
req
):
abstract_dist
.
prepare_distribution_metadata
(
finder
,
build_isolation
)
return
abstract_dist
def
unpack_vcs_link
(
link
,
location
):
# type: (Link, str) -> None
vcs_backend
=
vcs
.
get_backend_for_scheme
(
link
.
scheme
)
assert
vcs_backend
is
not
None
vcs_backend
.
unpack
(
location
,
url
=
hide_url
(
link
.
url
))
class
File
(
object
):
def
__init__
(
self
,
path
,
content_type
):
# type: (str, str) -> None
self
.
path
=
path
self
.
content_type
=
content_type
def
get_http_url
(
link
,
# type: Link
downloader
,
# type: Downloader
download_dir
=
None
,
# type: Optional[str]
hashes
=
None
,
# type: Optional[Hashes]
):
# type: (...) -> File
temp_dir
=
TempDirectory
(
kind
=
"unpack"
,
globally_managed
=
True
)
# If a download dir is specified, is the file already downloaded there?
already_downloaded_path
=
None
if
download_dir
:
already_downloaded_path
=
_check_download_dir
(
link
,
download_dir
,
hashes
)
if
already_downloaded_path
:
from_path
=
already_downloaded_path
content_type
=
mimetypes
.
guess_type
(
from_path
)[
0
]
else
:
# let's download to a tmp dir
from_path
,
content_type
=
_download_http_url
(
link
,
downloader
,
temp_dir
.
path
,
hashes
)
return
File
(
from_path
,
content_type
)
def
get_file_url
(
link
,
# type: Link
download_dir
=
None
,
# type: Optional[str]
hashes
=
None
# type: Optional[Hashes]
):
# type: (...) -> File
"""Get file and optionally check its hash.
"""
# If a download dir is specified, is the file already there and valid?
already_downloaded_path
=
None
if
download_dir
:
already_downloaded_path
=
_check_download_dir
(
link
,
download_dir
,
hashes
)
if
already_downloaded_path
:
from_path
=
already_downloaded_path
else
:
from_path
=
link
.
file_path
# If --require-hashes is off, `hashes` is either empty, the
# link's embedded hash, or MissingHashes; it is required to
# match. If --require-hashes is on, we are satisfied by any
# hash in `hashes` matching: a URL-based or an option-based
# one; no internet-sourced hash will be in `hashes`.
if
hashes
:
hashes
.
check_against_path
(
from_path
)
content_type
=
mimetypes
.
guess_type
(
from_path
)[
0
]
return
File
(
from_path
,
content_type
)
def
unpack_url
(
link
,
# type: Link
location
,
# type: str
downloader
,
# type: Downloader
download_dir
=
None
,
# type: Optional[str]
hashes
=
None
,
# type: Optional[Hashes]
):
# type: (...) -> Optional[File]
"""Unpack link into location, downloading if required.
:param hashes: A Hashes object, one of whose embedded hashes must match,
or HashMismatch will be raised. If the Hashes is empty, no matches are
required, and unhashable types of requirements (like VCS ones, which
would ordinarily raise HashUnsupported) are allowed.
"""
# non-editable vcs urls
if
link
.
is_vcs
:
unpack_vcs_link
(
link
,
location
)
return
None
# If it's a url to a local directory, we build in-place.
# There is nothing to be done here.
if
link
.
is_existing_dir
():
return
None
# file urls
if
link
.
is_file
:
file
=
get_file_url
(
link
,
download_dir
,
hashes
=
hashes
)
# http urls
else
:
file
=
get_http_url
(
link
,
downloader
,
download_dir
,
hashes
=
hashes
,
)
# unpack the archive to the build dir location. even when only downloading
# archives, they have to be unpacked to parse dependencies
unpack_file
(
file
.
path
,
location
,
file
.
content_type
)
return
file
def
_download_http_url
(
link
,
# type: Link
downloader
,
# type: Downloader
temp_dir
,
# type: str
hashes
,
# type: Optional[Hashes]
):
# type: (...) -> Tuple[str, str]
"""Download link url into temp_dir using provided session"""
download
=
downloader
(
link
)
file_path
=
os
.
path
.
join
(
temp_dir
,
download
.
filename
)
with
open
(
file_path
,
'wb'
)
as
content_file
:
for
chunk
in
download
.
chunks
:
content_file
.
write
(
chunk
)
if
hashes
:
hashes
.
check_against_path
(
file_path
)
return
file_path
,
download
.
response
.
headers
.
get
(
'content-type'
,
''
)
def
_check_download_dir
(
link
,
download_dir
,
hashes
):
# type: (Link, str, Optional[Hashes]) -> Optional[str]
""" Check download_dir for previously downloaded file with correct hash
If a correct file is found return its path else None
"""
download_path
=
os
.
path
.
join
(
download_dir
,
link
.
filename
)
if
not
os
.
path
.
exists
(
download_path
):
return
None
# If already downloaded, does its hash match?
logger
.
info
(
'File was already downloaded
%
s'
,
download_path
)
if
hashes
:
try
:
hashes
.
check_against_path
(
download_path
)
except
HashMismatch
:
logger
.
warning
(
'Previously-downloaded file
%
s has bad hash. '
'Re-downloading.'
,
download_path
)
os
.
unlink
(
download_path
)
return
None
return
download_path
class
RequirementPreparer
(
object
):
"""Prepares a Requirement
"""
def
__init__
(
self
,
build_dir
,
# type: str
download_dir
,
# type: Optional[str]
src_dir
,
# type: str
wheel_download_dir
,
# type: Optional[str]
build_isolation
,
# type: bool
req_tracker
,
# type: RequirementTracker
downloader
,
# type: Downloader
finder
,
# type: PackageFinder
require_hashes
,
# type: bool
use_user_site
,
# type: bool
):
# type: (...) -> None
super
(
RequirementPreparer
,
self
)
.
__init__
()
self
.
src_dir
=
src_dir
self
.
build_dir
=
build_dir
self
.
req_tracker
=
req_tracker
self
.
downloader
=
downloader
self
.
finder
=
finder
# Where still-packed archives should be written to. If None, they are
# not saved, and are deleted immediately after unpacking.
self
.
download_dir
=
download_dir
# Where still-packed .whl files should be written to. If None, they are
# written to the download_dir parameter. Separate to download_dir to
# permit only keeping wheel archives for pip wheel.
self
.
wheel_download_dir
=
wheel_download_dir
# NOTE
# download_dir and wheel_download_dir overlap semantically and may
# be combined if we're willing to have non-wheel archives present in
# the wheelhouse output by 'pip wheel'.
# Is build isolation allowed?
self
.
build_isolation
=
build_isolation
# Should hash-checking be required?
self
.
require_hashes
=
require_hashes
# Should install in user site-packages?
self
.
use_user_site
=
use_user_site
@property
def
_download_should_save
(
self
):
# type: () -> bool
if
not
self
.
download_dir
:
return
False
if
os
.
path
.
exists
(
self
.
download_dir
):
return
True
logger
.
critical
(
'Could not find download directory'
)
raise
InstallationError
(
"Could not find or access download directory '{}'"
.
format
(
self
.
download_dir
))
def
prepare_linked_requirement
(
self
,
req
,
# type: InstallRequirement
):
# type: (...) -> AbstractDistribution
"""Prepare a requirement that would be obtained from req.link
"""
assert
req
.
link
link
=
req
.
link
# TODO: Breakup into smaller functions
if
link
.
scheme
==
'file'
:
path
=
link
.
file_path
logger
.
info
(
'Processing
%
s'
,
display_path
(
path
))
else
:
logger
.
info
(
'Collecting
%
s'
,
req
.
req
or
req
)
download_dir
=
self
.
download_dir
if
link
.
is_wheel
and
self
.
wheel_download_dir
:
# when doing 'pip wheel` we download wheels to a
# dedicated dir.
download_dir
=
self
.
wheel_download_dir
if
link
.
is_wheel
:
if
download_dir
:
# When downloading, we only unpack wheels to get
# metadata.
autodelete_unpacked
=
True
else
:
# When installing a wheel, we use the unpacked
# wheel.
autodelete_unpacked
=
False
else
:
# We always delete unpacked sdists after pip runs.
autodelete_unpacked
=
True
with
indent_log
():
# Since source_dir is only set for editable requirements.
assert
req
.
source_dir
is
None
if
link
.
is_existing_dir
():
# Build local directories in place.
req
.
source_dir
=
link
.
file_path
else
:
req
.
ensure_has_source_dir
(
self
.
build_dir
,
autodelete_unpacked
)
# If a checkout exists, it's unwise to keep going. version
# inconsistencies are logged later, but do not fail the
# installation.
# FIXME: this won't upgrade when there's an existing
# package unpacked in `req.source_dir`
if
os
.
path
.
exists
(
os
.
path
.
join
(
req
.
source_dir
,
'setup.py'
)):
raise
PreviousBuildDirError
(
"pip can't proceed with requirements '{}' due to a"
" pre-existing build directory ({}). This is "
"likely due to a previous installation that failed"
". pip is being responsible and not assuming it "
"can delete this. Please delete it and try again."
.
format
(
req
,
req
.
source_dir
)
)
# Now that we have the real link, we can tell what kind of
# requirements we have and raise some more informative errors
# than otherwise. (For example, we can raise VcsHashUnsupported
# for a VCS URL rather than HashMissing.)
if
self
.
require_hashes
:
# We could check these first 2 conditions inside
# unpack_url and save repetition of conditions, but then
# we would report less-useful error messages for
# unhashable requirements, complaining that there's no
# hash provided.
if
link
.
is_vcs
:
raise
VcsHashUnsupported
()
elif
link
.
is_existing_dir
():
raise
DirectoryUrlHashUnsupported
()
if
not
req
.
original_link
and
not
req
.
is_pinned
:
# Unpinned packages are asking for trouble when a new
# version is uploaded. This isn't a security check, but
# it saves users a surprising hash mismatch in the
# future.
#
# file:/// URLs aren't pinnable, so don't complain
# about them not being pinned.
raise
HashUnpinned
()
hashes
=
req
.
hashes
(
trust_internet
=
not
self
.
require_hashes
)
if
self
.
require_hashes
and
not
hashes
:
# Known-good hashes are missing for this requirement, so
# shim it with a facade object that will provoke hash
# computation and then raise a HashMissing exception
# showing the user what the hash should be.
hashes
=
MissingHashes
()
try
:
local_file
=
unpack_url
(
link
,
req
.
source_dir
,
self
.
downloader
,
download_dir
,
hashes
=
hashes
,
)
except
requests
.
HTTPError
as
exc
:
logger
.
critical
(
'Could not install requirement
%
s because of error
%
s'
,
req
,
exc
,
)
raise
InstallationError
(
'Could not install requirement {} because of HTTP '
'error {} for URL {}'
.
format
(
req
,
exc
,
link
)
)
# For use in later processing, preserve the file path on the
# requirement.
if
local_file
:
req
.
local_file_path
=
local_file
.
path
abstract_dist
=
_get_prepared_distribution
(
req
,
self
.
req_tracker
,
self
.
finder
,
self
.
build_isolation
,
)
if
download_dir
:
if
link
.
is_existing_dir
():
logger
.
info
(
'Link is a directory, ignoring download_dir'
)
elif
local_file
:
download_location
=
os
.
path
.
join
(
download_dir
,
link
.
filename
)
if
not
os
.
path
.
exists
(
download_location
):
shutil
.
copy
(
local_file
.
path
,
download_location
)
logger
.
info
(
'Saved
%
s'
,
display_path
(
download_location
)
)
if
self
.
_download_should_save
:
# Make a .zip of the source_dir we already created.
if
link
.
is_vcs
:
req
.
archive
(
self
.
download_dir
)
return
abstract_dist
def
prepare_editable_requirement
(
self
,
req
,
# type: InstallRequirement
):
# type: (...) -> AbstractDistribution
"""Prepare an editable requirement
"""
assert
req
.
editable
,
"cannot prepare a non-editable req as editable"
logger
.
info
(
'Obtaining
%
s'
,
req
)
with
indent_log
():
if
self
.
require_hashes
:
raise
InstallationError
(
'The editable requirement {} cannot be installed when '
'requiring hashes, because there is no single file to '
'hash.'
.
format
(
req
)
)
req
.
ensure_has_source_dir
(
self
.
src_dir
)
req
.
update_editable
(
not
self
.
_download_should_save
)
abstract_dist
=
_get_prepared_distribution
(
req
,
self
.
req_tracker
,
self
.
finder
,
self
.
build_isolation
,
)
if
self
.
_download_should_save
:
req
.
archive
(
self
.
download_dir
)
req
.
check_if_exists
(
self
.
use_user_site
)
return
abstract_dist
def
prepare_installed_requirement
(
self
,
req
,
# type: InstallRequirement
skip_reason
# type: str
):
# type: (...) -> AbstractDistribution
"""Prepare an already-installed requirement
"""
assert
req
.
satisfied_by
,
"req should have been satisfied but isn't"
assert
skip_reason
is
not
None
,
(
"did not get skip reason skipped but req.satisfied_by "
"is set to {}"
.
format
(
req
.
satisfied_by
)
)
logger
.
info
(
'Requirement
%
s:
%
s (
%
s)'
,
skip_reason
,
req
,
req
.
satisfied_by
.
version
)
with
indent_log
():
if
self
.
require_hashes
:
logger
.
debug
(
'Since it is already installed, we are trusting this '
'package without checking its hash. To ensure a '
'completely repeatable environment, install into an '
'empty virtualenv.'
)
abstract_dist
=
InstalledDistribution
(
req
)
return
abstract_dist
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment