Compare commits

...

12 Commits

Author SHA1 Message Date
Brad Warren
a9cf4d35da Merge branch 'change-cwd' into test-mypy-certbot 2018-05-17 15:19:12 -07:00
Brad Warren
6dfee8b7f6 don't use ~- 2018-05-17 13:47:48 -07:00
Brad Warren
3d15fc8426 Merge branch 'change-cwd' into test-mypy-certbot 2018-05-17 13:33:44 -07:00
Brad Warren
6bf88e6752 cd before running tests
When importing a module, Python first searches the current directory. See
https://docs.python.org/3/tutorial/modules.html#the-module-search-path. This
means that running something like `import certbot` from the root of the Certbot
repo will use the local Certbot files regardless of the version installed on
the system or virtual environment.

Normally this behavior is fine because the local files are what we want to
test, however, during our "oldest" tests, we test against older versions of our
packages to make sure we're keeping compatibility. To make sure our tests use
the correct versions, this commit has our tests cd to an empty temporary
directory before running tests.

We also had to change the package names given to pytest to be the names used in
Python to import the package rather than the name of the files locally to
accommodate this.
2018-05-17 12:35:58 -07:00
Dmitry Figol
6fb026ae6a Reenabling pylint and adding nginx mypy back 2018-05-16 18:20:20 -04:00
Dmitry Figol
543d0e9d0c Merge branch 'certbot-untyped-defs' 2018-05-16 17:35:42 -04:00
Dmitry Figol
255e51205d Revert DNS plugins changes 2018-05-16 17:34:23 -04:00
Dmitry Figol
beed6247ef Implement Brad's suggestions 2018-05-16 17:34:00 -04:00
Dmitry Figol
35b4feaaa9 Merge branch 'master' of https://github.com/certbot/certbot into certbot-untyped-defs 2018-05-16 14:06:51 -04:00
Dmitry Figol
5ac47e095d Merge branch 'master' of https://github.com/certbot/certbot 2018-05-16 14:05:29 -04:00
Dmitry Figol
f840f92c46 Bump acme version in DNS plugins 2018-05-16 05:16:49 -04:00
Dmitry Figol
91a50506c5 Prepare certbot module for mypy check untyped defs
* Fix #5952

* Bump mypy to version 0.600 and fix associated bugs

* Fix pylint bugs after introducing mypy
2018-05-16 04:46:20 -04:00
37 changed files with 322 additions and 214 deletions

View File

