Compare commits
4 Commits
test-use_d
...
mypy-clean
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1f40032998 | ||
|
|
2d1117810f | ||
|
|
c61e2fc428 | ||
|
|
4fd1ac9c4b |
@@ -6,7 +6,7 @@ import os
|
||||
import re
|
||||
import socket
|
||||
|
||||
import OpenSSL
|
||||
from OpenSSL import SSL, crypto # type: ignore # https://github.com/python/typeshed/issues/2052
|
||||
import josepy as jose
|
||||
|
||||
|
||||
@@ -25,7 +25,7 @@ logger = logging.getLogger(__name__)
|
||||
# https://www.openssl.org/docs/ssl/SSLv23_method.html). _serve_sni
|
||||
# should be changed to use "set_options" to disable SSLv2 and SSLv3,
|
||||
# in case it's used for things other than probing/serving!
|
||||
_DEFAULT_TLSSNI01_SSL_METHOD = OpenSSL.SSL.SSLv23_METHOD # type: ignore
|
||||
_DEFAULT_TLSSNI01_SSL_METHOD = SSL.SSLv23_METHOD # type: ignore
|
||||
|
||||
|
||||
class SSLSocket(object): # pylint: disable=too-few-public-methods
|
||||
@@ -64,9 +64,9 @@ class SSLSocket(object): # pylint: disable=too-few-public-methods
|
||||
logger.debug("Server name (%s) not recognized, dropping SSL",
|
||||
server_name)
|
||||
return
|
||||
new_context = OpenSSL.SSL.Context(self.method)
|
||||
new_context.set_options(OpenSSL.SSL.OP_NO_SSLv2)
|
||||
new_context.set_options(OpenSSL.SSL.OP_NO_SSLv3)
|
||||
new_context = SSL.Context(self.method)
|
||||
new_context.set_options(SSL.OP_NO_SSLv2)
|
||||
new_context.set_options(SSL.OP_NO_SSLv3)
|
||||
new_context.use_privatekey(key)
|
||||
new_context.use_certificate(cert)
|
||||
connection.set_context(new_context)
|
||||
@@ -89,18 +89,18 @@ class SSLSocket(object): # pylint: disable=too-few-public-methods
|
||||
def accept(self): # pylint: disable=missing-docstring
|
||||
sock, addr = self.sock.accept()
|
||||
|
||||
context = OpenSSL.SSL.Context(self.method)
|
||||
context.set_options(OpenSSL.SSL.OP_NO_SSLv2)
|
||||
context.set_options(OpenSSL.SSL.OP_NO_SSLv3)
|
||||
context = SSL.Context(self.method)
|
||||
context.set_options(SSL.OP_NO_SSLv2)
|
||||
context.set_options(SSL.OP_NO_SSLv3)
|
||||
context.set_tlsext_servername_callback(self._pick_certificate_cb)
|
||||
|
||||
ssl_sock = self.FakeConnection(OpenSSL.SSL.Connection(context, sock))
|
||||
ssl_sock = self.FakeConnection(SSL.Connection(context, sock))
|
||||
ssl_sock.set_accept_state()
|
||||
|
||||
logger.debug("Performing handshake with %s", addr)
|
||||
try:
|
||||
ssl_sock.do_handshake()
|
||||
except OpenSSL.SSL.Error as error:
|
||||
except SSL.Error as error:
|
||||
# _pick_certificate_cb might have returned without
|
||||
# creating SSL context (wrong server name)
|
||||
raise socket.error(error)
|
||||
@@ -128,7 +128,7 @@ def probe_sni(name, host, port=443, timeout=300,
|
||||
:rtype: OpenSSL.crypto.X509
|
||||
|
||||
"""
|
||||
context = OpenSSL.SSL.Context(method)
|
||||
context = SSL.Context(method)
|
||||
context.set_timeout(timeout)
|
||||
|
||||
socket_kwargs = {'source_address': source_address}
|
||||
@@ -145,13 +145,13 @@ def probe_sni(name, host, port=443, timeout=300,
|
||||
raise errors.Error(error)
|
||||
|
||||
with contextlib.closing(sock) as client:
|
||||
client_ssl = OpenSSL.SSL.Connection(context, client)
|
||||
client_ssl = SSL.Connection(context, client)
|
||||
client_ssl.set_connect_state()
|
||||
client_ssl.set_tlsext_host_name(name) # pyOpenSSL>=0.13
|
||||
try:
|
||||
client_ssl.do_handshake()
|
||||
client_ssl.shutdown()
|
||||
except OpenSSL.SSL.Error as error:
|
||||
except SSL.Error as error:
|
||||
raise errors.Error(error)
|
||||
return client_ssl.get_peer_certificate()
|
||||
|
||||
@@ -164,18 +164,18 @@ def make_csr(private_key_pem, domains, must_staple=False):
|
||||
OCSP Must Staple: https://tools.ietf.org/html/rfc7633).
|
||||
:returns: buffer PEM-encoded Certificate Signing Request.
|
||||
"""
|
||||
private_key = OpenSSL.crypto.load_privatekey(
|
||||
OpenSSL.crypto.FILETYPE_PEM, private_key_pem)
|
||||
csr = OpenSSL.crypto.X509Req()
|
||||
private_key = crypto.load_privatekey(
|
||||
crypto.FILETYPE_PEM, private_key_pem)
|
||||
csr = crypto.X509Req()
|
||||
extensions = [
|
||||
OpenSSL.crypto.X509Extension(
|
||||
crypto.X509Extension(
|
||||
b'subjectAltName',
|
||||
critical=False,
|
||||
value=', '.join('DNS:' + d for d in domains).encode('ascii')
|
||||
),
|
||||
]
|
||||
if must_staple:
|
||||
extensions.append(OpenSSL.crypto.X509Extension(
|
||||
extensions.append(crypto.X509Extension(
|
||||
b"1.3.6.1.5.5.7.1.24",
|
||||
critical=False,
|
||||
value=b"DER:30:03:02:01:05"))
|
||||
@@ -183,8 +183,8 @@ def make_csr(private_key_pem, domains, must_staple=False):
|
||||
csr.set_pubkey(private_key)
|
||||
csr.set_version(2)
|
||||
csr.sign(private_key, 'sha256')
|
||||
return OpenSSL.crypto.dump_certificate_request(
|
||||
OpenSSL.crypto.FILETYPE_PEM, csr)
|
||||
return crypto.dump_certificate_request(
|
||||
crypto.FILETYPE_PEM, csr)
|
||||
|
||||
def _pyopenssl_cert_or_req_all_names(loaded_cert_or_req):
|
||||
common_name = loaded_cert_or_req.get_subject().CN
|
||||
@@ -221,11 +221,11 @@ def _pyopenssl_cert_or_req_san(cert_or_req):
|
||||
parts_separator = ", "
|
||||
prefix = "DNS" + part_separator
|
||||
|
||||
if isinstance(cert_or_req, OpenSSL.crypto.X509):
|
||||
func = OpenSSL.crypto.dump_certificate
|
||||
if isinstance(cert_or_req, crypto.X509):
|
||||
func = crypto.dump_certificate
|
||||
else:
|
||||
func = OpenSSL.crypto.dump_certificate_request
|
||||
text = func(OpenSSL.crypto.FILETYPE_TEXT, cert_or_req).decode("utf-8")
|
||||
func = crypto.dump_certificate_request
|
||||
text = func(crypto.FILETYPE_TEXT, cert_or_req).decode("utf-8")
|
||||
# WARNING: this function does not support multiple SANs extensions.
|
||||
# Multiple X509v3 extensions of the same type is disallowed by RFC 5280.
|
||||
match = re.search(r"X509v3 Subject Alternative Name:(?: critical)?\s*(.*)", text)
|
||||
@@ -252,12 +252,12 @@ def gen_ss_cert(key, domains, not_before=None,
|
||||
|
||||
"""
|
||||
assert domains, "Must provide one or more hostnames for the cert."
|
||||
cert = OpenSSL.crypto.X509()
|
||||
cert = crypto.X509()
|
||||
cert.set_serial_number(int(binascii.hexlify(os.urandom(16)), 16))
|
||||
cert.set_version(2)
|
||||
|
||||
extensions = [
|
||||
OpenSSL.crypto.X509Extension(
|
||||
crypto.X509Extension(
|
||||
b"basicConstraints", True, b"CA:TRUE, pathlen:0"),
|
||||
]
|
||||
|
||||
@@ -266,7 +266,7 @@ def gen_ss_cert(key, domains, not_before=None,
|
||||
cert.set_issuer(cert.get_subject())
|
||||
|
||||
if force_san or len(domains) > 1:
|
||||
extensions.append(OpenSSL.crypto.X509Extension(
|
||||
extensions.append(crypto.X509Extension(
|
||||
b"subjectAltName",
|
||||
critical=False,
|
||||
value=b", ".join(b"DNS:" + d.encode() for d in domains)
|
||||
@@ -281,7 +281,7 @@ def gen_ss_cert(key, domains, not_before=None,
|
||||
cert.sign(key, "sha256")
|
||||
return cert
|
||||
|
||||
def dump_pyopenssl_chain(chain, filetype=OpenSSL.crypto.FILETYPE_PEM):
|
||||
def dump_pyopenssl_chain(chain, filetype=crypto.FILETYPE_PEM):
|
||||
"""Dump certificate chain into a bundle.
|
||||
|
||||
:param list chain: List of `OpenSSL.crypto.X509` (or wrapped in
|
||||
@@ -298,7 +298,7 @@ def dump_pyopenssl_chain(chain, filetype=OpenSSL.crypto.FILETYPE_PEM):
|
||||
if isinstance(cert, jose.ComparableX509):
|
||||
# pylint: disable=protected-access
|
||||
cert = cert.wrapped
|
||||
return OpenSSL.crypto.dump_certificate(filetype, cert)
|
||||
return crypto.dump_certificate(filetype, cert)
|
||||
|
||||
# assumes that OpenSSL.crypto.dump_certificate includes ending
|
||||
# newline character
|
||||
|
||||
@@ -10,7 +10,7 @@ import unittest
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
import josepy as jose
|
||||
import OpenSSL
|
||||
from OpenSSL import crypto
|
||||
|
||||
|
||||
def vector_path(*names):
|
||||
@@ -39,8 +39,8 @@ def _guess_loader(filename, loader_pem, loader_der):
|
||||
def load_cert(*names):
|
||||
"""Load certificate."""
|
||||
loader = _guess_loader(
|
||||
names[-1], OpenSSL.crypto.FILETYPE_PEM, OpenSSL.crypto.FILETYPE_ASN1)
|
||||
return OpenSSL.crypto.load_certificate(loader, load_vector(*names))
|
||||
names[-1], crypto.FILETYPE_PEM, crypto.FILETYPE_ASN1)
|
||||
return crypto.load_certificate(loader, load_vector(*names))
|
||||
|
||||
|
||||
def load_comparable_cert(*names):
|
||||
@@ -51,8 +51,8 @@ def load_comparable_cert(*names):
|
||||
def load_csr(*names):
|
||||
"""Load certificate request."""
|
||||
loader = _guess_loader(
|
||||
names[-1], OpenSSL.crypto.FILETYPE_PEM, OpenSSL.crypto.FILETYPE_ASN1)
|
||||
return OpenSSL.crypto.load_certificate_request(loader, load_vector(*names))
|
||||
names[-1], crypto.FILETYPE_PEM, crypto.FILETYPE_ASN1)
|
||||
return crypto.load_certificate_request(loader, load_vector(*names))
|
||||
|
||||
|
||||
def load_comparable_csr(*names):
|
||||
@@ -71,8 +71,8 @@ def load_rsa_private_key(*names):
|
||||
def load_pyopenssl_private_key(*names):
|
||||
"""Load pyOpenSSL private key."""
|
||||
loader = _guess_loader(
|
||||
names[-1], OpenSSL.crypto.FILETYPE_PEM, OpenSSL.crypto.FILETYPE_ASN1)
|
||||
return OpenSSL.crypto.load_privatekey(loader, load_vector(*names))
|
||||
names[-1], crypto.FILETYPE_PEM, crypto.FILETYPE_ASN1)
|
||||
return crypto.load_privatekey(loader, load_vector(*names))
|
||||
|
||||
|
||||
def skip_unless(condition, reason): # pragma: no cover
|
||||
|
||||
@@ -50,7 +50,8 @@ class AuthenticatorTest(test_util.TempDirTestCase, dns_test_common.BaseAuthentic
|
||||
|
||||
|
||||
class DigitalOceanClientTest(unittest.TestCase):
|
||||
id = 1
|
||||
|
||||
id_num = 1
|
||||
record_prefix = "_acme-challenge"
|
||||
record_name = record_prefix + "." + DOMAIN
|
||||
record_content = "bar"
|
||||
@@ -70,7 +71,7 @@ class DigitalOceanClientTest(unittest.TestCase):
|
||||
|
||||
domain_mock = mock.MagicMock()
|
||||
domain_mock.name = DOMAIN
|
||||
domain_mock.create_new_domain_record.return_value = {'domain_record': {'id': self.id}}
|
||||
domain_mock.create_new_domain_record.return_value = {'domain_record': {'id': self.id_num}}
|
||||
|
||||
self.manager.get_all_domains.return_value = [wrong_domain_mock, domain_mock]
|
||||
|
||||
|
||||
@@ -13,7 +13,7 @@ import pyrfc3339
|
||||
import six
|
||||
import zope.component
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography import x509
|
||||
from cryptography import x509 # type: ignore
|
||||
|
||||
from acme import crypto_util as acme_crypto_util
|
||||
|
||||
|
||||
@@ -216,11 +216,12 @@ class CertificatesTest(BaseCertManagerTest):
|
||||
cert.is_test_cert = False
|
||||
parsed_certs = [cert]
|
||||
|
||||
mock_config = mock.MagicMock(certname=None, lineagename=None)
|
||||
# pylint: disable=protected-access
|
||||
|
||||
# pylint: disable=protected-access
|
||||
get_report = lambda: cert_manager._report_human_readable(mock_config, parsed_certs)
|
||||
|
||||
mock_config = mock.MagicMock(certname=None, lineagename=None)
|
||||
# pylint: disable=protected-access
|
||||
out = get_report()
|
||||
self.assertTrue("INVALID: EXPIRED" in out)
|
||||
|
||||
|
||||
@@ -54,7 +54,7 @@ _INITIAL_PID = os.getpid()
|
||||
# the dict are attempted to be cleaned up at program exit. If the
|
||||
# program exits before the lock is cleaned up, it is automatically
|
||||
# released, but the file isn't deleted.
|
||||
_LOCKS = OrderedDict()
|
||||
_LOCKS = OrderedDict() # type: OrderedDict[str, lock.LockFile]
|
||||
|
||||
|
||||
def run_script(params, log=logger.error):
|
||||
|
||||
6
mypy.ini
Normal file
6
mypy.ini
Normal file
@@ -0,0 +1,6 @@
|
||||
[mypy]
|
||||
python_version = 2.7
|
||||
ignore_missing_imports = True
|
||||
|
||||
[mypy-acme]
|
||||
check_untyped_defs = True
|
||||
Reference in New Issue
Block a user