Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
N
news
Project
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Sartika Aritonang
news
Commits
4ce6afeb
Commit
4ce6afeb
authored
May 29, 2020
by
Sartika Aritonang
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Upload New File
parent
ac64bc4b
Show whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
498 additions
and
0 deletions
+498
-0
pyopenssl.py
...ib/site-packages/pip/_vendor/urllib3/contrib/pyopenssl.py
+498
-0
No files found.
stbi/Lib/site-packages/pip/_vendor/urllib3/contrib/pyopenssl.py
0 → 100644
View file @
4ce6afeb
"""
SSL with SNI_-support for Python 2. Follow these instructions if you would
like to verify SSL certificates in Python 2. Note, the default libraries do
*not* do certificate checking; you need to do additional work to validate
certificates yourself.
This needs the following packages installed:
* pyOpenSSL (tested with 16.0.0)
* cryptography (minimum 1.3.4, from pyopenssl)
* idna (minimum 2.0, from cryptography)
However, pyopenssl depends on cryptography, which depends on idna, so while we
use all three directly here we end up having relatively few packages required.
You can install them with the following command:
pip install pyopenssl cryptography idna
To activate certificate checking, call
:func:`~urllib3.contrib.pyopenssl.inject_into_urllib3` from your Python code
before you begin making HTTP requests. This can be done in a ``sitecustomize``
module, or at any other time before your application begins using ``urllib3``,
like this::
try:
import urllib3.contrib.pyopenssl
urllib3.contrib.pyopenssl.inject_into_urllib3()
except ImportError:
pass
Now you can use :mod:`urllib3` as you normally would, and it will support SNI
when the required modules are installed.
Activating this module also has the positive side effect of disabling SSL/TLS
compression in Python 2 (see `CRIME attack`_).
If you want to configure the default list of supported cipher suites, you can
set the ``urllib3.contrib.pyopenssl.DEFAULT_SSL_CIPHER_LIST`` variable.
.. _sni: https://en.wikipedia.org/wiki/Server_Name_Indication
.. _crime attack: https://en.wikipedia.org/wiki/CRIME_(security_exploit)
"""
from
__future__
import
absolute_import
import
OpenSSL.SSL
from
cryptography
import
x509
from
cryptography.hazmat.backends.openssl
import
backend
as
openssl_backend
from
cryptography.hazmat.backends.openssl.x509
import
_Certificate
try
:
from
cryptography.x509
import
UnsupportedExtension
except
ImportError
:
# UnsupportedExtension is gone in cryptography >= 2.1.0
class
UnsupportedExtension
(
Exception
):
pass
from
socket
import
timeout
,
error
as
SocketError
from
io
import
BytesIO
try
:
# Platform-specific: Python 2
from
socket
import
_fileobject
except
ImportError
:
# Platform-specific: Python 3
_fileobject
=
None
from
..packages.backports.makefile
import
backport_makefile
import
logging
import
ssl
from
..packages
import
six
import
sys
from
..
import
util
__all__
=
[
"inject_into_urllib3"
,
"extract_from_urllib3"
]
# SNI always works.
HAS_SNI
=
True
# Map from urllib3 to PyOpenSSL compatible parameter-values.
_openssl_versions
=
{
util
.
PROTOCOL_TLS
:
OpenSSL
.
SSL
.
SSLv23_METHOD
,
ssl
.
PROTOCOL_TLSv1
:
OpenSSL
.
SSL
.
TLSv1_METHOD
,
}
if
hasattr
(
ssl
,
"PROTOCOL_SSLv3"
)
and
hasattr
(
OpenSSL
.
SSL
,
"SSLv3_METHOD"
):
_openssl_versions
[
ssl
.
PROTOCOL_SSLv3
]
=
OpenSSL
.
SSL
.
SSLv3_METHOD
if
hasattr
(
ssl
,
"PROTOCOL_TLSv1_1"
)
and
hasattr
(
OpenSSL
.
SSL
,
"TLSv1_1_METHOD"
):
_openssl_versions
[
ssl
.
PROTOCOL_TLSv1_1
]
=
OpenSSL
.
SSL
.
TLSv1_1_METHOD
if
hasattr
(
ssl
,
"PROTOCOL_TLSv1_2"
)
and
hasattr
(
OpenSSL
.
SSL
,
"TLSv1_2_METHOD"
):
_openssl_versions
[
ssl
.
PROTOCOL_TLSv1_2
]
=
OpenSSL
.
SSL
.
TLSv1_2_METHOD
_stdlib_to_openssl_verify
=
{
ssl
.
CERT_NONE
:
OpenSSL
.
SSL
.
VERIFY_NONE
,
ssl
.
CERT_OPTIONAL
:
OpenSSL
.
SSL
.
VERIFY_PEER
,
ssl
.
CERT_REQUIRED
:
OpenSSL
.
SSL
.
VERIFY_PEER
+
OpenSSL
.
SSL
.
VERIFY_FAIL_IF_NO_PEER_CERT
,
}
_openssl_to_stdlib_verify
=
dict
((
v
,
k
)
for
k
,
v
in
_stdlib_to_openssl_verify
.
items
())
# OpenSSL will only write 16K at a time
SSL_WRITE_BLOCKSIZE
=
16384
orig_util_HAS_SNI
=
util
.
HAS_SNI
orig_util_SSLContext
=
util
.
ssl_
.
SSLContext
log
=
logging
.
getLogger
(
__name__
)
def
inject_into_urllib3
():
"Monkey-patch urllib3 with PyOpenSSL-backed SSL-support."
_validate_dependencies_met
()
util
.
SSLContext
=
PyOpenSSLContext
util
.
ssl_
.
SSLContext
=
PyOpenSSLContext
util
.
HAS_SNI
=
HAS_SNI
util
.
ssl_
.
HAS_SNI
=
HAS_SNI
util
.
IS_PYOPENSSL
=
True
util
.
ssl_
.
IS_PYOPENSSL
=
True
def
extract_from_urllib3
():
"Undo monkey-patching by :func:`inject_into_urllib3`."
util
.
SSLContext
=
orig_util_SSLContext
util
.
ssl_
.
SSLContext
=
orig_util_SSLContext
util
.
HAS_SNI
=
orig_util_HAS_SNI
util
.
ssl_
.
HAS_SNI
=
orig_util_HAS_SNI
util
.
IS_PYOPENSSL
=
False
util
.
ssl_
.
IS_PYOPENSSL
=
False
def
_validate_dependencies_met
():
"""
Verifies that PyOpenSSL's package-level dependencies have been met.
Throws `ImportError` if they are not met.
"""
# Method added in `cryptography==1.1`; not available in older versions
from
cryptography.x509.extensions
import
Extensions
if
getattr
(
Extensions
,
"get_extension_for_class"
,
None
)
is
None
:
raise
ImportError
(
"'cryptography' module missing required functionality. "
"Try upgrading to v1.3.4 or newer."
)
# pyOpenSSL 0.14 and above use cryptography for OpenSSL bindings. The _x509
# attribute is only present on those versions.
from
OpenSSL.crypto
import
X509
x509
=
X509
()
if
getattr
(
x509
,
"_x509"
,
None
)
is
None
:
raise
ImportError
(
"'pyOpenSSL' module missing required functionality. "
"Try upgrading to v0.14 or newer."
)
def
_dnsname_to_stdlib
(
name
):
"""
Converts a dNSName SubjectAlternativeName field to the form used by the
standard library on the given Python version.
Cryptography produces a dNSName as a unicode string that was idna-decoded
from ASCII bytes. We need to idna-encode that string to get it back, and
then on Python 3 we also need to convert to unicode via UTF-8 (the stdlib
uses PyUnicode_FromStringAndSize on it, which decodes via UTF-8).
If the name cannot be idna-encoded then we return None signalling that
the name given should be skipped.
"""
def
idna_encode
(
name
):
"""
Borrowed wholesale from the Python Cryptography Project. It turns out
that we can't just safely call `idna.encode`: it can explode for
wildcard names. This avoids that problem.
"""
from
pip._vendor
import
idna
try
:
for
prefix
in
[
u"*."
,
u"."
]:
if
name
.
startswith
(
prefix
):
name
=
name
[
len
(
prefix
)
:]
return
prefix
.
encode
(
"ascii"
)
+
idna
.
encode
(
name
)
return
idna
.
encode
(
name
)
except
idna
.
core
.
IDNAError
:
return
None
# Don't send IPv6 addresses through the IDNA encoder.
if
":"
in
name
:
return
name
name
=
idna_encode
(
name
)
if
name
is
None
:
return
None
elif
sys
.
version_info
>=
(
3
,
0
):
name
=
name
.
decode
(
"utf-8"
)
return
name
def
get_subj_alt_name
(
peer_cert
):
"""
Given an PyOpenSSL certificate, provides all the subject alternative names.
"""
# Pass the cert to cryptography, which has much better APIs for this.
if
hasattr
(
peer_cert
,
"to_cryptography"
):
cert
=
peer_cert
.
to_cryptography
()
else
:
# This is technically using private APIs, but should work across all
# relevant versions before PyOpenSSL got a proper API for this.
cert
=
_Certificate
(
openssl_backend
,
peer_cert
.
_x509
)
# We want to find the SAN extension. Ask Cryptography to locate it (it's
# faster than looping in Python)
try
:
ext
=
cert
.
extensions
.
get_extension_for_class
(
x509
.
SubjectAlternativeName
)
.
value
except
x509
.
ExtensionNotFound
:
# No such extension, return the empty list.
return
[]
except
(
x509
.
DuplicateExtension
,
UnsupportedExtension
,
x509
.
UnsupportedGeneralNameType
,
UnicodeError
,
)
as
e
:
# A problem has been found with the quality of the certificate. Assume
# no SAN field is present.
log
.
warning
(
"A problem was encountered with the certificate that prevented "
"urllib3 from finding the SubjectAlternativeName field. This can "
"affect certificate validation. The error was
%
s"
,
e
,
)
return
[]
# We want to return dNSName and iPAddress fields. We need to cast the IPs
# back to strings because the match_hostname function wants them as
# strings.
# Sadly the DNS names need to be idna encoded and then, on Python 3, UTF-8
# decoded. This is pretty frustrating, but that's what the standard library
# does with certificates, and so we need to attempt to do the same.
# We also want to skip over names which cannot be idna encoded.
names
=
[
(
"DNS"
,
name
)
for
name
in
map
(
_dnsname_to_stdlib
,
ext
.
get_values_for_type
(
x509
.
DNSName
))
if
name
is
not
None
]
names
.
extend
(
(
"IP Address"
,
str
(
name
))
for
name
in
ext
.
get_values_for_type
(
x509
.
IPAddress
)
)
return
names
class
WrappedSocket
(
object
):
"""API-compatibility wrapper for Python OpenSSL's Connection-class.
Note: _makefile_refs, _drop() and _reuse() are needed for the garbage
collector of pypy.
"""
def
__init__
(
self
,
connection
,
socket
,
suppress_ragged_eofs
=
True
):
self
.
connection
=
connection
self
.
socket
=
socket
self
.
suppress_ragged_eofs
=
suppress_ragged_eofs
self
.
_makefile_refs
=
0
self
.
_closed
=
False
def
fileno
(
self
):
return
self
.
socket
.
fileno
()
# Copy-pasted from Python 3.5 source code
def
_decref_socketios
(
self
):
if
self
.
_makefile_refs
>
0
:
self
.
_makefile_refs
-=
1
if
self
.
_closed
:
self
.
close
()
def
recv
(
self
,
*
args
,
**
kwargs
):
try
:
data
=
self
.
connection
.
recv
(
*
args
,
**
kwargs
)
except
OpenSSL
.
SSL
.
SysCallError
as
e
:
if
self
.
suppress_ragged_eofs
and
e
.
args
==
(
-
1
,
"Unexpected EOF"
):
return
b
""
else
:
raise
SocketError
(
str
(
e
))
except
OpenSSL
.
SSL
.
ZeroReturnError
:
if
self
.
connection
.
get_shutdown
()
==
OpenSSL
.
SSL
.
RECEIVED_SHUTDOWN
:
return
b
""
else
:
raise
except
OpenSSL
.
SSL
.
WantReadError
:
if
not
util
.
wait_for_read
(
self
.
socket
,
self
.
socket
.
gettimeout
()):
raise
timeout
(
"The read operation timed out"
)
else
:
return
self
.
recv
(
*
args
,
**
kwargs
)
# TLS 1.3 post-handshake authentication
except
OpenSSL
.
SSL
.
Error
as
e
:
raise
ssl
.
SSLError
(
"read error:
%
r"
%
e
)
else
:
return
data
def
recv_into
(
self
,
*
args
,
**
kwargs
):
try
:
return
self
.
connection
.
recv_into
(
*
args
,
**
kwargs
)
except
OpenSSL
.
SSL
.
SysCallError
as
e
:
if
self
.
suppress_ragged_eofs
and
e
.
args
==
(
-
1
,
"Unexpected EOF"
):
return
0
else
:
raise
SocketError
(
str
(
e
))
except
OpenSSL
.
SSL
.
ZeroReturnError
:
if
self
.
connection
.
get_shutdown
()
==
OpenSSL
.
SSL
.
RECEIVED_SHUTDOWN
:
return
0
else
:
raise
except
OpenSSL
.
SSL
.
WantReadError
:
if
not
util
.
wait_for_read
(
self
.
socket
,
self
.
socket
.
gettimeout
()):
raise
timeout
(
"The read operation timed out"
)
else
:
return
self
.
recv_into
(
*
args
,
**
kwargs
)
# TLS 1.3 post-handshake authentication
except
OpenSSL
.
SSL
.
Error
as
e
:
raise
ssl
.
SSLError
(
"read error:
%
r"
%
e
)
def
settimeout
(
self
,
timeout
):
return
self
.
socket
.
settimeout
(
timeout
)
def
_send_until_done
(
self
,
data
):
while
True
:
try
:
return
self
.
connection
.
send
(
data
)
except
OpenSSL
.
SSL
.
WantWriteError
:
if
not
util
.
wait_for_write
(
self
.
socket
,
self
.
socket
.
gettimeout
()):
raise
timeout
()
continue
except
OpenSSL
.
SSL
.
SysCallError
as
e
:
raise
SocketError
(
str
(
e
))
def
sendall
(
self
,
data
):
total_sent
=
0
while
total_sent
<
len
(
data
):
sent
=
self
.
_send_until_done
(
data
[
total_sent
:
total_sent
+
SSL_WRITE_BLOCKSIZE
]
)
total_sent
+=
sent
def
shutdown
(
self
):
# FIXME rethrow compatible exceptions should we ever use this
self
.
connection
.
shutdown
()
def
close
(
self
):
if
self
.
_makefile_refs
<
1
:
try
:
self
.
_closed
=
True
return
self
.
connection
.
close
()
except
OpenSSL
.
SSL
.
Error
:
return
else
:
self
.
_makefile_refs
-=
1
def
getpeercert
(
self
,
binary_form
=
False
):
x509
=
self
.
connection
.
get_peer_certificate
()
if
not
x509
:
return
x509
if
binary_form
:
return
OpenSSL
.
crypto
.
dump_certificate
(
OpenSSL
.
crypto
.
FILETYPE_ASN1
,
x509
)
return
{
"subject"
:
(((
"commonName"
,
x509
.
get_subject
()
.
CN
),),),
"subjectAltName"
:
get_subj_alt_name
(
x509
),
}
def
version
(
self
):
return
self
.
connection
.
get_protocol_version_name
()
def
_reuse
(
self
):
self
.
_makefile_refs
+=
1
def
_drop
(
self
):
if
self
.
_makefile_refs
<
1
:
self
.
close
()
else
:
self
.
_makefile_refs
-=
1
if
_fileobject
:
# Platform-specific: Python 2
def
makefile
(
self
,
mode
,
bufsize
=-
1
):
self
.
_makefile_refs
+=
1
return
_fileobject
(
self
,
mode
,
bufsize
,
close
=
True
)
else
:
# Platform-specific: Python 3
makefile
=
backport_makefile
WrappedSocket
.
makefile
=
makefile
class
PyOpenSSLContext
(
object
):
"""
I am a wrapper class for the PyOpenSSL ``Context`` object. I am responsible
for translating the interface of the standard library ``SSLContext`` object
to calls into PyOpenSSL.
"""
def
__init__
(
self
,
protocol
):
self
.
protocol
=
_openssl_versions
[
protocol
]
self
.
_ctx
=
OpenSSL
.
SSL
.
Context
(
self
.
protocol
)
self
.
_options
=
0
self
.
check_hostname
=
False
@property
def
options
(
self
):
return
self
.
_options
@options.setter
def
options
(
self
,
value
):
self
.
_options
=
value
self
.
_ctx
.
set_options
(
value
)
@property
def
verify_mode
(
self
):
return
_openssl_to_stdlib_verify
[
self
.
_ctx
.
get_verify_mode
()]
@verify_mode.setter
def
verify_mode
(
self
,
value
):
self
.
_ctx
.
set_verify
(
_stdlib_to_openssl_verify
[
value
],
_verify_callback
)
def
set_default_verify_paths
(
self
):
self
.
_ctx
.
set_default_verify_paths
()
def
set_ciphers
(
self
,
ciphers
):
if
isinstance
(
ciphers
,
six
.
text_type
):
ciphers
=
ciphers
.
encode
(
"utf-8"
)
self
.
_ctx
.
set_cipher_list
(
ciphers
)
def
load_verify_locations
(
self
,
cafile
=
None
,
capath
=
None
,
cadata
=
None
):
if
cafile
is
not
None
:
cafile
=
cafile
.
encode
(
"utf-8"
)
if
capath
is
not
None
:
capath
=
capath
.
encode
(
"utf-8"
)
self
.
_ctx
.
load_verify_locations
(
cafile
,
capath
)
if
cadata
is
not
None
:
self
.
_ctx
.
load_verify_locations
(
BytesIO
(
cadata
))
def
load_cert_chain
(
self
,
certfile
,
keyfile
=
None
,
password
=
None
):
self
.
_ctx
.
use_certificate_chain_file
(
certfile
)
if
password
is
not
None
:
if
not
isinstance
(
password
,
six
.
binary_type
):
password
=
password
.
encode
(
"utf-8"
)
self
.
_ctx
.
set_passwd_cb
(
lambda
*
_
:
password
)
self
.
_ctx
.
use_privatekey_file
(
keyfile
or
certfile
)
def
wrap_socket
(
self
,
sock
,
server_side
=
False
,
do_handshake_on_connect
=
True
,
suppress_ragged_eofs
=
True
,
server_hostname
=
None
,
):
cnx
=
OpenSSL
.
SSL
.
Connection
(
self
.
_ctx
,
sock
)
if
isinstance
(
server_hostname
,
six
.
text_type
):
# Platform-specific: Python 3
server_hostname
=
server_hostname
.
encode
(
"utf-8"
)
if
server_hostname
is
not
None
:
cnx
.
set_tlsext_host_name
(
server_hostname
)
cnx
.
set_connect_state
()
while
True
:
try
:
cnx
.
do_handshake
()
except
OpenSSL
.
SSL
.
WantReadError
:
if
not
util
.
wait_for_read
(
sock
,
sock
.
gettimeout
()):
raise
timeout
(
"select timed out"
)
continue
except
OpenSSL
.
SSL
.
Error
as
e
:
raise
ssl
.
SSLError
(
"bad handshake:
%
r"
%
e
)
break
return
WrappedSocket
(
cnx
,
sock
)
def
_verify_callback
(
cnx
,
x509
,
err_no
,
err_depth
,
return_code
):
return
err_no
==
0
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment