Compare commits

...

4 Commits

Author SHA1 Message Date
Erica Portnoy
1f40032998 check_untyped_defs in mypy with clean output for acme 2018-04-13 14:27:33 -07:00
Erica Portnoy
2d1117810f Merge branch 'master' into mypy-clean 2018-04-12 18:31:46 -07:00
Erica Portnoy
c61e2fc428 Get mypy running with clean output 2018-04-12 17:52:40 -07:00
Erica Portnoy
4fd1ac9c4b extract mypy flags into mypy.ini file 2018-04-12 16:15:59 -07:00
8 changed files with 51 additions and 43 deletions

View File

@@ -6,7 +6,7 @@ import os
import re
import socket
import OpenSSL
from OpenSSL import SSL, crypto # type: ignore # https://github.com/python/typeshed/issues/2052
import josepy as jose
@@ -25,7 +25,7 @@ logger = logging.getLogger(__name__)
# https://www.openssl.org/docs/ssl/SSLv23_method.html). _serve_sni
# should be changed to use "set_options" to disable SSLv2 and SSLv3,
# in case it's used for things other than probing/serving!
_DEFAULT_TLSSNI01_SSL_METHOD = OpenSSL.SSL.SSLv23_METHOD # type: ignore
_DEFAULT_TLSSNI01_SSL_METHOD = SSL.SSLv23_METHOD # type: ignore
class SSLSocket(object): # pylint: disable=too-few-public-methods
@@ -64,9 +64,9 @@ class SSLSocket(object): # pylint: disable=too-few-public-methods
logger.debug("Server name (%s) not recognized, dropping SSL",
server_name)
return
new_context = OpenSSL.SSL.Context(self.method)
new_context.set_options(OpenSSL.SSL.OP_NO_SSLv2)
new_context.set_options(OpenSSL.SSL.OP_NO_SSLv3)
new_context = SSL.Context(self.method)
new_context.set_options(SSL.OP_NO_SSLv2)
new_context.set_options(SSL.OP_NO_SSLv3)
new_context.use_privatekey(key)
new_context.use_certificate(cert)
connection.set_context(new_context)
@@ -89,18 +89,18 @@ class SSLSocket(object): # pylint: disable=too-few-public-methods
def accept(self): # pylint: disable=missing-docstring
sock, addr = self.sock.accept()
context = OpenSSL.SSL.Context(self.method)
context.set_options(OpenSSL.SSL.OP_NO_SSLv2)
context.set_options(OpenSSL.SSL.OP_NO_SSLv3)
context = SSL.Context(self.method)
context.set_options(SSL.OP_NO_SSLv2)
context.set_options(SSL.OP_NO_SSLv3)
context.set_tlsext_servername_callback(self._pick_certificate_cb)
ssl_sock = self.FakeConnection(OpenSSL.SSL.Connection(context, sock))
ssl_sock = self.FakeConnection(SSL.Connection(context, sock))
ssl_sock.set_accept_state()
logger.debug("Performing handshake with %s", addr)
try:
ssl_sock.do_handshake()
except OpenSSL.SSL.Error as error:
except SSL.Error as error:
# _pick_certificate_cb might have returned without
# creating SSL context (wrong server name)
raise socket.error(error)
@@ -128,7 +128,7 @@ def probe_sni(name, host, port=443, timeout=300,
:rtype: OpenSSL.crypto.X509
"""
context = OpenSSL.SSL.Context(method)
context = SSL.Context(method)
context.set_timeout(timeout)
socket_kwargs = {'source_address': source_address}
@@ -145,13 +145,13 @@ def probe_sni(name, host, port=443, timeout=300,
raise errors.Error(error)
with contextlib.closing(sock) as client:
client_ssl = OpenSSL.SSL.Connection(context, client)
client_ssl = SSL.Connection(context, client)
client_ssl.set_connect_state()
client_ssl.set_tlsext_host_name(name) # pyOpenSSL>=0.13
try:
client_ssl.do_handshake()
client_ssl.shutdown()
except OpenSSL.SSL.Error as error:
except SSL.Error as error:
raise errors.Error(error)
return client_ssl.get_peer_certificate()
@@ -164,18 +164,18 @@ def make_csr(private_key_pem, domains, must_staple=False):
OCSP Must Staple: https://tools.ietf.org/html/rfc7633).
:returns: buffer PEM-encoded Certificate Signing Request.
"""
private_key = OpenSSL.crypto.load_privatekey(
OpenSSL.crypto.FILETYPE_PEM, private_key_pem)
csr = OpenSSL.crypto.X509Req()
private_key = crypto.load_privatekey(
crypto.FILETYPE_PEM, private_key_pem)
csr = crypto.X509Req()
extensions = [
OpenSSL.crypto.X509Extension(
crypto.X509Extension(
b'subjectAltName',
critical=False,
value=', '.join('DNS:' + d for d in domains).encode('ascii')
),
]
if must_staple:
extensions.append(OpenSSL.crypto.X509Extension(
extensions.append(crypto.X509Extension(
b"1.3.6.1.5.5.7.1.24",
critical=False,
value=b"DER:30:03:02:01:05"))
@@ -183,8 +183,8 @@ def make_csr(private_key_pem, domains, must_staple=False):
csr.set_pubkey(private_key)
csr.set_version(2)
csr.sign(private_key, 'sha256')
return OpenSSL.crypto.dump_certificate_request(
OpenSSL.crypto.FILETYPE_PEM, csr)
return crypto.dump_certificate_request(
crypto.FILETYPE_PEM, csr)
def _pyopenssl_cert_or_req_all_names(loaded_cert_or_req):
common_name = loaded_cert_or_req.get_subject().CN
@@ -221,11 +221,11 @@ def _pyopenssl_cert_or_req_san(cert_or_req):
parts_separator = ", "
prefix = "DNS" + part_separator
if isinstance(cert_or_req, OpenSSL.crypto.X509):
func = OpenSSL.crypto.dump_certificate
if isinstance(cert_or_req, crypto.X509):
func = crypto.dump_certificate
else:
func = OpenSSL.crypto.dump_certificate_request
text = func(OpenSSL.crypto.FILETYPE_TEXT, cert_or_req).decode("utf-8")
func = crypto.dump_certificate_request
text = func(crypto.FILETYPE_TEXT, cert_or_req).decode("utf-8")
# WARNING: this function does not support multiple SANs extensions.
# Multiple X509v3 extensions of the same type is disallowed by RFC 5280.
match = re.search(r"X509v3 Subject Alternative Name:(?: critical)?\s*(.*)", text)
@@ -252,12 +252,12 @@ def gen_ss_cert(key, domains, not_before=None,
"""
assert domains, "Must provide one or more hostnames for the cert."
cert = OpenSSL.crypto.X509()
cert = crypto.X509()
cert.set_serial_number(int(binascii.hexlify(os.urandom(16)), 16))
cert.set_version(2)
extensions = [
OpenSSL.crypto.X509Extension(
crypto.X509Extension(
b"basicConstraints", True, b"CA:TRUE, pathlen:0"),
]
@@ -266,7 +266,7 @@ def gen_ss_cert(key, domains, not_before=None,
cert.set_issuer(cert.get_subject())
if force_san or len(domains) > 1:
extensions.append(OpenSSL.crypto.X509Extension(
extensions.append(crypto.X509Extension(
b"subjectAltName",
critical=False,
value=b", ".join(b"DNS:" + d.encode() for d in domains)
@@ -281,7 +281,7 @@ def gen_ss_cert(key, domains, not_before=None,
cert.sign(key, "sha256")
return cert
def dump_pyopenssl_chain(chain, filetype=OpenSSL.crypto.FILETYPE_PEM):
def dump_pyopenssl_chain(chain, filetype=crypto.FILETYPE_PEM):
"""Dump certificate chain into a bundle.
:param list chain: List of `OpenSSL.crypto.X509` (or wrapped in
@@ -298,7 +298,7 @@ def dump_pyopenssl_chain(chain, filetype=OpenSSL.crypto.FILETYPE_PEM):
if isinstance(cert, jose.ComparableX509):
# pylint: disable=protected-access
cert = cert.wrapped
return OpenSSL.crypto.dump_certificate(filetype, cert)
return crypto.dump_certificate(filetype, cert)
# assumes that OpenSSL.crypto.dump_certificate includes ending
# newline character

View File

@@ -10,7 +10,7 @@ import unittest
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
import josepy as jose
import OpenSSL
from OpenSSL import crypto
def vector_path(*names):
@@ -39,8 +39,8 @@ def _guess_loader(filename, loader_pem, loader_der):
def load_cert(*names):
"""Load certificate."""
loader = _guess_loader(
names[-1], OpenSSL.crypto.FILETYPE_PEM, OpenSSL.crypto.FILETYPE_ASN1)
return OpenSSL.crypto.load_certificate(loader, load_vector(*names))
names[-1], crypto.FILETYPE_PEM, crypto.FILETYPE_ASN1)
return crypto.load_certificate(loader, load_vector(*names))
def load_comparable_cert(*names):
@@ -51,8 +51,8 @@ def load_comparable_cert(*names):
def load_csr(*names):
"""Load certificate request."""
loader = _guess_loader(
names[-1], OpenSSL.crypto.FILETYPE_PEM, OpenSSL.crypto.FILETYPE_ASN1)
return OpenSSL.crypto.load_certificate_request(loader, load_vector(*names))
names[-1], crypto.FILETYPE_PEM, crypto.FILETYPE_ASN1)
return crypto.load_certificate_request(loader, load_vector(*names))
def load_comparable_csr(*names):
@@ -71,8 +71,8 @@ def load_rsa_private_key(*names):
def load_pyopenssl_private_key(*names):
"""Load pyOpenSSL private key."""
loader = _guess_loader(
names[-1], OpenSSL.crypto.FILETYPE_PEM, OpenSSL.crypto.FILETYPE_ASN1)
return OpenSSL.crypto.load_privatekey(loader, load_vector(*names))
names[-1], crypto.FILETYPE_PEM, crypto.FILETYPE_ASN1)
return crypto.load_privatekey(loader, load_vector(*names))
def skip_unless(condition, reason): # pragma: no cover

View File

@@ -50,7 +50,8 @@ class AuthenticatorTest(test_util.TempDirTestCase, dns_test_common.BaseAuthentic
class DigitalOceanClientTest(unittest.TestCase):
id = 1
id_num = 1
record_prefix = "_acme-challenge"
record_name = record_prefix + "." + DOMAIN
record_content = "bar"
@@ -70,7 +71,7 @@ class DigitalOceanClientTest(unittest.TestCase):
domain_mock = mock.MagicMock()
domain_mock.name = DOMAIN
domain_mock.create_new_domain_record.return_value = {'domain_record': {'id': self.id}}
domain_mock.create_new_domain_record.return_value = {'domain_record': {'id': self.id_num}}
self.manager.get_all_domains.return_value = [wrong_domain_mock, domain_mock]

View File

@@ -13,7 +13,7 @@ import pyrfc3339
import six
import zope.component
from cryptography.hazmat.backends import default_backend
from cryptography import x509
from cryptography import x509 # type: ignore
from acme import crypto_util as acme_crypto_util

View File

@@ -216,11 +216,12 @@ class CertificatesTest(BaseCertManagerTest):
cert.is_test_cert = False
parsed_certs = [cert]
mock_config = mock.MagicMock(certname=None, lineagename=None)
# pylint: disable=protected-access
# pylint: disable=protected-access
get_report = lambda: cert_manager._report_human_readable(mock_config, parsed_certs)
mock_config = mock.MagicMock(certname=None, lineagename=None)
# pylint: disable=protected-access
out = get_report()
self.assertTrue("INVALID: EXPIRED" in out)

View File

@@ -54,7 +54,7 @@ _INITIAL_PID = os.getpid()
# the dict are attempted to be cleaned up at program exit. If the
# program exits before the lock is cleaned up, it is automatically
# released, but the file isn't deleted.
_LOCKS = OrderedDict()
_LOCKS = OrderedDict() # type: OrderedDict[str, lock.LockFile]
def run_script(params, log=logger.error):

6
mypy.ini Normal file
View File

@@ -0,0 +1,6 @@
[mypy]
python_version = 2.7
ignore_missing_imports = True
[mypy-acme]
check_untyped_defs = True

View File

@@ -140,7 +140,7 @@ basepython = python3
commands =
{[base]pip_install} .[dev3]
{[base]install_packages}
mypy --py2 --ignore-missing-imports {[base]source_paths}
mypy {[base]source_paths}
[testenv:apacheconftest]
#basepython = python2.7