@@ -147,9 +147,9 @@ class KeyAuthorizationChallenge(_TokenChallenge):
:param response_cls: Subclass of `KeyAuthorizationChallengeResponse`
that will be used to generate `response`.
:param str typ: type of the challenge
"""
typ = NotImplemented
response_cls = NotImplemented
thumbprint_hash_function = (
KeyAuthorizationChallengeResponse.thumbprint_hash_function)

View File

@@ -12,7 +12,8 @@ import josepy as jose
from acme import errors
# pylint: disable=unused-import, no-name-in-module
from acme.magic_typing import Callable, Text, Union
from acme.magic_typing import Callable, Union, Tuple, Optional
# pylint: enable=unused-import, no-name-in-module
logger = logging.getLogger(__name__)
@@ -135,14 +136,23 @@ def probe_sni(name, host, port=443, timeout=300,
socket_kwargs = {'source_address': source_address}
host_protocol_agnostic = None if host == '::' or host == '0' else host
host_protocol_agnostic = host
if host == '::' or host == '0':
# https://github.com/python/typeshed/pull/2136
# while PR is not merged, we need to ignore
host_protocol_agnostic = None
try:
# pylint: disable=star-args
logger.debug("Attempting to connect to %s:%d%s.", host_protocol_agnostic, port,
" from {0}:{1}".format(source_address[0], source_address[1]) if \
socket_kwargs else "")
sock = socket.create_connection((host_protocol_agnostic, port), **socket_kwargs)
logger.debug(
"Attempting to connect to %s:%d%s.", host_protocol_agnostic, port,
" from {0}:{1}".format(
source_address[0],
source_address[1]
) if socket_kwargs else ""
)
socket_tuple = (host_protocol_agnostic, port) # type: Tuple[Optional[str], int]
sock = socket.create_connection(socket_tuple, **socket_kwargs) # type: ignore
except socket.error as error:
raise errors.Error(error)

View File

@@ -8,6 +8,9 @@ class TypingClass(object):
try:
# mypy doesn't respect modifying sys.modules
from typing import * # pylint: disable=wildcard-import, unused-wildcard-import
from typing import * # pylint: disable=wildcard-import, unused-wildcard-import
# pylint: disable=unused-import
from typing import Collection, IO # type: ignore
# pylint: enable=unused-import
except ImportError:
sys.modules[__name__] = TypingClass()

View File

@@ -8,7 +8,9 @@ import zope.component
from acme import challenges
from acme import messages
# pylint: disable=unused-import, no-name-in-module
from acme.magic_typing import DefaultDict, Dict, List, Set, Collection
# pylint: enable=unused-import, no-name-in-module
from certbot import achallenges
from certbot import errors
from certbot import error_handler
@@ -117,7 +119,7 @@ class AuthHandler(object):
def _solve_challenges(self, aauthzrs):
"""Get Responses for challenges from authenticators."""
resp = []
resp = [] # type: Collection[acme.challenges.ChallengeResponse]
all_achalls = self._get_all_achalls(aauthzrs)
try:
if all_achalls:
@@ -133,10 +135,9 @@ class AuthHandler(object):
def _get_all_achalls(self, aauthzrs):
"""Return all active challenges."""
all_achalls = []
all_achalls = [] # type: Collection[challenges.ChallengeResponse]
for aauthzr in aauthzrs:
all_achalls.extend(aauthzr.achalls)
return all_achalls
def _respond(self, aauthzrs, resp, best_effort):
@@ -146,7 +147,8 @@ class AuthHandler(object):
"""
# TODO: chall_update is a dirty hack to get around acme-spec #105
chall_update = dict()
chall_update = dict() \
# type: Dict[int, List[achallenges.KeyAuthorizationAnnotatedChallenge]]
self._send_responses(aauthzrs, resp, chall_update)
# Check for updated status...
@@ -198,7 +200,7 @@ class AuthHandler(object):
while indices_to_check and rounds < max_rounds:
# TODO: Use retry-after...
time.sleep(min_sleep)
all_failed_achalls = set()
all_failed_achalls = set() # type: Set[achallenges.KeyAuthorizationAnnotatedChallenge]
for index in indices_to_check:
comp_achalls, failed_achalls = self._handle_check(
aauthzrs, index, chall_update[index])
@@ -424,7 +426,7 @@ def _find_smart_path(challbs, preferences, combinations):
# max_cost is now equal to sum(indices) + 1
best_combo = []
best_combo = None
# Set above completing all of the available challenges
best_combo_cost = max_cost
@@ -479,7 +481,7 @@ def _report_no_chall_path(challbs):
msg += (
" You may need to use an authenticator "
"plugin that can do challenges over DNS.")
logger.fatal(msg)
logger.critical(msg)
raise errors.AuthorizationError(msg)
@@ -522,11 +524,11 @@ def _report_failed_challs(failed_achalls):
:class:`certbot.achallenges.AnnotatedChallenge`.
"""
problems = dict()
problems = collections.defaultdict(list)\
# type: DefaultDict[str, List[achallenges.KeyAuthorizationAnnotatedChallenge]]
for achall in failed_achalls:
if achall.error:
problems.setdefault(achall.error.typ, []).append(achall)
problems[achall.error.typ].append(achall)
reporter = zope.component.getUtility(interfaces.IReporter)
for achalls in six.itervalues(problems):
reporter.add_message(

View File

@@ -7,6 +7,7 @@ import re
import traceback
import zope.component
from acme.magic_typing import List # pylint: disable=unused-import, no-name-in-module
from certbot import crypto_util
from certbot import errors
from certbot import interfaces
@@ -226,7 +227,7 @@ def match_and_check_overlaps(cli_config, acceptable_matches, match_func, rv_func
def find_matches(candidate_lineage, return_value, acceptable_matches):
"""Returns a list of matches using _search_lineages."""
acceptable_matches = [func(candidate_lineage) for func in acceptable_matches]
acceptable_matches_rv = []
acceptable_matches_rv = [] # type: List[str]
for item in acceptable_matches:
if isinstance(item, list):
acceptable_matches_rv += item
@@ -340,7 +341,7 @@ def _report_human_readable(config, parsed_certs):
def _describe_certs(config, parsed_certs, parse_failures):
"""Print information about the certs we know about"""
out = []
out = [] # type: List[str]
notify = out.append

View File

@@ -12,10 +12,14 @@ import sys
import configargparse
import six
import zope.component
import zope.interface
from zope.interface import interfaces as zope_interfaces
from acme import challenges
# pylint: disable=unused-import, no-name-in-module
from acme.magic_typing import Any, Dict, Optional
# pylint: enable=unused-import, no-name-in-module
import certbot
@@ -33,7 +37,7 @@ import certbot.plugins.selection as plugin_selection
logger = logging.getLogger(__name__)
# Global, to save us from a lot of argument passing within the scope of this module
helpful_parser = None
helpful_parser = None # type: Optional[HelpfulArgumentParser]
# For help strings, figure out how the user ran us.
# When invoked from letsencrypt-auto, sys.argv[0] is something like:
@@ -196,17 +200,17 @@ def set_by_cli(var):
(CLI or config file) including if the user explicitly set it to the
default. Returns False if the variable was assigned a default value.
"""
detector = set_by_cli.detector
if detector is None:
detector = set_by_cli.detector # type: ignore
if detector is None and helpful_parser is not None:
# Setup on first run: `detector` is a weird version of config in which
# the default value of every attribute is wrangled to be boolean-false
plugins = plugins_disco.PluginsRegistry.find_all()
# reconstructed_args == sys.argv[1:], or whatever was passed to main()
reconstructed_args = helpful_parser.args + [helpful_parser.verb]
detector = set_by_cli.detector = prepare_and_parse_args(
detector = set_by_cli.detector = prepare_and_parse_args( # type: ignore
plugins, reconstructed_args, detect_defaults=True)
# propagate plugin requests: eg --standalone modifies config.authenticator
detector.authenticator, detector.installer = (
detector.authenticator, detector.installer = ( # type: ignore
plugin_selection.cli_plugin_requests(detector))
if not isinstance(getattr(detector, var), _Default):
@@ -220,7 +224,10 @@ def set_by_cli(var):
return True
return False
# static housekeeping var
# functions attributed are not supported by mypy
# https://github.com/python/mypy/issues/2087
set_by_cli.detector = None # type: ignore
@@ -236,8 +243,10 @@ def has_default_value(option, value):
:rtype: bool
"""
return (option in helpful_parser.defaults and
helpful_parser.defaults[option] == value)
if helpful_parser is not None:
return (option in helpful_parser.defaults and
helpful_parser.defaults[option] == value)
return False
def option_was_set(option, value):
@@ -254,11 +263,12 @@ def option_was_set(option, value):
def argparse_type(variable):
"Return our argparse type function for a config variable (default: str)"
"""Return our argparse type function for a config variable (default: str)"""
# pylint: disable=protected-access
for action in helpful_parser.parser._actions:
if action.type is not None and action.dest == variable:
return action.type
if helpful_parser is not None:
for action in helpful_parser.parser._actions:
if action.type is not None and action.dest == variable:
return action.type
return str
def read_file(filename, mode="rb"):
@@ -291,10 +301,12 @@ def flag_default(name):
def config_help(name, hidden=False):
"""Extract the help message for an `.IConfig` attribute."""
# pylint: disable=no-member
if hidden:
return argparse.SUPPRESS
else:
return interfaces.IConfig[name].__doc__
field = interfaces.IConfig.__getitem__(name) # type: zope.interface.interface.Attribute
return field.__doc__
class HelpfulArgumentGroup(object):
@@ -473,7 +485,7 @@ class HelpfulArgumentParser(object):
HELP_TOPICS += list(self.VERBS) + self.COMMANDS_TOPICS + ["manage"]
plugin_names = list(plugins)
self.help_topics = HELP_TOPICS + plugin_names + [None]
self.help_topics = HELP_TOPICS + plugin_names + [None] # type: ignore
self.detect_defaults = detect_defaults
self.args = args
@@ -492,8 +504,11 @@ class HelpfulArgumentParser(object):
short_usage = self._usage_string(plugins, self.help_arg)
self.visible_topics = self.determine_help_topics(self.help_arg)
self.groups = {} # elements are added by .add_group()
self.defaults = {} # elements are added by .parse_args()
# elements are added by .add_group()
self.groups = {} # type: Dict[str, argparse._ArgumentGroup]
# elements are added by .parse_args()
self.defaults = {} # type: Dict[str, Any]
self.parser = configargparse.ArgParser(
prog="certbot",
@@ -805,7 +820,6 @@ class HelpfulArgumentParser(object):
if self.help_arg:
for v in verbs:
self.groups[topic].add_argument(v, help=VERB_HELP_MAP[v]["short"])
return HelpfulArgumentGroup(self, topic)
def add_plugin_args(self, plugins):
@@ -1296,14 +1310,14 @@ def _paths_parser(helpful):
verb = helpful.help_arg
cph = "Path to where certificate is saved (with auth --csr), installed from, or revoked."
section = ["paths", "install", "revoke", "certonly", "manage"]
sections = ["paths", "install", "revoke", "certonly", "manage"]
if verb == "certonly":
add(section, "--cert-path", type=os.path.abspath,
add(sections, "--cert-path", type=os.path.abspath,
default=flag_default("auth_cert_path"), help=cph)
elif verb == "revoke":
add(section, "--cert-path", type=read_file, required=True, help=cph)
add(sections, "--cert-path", type=read_file, required=True, help=cph)
else:
add(section, "--cert-path", type=os.path.abspath, help=cph)
add(sections, "--cert-path", type=os.path.abspath, help=cph)
section = "paths"
if verb in ("install", "revoke"):

View File

@@ -5,7 +5,9 @@ import os
import platform
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
# https://github.com/python/typeshed/blob/master/third_party/
# 2/cryptography/hazmat/primitives/asymmetric/rsa.pyi
from cryptography.hazmat.primitives.asymmetric.rsa import generate_private_key # type: ignore
import josepy as jose
import OpenSSL
import zope.component
@@ -156,11 +158,11 @@ def register(config, account_storage, tos_cb=None):
logger.info("Registering without email!")
# Each new registration shall use a fresh new key
key = jose.JWKRSA(key=jose.ComparableRSAKey(
rsa.generate_private_key(
rsa_key = generate_private_key(
public_exponent=65537,
key_size=config.rsa_key_size,
backend=default_backend())))
backend=default_backend())
key = jose.JWKRSA(key=jose.ComparableRSAKey(rsa_key))
acme = acme_from_config_key(config, key)
# TODO: add phone?
regr = perform_registration(acme, config, tos_cb)
@@ -605,8 +607,10 @@ def validate_key_csr(privkey, csr=None):
if csr.form == "der":
csr_obj = OpenSSL.crypto.load_certificate_request(
OpenSSL.crypto.FILETYPE_ASN1, csr.data)
csr = util.CSR(csr.file, OpenSSL.crypto.dump_certificate(
OpenSSL.crypto.FILETYPE_PEM, csr_obj), "pem")
cert_buffer = OpenSSL.crypto.dump_certificate_request(
OpenSSL.crypto.FILETYPE_PEM, csr_obj
)
csr = util.CSR(csr.file, cert_buffer, "pem")
# If CSR is provided, it must be readable and valid.
if csr.data and not crypto_util.valid_csr(csr.data):

View File

@@ -8,15 +8,18 @@ import hashlib
import logging
import os
import OpenSSL
import pyrfc3339
import six
import zope.component
from OpenSSL import crypto
from OpenSSL import SSL # type: ignore
from cryptography.hazmat.backends import default_backend
from cryptography import x509 # type: ignore
# https://github.com/python/typeshed/tree/master/third_party/2/cryptography
from cryptography import x509 # type: ignore
from acme import crypto_util as acme_crypto_util
from acme.magic_typing import IO # pylint: disable=unused-import, no-name-in-module
from certbot import errors
from certbot import interfaces
from certbot import util
@@ -47,7 +50,7 @@ def init_save_key(key_size, key_dir, keyname="key-certbot.pem"):
try:
key_pem = make_key(key_size)
except ValueError as err:
logger.exception(err)
logger.error("", exc_info=True)
raise err
config = zope.component.getUtility(interfaces.IConfig)
@@ -111,11 +114,11 @@ def valid_csr(csr):
"""
try:
req = OpenSSL.crypto.load_certificate_request(
OpenSSL.crypto.FILETYPE_PEM, csr)
req = crypto.load_certificate_request(
crypto.FILETYPE_PEM, csr)
return req.verify(req.get_pubkey())
except OpenSSL.crypto.Error as error:
logger.debug(error, exc_info=True)
except crypto.Error:
logger.debug("", exc_info=True)
return False
@@ -129,13 +132,13 @@ def csr_matches_pubkey(csr, privkey):
:rtype: bool
"""
req = OpenSSL.crypto.load_certificate_request(
OpenSSL.crypto.FILETYPE_PEM, csr)
pkey = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, privkey)
req = crypto.load_certificate_request(
crypto.FILETYPE_PEM, csr)
pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, privkey)
try:
return req.verify(pkey)
except OpenSSL.crypto.Error as error:
logger.debug(error, exc_info=True)
except crypto.Error:
logger.debug("", exc_info=True)
return False
@@ -145,26 +148,26 @@ def import_csr_file(csrfile, data):
:param str csrfile: CSR filename
:param str data: contents of the CSR file
:returns: (`OpenSSL.crypto.FILETYPE_PEM`,
:returns: (`crypto.FILETYPE_PEM`,
util.CSR object representing the CSR,
list of domains requested in the CSR)
:rtype: tuple
"""
PEM = OpenSSL.crypto.FILETYPE_PEM
load = OpenSSL.crypto.load_certificate_request
PEM = crypto.FILETYPE_PEM
load = crypto.load_certificate_request
try:
# Try to parse as DER first, then fall back to PEM.
csr = load(OpenSSL.crypto.FILETYPE_ASN1, data)
except OpenSSL.crypto.Error:
csr = load(crypto.FILETYPE_ASN1, data)
except crypto.Error:
try:
csr = load(PEM, data)
except OpenSSL.crypto.Error:
except crypto.Error:
raise errors.Error("Failed to parse CSR file: {0}".format(csrfile))
domains = _get_names_from_loaded_cert_or_req(csr)
# Internally we always use PEM, so re-encode as PEM before returning.
data_pem = OpenSSL.crypto.dump_certificate_request(PEM, csr)
data_pem = crypto.dump_certificate_request(PEM, csr)
return PEM, util.CSR(file=csrfile, data=data_pem, form="pem"), domains
@@ -178,9 +181,9 @@ def make_key(bits):
"""
assert bits >= 1024 # XXX
key = OpenSSL.crypto.PKey()
key.generate_key(OpenSSL.crypto.TYPE_RSA, bits)
return OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, key)
key = crypto.PKey()
key.generate_key(crypto.TYPE_RSA, bits)
return crypto.dump_privatekey(crypto.FILETYPE_PEM, key)
def valid_privkey(privkey):
@@ -193,9 +196,9 @@ def valid_privkey(privkey):
"""
try:
return OpenSSL.crypto.load_privatekey(
OpenSSL.crypto.FILETYPE_PEM, privkey).check()
except (TypeError, OpenSSL.crypto.Error):
return crypto.load_privatekey(
crypto.FILETYPE_PEM, privkey).check()
except (TypeError, crypto.Error):
return False
@@ -224,13 +227,14 @@ def verify_renewable_cert_sig(renewable_cert):
:raises errors.Error: If signature verification fails.
"""
try:
with open(renewable_cert.chain, 'rb') as chain:
chain, _ = pyopenssl_load_certificate(chain.read())
with open(renewable_cert.cert, 'rb') as cert:
cert = x509.load_pem_x509_certificate(cert.read(), default_backend())
with open(renewable_cert.chain, 'rb') as chain_file: # type: IO[bytes]
chain, _ = pyopenssl_load_certificate(chain_file.read())
with open(renewable_cert.cert, 'rb') as cert_file: # type: IO[bytes]
cert = x509.load_pem_x509_certificate(
cert_file.read(), default_backend())
hash_name = cert.signature_hash_algorithm.name
OpenSSL.crypto.verify(chain, cert.signature, cert.tbs_certificate_bytes, hash_name)
except (IOError, ValueError, OpenSSL.crypto.Error) as e:
crypto.verify(chain, cert.signature, cert.tbs_certificate_bytes, hash_name)
except (IOError, ValueError, crypto.Error) as e:
error_str = "verifying the signature of the cert located at {0} has failed. \
Details: {1}".format(renewable_cert.cert, e)
logger.exception(error_str)
@@ -246,11 +250,11 @@ def verify_cert_matches_priv_key(cert_path, key_path):
:raises errors.Error: If they don't match.
"""
try:
context = OpenSSL.SSL.Context(OpenSSL.SSL.SSLv23_METHOD)
context = SSL.Context(SSL.SSLv23_METHOD)
context.use_certificate_file(cert_path)
context.use_privatekey_file(key_path)
context.check_privatekey()
except (IOError, OpenSSL.SSL.Error) as e:
except (IOError, SSL.Error) as e:
error_str = "verifying the cert located at {0} matches the \
private key located at {1} has failed. \
Details: {2}".format(cert_path,
@@ -267,12 +271,12 @@ def verify_fullchain(renewable_cert):
:raises errors.Error: If cert and chain do not combine to fullchain.
"""
try:
with open(renewable_cert.chain) as chain:
chain = chain.read()
with open(renewable_cert.cert) as cert:
cert = cert.read()
with open(renewable_cert.fullchain) as fullchain:
fullchain = fullchain.read()
with open(renewable_cert.chain) as chain_file: # type: IO[str]
chain = chain_file.read()
with open(renewable_cert.cert) as cert_file: # type: IO[str]
cert = cert_file.read()
with open(renewable_cert.fullchain) as fullchain_file: # type: IO[str]
fullchain = fullchain_file.read()
if (cert + chain) != fullchain:
error_str = "fullchain does not match cert + chain for {0}!"
error_str = error_str.format(renewable_cert.lineagename)
@@ -294,43 +298,43 @@ def pyopenssl_load_certificate(data):
openssl_errors = []
for file_type in (OpenSSL.crypto.FILETYPE_PEM, OpenSSL.crypto.FILETYPE_ASN1):
for file_type in (crypto.FILETYPE_PEM, crypto.FILETYPE_ASN1):
try:
return OpenSSL.crypto.load_certificate(file_type, data), file_type
except OpenSSL.crypto.Error as error: # TODO: other errors?
return crypto.load_certificate(file_type, data), file_type
except crypto.Error as error: # TODO: other errors?
openssl_errors.append(error)
raise errors.Error("Unable to load: {0}".format(",".join(
str(error) for error in openssl_errors)))
def _load_cert_or_req(cert_or_req_str, load_func,
typ=OpenSSL.crypto.FILETYPE_PEM):
typ=crypto.FILETYPE_PEM):
try:
return load_func(typ, cert_or_req_str)
except OpenSSL.crypto.Error as error:
logger.exception(error)
except crypto.Error:
logger.error("", exc_info=True)
raise
def _get_sans_from_cert_or_req(cert_or_req_str, load_func,
typ=OpenSSL.crypto.FILETYPE_PEM):
typ=crypto.FILETYPE_PEM):
# pylint: disable=protected-access
return acme_crypto_util._pyopenssl_cert_or_req_san(_load_cert_or_req(
cert_or_req_str, load_func, typ))
def get_sans_from_cert(cert, typ=OpenSSL.crypto.FILETYPE_PEM):
def get_sans_from_cert(cert, typ=crypto.FILETYPE_PEM):
"""Get a list of Subject Alternative Names from a certificate.
:param str cert: Certificate (encoded).
:param typ: `OpenSSL.crypto.FILETYPE_PEM` or `OpenSSL.crypto.FILETYPE_ASN1`
:param typ: `crypto.FILETYPE_PEM` or `crypto.FILETYPE_ASN1`
:returns: A list of Subject Alternative Names.
:rtype: list
"""
return _get_sans_from_cert_or_req(
cert, OpenSSL.crypto.load_certificate, typ)
cert, crypto.load_certificate, typ)
def _get_names_from_cert_or_req(cert_or_req, load_func, typ):
@@ -343,24 +347,24 @@ def _get_names_from_loaded_cert_or_req(loaded_cert_or_req):
return acme_crypto_util._pyopenssl_cert_or_req_all_names(loaded_cert_or_req)
def get_names_from_cert(csr, typ=OpenSSL.crypto.FILETYPE_PEM):
def get_names_from_cert(csr, typ=crypto.FILETYPE_PEM):
"""Get a list of domains from a cert, including the CN if it is set.
:param str cert: Certificate (encoded).
:param typ: `OpenSSL.crypto.FILETYPE_PEM` or `OpenSSL.crypto.FILETYPE_ASN1`
:param typ: `crypto.FILETYPE_PEM` or `crypto.FILETYPE_ASN1`
:returns: A list of domain names.
:rtype: list
"""
return _get_names_from_cert_or_req(
csr, OpenSSL.crypto.load_certificate, typ)
csr, crypto.load_certificate, typ)
def dump_pyopenssl_chain(chain, filetype=OpenSSL.crypto.FILETYPE_PEM):
def dump_pyopenssl_chain(chain, filetype=crypto.FILETYPE_PEM):
"""Dump certificate chain into a bundle.
:param list chain: List of `OpenSSL.crypto.X509` (or wrapped in
:param list chain: List of `crypto.X509` (or wrapped in
:class:`josepy.util.ComparableX509`).
"""
@@ -378,7 +382,7 @@ def notBefore(cert_path):
:rtype: :class:`datetime.datetime`
"""
return _notAfterBefore(cert_path, OpenSSL.crypto.X509.get_notBefore)
return _notAfterBefore(cert_path, crypto.X509.get_notBefore)
def notAfter(cert_path):
@@ -390,15 +394,15 @@ def notAfter(cert_path):
:rtype: :class:`datetime.datetime`
"""
return _notAfterBefore(cert_path, OpenSSL.crypto.X509.get_notAfter)
return _notAfterBefore(cert_path, crypto.X509.get_notAfter)
def _notAfterBefore(cert_path, method):
"""Internal helper function for finding notbefore/notafter.
:param str cert_path: path to a cert in PEM format
:param function method: one of ``OpenSSL.crypto.X509.get_notBefore``
or ``OpenSSL.crypto.X509.get_notAfter``
:param function method: one of ``crypto.X509.get_notBefore``
or ``crypto.X509.get_notAfter``
:returns: the notBefore or notAfter value from the cert at cert_path
:rtype: :class:`datetime.datetime`
@@ -406,7 +410,7 @@ def _notAfterBefore(cert_path, method):
"""
# pylint: disable=redefined-outer-name
with open(cert_path) as f:
x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
x509 = crypto.load_certificate(crypto.FILETYPE_PEM,
f.read())
# pyopenssl always returns bytes
timestamp = method(x509)
@@ -443,7 +447,7 @@ def cert_and_chain_from_fullchain(fullchain_pem):
:rtype: tuple
"""
cert = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, fullchain_pem)).decode()
cert = crypto.dump_certificate(crypto.FILETYPE_PEM,
crypto.load_certificate(crypto.FILETYPE_PEM, fullchain_pem)).decode()
chain = fullchain_pem[len(cert):].lstrip()
return (cert, chain)

View File

@@ -5,6 +5,10 @@ import os
import signal
import traceback
# pylint: disable=unused-import, no-name-in-module
from acme.magic_typing import Any, Callable, Dict, List, Union
# pylint: enable=unused-import, no-name-in-module
from certbot import errors
logger = logging.getLogger(__name__)
@@ -56,9 +60,9 @@ class ErrorHandler(object):
def __init__(self, func=None, *args, **kwargs):
self.call_on_regular_exit = False
self.body_executed = False
self.funcs = []
self.prev_handlers = {}
self.received_signals = []
self.funcs = [] # type: List[Callable[[], Any]]
self.prev_handlers = {} # type: Dict[int, Union[int, None, Callable]]
self.received_signals = [] # type: List[int]
if func is not None:
self.register(func, *args, **kwargs)
@@ -88,6 +92,7 @@ class ErrorHandler(object):
return retval
def register(self, func, *args, **kwargs):
# type: (Callable, *Any, **Any) -> None
"""Sets func to be run with the given arguments during cleanup.
:param function func: function to be called in case of an error
@@ -101,9 +106,8 @@ class ErrorHandler(object):
while self.funcs:
try:
self.funcs[-1]()
except Exception as error: # pylint: disable=broad-except
logger.error("Encountered exception during recovery")
logger.exception(error)
except Exception: # pylint: disable=broad-except
logger.error("Encountered exception during recovery: ", exc_info=True)
self.funcs.pop()
def _set_signal_handlers(self):

View File

@@ -6,6 +6,7 @@ import os
from subprocess import Popen, PIPE
from acme.magic_typing import Set, List # pylint: disable=unused-import, no-name-in-module
from certbot import errors
from certbot import util
@@ -76,7 +77,8 @@ def pre_hook(config):
if cmd:
_run_pre_hook_if_necessary(cmd)
pre_hook.already = set() # type: ignore
executed_pre_hooks = set() # type: Set[str]
def _run_pre_hook_if_necessary(command):
@@ -88,12 +90,12 @@ def _run_pre_hook_if_necessary(command):
:param str command: pre-hook to be run
"""
if command in pre_hook.already:
if command in executed_pre_hooks:
logger.info("Pre-hook command already run, skipping: %s", command)
else:
logger.info("Running pre-hook command: %s", command)
_run_hook(command)
pre_hook.already.add(command)
executed_pre_hooks.add(command)
def post_hook(config):
@@ -127,7 +129,8 @@ def post_hook(config):
logger.info("Running post-hook command: %s", cmd)
_run_hook(cmd)
post_hook.eventually = [] # type: ignore
post_hooks = [] # type: List[str]
def _run_eventually(command):
@@ -139,13 +142,13 @@ def _run_eventually(command):
:param str command: post-hook to register to be run
"""
if command not in post_hook.eventually:
post_hook.eventually.append(command)
if command not in post_hooks:
post_hooks.append(command)
def run_saved_post_hooks():
"""Run any post hooks that were saved up in the course of the 'renew' verb"""
for cmd in post_hook.eventually:
for cmd in post_hooks:
logger.info("Running post-hook command: %s", cmd)
_run_hook(cmd)

View File

@@ -191,9 +191,8 @@ class MemoryHandler(logging.handlers.MemoryHandler):
only happens when flush(force=True) is called.
"""
def __init__(self, target=None):
def __init__(self, target=None, capacity=10000):
# capacity doesn't matter because should_flush() is overridden
capacity = float('inf')
super(MemoryHandler, self).__init__(capacity, target=target)
def close(self):

View File

@@ -11,6 +11,7 @@ import josepy as jose
import zope.component
from acme import errors as acme_errors
from acme.magic_typing import Union # pylint: disable=unused-import, no-name-in-module
import certbot
@@ -520,8 +521,8 @@ def _determine_account(config):
config, account_storage, tos_cb=_tos_cb)
except errors.MissingCommandlineFlag:
raise
except errors.Error as error:
logger.debug(error, exc_info=True)
except errors.Error:
logger.debug("", exc_info=True)
raise errors.Error(
"Unable to register an account with ACME server")
@@ -1271,7 +1272,8 @@ def set_displayer(config):
"""
if config.quiet:
config.noninteractive_mode = True
displayer = display_util.NoninteractiveDisplay(open(os.devnull, "w"))
displayer = display_util.NoninteractiveDisplay(open(os.devnull, "w")) \
# type: Union[None, display_util.NoninteractiveDisplay, display_util.FileDisplay]
elif config.noninteractive_mode:
displayer = display_util.NoninteractiveDisplay(sys.stdout)
else:

View File

@@ -11,6 +11,8 @@ import zope.interface
from josepy import util as jose_util
from acme.magic_typing import List # pylint: disable=unused-import, no-name-in-module
from certbot import achallenges # pylint: disable=unused-import
from certbot import constants
from certbot import crypto_util
from certbot import errors
@@ -331,8 +333,8 @@ class ChallengePerformer(object):
def __init__(self, configurator):
self.configurator = configurator
self.achalls = []
self.indices = []
self.achalls = [] # type: List[achallenges.KeyAuthorizationAnnotatedChallenge]
self.indices = [] # type: List[int]
def add_chall(self, achall, idx=None):
"""Store challenge to be performed when perform() is called.

View File

@@ -10,6 +10,7 @@ from collections import OrderedDict
import zope.interface
import zope.interface.verify
from acme.magic_typing import Dict # pylint: disable=unused-import, no-name-in-module
from certbot import constants
from certbot import errors
from certbot import interfaces
@@ -189,7 +190,7 @@ class PluginsRegistry(collections.Mapping):
@classmethod
def find_all(cls):
"""Find plugins using setuptools entry points."""
plugins = {}
plugins = {} # type: Dict[str, PluginEntryPoint]
# pylint: disable=not-callable
entry_points = itertools.chain(
pkg_resources.iter_entry_points(

View File

@@ -8,6 +8,7 @@ import pkg_resources
import six
import zope.interface
from acme.magic_typing import List # pylint: disable=unused-import, no-name-in-module
from certbot import errors
from certbot import interfaces
@@ -250,7 +251,7 @@ class PluginsRegistryTest(unittest.TestCase):
self.plugin_ep.prepare.assert_called_once_with()
def test_prepare_order(self):
order = []
order = [] # type: List[str]
plugins = dict(
(c, mock.MagicMock(prepare=functools.partial(order.append, c)))
for c in string.ascii_letters)

View File

@@ -5,7 +5,9 @@ import zope.component
import zope.interface
from acme import challenges
from acme.magic_typing import Dict # pylint: disable=unused-import, no-name-in-module
from certbot import achallenges # pylint: disable=unused-import
from certbot import interfaces
from certbot import errors
from certbot import hooks
@@ -98,7 +100,8 @@ when it receives a TLS ClientHello with the SNI extension set to
super(Authenticator, self).__init__(*args, **kwargs)
self.reverter = reverter.Reverter(self.config)
self.reverter.recovery_routine()
self.env = dict()
self.env = dict() \
# type: Dict[achallenges.KeyAuthorizationAnnotatedChallenge, Dict[str, str]]
self.tls_sni_01 = None
@classmethod

View File

@@ -6,6 +6,7 @@ import unittest
import mock
import zope.component
from acme.magic_typing import List # pylint: disable=unused-import, no-name-in-module
from certbot.display import util as display_util
from certbot.tests import util as test_util
from certbot import interfaces
@@ -47,7 +48,7 @@ class PickPluginTest(unittest.TestCase):
self.default = None
self.reg = mock.MagicMock()
self.question = "Question?"
self.ifaces = []
self.ifaces = [] # type: List[interfaces.IPlugin]
def _call(self):
from certbot.plugins.selection import pick_plugin

View File

@@ -3,6 +3,8 @@ import argparse
import collections
import logging
import socket
# https://github.com/python/typeshed/blob/master/stdlib/2and3/socket.pyi
from socket import errno as socket_errors # type: ignore
import OpenSSL
import six
@@ -10,7 +12,10 @@ import zope.interface
from acme import challenges
from acme import standalone as acme_standalone
# pylint: disable=unused-import, no-name-in-module
from acme.magic_typing import DefaultDict, Dict, Set, Tuple, List, Type, TYPE_CHECKING
from certbot import achallenges # pylint: disable=unused-import
from certbot import errors
from certbot import interfaces
@@ -18,6 +23,11 @@ from certbot.plugins import common
logger = logging.getLogger(__name__)
if TYPE_CHECKING:
ServedType = DefaultDict[
acme_standalone.BaseDualNetworkedServers,
Set[achallenges.KeyAuthorizationAnnotatedChallenge]
]
class ServerManager(object):
"""Standalone servers manager.
@@ -33,7 +43,7 @@ class ServerManager(object):
"""
def __init__(self, certs, http_01_resources):
self._instances = {}
self._instances = {} # type: Dict[int, acme_standalone.BaseDualNetworkedServers]
self.certs = certs
self.http_01_resources = http_01_resources
@@ -59,7 +69,8 @@ class ServerManager(object):
address = (listenaddr, port)
try:
if challenge_type is challenges.TLSSNI01:
servers = acme_standalone.TLSSNI01DualNetworkedServers(address, self.certs)
servers = acme_standalone.TLSSNI01DualNetworkedServers(
address, self.certs) # type: acme_standalone.BaseDualNetworkedServers
else: # challenges.HTTP01
servers = acme_standalone.HTTP01DualNetworkedServers(
address, self.http_01_resources)
@@ -103,7 +114,8 @@ class ServerManager(object):
return self._instances.copy()
SUPPORTED_CHALLENGES = [challenges.TLSSNI01, challenges.HTTP01]
SUPPORTED_CHALLENGES = [challenges.TLSSNI01, challenges.HTTP01] \
# type: List[Type[challenges.KeyAuthorizationChallenge]]
class SupportedChallengesAction(argparse.Action):
@@ -179,14 +191,15 @@ class Authenticator(common.Plugin):
self.key = OpenSSL.crypto.PKey()
self.key.generate_key(OpenSSL.crypto.TYPE_RSA, 2048)
self.served = collections.defaultdict(set)
self.served = collections.defaultdict(set) # type: ServedType
# Stuff below is shared across threads (i.e. servers read
# values, main thread writes). Due to the nature of CPython's
# GIL, the operations are safe, c.f.
# https://docs.python.org/2/faq/library.html#what-kinds-of-global-value-mutation-are-thread-safe
self.certs = {}
self.http_01_resources = set()
self.certs = {} # type: Dict[bytes, Tuple[OpenSSL.crypto.PKey, OpenSSL.crypto.X509]]
self.http_01_resources = set() \
# type: Set[acme_standalone.HTTP01RequestHandler.HTTP01Resource]
self.servers = ServerManager(self.certs, self.http_01_resources)
@@ -265,13 +278,13 @@ class Authenticator(common.Plugin):
def _handle_perform_error(error):
if error.socket_error.errno == socket.errno.EACCES:
if error.socket_error.errno == socket_errors.EACCES:
raise errors.PluginError(
"Could not bind TCP port {0} because you don't have "
"the appropriate permissions (for example, you "
"aren't running this program as "
"root).".format(error.port))
elif error.socket_error.errno == socket.errno.EADDRINUSE:
elif error.socket_error.errno == socket_errors.EADDRINUSE:
display = zope.component.getUtility(interfaces.IDisplay)
msg = (
"Could not bind TCP port {0} because it is already in "

View File

@@ -2,12 +2,18 @@
import argparse
import socket
import unittest
# https://github.com/python/typeshed/blob/master/stdlib/2and3/socket.pyi
from socket import errno as socket_errors # type: ignore
import josepy as jose
import mock
import six
import OpenSSL.crypto # pylint: disable=unused-import
from acme import challenges
from acme import standalone as acme_standalone # pylint: disable=unused-import
from acme.magic_typing import Dict, Tuple, Set # pylint: disable=unused-import, no-name-in-module
from certbot import achallenges
from certbot import errors
@@ -21,8 +27,9 @@ class ServerManagerTest(unittest.TestCase):
def setUp(self):
from certbot.plugins.standalone import ServerManager
self.certs = {}
self.http_01_resources = {}
self.certs = {} # type: Dict[bytes, Tuple[OpenSSL.crypto.PKey, OpenSSL.crypto.X509]]
self.http_01_resources = {} \
# type: Set[acme_standalone.HTTP01RequestHandler.HTTP01Resource]
self.mgr = ServerManager(self.certs, self.http_01_resources)
def test_init(self):
@@ -159,7 +166,7 @@ class AuthenticatorTest(unittest.TestCase):
@test_util.patch_get_utility()
def test_perform_eaddrinuse_retry(self, mock_get_utility):
mock_utility = mock_get_utility()
errno = socket.errno.EADDRINUSE
errno = socket_errors.EADDRINUSE
error = errors.StandaloneBindError(mock.MagicMock(errno=errno), -1)
self.auth.servers.run.side_effect = [error] + 2 * [mock.MagicMock()]
mock_yesno = mock_utility.yesno
@@ -174,7 +181,7 @@ class AuthenticatorTest(unittest.TestCase):
mock_yesno = mock_utility.yesno
mock_yesno.return_value = False
errno = socket.errno.EADDRINUSE
errno = socket_errors.EADDRINUSE
self.assertRaises(errors.PluginError, self._fail_perform, errno)
self._assert_correct_yesno_call(mock_yesno)
@@ -184,11 +191,11 @@ class AuthenticatorTest(unittest.TestCase):
self.assertFalse(yesno_kwargs.get("default", True))
def test_perform_eacces(self):
errno = socket.errno.EACCES
errno = socket_errors.EACCES
self.assertRaises(errors.PluginError, self._fail_perform, errno)
def test_perform_unexpected_socket_error(self):
errno = socket.errno.ENOTCONN
errno = socket_errors.ENOTCONN
self.assertRaises(
errors.StandaloneBindError, self._fail_perform, errno)

View File

@@ -3,6 +3,7 @@ import json
import logging
import os
from acme.magic_typing import Any, Dict # pylint: disable=unused-import, no-name-in-module
from certbot import errors
logger = logging.getLogger(__name__)
@@ -38,7 +39,7 @@ class PluginStorage(object):
:raises .errors.PluginStorageError: when unable to open or read the file
"""
data = dict()
data = dict() # type: Dict[str, Any]
filedata = ""
try:
with open(self._storagepath, 'r') as fh:

View File

@@ -10,8 +10,12 @@ import six
import zope.component
import zope.interface
from acme import challenges
from acme import challenges # pylint: disable=unused-import
# pylint: disable=unused-import, no-name-in-module
from acme.magic_typing import Dict, Set, DefaultDict, List
# pylint: enable=unused-import, no-name-in-module
from certbot import achallenges # pylint: disable=unused-import
from certbot import cli
from certbot import errors
from certbot import interfaces
@@ -64,10 +68,11 @@ to serve all files under specified web root ({0})."""
def __init__(self, *args, **kwargs):
super(Authenticator, self).__init__(*args, **kwargs)
self.full_roots = {}
self.performed = collections.defaultdict(set)
self.full_roots = {} # type: Dict[str, str]
self.performed = collections.defaultdict(set) \
# type: DefaultDict[str, Set[achallenges.KeyAuthorizationAnnotatedChallenge]]
# stack of dirs successfully created by this authenticator
self._created_dirs = []
self._created_dirs = [] # type: List[str]
def prepare(self): # pylint: disable=missing-docstring
pass
@@ -156,7 +161,6 @@ to serve all files under specified web root ({0})."""
" --help webroot for examples.")
for name, path in path_map.items():
self.full_roots[name] = os.path.join(path, challenges.HTTP01.URI_ROOT_PATH)
logger.debug("Creating root challenges validation dir at %s",
self.full_roots[name])
@@ -207,7 +211,6 @@ to serve all files under specified web root ({0})."""
os.umask(old_umask)
self.performed[root_path].add(achall)
return response
def cleanup(self, achalls): # pylint: disable=missing-docstring
@@ -219,7 +222,7 @@ to serve all files under specified web root ({0})."""
os.remove(validation_path)
self.performed[root_path].remove(achall)
not_removed = []
not_removed = [] # type: List[str]
while len(self._created_dirs) > 0:
path = self._created_dirs.pop()
try:

View File

@@ -11,6 +11,8 @@ import zope.component
import OpenSSL
from acme.magic_typing import List # pylint: disable=unused-import, no-name-in-module
from certbot import cli
from certbot import crypto_util
from certbot import errors
@@ -59,8 +61,8 @@ def _reconstitute(config, full_path):
"""
try:
renewal_candidate = storage.RenewableCert(full_path, config)
except (errors.CertStorageError, IOError) as exc:
logger.warning(exc)
except (errors.CertStorageError, IOError):
logger.warning("", exc_info=True)
logger.warning("Renewal configuration file %s is broken. Skipping.", full_path)
logger.debug("Traceback was:\n%s", traceback.format_exc())
return None
@@ -133,14 +135,15 @@ def _restore_plugin_configs(config, renewalparams):
# longer defined, stored copies of that parameter will be
# deserialized as strings by this logic even if they were
# originally meant to be some other type.
plugin_prefixes = [] # type: List[str]
if renewalparams["authenticator"] == "webroot":
_restore_webroot_config(config, renewalparams)
plugin_prefixes = []
else:
plugin_prefixes = [renewalparams["authenticator"]]
plugin_prefixes.append(renewalparams["authenticator"])
if renewalparams.get("installer", None) is not None:
if renewalparams.get("installer") is not None:
plugin_prefixes.append(renewalparams["installer"])
for plugin_prefix in set(plugin_prefixes):
plugin_prefix = plugin_prefix.replace('-', '_')
for config_item, config_value in six.iteritems(renewalparams):
@@ -316,13 +319,13 @@ def report(msgs, category):
def _renew_describe_results(config, renew_successes, renew_failures,
renew_skipped, parse_failures):
out = []
out = [] # type: List[str]
notify = out.append
disp = zope.component.getUtility(interfaces.IDisplay)
def notify_error(err):
"""Notify and log errors."""
notify(err)
notify(str(err))
logger.error(err)
if config.dry_run:

View File

@@ -82,8 +82,10 @@ class Reverter(object):
self._recover_checkpoint(self.config.temp_checkpoint_dir)
except errors.ReverterError:
# We have a partial or incomplete recovery
logger.fatal("Incomplete or failed recovery for %s",
self.config.temp_checkpoint_dir)
logger.critical(
"Incomplete or failed recovery for %s",
self.config.temp_checkpoint_dir,
)
raise errors.ReverterError("Unable to revert temporary config")
def rollback_checkpoints(self, rollback=1):
@@ -123,7 +125,7 @@ class Reverter(object):
try:
self._recover_checkpoint(cp_dir)
except errors.ReverterError:
logger.fatal("Failed to load checkpoint during rollback")
logger.critical("Failed to load checkpoint during rollback")
raise errors.ReverterError(
"Unable to load checkpoint during rollback")
rollback -= 1
@@ -457,7 +459,7 @@ class Reverter(object):
self._recover_checkpoint(self.config.in_progress_dir)
except errors.ReverterError:
# We have a partial or incomplete recovery
logger.fatal("Incomplete or failed recovery for IN_PROGRESS "
logger.critical("Incomplete or failed recovery for IN_PROGRESS "
"checkpoint - %s",
self.config.in_progress_dir)
raise errors.ReverterError(
@@ -494,7 +496,7 @@ class Reverter(object):
"Certbot probably shut down unexpectedly",
os.linesep, path)
except (IOError, OSError):
logger.fatal(
logger.critical(
"Unable to remove filepaths contained within %s", file_list)
raise errors.ReverterError(
"Unable to remove filepaths contained within "

View File

@@ -10,6 +10,7 @@ import zope.component
from acme import challenges
from acme import client as acme_client
from acme import messages
from acme.magic_typing import Dict # pylint: disable=unused-import, no-name-in-module
from certbot import achallenges
from certbot import errors
@@ -354,12 +355,13 @@ class PollChallengesTest(unittest.TestCase):
acme_util.CHALLENGES, [messages.STATUS_PENDING] * 3, False), [])
]
self.chall_update = {}
self.chall_update = {} # type: Dict[int, achallenges.KeyAuthorizationAnnotatedChallenge]
for i, aauthzr in enumerate(self.aauthzrs):
self.chall_update[i] = [
challb_to_achall(challb, mock.Mock(key="dummy_key"), self.doms[i])
for challb in aauthzr.authzr.body.challenges]
@mock.patch("certbot.auth_handler.time")
def test_poll_challenges(self, unused_mock_time):
self.mock_net.poll.side_effect = self._mock_poll_solve_one_valid

View File

@@ -495,7 +495,8 @@ class SetByCliTest(unittest.TestCase):
for v in ('manual', 'manual_auth_hook', 'manual_public_ip_logging_ok'):
self.assertTrue(_call_set_by_cli(v, args, verb))
cli.set_by_cli.detector = None
# https://github.com/python/mypy/issues/2087
cli.set_by_cli.detector = None # type: ignore
args = ['--manual-auth-hook', 'command']
for v in ('manual_auth_hook', 'manual_public_ip_logging_ok'):

View File

@@ -8,6 +8,7 @@ import unittest
import mock
from six.moves import reload_module # pylint: disable=import-error
from acme.magic_typing import List # pylint: disable=unused-import, no-name-in-module
from certbot.tests.util import TempDirTestCase
class CompleterTest(TempDirTestCase):
@@ -21,7 +22,7 @@ class CompleterTest(TempDirTestCase):
if self.tempdir[-1] != os.sep:
self.tempdir += os.sep
self.paths = []
self.paths = [] # type: List[str]
# create some files and directories in temp_dir
for c in string.ascii_lowercase:
path = os.path.join(self.tempdir, c)

View File

@@ -6,6 +6,9 @@ import sys
import unittest
import mock
# pylint: disable=unused-import, no-name-in-module
from acme.magic_typing import Callable, Dict, Union
# pylint: enable=unused-import, no-name-in-module
def get_signals(signums):
@@ -23,8 +26,7 @@ def set_signals(sig_handler_dict):
def signal_receiver(signums):
"""Context manager to catch signals"""
signals = []
prev_handlers = {}
prev_handlers = get_signals(signums)
prev_handlers = get_signals(signums) # type: Dict[int, Union[int, None, Callable]]
set_signals(dict((s, lambda s, _: signals.append(s)) for s in signums))
yield signals
set_signals(prev_handlers)

View File

@@ -5,6 +5,7 @@ import unittest
import mock
from acme.magic_typing import List # pylint: disable=unused-import, no-name-in-module
from certbot import errors
from certbot.tests import util
@@ -106,8 +107,8 @@ class PreHookTest(HookTest):
super(PreHookTest, self).tearDown()
def _reset_pre_hook_already(self):
from certbot.hooks import pre_hook
pre_hook.already.clear()
from certbot.hooks import executed_pre_hooks
executed_pre_hooks.clear()
def test_certonly(self):
self.config.verb = "certonly"
@@ -184,8 +185,8 @@ class PostHookTest(HookTest):
super(PostHookTest, self).tearDown()
def _reset_post_hook_eventually(self):
from certbot.hooks import post_hook
post_hook.eventually = []
from certbot.hooks import post_hooks
del post_hooks[:]
def test_certonly_and_run_with_hook(self):
for verb in ("certonly", "run",):
@@ -238,8 +239,8 @@ class PostHookTest(HookTest):
self.assertEqual(self._get_eventually(), expected)
def _get_eventually(self):
from certbot.hooks import post_hook
return post_hook.eventually
from certbot.hooks import post_hooks
return post_hooks
class RunSavedPostHooksTest(HookTest):
@@ -248,23 +249,23 @@ class RunSavedPostHooksTest(HookTest):
@classmethod
def _call(cls, *args, **kwargs):
from certbot.hooks import run_saved_post_hooks
return run_saved_post_hooks(*args, **kwargs)
return run_saved_post_hooks()
def _call_with_mock_execute_and_eventually(self, *args, **kwargs):
"""Call run_saved_post_hooks but mock out execute and eventually
certbot.hooks.post_hook.eventually is replaced with
certbot.hooks.post_hooks is replaced with
self.eventually. The mock execute object is returned rather than
the return value of run_saved_post_hooks.
"""
eventually_path = "certbot.hooks.post_hook.eventually"
eventually_path = "certbot.hooks.post_hooks"
with mock.patch(eventually_path, new=self.eventually):
return self._call_with_mock_execute(*args, **kwargs)
def setUp(self):
super(RunSavedPostHooksTest, self).setUp()
self.eventually = []
self.eventually = [] # type: List[str]
def test_empty(self):
self.assertFalse(self._call_with_mock_execute_and_eventually().called)

View File

@@ -10,6 +10,7 @@ import mock
import six
from acme import messages
from acme.magic_typing import Optional # pylint: disable=unused-import, no-name-in-module
from certbot import constants
from certbot import errors
@@ -21,9 +22,9 @@ class PreArgParseSetupTest(unittest.TestCase):
"""Tests for certbot.log.pre_arg_parse_setup."""
@classmethod
def _call(cls, *args, **kwargs):
def _call(cls, *args, **kwargs): # pylint: disable=unused-argument
from certbot.log import pre_arg_parse_setup
return pre_arg_parse_setup(*args, **kwargs)
return pre_arg_parse_setup()
@mock.patch('certbot.log.sys')
@mock.patch('certbot.log.pre_arg_parse_except_hook')
@@ -38,16 +39,16 @@ class PreArgParseSetupTest(unittest.TestCase):
mock_root_logger.setLevel.assert_called_once_with(logging.DEBUG)
self.assertEqual(mock_root_logger.addHandler.call_count, 2)
MemoryHandler = logging.handlers.MemoryHandler
memory_handler = None
memory_handler = None # type: Optional[logging.handlers.MemoryHandler]
for call in mock_root_logger.addHandler.call_args_list:
handler = call[0][0]
if memory_handler is None and isinstance(handler, MemoryHandler):
if memory_handler is None and isinstance(handler, logging.handlers.MemoryHandler):
memory_handler = handler
target = memory_handler.target # type: ignore
else:
self.assertTrue(isinstance(handler, logging.StreamHandler))
self.assertTrue(
isinstance(memory_handler.target, logging.StreamHandler))
isinstance(target, logging.StreamHandler))
mock_register.assert_called_once_with(logging.shutdown)
mock_sys.excepthook(1, 2, 3)

View File

@@ -16,12 +16,14 @@ import josepy as jose
import six
from six.moves import reload_module # pylint: disable=import-error
from acme.magic_typing import List # pylint: disable=unused-import, no-name-in-module
from certbot import account
from certbot import cli
from certbot import constants
from certbot import configuration
from certbot import crypto_util
from certbot import errors
from certbot import interfaces # pylint: disable=unused-import
from certbot import main
from certbot import updater
from certbot import util
@@ -600,14 +602,14 @@ class MainTest(test_util.ConfigTestCase): # pylint: disable=too-many-public-met
if mockisfile:
orig_open = os.path.isfile
def mock_isfile(fn, *args, **kwargs):
def mock_isfile(fn, *args, **kwargs): # pylint: disable=unused-argument
"""Mock os.path.isfile()"""
if (fn.endswith("cert") or
fn.endswith("chain") or
fn.endswith("privkey")):
return True
else:
return orig_open(fn, *args, **kwargs)
return orig_open(fn)
with mock.patch("os.path.isfile") as mock_if:
mock_if.side_effect = mock_isfile
@@ -836,7 +838,7 @@ class MainTest(test_util.ConfigTestCase): # pylint: disable=too-many-public-met
@mock.patch('certbot.main.plugins_disco')
@mock.patch('certbot.main.cli.HelpfulArgumentParser.determine_help_topics')
def test_plugins_no_args(self, _det, mock_disco):
ifaces = []
ifaces = [] # type: List[interfaces.IPlugin]
plugins = mock_disco.PluginsRegistry.find_all()
stdout = six.StringIO()
@@ -851,7 +853,7 @@ class MainTest(test_util.ConfigTestCase): # pylint: disable=too-many-public-met
@mock.patch('certbot.main.plugins_disco')
@mock.patch('certbot.main.cli.HelpfulArgumentParser.determine_help_topics')
def test_plugins_no_args_unprivileged(self, _det, mock_disco):
ifaces = []
ifaces = [] # type: List[interfaces.IPlugin]
plugins = mock_disco.PluginsRegistry.find_all()
def throw_error(directory, mode, uid, strict):
@@ -873,7 +875,7 @@ class MainTest(test_util.ConfigTestCase): # pylint: disable=too-many-public-met
@mock.patch('certbot.main.plugins_disco')
@mock.patch('certbot.main.cli.HelpfulArgumentParser.determine_help_topics')
def test_plugins_init(self, _det, mock_disco):
ifaces = []
ifaces = [] # type: List[interfaces.IPlugin]
plugins = mock_disco.PluginsRegistry.find_all()
stdout = six.StringIO()
@@ -891,7 +893,7 @@ class MainTest(test_util.ConfigTestCase): # pylint: disable=too-many-public-met
@mock.patch('certbot.main.plugins_disco')
@mock.patch('certbot.main.cli.HelpfulArgumentParser.determine_help_topics')
def test_plugins_prepare(self, _det, mock_disco):
ifaces = []
ifaces = [] # type: List[interfaces.IPlugin]
plugins = mock_disco.PluginsRegistry.find_all()
stdout = six.StringIO()
@@ -1040,9 +1042,8 @@ class MainTest(test_util.ConfigTestCase): # pylint: disable=too-many-public-met
mock_client.obtain_certificate.return_value = (mock_certr, 'chain',
mock_key, 'csr')
def write_msg(message, *args, **kwargs):
def write_msg(message, *args, **kwargs): # pylint: disable=unused-argument
"""Write message to stdout."""
_, _ = args, kwargs
stdout.write(message)
try:

View File

@@ -12,7 +12,7 @@ class ReporterTest(unittest.TestCase):
from certbot import reporter
self.reporter = reporter.Reporter(mock.MagicMock(quiet=False))
self.old_stdout = sys.stdout
self.old_stdout = sys.stdout # type: ignore
sys.stdout = six.StringIO()
def tearDown(self):
@@ -21,32 +21,32 @@ class ReporterTest(unittest.TestCase):
def test_multiline_message(self):
self.reporter.add_message("Line 1\nLine 2", self.reporter.LOW_PRIORITY)
self.reporter.print_messages()
output = sys.stdout.getvalue()
output = sys.stdout.getvalue() # type: ignore
self.assertTrue("Line 1\n" in output)
self.assertTrue("Line 2" in output)
def test_tty_print_empty(self):
sys.stdout.isatty = lambda: True
sys.stdout.isatty = lambda: True # type: ignore
self.test_no_tty_print_empty()
def test_no_tty_print_empty(self):
self.reporter.print_messages()
self.assertEqual(sys.stdout.getvalue(), "")
self.assertEqual(sys.stdout.getvalue(), "") # type: ignore
try:
raise ValueError
except ValueError:
self.reporter.print_messages()
self.assertEqual(sys.stdout.getvalue(), "")
self.assertEqual(sys.stdout.getvalue(), "") # type: ignore
def test_tty_successful_exit(self):
sys.stdout.isatty = lambda: True
sys.stdout.isatty = lambda: True # type: ignore
self._successful_exit_common()
def test_no_tty_successful_exit(self):
self._successful_exit_common()
def test_tty_unsuccessful_exit(self):
sys.stdout.isatty = lambda: True
sys.stdout.isatty = lambda: True # type: ignore
self._unsuccessful_exit_common()
def test_no_tty_unsuccessful_exit(self):
@@ -55,7 +55,7 @@ class ReporterTest(unittest.TestCase):
def _successful_exit_common(self):
self._add_messages()
self.reporter.print_messages()
output = sys.stdout.getvalue()
output = sys.stdout.getvalue() # type: ignore
self.assertTrue("IMPORTANT NOTES:" in output)
self.assertTrue("High" in output)
self.assertTrue("Med" in output)
@@ -67,7 +67,7 @@ class ReporterTest(unittest.TestCase):
raise ValueError
except ValueError:
self.reporter.print_messages()
output = sys.stdout.getvalue()
output = sys.stdout.getvalue() # type: ignore
self.assertTrue("IMPORTANT NOTES:" in output)
self.assertTrue("High" in output)
self.assertTrue("Med" not in output)

View File

@@ -20,6 +20,7 @@ from collections import OrderedDict
import configargparse
from acme.magic_typing import Tuple, Union # pylint: disable=unused-import, no-name-in-module
from certbot import constants
from certbot import errors
from certbot import lock
@@ -218,8 +219,12 @@ def safe_open(path, mode="w", chmod=None, buffering=None):
"""
# pylint: disable=star-args
open_args = () if chmod is None else (chmod,)
fdopen_args = () if buffering is None else (buffering,)
open_args = () # type: Union[Tuple[()], Tuple[int]]
if chmod is not None:
open_args = (chmod,)
fdopen_args = () # type: Union[Tuple[()], Tuple[int]]
if buffering is not None:
fdopen_args = (buffering,)
return os.fdopen(
os.open(path, os.O_CREAT | os.O_EXCL | os.O_RDWR, *open_args),
mode, *fdopen_args)
@@ -303,9 +308,8 @@ def get_filtered_names(all_names):
for name in all_names:
try:
filtered_names.add(enforce_le_validity(name))
except errors.ConfigurationError as error:
logger.debug('Not suggesting name "%s"', name)
logger.debug(error)
except errors.ConfigurationError:
logger.debug('Not suggesting name "%s"', name, exc_info=True)
return filtered_names

View File

@@ -5,6 +5,12 @@ ignore_missing_imports = True
[mypy-acme.*]
check_untyped_defs = True
[mypy-acme.magic_typing_test]
ignore_errors = True
[mypy-certbot.*]
check_untyped_defs = True
[mypy-certbot_apache.*]
check_untyped_defs = True

View File

@@ -34,7 +34,7 @@ version = meta['version']
# specified here to avoid masking the more specific request requirements in
# acme. See https://github.com/pypa/pip/issues/988 for more info.
install_requires = [
'acme>=0.22.1',
'acme>0.24.0',
# We technically need ConfigArgParse 0.10.0 for Python 2.6 support, but
# saying so here causes a runtime error against our temporary fork of 0.9.3
# in which we added 2.6 support (see #2243), so we relax the requirement.

View File

@@ -30,7 +30,7 @@ josepy==1.0.1
logger==1.4
logilab-common==1.4.1
MarkupSafe==1.0
mypy==0.580
mypy==0.600
ndg-httpsclient==0.3.2
oauth2client==2.0.0
pathlib2==2.3.0

View File

@@ -12,12 +12,18 @@ else
pip_install="$(dirname $0)/pip_install_editable.sh"
fi
temp_cwd=$(mktemp -d)
trap "rm -rf $temp_cwd" EXIT
set -x
for requirement in "$@" ; do
$pip_install $requirement
pkg=$(echo $requirement | cut -f1 -d\[) # remove any extras such as [dev]
pkg=$(echo "$pkg" | tr - _ ) # convert package names to Python import names
if [ $pkg = "." ]; then
pkg="certbot"
fi
cd "$temp_cwd"
pytest --numprocesses auto --quiet --pyargs $pkg
cd -
done

View File

@@ -121,8 +121,8 @@ commands =
[testenv:mypy]
basepython = python3
commands =
{[base]pip_install} .[dev3]
{[base]install_packages}
{[base]pip_install} .[dev3]
mypy {[base]source_paths}
[testenv:apacheconftest]